Coverage for integrations / social / hierarchy_service.py: 93.9%
179 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - 3-Tier Hierarchy Service
4Manages registration of regional/local nodes, region assignment,
5tier-aware gossip targets, and capacity reporting.
7Tiers: central -> regional -> local (flat = backward-compatible)
8"""
9import logging
10from datetime import datetime
11from typing import Optional, Dict, List
13from sqlalchemy.orm import Session
14from sqlalchemy import func
16logger = logging.getLogger('hevolve_social')
19class HierarchyService:
20 """Static-only service for 3-tier hierarchy operations."""
22 # ─── Registration (central-only) ───
24 @staticmethod
25 def register_regional_host(
26 db: Session,
27 node_id: str,
28 public_key_hex: str,
29 region_name: str,
30 compute_info: dict,
31 certificate: dict,
32 ) -> Dict:
33 """Register a regional host. Called on central node.
35 Verifies certificate chain, creates PeerNode + Region entries.
36 """
37 from .models import PeerNode, Region
39 # Verify certificate
40 from security.key_delegation import verify_certificate_chain
41 chain_result = verify_certificate_chain(certificate)
42 if not chain_result['valid']:
43 return {'registered': False,
44 'error': f'Invalid certificate: {chain_result["details"]}'}
46 if certificate.get('tier') != 'regional':
47 return {'registered': False,
48 'error': 'Certificate tier must be regional'}
50 # Upsert PeerNode
51 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
52 if not peer:
53 peer = PeerNode(
54 node_id=node_id,
55 url=compute_info.get('url', ''),
56 name=compute_info.get('name', f'regional-{node_id[:8]}'),
57 version=compute_info.get('version', ''),
58 status='active',
59 )
60 db.add(peer)
62 peer.tier = 'regional'
63 peer.public_key = public_key_hex
64 peer.certificate_json = certificate
65 peer.certificate_verified = True
66 peer.compute_cpu_cores = compute_info.get('cpu_cores')
67 peer.compute_ram_gb = compute_info.get('ram_gb')
68 peer.compute_gpu_count = compute_info.get('gpu_count')
69 peer.max_user_capacity = compute_info.get('max_users', 0)
70 peer.dns_region = compute_info.get('dns_region', '')
71 db.flush()
73 # Upsert Region
74 region = db.query(Region).filter_by(name=region_name).first()
75 if not region:
76 region = Region(
77 name=region_name,
78 display_name=region_name.replace('-', ' ').title(),
79 region_type='geographic',
80 )
81 db.add(region)
82 db.flush()
84 region.host_node_id = node_id
85 region.capacity_cpu = compute_info.get('cpu_cores')
86 region.capacity_ram_gb = compute_info.get('ram_gb')
87 region.capacity_gpu = compute_info.get('gpu_count')
88 region.central_approved = True
89 region.is_accepting_nodes = True
90 region.global_server_url = compute_info.get('url', '')
91 db.flush()
93 logger.info(f"Regional host registered: {node_id[:8]} in {region_name}")
94 return {
95 'registered': True,
96 'node_id': node_id,
97 'region_id': region.id,
98 'region_name': region_name,
99 }
101 @staticmethod
102 def register_local_node(
103 db: Session,
104 node_id: str,
105 public_key_hex: str,
106 compute_info: dict,
107 geo_info: dict = None,
108 ) -> Dict:
109 """Register a local node and auto-assign to a region.
111 Called on central. Returns region assignment.
112 """
113 from .models import PeerNode
115 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
116 if not peer:
117 peer = PeerNode(
118 node_id=node_id,
119 url=compute_info.get('url', ''),
120 name=compute_info.get('name', f'local-{node_id[:8]}'),
121 version=compute_info.get('version', ''),
122 status='active',
123 )
124 db.add(peer)
126 peer.tier = 'local'
127 peer.public_key = public_key_hex
128 peer.compute_cpu_cores = compute_info.get('cpu_cores')
129 peer.compute_ram_gb = compute_info.get('ram_gb')
130 peer.compute_gpu_count = compute_info.get('gpu_count')
131 peer.dns_region = (geo_info or {}).get('dns_region', '')
132 db.flush()
134 # Auto-assign to best region
135 assignment = HierarchyService.assign_to_region(
136 db, node_id, compute_info, geo_info or {})
138 return {
139 'registered': True,
140 'node_id': node_id,
141 'assignment': assignment,
142 }
144 # ─── Region Assignment (central-only) ───
146 @staticmethod
147 def assign_to_region(
148 db: Session,
149 local_node_id: str,
150 compute_info: dict,
151 geo_info: dict,
152 ) -> Dict:
153 """Auto-assign a local node to the best regional host.
155 Scoring: compute_headroom*0.4 + user_headroom*0.3 + geo_proximity*0.2 + dns_match*0.1
156 """
157 from .models import PeerNode, Region, RegionAssignment
159 # Find all accepting regional hosts
160 regionals = db.query(PeerNode).filter(
161 PeerNode.tier == 'regional',
162 PeerNode.status == 'active',
163 ).all()
165 if not regionals:
166 return {'assigned': False, 'error': 'No regional hosts available'}
168 # Get regions for accepting check
169 best_score = -1
170 best_regional = None
171 best_region = None
172 node_dns = geo_info.get('dns_region', '') or ''
174 for regional in regionals:
175 region = db.query(Region).filter_by(
176 host_node_id=regional.node_id).first()
177 if region and not region.is_accepting_nodes:
178 continue
180 score = 0.0
182 # Compute headroom (0.4 weight)
183 if regional.max_user_capacity and regional.max_user_capacity > 0:
184 used_pct = (regional.active_user_count or 0) / regional.max_user_capacity
185 score += (1.0 - min(used_pct, 1.0)) * 0.4
187 # User headroom (0.3 weight)
188 if regional.max_user_capacity and regional.max_user_capacity > 0:
189 remaining = regional.max_user_capacity - (regional.active_user_count or 0)
190 headroom = min(remaining / max(regional.max_user_capacity, 1), 1.0)
191 score += headroom * 0.3
193 # DNS match (0.1 weight)
194 if node_dns and regional.dns_region and node_dns == regional.dns_region:
195 score += 0.1
197 # Geo proximity (0.2 weight) — simple: same dns_region prefix
198 if node_dns and regional.dns_region:
199 # Compare first segment (e.g., 'us' from 'us-east-1')
200 node_prefix = node_dns.split('-')[0] if '-' in node_dns else node_dns
201 reg_prefix = regional.dns_region.split('-')[0] if '-' in regional.dns_region else regional.dns_region
202 if node_prefix == reg_prefix:
203 score += 0.2
205 if score > best_score:
206 best_score = score
207 best_regional = regional
208 best_region = region
210 if not best_regional:
211 return {'assigned': False, 'error': 'No suitable regional host found'}
213 # Create assignment
214 assignment = RegionAssignment(
215 local_node_id=local_node_id,
216 regional_node_id=best_regional.node_id,
217 region_id=best_region.id if best_region else None,
218 assigned_by='central_auto',
219 status='active',
220 approved_at=datetime.utcnow(),
221 approved_by_central=True,
222 compute_snapshot=compute_info,
223 )
224 db.add(assignment)
226 # Update peer's assignment
227 peer = db.query(PeerNode).filter_by(node_id=local_node_id).first()
228 if peer:
229 peer.region_assignment_id = assignment.id
230 peer.parent_node_id = best_regional.node_id
232 # Atomic increment — prevents race when multiple nodes assigned concurrently
233 db.query(PeerNode).filter_by(node_id=best_regional.node_id).update(
234 {PeerNode.active_user_count: func.coalesce(PeerNode.active_user_count, 0) + 1}
235 )
236 db.flush()
238 return {
239 'assigned': True,
240 'regional_node_id': best_regional.node_id,
241 'regional_url': best_regional.url,
242 'region_id': best_region.id if best_region else None,
243 'region_name': best_region.name if best_region else None,
244 'assignment_id': assignment.id,
245 }
247 @staticmethod
248 def switch_region(
249 db: Session,
250 local_node_id: str,
251 new_region_id: str,
252 requester: str,
253 ) -> Dict:
254 """Switch a local node to a different region."""
255 from .models import PeerNode, Region, RegionAssignment
257 new_region = db.query(Region).filter_by(id=new_region_id).first()
258 if not new_region:
259 return {'switched': False, 'error': 'Region not found'}
261 if not new_region.is_accepting_nodes:
262 return {'switched': False, 'error': 'Region not accepting nodes'}
264 new_regional = db.query(PeerNode).filter_by(
265 node_id=new_region.host_node_id).first()
266 if not new_regional:
267 return {'switched': False, 'error': 'Regional host not found'}
269 # Revoke old assignment
270 old = db.query(RegionAssignment).filter_by(
271 local_node_id=local_node_id, status='active').first()
272 if old:
273 old.status = 'revoked'
274 # Decrement old regional user count
275 old_regional = db.query(PeerNode).filter_by(
276 node_id=old.regional_node_id).first()
277 if old_regional and (old_regional.active_user_count or 0) > 0:
278 old_regional.active_user_count -= 1
280 # Create new assignment
281 assignment = RegionAssignment(
282 local_node_id=local_node_id,
283 regional_node_id=new_regional.node_id,
284 region_id=new_region_id,
285 assigned_by=requester,
286 status='active',
287 approved_at=datetime.utcnow(),
288 approved_by_central=True,
289 )
290 db.add(assignment)
292 # Update peer
293 peer = db.query(PeerNode).filter_by(node_id=local_node_id).first()
294 if peer:
295 peer.region_assignment_id = assignment.id
296 peer.parent_node_id = new_regional.node_id
298 new_regional.active_user_count = (new_regional.active_user_count or 0) + 1
299 db.flush()
301 return {
302 'switched': True,
303 'regional_node_id': new_regional.node_id,
304 'regional_url': new_regional.url,
305 'region_id': new_region_id,
306 'assignment_id': assignment.id,
307 }
309 # ─── Tier-Aware Gossip Targets ───
311 @staticmethod
312 def get_gossip_targets(
313 db: Session,
314 node_id: str,
315 tier: str,
316 ) -> List[Dict]:
317 """Return appropriate gossip targets based on node tier.
319 - central: only regional peers
320 - regional: central + own local nodes
321 - local: assigned regional only
322 - flat: all peers (backward compat)
323 """
324 from .models import PeerNode, RegionAssignment
326 if tier == 'flat':
327 peers = db.query(PeerNode).filter(
328 PeerNode.status != 'dead',
329 PeerNode.node_id != node_id,
330 ).all()
331 return [p.to_dict() for p in peers]
333 if tier == 'central':
334 peers = db.query(PeerNode).filter(
335 PeerNode.tier == 'regional',
336 PeerNode.status == 'active',
337 ).all()
338 return [p.to_dict() for p in peers]
340 if tier == 'regional':
341 targets = []
342 # Central nodes
343 centrals = db.query(PeerNode).filter(
344 PeerNode.tier == 'central',
345 PeerNode.status == 'active',
346 ).all()
347 targets.extend([p.to_dict() for p in centrals])
349 # Own local nodes
350 locals_ = db.query(PeerNode).filter(
351 PeerNode.parent_node_id == node_id,
352 PeerNode.tier == 'local',
353 PeerNode.status == 'active',
354 ).all()
355 targets.extend([p.to_dict() for p in locals_])
356 return targets
358 if tier == 'local':
359 # Find assigned regional
360 assignment = db.query(RegionAssignment).filter_by(
361 local_node_id=node_id, status='active').first()
362 if assignment:
363 regional = db.query(PeerNode).filter_by(
364 node_id=assignment.regional_node_id).first()
365 if regional:
366 return [regional.to_dict()]
367 return []
369 return []
371 # ─── Health & Capacity ───
373 @staticmethod
374 def report_node_capacity(
375 db: Session,
376 node_id: str,
377 compute_info: dict,
378 ) -> Dict:
379 """Update a node's compute capacity info."""
380 from .models import PeerNode
382 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
383 if not peer:
384 return {'updated': False, 'error': 'Node not found'}
386 peer.compute_cpu_cores = compute_info.get('cpu_cores', peer.compute_cpu_cores)
387 peer.compute_ram_gb = compute_info.get('ram_gb', peer.compute_ram_gb)
388 peer.compute_gpu_count = compute_info.get('gpu_count', peer.compute_gpu_count)
389 peer.active_user_count = compute_info.get('active_users', peer.active_user_count)
390 peer.max_user_capacity = compute_info.get('max_users', peer.max_user_capacity)
391 db.flush()
393 return {'updated': True, 'node_id': node_id}
395 @staticmethod
396 def get_region_health(db: Session, region_id: str) -> Optional[Dict]:
397 """Get health/load info for a region."""
398 from .models import Region, PeerNode, RegionAssignment
400 region = db.query(Region).filter_by(id=region_id).first()
401 if not region:
402 return None
404 # Get host node info
405 host = None
406 if region.host_node_id:
407 host = db.query(PeerNode).filter_by(
408 node_id=region.host_node_id).first()
410 # Count assigned local nodes
411 local_count = db.query(RegionAssignment).filter_by(
412 region_id=region_id, status='active').count()
414 return {
415 'region': region.to_dict(),
416 'host_status': host.status if host else 'unknown',
417 'host_url': host.url if host else None,
418 'local_node_count': local_count,
419 'is_accepting': region.is_accepting_nodes,
420 'capacity_cpu': region.capacity_cpu,
421 'capacity_ram_gb': region.capacity_ram_gb,
422 'current_load_pct': region.current_load_pct,
423 }