Coverage for integrations / distributed_agent / host_registry.py: 70.2%
84 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Regional Host Registry — tracks which hosts contribute compute.
4Stores host info in Redis hashes so all distributed agents can discover
5each other. Reuses the same Redis connection used by RedisBackend / heartbeat.
6"""
8import json
9import logging
10from datetime import datetime
11from typing import Any, Dict, List, Optional
13logger = logging.getLogger(__name__)
15# Hosts not seen within this window (seconds) are considered stale and purged.
16STALE_THRESHOLD = 300 # 5 minutes
19class RegionalHostRegistry:
20 """
21 Tracks regional hosts that contribute compute to distributed coding.
23 Each host registers with capabilities (tools, skills) and compute budget.
24 Other hosts discover peers via shared Redis.
26 Host entries carry a ``last_seen`` timestamp. Entries older than
27 ``STALE_THRESHOLD`` seconds are automatically purged when the host list
28 is queried via ``get_all_hosts()`` or ``get_hosts_with_capability()``.
29 """
31 HOSTS_HASH = "distributed_agent:hosts"
32 HOST_PREFIX = "distributed_agent:host:"
34 def __init__(self, redis_client, host_id: str, host_url: str = ""):
35 self._redis = redis_client
36 self.host_id = host_id
37 self.host_url = host_url
39 def register_host(
40 self,
41 capabilities: List[str],
42 compute_budget: Optional[Dict[str, Any]] = None,
43 agent_ids: Optional[List[str]] = None,
44 ) -> bool:
45 """Register this host as available for distributed work.
47 Capabilities list contains flat strings ('coding', 'testing') AND
48 auto-discovered model capabilities from ModelOrchestrator (e.g.
49 'tts', 'audio_gen:music_gen', 'video_gen:txt2vid'). This allows
50 distributed dispatch to route tasks to nodes with the right models.
51 """
52 # Auto-discover model/service capabilities from orchestrator
53 model_caps = self._discover_model_capabilities()
54 all_caps = list(set(capabilities + model_caps))
56 try:
57 data = {
58 "host_id": self.host_id,
59 "host_url": self.host_url,
60 "capabilities": all_caps,
61 "compute_budget": compute_budget or {},
62 "agent_ids": agent_ids or [],
63 "registered_at": datetime.now().isoformat(),
64 "last_seen": datetime.now().isoformat(),
65 }
66 self._redis.hset(self.HOSTS_HASH, self.host_id, json.dumps(data))
67 logger.info(f"Host registered: {self.host_id} with {len(all_caps)} capabilities")
68 return True
69 except Exception as e:
70 logger.error(f"Failed to register host {self.host_id}: {e}")
71 return False
73 @staticmethod
74 def _discover_model_capabilities() -> List[str]:
75 """Auto-discover model capabilities from ModelOrchestrator.
77 Returns flat strings like ['tts', 'audio_gen:music_gen', 'stt:realtime']
78 that can be matched by get_hosts_with_capability().
79 """
80 try:
81 from integrations.service_tools.model_orchestrator import get_orchestrator
82 caps = get_orchestrator().available_capabilities()
83 flat = []
84 for model_type, info in caps.items():
85 if info.get('available'):
86 flat.append(model_type)
87 # Also add type:capability pairs for fine-grained matching
88 for cap_name in info.get('capabilities', {}):
89 flat.append(f"{model_type}:{cap_name}")
90 return flat
91 except Exception:
92 return []
94 def deregister_host(self) -> bool:
95 """Remove this host from the registry."""
96 try:
97 self._redis.hdel(self.HOSTS_HASH, self.host_id)
98 logger.info(f"Host deregistered: {self.host_id}")
99 return True
100 except Exception as e:
101 logger.error(f"Failed to deregister host: {e}")
102 return False
104 # ── stale-host purging ────────────────────────────────────────
105 def _purge_stale(self) -> int:
106 """Remove host entries whose ``last_seen`` is older than STALE_THRESHOLD.
108 Returns the number of purged entries.
109 """
110 purged = 0
111 try:
112 now = datetime.now()
113 all_data = self._redis.hgetall(self.HOSTS_HASH)
114 for hid, raw in all_data.items():
115 try:
116 entry = json.loads(raw)
117 last_seen_str = entry.get("last_seen")
118 if not last_seen_str:
119 # No timestamp — treat as stale
120 self._redis.hdel(self.HOSTS_HASH, hid)
121 purged += 1
122 continue
123 last_seen = datetime.fromisoformat(last_seen_str)
124 if (now - last_seen).total_seconds() > STALE_THRESHOLD:
125 self._redis.hdel(self.HOSTS_HASH, hid)
126 purged += 1
127 logger.info(f"Purged stale host: {hid} (last seen {last_seen_str})")
128 except (json.JSONDecodeError, ValueError):
129 # Corrupt entry — remove it
130 self._redis.hdel(self.HOSTS_HASH, hid)
131 purged += 1
132 except Exception as e:
133 logger.error(f"Failed to purge stale hosts: {e}")
134 return purged
136 def get_all_hosts(self) -> List[Dict[str, Any]]:
137 """List all registered hosts (purges stale entries first)."""
138 self._purge_stale()
139 hosts = []
140 try:
141 all_data = self._redis.hgetall(self.HOSTS_HASH)
142 for host_id, raw in all_data.items():
143 hosts.append(json.loads(raw))
144 except Exception as e:
145 logger.error(f"Failed to list hosts: {e}")
146 return hosts
148 def get_hosts_with_capability(self, capability: str) -> List[Dict[str, Any]]:
149 """Find hosts that have a specific capability."""
150 return [
151 h for h in self.get_all_hosts()
152 if capability in h.get("capabilities", [])
153 ]
155 def update_compute_usage(self, usage: Dict[str, Any]) -> None:
156 """Report current compute usage (CPU, memory, active tasks)."""
157 try:
158 raw = self._redis.hget(self.HOSTS_HASH, self.host_id)
159 if raw:
160 data = json.loads(raw)
161 data["compute_usage"] = usage
162 data["last_seen"] = datetime.now().isoformat()
163 self._redis.hset(self.HOSTS_HASH, self.host_id, json.dumps(data))
164 except Exception as e:
165 logger.debug(f"Failed to update compute usage: {e}")
167 def get_host_info(self, host_id: str) -> Optional[Dict[str, Any]]:
168 """Get info for a specific host."""
169 try:
170 raw = self._redis.hget(self.HOSTS_HASH, host_id)
171 if raw:
172 return json.loads(raw)
173 except Exception:
174 pass
175 return None