Coverage for integrations / social / hierarchy_service.py: 93.9%

179 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - 3-Tier Hierarchy Service 

3 

4Manages registration of regional/local nodes, region assignment, 

5tier-aware gossip targets, and capacity reporting. 

6 

7Tiers: central -> regional -> local (flat = backward-compatible) 

8""" 

9import logging 

10from datetime import datetime 

11from typing import Optional, Dict, List 

12 

13from sqlalchemy.orm import Session 

14from sqlalchemy import func 

15 

16logger = logging.getLogger('hevolve_social') 

17 

18 

19class HierarchyService: 

20 """Static-only service for 3-tier hierarchy operations.""" 

21 

22 # ─── Registration (central-only) ─── 

23 

24 @staticmethod 

25 def register_regional_host( 

26 db: Session, 

27 node_id: str, 

28 public_key_hex: str, 

29 region_name: str, 

30 compute_info: dict, 

31 certificate: dict, 

32 ) -> Dict: 

33 """Register a regional host. Called on central node. 

34 

35 Verifies certificate chain, creates PeerNode + Region entries. 

36 """ 

37 from .models import PeerNode, Region 

38 

39 # Verify certificate 

40 from security.key_delegation import verify_certificate_chain 

41 chain_result = verify_certificate_chain(certificate) 

42 if not chain_result['valid']: 

43 return {'registered': False, 

44 'error': f'Invalid certificate: {chain_result["details"]}'} 

45 

46 if certificate.get('tier') != 'regional': 

47 return {'registered': False, 

48 'error': 'Certificate tier must be regional'} 

49 

50 # Upsert PeerNode 

51 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

52 if not peer: 

53 peer = PeerNode( 

54 node_id=node_id, 

55 url=compute_info.get('url', ''), 

56 name=compute_info.get('name', f'regional-{node_id[:8]}'), 

57 version=compute_info.get('version', ''), 

58 status='active', 

59 ) 

60 db.add(peer) 

61 

62 peer.tier = 'regional' 

63 peer.public_key = public_key_hex 

64 peer.certificate_json = certificate 

65 peer.certificate_verified = True 

66 peer.compute_cpu_cores = compute_info.get('cpu_cores') 

67 peer.compute_ram_gb = compute_info.get('ram_gb') 

68 peer.compute_gpu_count = compute_info.get('gpu_count') 

69 peer.max_user_capacity = compute_info.get('max_users', 0) 

70 peer.dns_region = compute_info.get('dns_region', '') 

71 db.flush() 

72 

73 # Upsert Region 

74 region = db.query(Region).filter_by(name=region_name).first() 

75 if not region: 

76 region = Region( 

77 name=region_name, 

78 display_name=region_name.replace('-', ' ').title(), 

79 region_type='geographic', 

80 ) 

81 db.add(region) 

82 db.flush() 

83 

84 region.host_node_id = node_id 

85 region.capacity_cpu = compute_info.get('cpu_cores') 

86 region.capacity_ram_gb = compute_info.get('ram_gb') 

87 region.capacity_gpu = compute_info.get('gpu_count') 

88 region.central_approved = True 

89 region.is_accepting_nodes = True 

90 region.global_server_url = compute_info.get('url', '') 

91 db.flush() 

92 

93 logger.info(f"Regional host registered: {node_id[:8]} in {region_name}") 

94 return { 

95 'registered': True, 

96 'node_id': node_id, 

97 'region_id': region.id, 

98 'region_name': region_name, 

99 } 

100 

101 @staticmethod 

102 def register_local_node( 

103 db: Session, 

104 node_id: str, 

105 public_key_hex: str, 

106 compute_info: dict, 

107 geo_info: dict = None, 

108 ) -> Dict: 

109 """Register a local node and auto-assign to a region. 

110 

111 Called on central. Returns region assignment. 

112 """ 

113 from .models import PeerNode 

114 

115 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

116 if not peer: 

117 peer = PeerNode( 

118 node_id=node_id, 

119 url=compute_info.get('url', ''), 

120 name=compute_info.get('name', f'local-{node_id[:8]}'), 

121 version=compute_info.get('version', ''), 

122 status='active', 

123 ) 

124 db.add(peer) 

125 

126 peer.tier = 'local' 

127 peer.public_key = public_key_hex 

128 peer.compute_cpu_cores = compute_info.get('cpu_cores') 

129 peer.compute_ram_gb = compute_info.get('ram_gb') 

130 peer.compute_gpu_count = compute_info.get('gpu_count') 

131 peer.dns_region = (geo_info or {}).get('dns_region', '') 

132 db.flush() 

133 

134 # Auto-assign to best region 

135 assignment = HierarchyService.assign_to_region( 

136 db, node_id, compute_info, geo_info or {}) 

137 

138 return { 

139 'registered': True, 

140 'node_id': node_id, 

141 'assignment': assignment, 

142 } 

143 

144 # ─── Region Assignment (central-only) ─── 

145 

146 @staticmethod 

147 def assign_to_region( 

148 db: Session, 

149 local_node_id: str, 

150 compute_info: dict, 

151 geo_info: dict, 

152 ) -> Dict: 

153 """Auto-assign a local node to the best regional host. 

154 

155 Scoring: compute_headroom*0.4 + user_headroom*0.3 + geo_proximity*0.2 + dns_match*0.1 

156 """ 

157 from .models import PeerNode, Region, RegionAssignment 

158 

159 # Find all accepting regional hosts 

160 regionals = db.query(PeerNode).filter( 

161 PeerNode.tier == 'regional', 

162 PeerNode.status == 'active', 

163 ).all() 

164 

165 if not regionals: 

166 return {'assigned': False, 'error': 'No regional hosts available'} 

167 

168 # Get regions for accepting check 

169 best_score = -1 

170 best_regional = None 

171 best_region = None 

172 node_dns = geo_info.get('dns_region', '') or '' 

173 

174 for regional in regionals: 

175 region = db.query(Region).filter_by( 

176 host_node_id=regional.node_id).first() 

177 if region and not region.is_accepting_nodes: 

178 continue 

179 

180 score = 0.0 

181 

182 # Compute headroom (0.4 weight) 

183 if regional.max_user_capacity and regional.max_user_capacity > 0: 

184 used_pct = (regional.active_user_count or 0) / regional.max_user_capacity 

185 score += (1.0 - min(used_pct, 1.0)) * 0.4 

186 

187 # User headroom (0.3 weight) 

188 if regional.max_user_capacity and regional.max_user_capacity > 0: 

189 remaining = regional.max_user_capacity - (regional.active_user_count or 0) 

190 headroom = min(remaining / max(regional.max_user_capacity, 1), 1.0) 

191 score += headroom * 0.3 

192 

193 # DNS match (0.1 weight) 

194 if node_dns and regional.dns_region and node_dns == regional.dns_region: 

195 score += 0.1 

196 

197 # Geo proximity (0.2 weight) — simple: same dns_region prefix 

198 if node_dns and regional.dns_region: 

199 # Compare first segment (e.g., 'us' from 'us-east-1') 

200 node_prefix = node_dns.split('-')[0] if '-' in node_dns else node_dns 

201 reg_prefix = regional.dns_region.split('-')[0] if '-' in regional.dns_region else regional.dns_region 

202 if node_prefix == reg_prefix: 

203 score += 0.2 

204 

205 if score > best_score: 

206 best_score = score 

207 best_regional = regional 

208 best_region = region 

209 

210 if not best_regional: 

211 return {'assigned': False, 'error': 'No suitable regional host found'} 

212 

213 # Create assignment 

214 assignment = RegionAssignment( 

215 local_node_id=local_node_id, 

216 regional_node_id=best_regional.node_id, 

217 region_id=best_region.id if best_region else None, 

218 assigned_by='central_auto', 

219 status='active', 

220 approved_at=datetime.utcnow(), 

221 approved_by_central=True, 

222 compute_snapshot=compute_info, 

223 ) 

224 db.add(assignment) 

225 

226 # Update peer's assignment 

227 peer = db.query(PeerNode).filter_by(node_id=local_node_id).first() 

228 if peer: 

229 peer.region_assignment_id = assignment.id 

230 peer.parent_node_id = best_regional.node_id 

231 

232 # Atomic increment — prevents race when multiple nodes assigned concurrently 

233 db.query(PeerNode).filter_by(node_id=best_regional.node_id).update( 

234 {PeerNode.active_user_count: func.coalesce(PeerNode.active_user_count, 0) + 1} 

235 ) 

236 db.flush() 

237 

238 return { 

239 'assigned': True, 

240 'regional_node_id': best_regional.node_id, 

241 'regional_url': best_regional.url, 

242 'region_id': best_region.id if best_region else None, 

243 'region_name': best_region.name if best_region else None, 

244 'assignment_id': assignment.id, 

245 } 

246 

247 @staticmethod 

248 def switch_region( 

249 db: Session, 

250 local_node_id: str, 

251 new_region_id: str, 

252 requester: str, 

253 ) -> Dict: 

254 """Switch a local node to a different region.""" 

255 from .models import PeerNode, Region, RegionAssignment 

256 

257 new_region = db.query(Region).filter_by(id=new_region_id).first() 

258 if not new_region: 

259 return {'switched': False, 'error': 'Region not found'} 

260 

261 if not new_region.is_accepting_nodes: 

262 return {'switched': False, 'error': 'Region not accepting nodes'} 

263 

264 new_regional = db.query(PeerNode).filter_by( 

265 node_id=new_region.host_node_id).first() 

266 if not new_regional: 

267 return {'switched': False, 'error': 'Regional host not found'} 

268 

269 # Revoke old assignment 

270 old = db.query(RegionAssignment).filter_by( 

271 local_node_id=local_node_id, status='active').first() 

272 if old: 

273 old.status = 'revoked' 

274 # Decrement old regional user count 

275 old_regional = db.query(PeerNode).filter_by( 

276 node_id=old.regional_node_id).first() 

277 if old_regional and (old_regional.active_user_count or 0) > 0: 

278 old_regional.active_user_count -= 1 

279 

280 # Create new assignment 

281 assignment = RegionAssignment( 

282 local_node_id=local_node_id, 

283 regional_node_id=new_regional.node_id, 

284 region_id=new_region_id, 

285 assigned_by=requester, 

286 status='active', 

287 approved_at=datetime.utcnow(), 

288 approved_by_central=True, 

289 ) 

290 db.add(assignment) 

291 

292 # Update peer 

293 peer = db.query(PeerNode).filter_by(node_id=local_node_id).first() 

294 if peer: 

295 peer.region_assignment_id = assignment.id 

296 peer.parent_node_id = new_regional.node_id 

297 

298 new_regional.active_user_count = (new_regional.active_user_count or 0) + 1 

299 db.flush() 

300 

301 return { 

302 'switched': True, 

303 'regional_node_id': new_regional.node_id, 

304 'regional_url': new_regional.url, 

305 'region_id': new_region_id, 

306 'assignment_id': assignment.id, 

307 } 

308 

309 # ─── Tier-Aware Gossip Targets ─── 

310 

311 @staticmethod 

312 def get_gossip_targets( 

313 db: Session, 

314 node_id: str, 

315 tier: str, 

316 ) -> List[Dict]: 

317 """Return appropriate gossip targets based on node tier. 

318 

319 - central: only regional peers 

320 - regional: central + own local nodes 

321 - local: assigned regional only 

322 - flat: all peers (backward compat) 

323 """ 

324 from .models import PeerNode, RegionAssignment 

325 

326 if tier == 'flat': 

327 peers = db.query(PeerNode).filter( 

328 PeerNode.status != 'dead', 

329 PeerNode.node_id != node_id, 

330 ).all() 

331 return [p.to_dict() for p in peers] 

332 

333 if tier == 'central': 

334 peers = db.query(PeerNode).filter( 

335 PeerNode.tier == 'regional', 

336 PeerNode.status == 'active', 

337 ).all() 

338 return [p.to_dict() for p in peers] 

339 

340 if tier == 'regional': 

341 targets = [] 

342 # Central nodes 

343 centrals = db.query(PeerNode).filter( 

344 PeerNode.tier == 'central', 

345 PeerNode.status == 'active', 

346 ).all() 

347 targets.extend([p.to_dict() for p in centrals]) 

348 

349 # Own local nodes 

350 locals_ = db.query(PeerNode).filter( 

351 PeerNode.parent_node_id == node_id, 

352 PeerNode.tier == 'local', 

353 PeerNode.status == 'active', 

354 ).all() 

355 targets.extend([p.to_dict() for p in locals_]) 

356 return targets 

357 

358 if tier == 'local': 

359 # Find assigned regional 

360 assignment = db.query(RegionAssignment).filter_by( 

361 local_node_id=node_id, status='active').first() 

362 if assignment: 

363 regional = db.query(PeerNode).filter_by( 

364 node_id=assignment.regional_node_id).first() 

365 if regional: 

366 return [regional.to_dict()] 

367 return [] 

368 

369 return [] 

370 

371 # ─── Health & Capacity ─── 

372 

373 @staticmethod 

374 def report_node_capacity( 

375 db: Session, 

376 node_id: str, 

377 compute_info: dict, 

378 ) -> Dict: 

379 """Update a node's compute capacity info.""" 

380 from .models import PeerNode 

381 

382 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

383 if not peer: 

384 return {'updated': False, 'error': 'Node not found'} 

385 

386 peer.compute_cpu_cores = compute_info.get('cpu_cores', peer.compute_cpu_cores) 

387 peer.compute_ram_gb = compute_info.get('ram_gb', peer.compute_ram_gb) 

388 peer.compute_gpu_count = compute_info.get('gpu_count', peer.compute_gpu_count) 

389 peer.active_user_count = compute_info.get('active_users', peer.active_user_count) 

390 peer.max_user_capacity = compute_info.get('max_users', peer.max_user_capacity) 

391 db.flush() 

392 

393 return {'updated': True, 'node_id': node_id} 

394 

395 @staticmethod 

396 def get_region_health(db: Session, region_id: str) -> Optional[Dict]: 

397 """Get health/load info for a region.""" 

398 from .models import Region, PeerNode, RegionAssignment 

399 

400 region = db.query(Region).filter_by(id=region_id).first() 

401 if not region: 

402 return None 

403 

404 # Get host node info 

405 host = None 

406 if region.host_node_id: 

407 host = db.query(PeerNode).filter_by( 

408 node_id=region.host_node_id).first() 

409 

410 # Count assigned local nodes 

411 local_count = db.query(RegionAssignment).filter_by( 

412 region_id=region_id, status='active').count() 

413 

414 return { 

415 'region': region.to_dict(), 

416 'host_status': host.status if host else 'unknown', 

417 'host_url': host.url if host else None, 

418 'local_node_count': local_count, 

419 'is_accepting': region.is_accepting_nodes, 

420 'capacity_cpu': region.capacity_cpu, 

421 'capacity_ram_gb': region.capacity_ram_gb, 

422 'current_load_pct': region.current_load_pct, 

423 }