Coverage for integrations / social / regional_host_service.py: 66.8%
199 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Regional Host Service - Hybrid Approval for Regional Host Onboarding
4Users request regional host status via UI. The system auto-qualifies based on
5compute tier (>= STANDARD) and trust score (>= 2.5). A steward then one-click
6approves, which triggers certificate issuance, GitHub repo invite, and
7registration in the hierarchy.
9All methods follow the static service pattern: receive db: Session, return Dict.
10"""
11import json
12import logging
13import os
14from datetime import datetime
15from typing import Dict, List, Optional
17logger = logging.getLogger('hevolve_social')
19# Minimum requirements for regional host qualification
20MIN_COMPUTE_TIER = 'STANDARD'
21MIN_TRUST_SCORE = 2.5
23# Tier ranking for comparison
24_TIER_RANK = {
25 'OBSERVER': 0,
26 'BASIC': 1,
27 'STANDARD': 2,
28 'ADVANCED': 3,
29 'COMPUTE_HOST': 4,
30}
33class RegionalHostService:
34 """Static service for regional host application + hybrid approval."""
36 @staticmethod
37 def request_regional_host(
38 db, user_id: str, compute_info: dict,
39 node_id: str = '', public_key_hex: str = '',
40 github_username: str = '',
41 ) -> Dict:
42 """User clicks 'Request Regional Host' in UI.
44 1. Check compute tier via system_requirements → must be >= STANDARD
45 2. Check trust score → must be >= 2.5 composite_trust
46 3. If qualifies: create RegionalHostRequest(status='pending_steward')
47 4. If not: return {qualified: False, reason: ...}
48 """
49 from .models import RegionalHostRequest
51 # Check for existing pending/approved request
52 existing = db.query(RegionalHostRequest).filter(
53 RegionalHostRequest.user_id == user_id,
54 RegionalHostRequest.status.in_([
55 'pending', 'pending_steward', 'approved']),
56 ).first()
57 if existing:
58 return {
59 'qualified': existing.status == 'approved',
60 'request_id': existing.id,
61 'status': existing.status,
62 'reason': 'Request already exists',
63 }
65 # Detect compute tier — server-side only (never trust client claims)
66 compute_tier = 'UNKNOWN'
67 try:
68 from security.system_requirements import (
69 detect_hardware, classify_tier)
70 hw = detect_hardware()
71 # NOTE: client compute_info is stored for display but NEVER
72 # used to override server-detected hardware for tier classification
73 compute_tier = classify_tier(hw)
74 except Exception as e:
75 logger.debug(f"Compute tier detection failed: {e}")
76 # Do NOT fall back to client-provided tier — unknown is safer
77 compute_tier = 'UNKNOWN'
79 # Check trust score
80 trust_score = 0.0
81 try:
82 from .rating_service import RatingService
83 ts = RatingService.get_trust_score(db, user_id)
84 if ts:
85 trust_score = ts.get('composite_trust', 0.0)
86 except Exception:
87 pass
89 # Evaluate qualification
90 reasons = []
91 tier_rank = _TIER_RANK.get(compute_tier, -1)
92 min_rank = _TIER_RANK.get(MIN_COMPUTE_TIER, 2)
93 if tier_rank < min_rank:
94 reasons.append(
95 f'Compute tier {compute_tier} below minimum '
96 f'{MIN_COMPUTE_TIER}')
97 if trust_score < MIN_TRUST_SCORE:
98 reasons.append(
99 f'Trust score {trust_score:.2f} below minimum '
100 f'{MIN_TRUST_SCORE}')
102 qualified = len(reasons) == 0
103 status = 'pending_steward' if qualified else 'rejected'
105 request = RegionalHostRequest(
106 user_id=user_id,
107 node_id=node_id or None,
108 public_key_hex=public_key_hex or None,
109 compute_tier=compute_tier,
110 compute_info_json=json.dumps(compute_info),
111 trust_score=trust_score,
112 status=status,
113 github_username=github_username or None,
114 requested_at=datetime.utcnow(),
115 rejected_reason='; '.join(reasons) if reasons else None,
116 )
117 db.add(request)
118 db.flush()
120 return {
121 'qualified': qualified,
122 'request_id': request.id,
123 'status': status,
124 'compute_tier': compute_tier,
125 'trust_score': trust_score,
126 'reason': '; '.join(reasons) if reasons else 'Auto-qualified',
127 }
129 @staticmethod
130 def approve_request(
131 db, request_id: str, steward_node_id: str,
132 region_name: str,
133 ) -> Dict:
134 """Steward one-click approval.
136 1. Issue certificate via key_delegation
137 2. Trigger GitHub repo invite (coding agent)
138 3. Register regional host via hierarchy_service
139 """
140 from .models import RegionalHostRequest
142 request = db.query(RegionalHostRequest).get(request_id)
143 if not request:
144 return {'approved': False, 'error': 'Request not found'}
145 if request.status == 'approved':
146 return {'approved': True, 'request_id': request_id,
147 'reason': 'Already approved'}
148 if request.status not in ('pending_steward', 'pending'):
149 return {'approved': False,
150 'error': f'Cannot approve from status: {request.status}'}
152 # Issue certificate — requires valid public key and this node's private key
153 certificate = None
154 if not request.public_key_hex:
155 logger.warning("Cannot issue certificate: no public_key_hex on request")
156 else:
157 try:
158 from security.key_delegation import create_child_certificate
159 from security.node_integrity import get_node_identity
160 identity = get_node_identity()
161 parent_private_key = identity.get('_private_key')
162 if not parent_private_key:
163 logger.warning("Cannot issue certificate: no node private key")
164 else:
165 cert = create_child_certificate(
166 parent_private_key=parent_private_key,
167 child_public_key_hex=request.public_key_hex,
168 node_id=request.node_id or request.user_id,
169 tier='regional',
170 region_name=region_name,
171 )
172 certificate = cert
173 request.certificate_json = json.dumps(cert)
174 except Exception as e:
175 logger.warning(f"Certificate issuance failed: {e}")
177 # Send GitHub repo invite via coding agent
178 invite_sent = False
179 if request.github_username:
180 try:
181 from integrations.agent_engine.private_repo_access import (
182 PrivateRepoAccessService)
183 repos = os.environ.get(
184 'HEVOLVE_PRIVATE_REPOS', '').split(',')
185 for repo_url in repos:
186 repo_url = repo_url.strip()
187 if repo_url:
188 result = PrivateRepoAccessService.send_github_invite(
189 repo_url, request.github_username,
190 permission='push')
191 if result.get('invited'):
192 invite_sent = True
193 request.github_invite_sent = invite_sent
194 except Exception as e:
195 logger.debug(f"GitHub invite failed: {e}")
197 # Register in hierarchy
198 try:
199 from .hierarchy_service import HierarchyService
200 HierarchyService.register_regional_host(
201 db,
202 node_id=request.node_id or request.user_id,
203 region_name=region_name,
204 public_key_hex=request.public_key_hex,
205 )
206 except Exception as e:
207 logger.debug(f"Hierarchy registration failed: {e}")
209 request.status = 'approved'
210 request.region_name = region_name
211 request.approved_at = datetime.utcnow()
212 request.approved_by = steward_node_id
213 db.flush()
215 # Push tier_promote fleet command to the node so it auto-reloads as regional
216 cmd_pushed = False
217 target_node = request.node_id or ''
218 if target_node:
219 try:
220 from .fleet_command import FleetCommandService
221 FleetCommandService.push_command(
222 db, target_node, 'tier_promote',
223 params={
224 'new_tier': 'regional',
225 'region_name': region_name,
226 'env_vars': {
227 'HEVOLVE_NODE_TIER': 'regional',
228 },
229 'restart_required': True,
230 },
231 issued_by=steward_node_id,
232 )
233 cmd_pushed = True
234 except Exception as e:
235 logger.debug(f"Fleet command push after approval failed: {e}")
237 return {
238 'approved': True,
239 'request_id': request_id,
240 'certificate': certificate,
241 'invite_sent': invite_sent,
242 'region_name': region_name,
243 'fleet_command_pushed': cmd_pushed,
244 }
246 @staticmethod
247 def reject_request(
248 db, request_id: str, reason: str = '',
249 ) -> Dict:
250 """Steward rejects a request."""
251 from .models import RegionalHostRequest
253 request = db.query(RegionalHostRequest).get(request_id)
254 if not request:
255 return {'rejected': False, 'error': 'Request not found'}
257 request.status = 'rejected'
258 request.rejected_reason = reason or 'Rejected by steward'
259 db.flush()
260 return {'rejected': True, 'request_id': request_id}
262 @staticmethod
263 def revoke_regional_host(
264 db, request_id: str,
265 ) -> Dict:
266 """Revoke certificate + remove GitHub collaborator + downgrade."""
267 from .models import RegionalHostRequest
269 request = db.query(RegionalHostRequest).get(request_id)
270 if not request:
271 return {'revoked': False, 'error': 'Request not found'}
273 # Revoke GitHub access
274 if request.github_username and request.github_invite_sent:
275 try:
276 from integrations.agent_engine.private_repo_access import (
277 PrivateRepoAccessService)
278 repos = os.environ.get(
279 'HEVOLVE_PRIVATE_REPOS', '').split(',')
280 for repo_url in repos:
281 repo_url = repo_url.strip()
282 if repo_url:
283 PrivateRepoAccessService.revoke_github_access(
284 repo_url, request.github_username)
285 except Exception as e:
286 logger.debug(f"GitHub access revocation failed: {e}")
288 request.status = 'revoked'
289 request.github_invite_sent = False
290 request.certificate_json = None
291 db.flush()
293 # Push tier_demote fleet command so the node auto-reloads as flat
294 target_node = request.node_id or ''
295 if target_node:
296 try:
297 from .fleet_command import FleetCommandService
298 FleetCommandService.push_command(
299 db, target_node, 'tier_demote',
300 params={
301 'new_tier': 'flat',
302 'reason': 'Regional host certificate revoked',
303 'env_vars': {
304 'HEVOLVE_NODE_TIER': 'flat',
305 },
306 'restart_required': True,
307 },
308 )
309 except Exception as e:
310 logger.debug(f"Fleet command push after revoke failed: {e}")
312 return {'revoked': True, 'request_id': request_id}
314 @staticmethod
315 def list_pending_requests(db) -> List[Dict]:
316 """Steward dashboard: list all pending requests."""
317 from .models import RegionalHostRequest
319 requests = db.query(RegionalHostRequest).filter(
320 RegionalHostRequest.status.in_([
321 'pending', 'pending_steward']),
322 ).order_by(RegionalHostRequest.requested_at.desc()).all()
323 return [r.to_dict() for r in requests]
325 @staticmethod
326 def get_request_status(db, user_id: str) -> Optional[Dict]:
327 """User checks their latest request status."""
328 from .models import RegionalHostRequest
330 request = db.query(RegionalHostRequest).filter_by(
331 user_id=user_id,
332 ).order_by(RegionalHostRequest.requested_at.desc()).first()
333 if not request:
334 return None
335 return request.to_dict()
337 @staticmethod
338 def get_region_capacity(db, region_name: str) -> Dict:
339 """Get region capacity metrics — current load, max capacity, health."""
340 from .models import PeerNode, RegionalHostRequest
342 # Find all approved regional hosts in this region
343 hosts = db.query(RegionalHostRequest).filter(
344 RegionalHostRequest.status == 'approved',
345 RegionalHostRequest.region_name == region_name,
346 ).all()
348 # Find all peer nodes in this region
349 nodes = db.query(PeerNode).filter(
350 PeerNode.dns_region == region_name,
351 PeerNode.status.in_(['active', 'online']),
352 ).all()
354 total_capacity = 0
355 current_load = 0
356 compute_cores = 0
357 compute_ram_gb = 0
358 gpu_count = 0
360 for node in nodes:
361 max_users = getattr(node, 'max_user_capacity', 50) or 50
362 active_users = getattr(node, 'active_user_count', 0) or 0
363 total_capacity += max_users
364 current_load += active_users
365 compute_cores += getattr(node, 'compute_cpu_cores', 0) or 0
366 compute_ram_gb += getattr(node, 'compute_ram_gb', 0) or 0
367 gpu_count += getattr(node, 'compute_gpu_count', 0) or 0
369 utilization = (current_load / total_capacity * 100) if total_capacity > 0 else 0
371 return {
372 'region_name': region_name,
373 'host_count': len(hosts),
374 'active_node_count': len(nodes),
375 'total_capacity': total_capacity,
376 'current_load': current_load,
377 'utilization_percent': round(utilization, 1),
378 'compute_cores': compute_cores,
379 'compute_ram_gb': round(compute_ram_gb, 1),
380 'gpu_count': gpu_count,
381 'status': (
382 'critical' if utilization > 90 else
383 'high' if utilization > 75 else
384 'healthy' if utilization > 0 else
385 'idle'
386 ),
387 'needs_scaling': utilization > 80,
388 }
390 @staticmethod
391 def get_all_region_capacities(db) -> List[Dict]:
392 """Get capacity metrics for ALL regions — used by elastic rebalancer."""
393 from .models import RegionalHostRequest
395 # Get unique region names
396 regions = db.query(RegionalHostRequest.region_name).filter(
397 RegionalHostRequest.status == 'approved',
398 RegionalHostRequest.region_name.isnot(None),
399 ).distinct().all()
401 capacities = []
402 for (region_name,) in regions:
403 if region_name:
404 cap = RegionalHostService.get_region_capacity(db, region_name)
405 capacities.append(cap)
407 return sorted(capacities, key=lambda c: c['utilization_percent'], reverse=True)
409 @staticmethod
410 def suggest_rebalance(db) -> Dict:
411 """Elastic rebalancing: identify overloaded regions and suggest migrations.
413 Returns a list of suggested user migrations from overloaded to underloaded regions.
414 Does NOT execute — returns suggestions for steward approval or auto-execution.
415 """
416 capacities = RegionalHostService.get_all_region_capacities(db)
417 if len(capacities) < 2:
418 return {'suggestions': [], 'reason': 'Need at least 2 regions to rebalance'}
420 overloaded = [c for c in capacities if c['utilization_percent'] > 80]
421 underloaded = [c for c in capacities if c['utilization_percent'] < 50]
423 suggestions = []
424 for over in overloaded:
425 excess = over['current_load'] - int(over['total_capacity'] * 0.7)
426 if excess <= 0:
427 continue
429 for under in underloaded:
430 available = under['total_capacity'] - under['current_load']
431 if available <= 0:
432 continue
434 migrate_count = min(excess, available)
435 suggestions.append({
436 'from_region': over['region_name'],
437 'to_region': under['region_name'],
438 'migrate_count': migrate_count,
439 'from_utilization': over['utilization_percent'],
440 'to_utilization': under['utilization_percent'],
441 'reason': f"{over['region_name']} at {over['utilization_percent']}% → "
442 f"migrate {migrate_count} users to {under['region_name']} "
443 f"({under['utilization_percent']}%)",
444 })
445 excess -= migrate_count
446 if excess <= 0:
447 break
449 return {
450 'suggestions': suggestions,
451 'total_regions': len(capacities),
452 'overloaded_count': len(overloaded),
453 'underloaded_count': len(underloaded),
454 'capacities': capacities,
455 }
457 @staticmethod
458 def check_scaling_needed(db) -> Dict:
459 """Check if any region needs horizontal scaling (more hosts).
461 Called periodically by a background task or fleet command.
462 Returns regions that need more hosts and optionally auto-posts
463 recruitment requests.
464 """
465 capacities = RegionalHostService.get_all_region_capacities(db)
466 scaling_needed = []
468 for cap in capacities:
469 if cap['utilization_percent'] > 80:
470 scaling_needed.append({
471 'region_name': cap['region_name'],
472 'utilization_percent': cap['utilization_percent'],
473 'current_hosts': cap['host_count'],
474 'current_load': cap['current_load'],
475 'total_capacity': cap['total_capacity'],
476 'action': 'recruit_hosts' if cap['host_count'] < 3 else 'scale_compute',
477 'recommended_additional_hosts': max(1, (cap['current_load'] - int(cap['total_capacity'] * 0.6)) // 50),
478 })
479 elif cap['utilization_percent'] < 10 and cap['host_count'] > 1:
480 scaling_needed.append({
481 'region_name': cap['region_name'],
482 'utilization_percent': cap['utilization_percent'],
483 'current_hosts': cap['host_count'],
484 'current_load': cap['current_load'],
485 'total_capacity': cap['total_capacity'],
486 'action': 'consolidate',
487 'reason': 'Very low utilization — consider consolidating hosts',
488 })
490 return {
491 'scaling_needed': scaling_needed,
492 'total_regions': len(capacities),
493 }