Coverage for integrations / distributed_agent / host_registry.py: 70.2%

84 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Regional Host Registry — tracks which hosts contribute compute. 

3 

4Stores host info in Redis hashes so all distributed agents can discover 

5each other. Reuses the same Redis connection used by RedisBackend / heartbeat. 

6""" 

7 

8import json 

9import logging 

10from datetime import datetime 

11from typing import Any, Dict, List, Optional 

12 

13logger = logging.getLogger(__name__) 

14 

15# Hosts not seen within this window (seconds) are considered stale and purged. 

16STALE_THRESHOLD = 300 # 5 minutes 

17 

18 

19class RegionalHostRegistry: 

20 """ 

21 Tracks regional hosts that contribute compute to distributed coding. 

22 

23 Each host registers with capabilities (tools, skills) and compute budget. 

24 Other hosts discover peers via shared Redis. 

25 

26 Host entries carry a ``last_seen`` timestamp. Entries older than 

27 ``STALE_THRESHOLD`` seconds are automatically purged when the host list 

28 is queried via ``get_all_hosts()`` or ``get_hosts_with_capability()``. 

29 """ 

30 

31 HOSTS_HASH = "distributed_agent:hosts" 

32 HOST_PREFIX = "distributed_agent:host:" 

33 

34 def __init__(self, redis_client, host_id: str, host_url: str = ""): 

35 self._redis = redis_client 

36 self.host_id = host_id 

37 self.host_url = host_url 

38 

39 def register_host( 

40 self, 

41 capabilities: List[str], 

42 compute_budget: Optional[Dict[str, Any]] = None, 

43 agent_ids: Optional[List[str]] = None, 

44 ) -> bool: 

45 """Register this host as available for distributed work. 

46 

47 Capabilities list contains flat strings ('coding', 'testing') AND 

48 auto-discovered model capabilities from ModelOrchestrator (e.g. 

49 'tts', 'audio_gen:music_gen', 'video_gen:txt2vid'). This allows 

50 distributed dispatch to route tasks to nodes with the right models. 

51 """ 

52 # Auto-discover model/service capabilities from orchestrator 

53 model_caps = self._discover_model_capabilities() 

54 all_caps = list(set(capabilities + model_caps)) 

55 

56 try: 

57 data = { 

58 "host_id": self.host_id, 

59 "host_url": self.host_url, 

60 "capabilities": all_caps, 

61 "compute_budget": compute_budget or {}, 

62 "agent_ids": agent_ids or [], 

63 "registered_at": datetime.now().isoformat(), 

64 "last_seen": datetime.now().isoformat(), 

65 } 

66 self._redis.hset(self.HOSTS_HASH, self.host_id, json.dumps(data)) 

67 logger.info(f"Host registered: {self.host_id} with {len(all_caps)} capabilities") 

68 return True 

69 except Exception as e: 

70 logger.error(f"Failed to register host {self.host_id}: {e}") 

71 return False 

72 

73 @staticmethod 

74 def _discover_model_capabilities() -> List[str]: 

75 """Auto-discover model capabilities from ModelOrchestrator. 

76 

77 Returns flat strings like ['tts', 'audio_gen:music_gen', 'stt:realtime'] 

78 that can be matched by get_hosts_with_capability(). 

79 """ 

80 try: 

81 from integrations.service_tools.model_orchestrator import get_orchestrator 

82 caps = get_orchestrator().available_capabilities() 

83 flat = [] 

84 for model_type, info in caps.items(): 

85 if info.get('available'): 

86 flat.append(model_type) 

87 # Also add type:capability pairs for fine-grained matching 

88 for cap_name in info.get('capabilities', {}): 

89 flat.append(f"{model_type}:{cap_name}") 

90 return flat 

91 except Exception: 

92 return [] 

93 

94 def deregister_host(self) -> bool: 

95 """Remove this host from the registry.""" 

96 try: 

97 self._redis.hdel(self.HOSTS_HASH, self.host_id) 

98 logger.info(f"Host deregistered: {self.host_id}") 

99 return True 

100 except Exception as e: 

101 logger.error(f"Failed to deregister host: {e}") 

102 return False 

103 

104 # ── stale-host purging ──────────────────────────────────────── 

105 def _purge_stale(self) -> int: 

106 """Remove host entries whose ``last_seen`` is older than STALE_THRESHOLD. 

107 

108 Returns the number of purged entries. 

109 """ 

110 purged = 0 

111 try: 

112 now = datetime.now() 

113 all_data = self._redis.hgetall(self.HOSTS_HASH) 

114 for hid, raw in all_data.items(): 

115 try: 

116 entry = json.loads(raw) 

117 last_seen_str = entry.get("last_seen") 

118 if not last_seen_str: 

119 # No timestamp — treat as stale 

120 self._redis.hdel(self.HOSTS_HASH, hid) 

121 purged += 1 

122 continue 

123 last_seen = datetime.fromisoformat(last_seen_str) 

124 if (now - last_seen).total_seconds() > STALE_THRESHOLD: 

125 self._redis.hdel(self.HOSTS_HASH, hid) 

126 purged += 1 

127 logger.info(f"Purged stale host: {hid} (last seen {last_seen_str})") 

128 except (json.JSONDecodeError, ValueError): 

129 # Corrupt entry — remove it 

130 self._redis.hdel(self.HOSTS_HASH, hid) 

131 purged += 1 

132 except Exception as e: 

133 logger.error(f"Failed to purge stale hosts: {e}") 

134 return purged 

135 

136 def get_all_hosts(self) -> List[Dict[str, Any]]: 

137 """List all registered hosts (purges stale entries first).""" 

138 self._purge_stale() 

139 hosts = [] 

140 try: 

141 all_data = self._redis.hgetall(self.HOSTS_HASH) 

142 for host_id, raw in all_data.items(): 

143 hosts.append(json.loads(raw)) 

144 except Exception as e: 

145 logger.error(f"Failed to list hosts: {e}") 

146 return hosts 

147 

148 def get_hosts_with_capability(self, capability: str) -> List[Dict[str, Any]]: 

149 """Find hosts that have a specific capability.""" 

150 return [ 

151 h for h in self.get_all_hosts() 

152 if capability in h.get("capabilities", []) 

153 ] 

154 

155 def update_compute_usage(self, usage: Dict[str, Any]) -> None: 

156 """Report current compute usage (CPU, memory, active tasks).""" 

157 try: 

158 raw = self._redis.hget(self.HOSTS_HASH, self.host_id) 

159 if raw: 

160 data = json.loads(raw) 

161 data["compute_usage"] = usage 

162 data["last_seen"] = datetime.now().isoformat() 

163 self._redis.hset(self.HOSTS_HASH, self.host_id, json.dumps(data)) 

164 except Exception as e: 

165 logger.debug(f"Failed to update compute usage: {e}") 

166 

167 def get_host_info(self, host_id: str) -> Optional[Dict[str, Any]]: 

168 """Get info for a specific host.""" 

169 try: 

170 raw = self._redis.hget(self.HOSTS_HASH, host_id) 

171 if raw: 

172 return json.loads(raw) 

173 except Exception: 

174 pass 

175 return None