Coverage for integrations / social / regional_host_service.py: 66.8%

199 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Regional Host Service - Hybrid Approval for Regional Host Onboarding 

3 

4Users request regional host status via UI. The system auto-qualifies based on 

5compute tier (>= STANDARD) and trust score (>= 2.5). A steward then one-click 

6approves, which triggers certificate issuance, GitHub repo invite, and 

7registration in the hierarchy. 

8 

9All methods follow the static service pattern: receive db: Session, return Dict. 

10""" 

11import json 

12import logging 

13import os 

14from datetime import datetime 

15from typing import Dict, List, Optional 

16 

17logger = logging.getLogger('hevolve_social') 

18 

19# Minimum requirements for regional host qualification 

20MIN_COMPUTE_TIER = 'STANDARD' 

21MIN_TRUST_SCORE = 2.5 

22 

23# Tier ranking for comparison 

24_TIER_RANK = { 

25 'OBSERVER': 0, 

26 'BASIC': 1, 

27 'STANDARD': 2, 

28 'ADVANCED': 3, 

29 'COMPUTE_HOST': 4, 

30} 

31 

32 

33class RegionalHostService: 

34 """Static service for regional host application + hybrid approval.""" 

35 

36 @staticmethod 

37 def request_regional_host( 

38 db, user_id: str, compute_info: dict, 

39 node_id: str = '', public_key_hex: str = '', 

40 github_username: str = '', 

41 ) -> Dict: 

42 """User clicks 'Request Regional Host' in UI. 

43 

44 1. Check compute tier via system_requirements → must be >= STANDARD 

45 2. Check trust score → must be >= 2.5 composite_trust 

46 3. If qualifies: create RegionalHostRequest(status='pending_steward') 

47 4. If not: return {qualified: False, reason: ...} 

48 """ 

49 from .models import RegionalHostRequest 

50 

51 # Check for existing pending/approved request 

52 existing = db.query(RegionalHostRequest).filter( 

53 RegionalHostRequest.user_id == user_id, 

54 RegionalHostRequest.status.in_([ 

55 'pending', 'pending_steward', 'approved']), 

56 ).first() 

57 if existing: 

58 return { 

59 'qualified': existing.status == 'approved', 

60 'request_id': existing.id, 

61 'status': existing.status, 

62 'reason': 'Request already exists', 

63 } 

64 

65 # Detect compute tier — server-side only (never trust client claims) 

66 compute_tier = 'UNKNOWN' 

67 try: 

68 from security.system_requirements import ( 

69 detect_hardware, classify_tier) 

70 hw = detect_hardware() 

71 # NOTE: client compute_info is stored for display but NEVER 

72 # used to override server-detected hardware for tier classification 

73 compute_tier = classify_tier(hw) 

74 except Exception as e: 

75 logger.debug(f"Compute tier detection failed: {e}") 

76 # Do NOT fall back to client-provided tier — unknown is safer 

77 compute_tier = 'UNKNOWN' 

78 

79 # Check trust score 

80 trust_score = 0.0 

81 try: 

82 from .rating_service import RatingService 

83 ts = RatingService.get_trust_score(db, user_id) 

84 if ts: 

85 trust_score = ts.get('composite_trust', 0.0) 

86 except Exception: 

87 pass 

88 

89 # Evaluate qualification 

90 reasons = [] 

91 tier_rank = _TIER_RANK.get(compute_tier, -1) 

92 min_rank = _TIER_RANK.get(MIN_COMPUTE_TIER, 2) 

93 if tier_rank < min_rank: 

94 reasons.append( 

95 f'Compute tier {compute_tier} below minimum ' 

96 f'{MIN_COMPUTE_TIER}') 

97 if trust_score < MIN_TRUST_SCORE: 

98 reasons.append( 

99 f'Trust score {trust_score:.2f} below minimum ' 

100 f'{MIN_TRUST_SCORE}') 

101 

102 qualified = len(reasons) == 0 

103 status = 'pending_steward' if qualified else 'rejected' 

104 

105 request = RegionalHostRequest( 

106 user_id=user_id, 

107 node_id=node_id or None, 

108 public_key_hex=public_key_hex or None, 

109 compute_tier=compute_tier, 

110 compute_info_json=json.dumps(compute_info), 

111 trust_score=trust_score, 

112 status=status, 

113 github_username=github_username or None, 

114 requested_at=datetime.utcnow(), 

115 rejected_reason='; '.join(reasons) if reasons else None, 

116 ) 

117 db.add(request) 

118 db.flush() 

119 

120 return { 

121 'qualified': qualified, 

122 'request_id': request.id, 

123 'status': status, 

124 'compute_tier': compute_tier, 

125 'trust_score': trust_score, 

126 'reason': '; '.join(reasons) if reasons else 'Auto-qualified', 

127 } 

128 

129 @staticmethod 

130 def approve_request( 

131 db, request_id: str, steward_node_id: str, 

132 region_name: str, 

133 ) -> Dict: 

134 """Steward one-click approval. 

135 

136 1. Issue certificate via key_delegation 

137 2. Trigger GitHub repo invite (coding agent) 

138 3. Register regional host via hierarchy_service 

139 """ 

140 from .models import RegionalHostRequest 

141 

142 request = db.query(RegionalHostRequest).get(request_id) 

143 if not request: 

144 return {'approved': False, 'error': 'Request not found'} 

145 if request.status == 'approved': 

146 return {'approved': True, 'request_id': request_id, 

147 'reason': 'Already approved'} 

148 if request.status not in ('pending_steward', 'pending'): 

149 return {'approved': False, 

150 'error': f'Cannot approve from status: {request.status}'} 

151 

152 # Issue certificate — requires valid public key and this node's private key 

153 certificate = None 

154 if not request.public_key_hex: 

155 logger.warning("Cannot issue certificate: no public_key_hex on request") 

156 else: 

157 try: 

158 from security.key_delegation import create_child_certificate 

159 from security.node_integrity import get_node_identity 

160 identity = get_node_identity() 

161 parent_private_key = identity.get('_private_key') 

162 if not parent_private_key: 

163 logger.warning("Cannot issue certificate: no node private key") 

164 else: 

165 cert = create_child_certificate( 

166 parent_private_key=parent_private_key, 

167 child_public_key_hex=request.public_key_hex, 

168 node_id=request.node_id or request.user_id, 

169 tier='regional', 

170 region_name=region_name, 

171 ) 

172 certificate = cert 

173 request.certificate_json = json.dumps(cert) 

174 except Exception as e: 

175 logger.warning(f"Certificate issuance failed: {e}") 

176 

177 # Send GitHub repo invite via coding agent 

178 invite_sent = False 

179 if request.github_username: 

180 try: 

181 from integrations.agent_engine.private_repo_access import ( 

182 PrivateRepoAccessService) 

183 repos = os.environ.get( 

184 'HEVOLVE_PRIVATE_REPOS', '').split(',') 

185 for repo_url in repos: 

186 repo_url = repo_url.strip() 

187 if repo_url: 

188 result = PrivateRepoAccessService.send_github_invite( 

189 repo_url, request.github_username, 

190 permission='push') 

191 if result.get('invited'): 

192 invite_sent = True 

193 request.github_invite_sent = invite_sent 

194 except Exception as e: 

195 logger.debug(f"GitHub invite failed: {e}") 

196 

197 # Register in hierarchy 

198 try: 

199 from .hierarchy_service import HierarchyService 

200 HierarchyService.register_regional_host( 

201 db, 

202 node_id=request.node_id or request.user_id, 

203 region_name=region_name, 

204 public_key_hex=request.public_key_hex, 

205 ) 

206 except Exception as e: 

207 logger.debug(f"Hierarchy registration failed: {e}") 

208 

209 request.status = 'approved' 

210 request.region_name = region_name 

211 request.approved_at = datetime.utcnow() 

212 request.approved_by = steward_node_id 

213 db.flush() 

214 

215 # Push tier_promote fleet command to the node so it auto-reloads as regional 

216 cmd_pushed = False 

217 target_node = request.node_id or '' 

218 if target_node: 

219 try: 

220 from .fleet_command import FleetCommandService 

221 FleetCommandService.push_command( 

222 db, target_node, 'tier_promote', 

223 params={ 

224 'new_tier': 'regional', 

225 'region_name': region_name, 

226 'env_vars': { 

227 'HEVOLVE_NODE_TIER': 'regional', 

228 }, 

229 'restart_required': True, 

230 }, 

231 issued_by=steward_node_id, 

232 ) 

233 cmd_pushed = True 

234 except Exception as e: 

235 logger.debug(f"Fleet command push after approval failed: {e}") 

236 

237 return { 

238 'approved': True, 

239 'request_id': request_id, 

240 'certificate': certificate, 

241 'invite_sent': invite_sent, 

242 'region_name': region_name, 

243 'fleet_command_pushed': cmd_pushed, 

244 } 

245 

246 @staticmethod 

247 def reject_request( 

248 db, request_id: str, reason: str = '', 

249 ) -> Dict: 

250 """Steward rejects a request.""" 

251 from .models import RegionalHostRequest 

252 

253 request = db.query(RegionalHostRequest).get(request_id) 

254 if not request: 

255 return {'rejected': False, 'error': 'Request not found'} 

256 

257 request.status = 'rejected' 

258 request.rejected_reason = reason or 'Rejected by steward' 

259 db.flush() 

260 return {'rejected': True, 'request_id': request_id} 

261 

262 @staticmethod 

263 def revoke_regional_host( 

264 db, request_id: str, 

265 ) -> Dict: 

266 """Revoke certificate + remove GitHub collaborator + downgrade.""" 

267 from .models import RegionalHostRequest 

268 

269 request = db.query(RegionalHostRequest).get(request_id) 

270 if not request: 

271 return {'revoked': False, 'error': 'Request not found'} 

272 

273 # Revoke GitHub access 

274 if request.github_username and request.github_invite_sent: 

275 try: 

276 from integrations.agent_engine.private_repo_access import ( 

277 PrivateRepoAccessService) 

278 repos = os.environ.get( 

279 'HEVOLVE_PRIVATE_REPOS', '').split(',') 

280 for repo_url in repos: 

281 repo_url = repo_url.strip() 

282 if repo_url: 

283 PrivateRepoAccessService.revoke_github_access( 

284 repo_url, request.github_username) 

285 except Exception as e: 

286 logger.debug(f"GitHub access revocation failed: {e}") 

287 

288 request.status = 'revoked' 

289 request.github_invite_sent = False 

290 request.certificate_json = None 

291 db.flush() 

292 

293 # Push tier_demote fleet command so the node auto-reloads as flat 

294 target_node = request.node_id or '' 

295 if target_node: 

296 try: 

297 from .fleet_command import FleetCommandService 

298 FleetCommandService.push_command( 

299 db, target_node, 'tier_demote', 

300 params={ 

301 'new_tier': 'flat', 

302 'reason': 'Regional host certificate revoked', 

303 'env_vars': { 

304 'HEVOLVE_NODE_TIER': 'flat', 

305 }, 

306 'restart_required': True, 

307 }, 

308 ) 

309 except Exception as e: 

310 logger.debug(f"Fleet command push after revoke failed: {e}") 

311 

312 return {'revoked': True, 'request_id': request_id} 

313 

314 @staticmethod 

315 def list_pending_requests(db) -> List[Dict]: 

316 """Steward dashboard: list all pending requests.""" 

317 from .models import RegionalHostRequest 

318 

319 requests = db.query(RegionalHostRequest).filter( 

320 RegionalHostRequest.status.in_([ 

321 'pending', 'pending_steward']), 

322 ).order_by(RegionalHostRequest.requested_at.desc()).all() 

323 return [r.to_dict() for r in requests] 

324 

325 @staticmethod 

326 def get_request_status(db, user_id: str) -> Optional[Dict]: 

327 """User checks their latest request status.""" 

328 from .models import RegionalHostRequest 

329 

330 request = db.query(RegionalHostRequest).filter_by( 

331 user_id=user_id, 

332 ).order_by(RegionalHostRequest.requested_at.desc()).first() 

333 if not request: 

334 return None 

335 return request.to_dict() 

336 

337 @staticmethod 

338 def get_region_capacity(db, region_name: str) -> Dict: 

339 """Get region capacity metrics — current load, max capacity, health.""" 

340 from .models import PeerNode, RegionalHostRequest 

341 

342 # Find all approved regional hosts in this region 

343 hosts = db.query(RegionalHostRequest).filter( 

344 RegionalHostRequest.status == 'approved', 

345 RegionalHostRequest.region_name == region_name, 

346 ).all() 

347 

348 # Find all peer nodes in this region 

349 nodes = db.query(PeerNode).filter( 

350 PeerNode.dns_region == region_name, 

351 PeerNode.status.in_(['active', 'online']), 

352 ).all() 

353 

354 total_capacity = 0 

355 current_load = 0 

356 compute_cores = 0 

357 compute_ram_gb = 0 

358 gpu_count = 0 

359 

360 for node in nodes: 

361 max_users = getattr(node, 'max_user_capacity', 50) or 50 

362 active_users = getattr(node, 'active_user_count', 0) or 0 

363 total_capacity += max_users 

364 current_load += active_users 

365 compute_cores += getattr(node, 'compute_cpu_cores', 0) or 0 

366 compute_ram_gb += getattr(node, 'compute_ram_gb', 0) or 0 

367 gpu_count += getattr(node, 'compute_gpu_count', 0) or 0 

368 

369 utilization = (current_load / total_capacity * 100) if total_capacity > 0 else 0 

370 

371 return { 

372 'region_name': region_name, 

373 'host_count': len(hosts), 

374 'active_node_count': len(nodes), 

375 'total_capacity': total_capacity, 

376 'current_load': current_load, 

377 'utilization_percent': round(utilization, 1), 

378 'compute_cores': compute_cores, 

379 'compute_ram_gb': round(compute_ram_gb, 1), 

380 'gpu_count': gpu_count, 

381 'status': ( 

382 'critical' if utilization > 90 else 

383 'high' if utilization > 75 else 

384 'healthy' if utilization > 0 else 

385 'idle' 

386 ), 

387 'needs_scaling': utilization > 80, 

388 } 

389 

390 @staticmethod 

391 def get_all_region_capacities(db) -> List[Dict]: 

392 """Get capacity metrics for ALL regions — used by elastic rebalancer.""" 

393 from .models import RegionalHostRequest 

394 

395 # Get unique region names 

396 regions = db.query(RegionalHostRequest.region_name).filter( 

397 RegionalHostRequest.status == 'approved', 

398 RegionalHostRequest.region_name.isnot(None), 

399 ).distinct().all() 

400 

401 capacities = [] 

402 for (region_name,) in regions: 

403 if region_name: 

404 cap = RegionalHostService.get_region_capacity(db, region_name) 

405 capacities.append(cap) 

406 

407 return sorted(capacities, key=lambda c: c['utilization_percent'], reverse=True) 

408 

409 @staticmethod 

410 def suggest_rebalance(db) -> Dict: 

411 """Elastic rebalancing: identify overloaded regions and suggest migrations. 

412 

413 Returns a list of suggested user migrations from overloaded to underloaded regions. 

414 Does NOT execute — returns suggestions for steward approval or auto-execution. 

415 """ 

416 capacities = RegionalHostService.get_all_region_capacities(db) 

417 if len(capacities) < 2: 

418 return {'suggestions': [], 'reason': 'Need at least 2 regions to rebalance'} 

419 

420 overloaded = [c for c in capacities if c['utilization_percent'] > 80] 

421 underloaded = [c for c in capacities if c['utilization_percent'] < 50] 

422 

423 suggestions = [] 

424 for over in overloaded: 

425 excess = over['current_load'] - int(over['total_capacity'] * 0.7) 

426 if excess <= 0: 

427 continue 

428 

429 for under in underloaded: 

430 available = under['total_capacity'] - under['current_load'] 

431 if available <= 0: 

432 continue 

433 

434 migrate_count = min(excess, available) 

435 suggestions.append({ 

436 'from_region': over['region_name'], 

437 'to_region': under['region_name'], 

438 'migrate_count': migrate_count, 

439 'from_utilization': over['utilization_percent'], 

440 'to_utilization': under['utilization_percent'], 

441 'reason': f"{over['region_name']} at {over['utilization_percent']}% → " 

442 f"migrate {migrate_count} users to {under['region_name']} " 

443 f"({under['utilization_percent']}%)", 

444 }) 

445 excess -= migrate_count 

446 if excess <= 0: 

447 break 

448 

449 return { 

450 'suggestions': suggestions, 

451 'total_regions': len(capacities), 

452 'overloaded_count': len(overloaded), 

453 'underloaded_count': len(underloaded), 

454 'capacities': capacities, 

455 } 

456 

457 @staticmethod 

458 def check_scaling_needed(db) -> Dict: 

459 """Check if any region needs horizontal scaling (more hosts). 

460 

461 Called periodically by a background task or fleet command. 

462 Returns regions that need more hosts and optionally auto-posts 

463 recruitment requests. 

464 """ 

465 capacities = RegionalHostService.get_all_region_capacities(db) 

466 scaling_needed = [] 

467 

468 for cap in capacities: 

469 if cap['utilization_percent'] > 80: 

470 scaling_needed.append({ 

471 'region_name': cap['region_name'], 

472 'utilization_percent': cap['utilization_percent'], 

473 'current_hosts': cap['host_count'], 

474 'current_load': cap['current_load'], 

475 'total_capacity': cap['total_capacity'], 

476 'action': 'recruit_hosts' if cap['host_count'] < 3 else 'scale_compute', 

477 'recommended_additional_hosts': max(1, (cap['current_load'] - int(cap['total_capacity'] * 0.6)) // 50), 

478 }) 

479 elif cap['utilization_percent'] < 10 and cap['host_count'] > 1: 

480 scaling_needed.append({ 

481 'region_name': cap['region_name'], 

482 'utilization_percent': cap['utilization_percent'], 

483 'current_hosts': cap['host_count'], 

484 'current_load': cap['current_load'], 

485 'total_capacity': cap['total_capacity'], 

486 'action': 'consolidate', 

487 'reason': 'Very low utilization — consider consolidating hosts', 

488 }) 

489 

490 return { 

491 'scaling_needed': scaling_needed, 

492 'total_regions': len(capacities), 

493 }