Coverage for integrations / distributed_agent / coordinator_backends.py: 79.2%

192 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Coordinator Backend Abstraction — pluggable task coordination without Redis. 

3 

4Three backends for DistributedTaskCoordinator: 

51. Redis — fast pub/sub, shared queue across nodes (production multi-node) 

62. In-Memory — thread-safe, single-process (default when Redis unavailable) 

73. Gossip — HTTP-based peer gossip for multi-node without Redis 

8 

9The coordinator doesn't know or care which backend it uses. All three 

10provide SmartLedger + TaskLock + TaskVerification + TaskBaseline. 

11 

12Philosophy: Redis is ONE transport, not THE transport. Distribution is 

13emergent from having peers. A single-node hive works with in-memory. 

14A multi-node hive can use Redis OR peer gossip. Every drop counts. 

15""" 

16 

17import json 

18import logging 

19import os 

20import sys 

21import threading 

22import time 

23from datetime import datetime 

24from typing import Any, Dict, List, Optional 

25 

26logger = logging.getLogger('hevolve_social') 

27 

28 

29# ═══════════════════════════════════════════════════════════════ 

30# In-Memory Task Lock (replaces DistributedTaskLock when no Redis) 

31# ═══════════════════════════════════════════════════════════════ 

32 

33class InMemoryTaskLock: 

34 """Thread-safe in-process task lock. Same interface as DistributedTaskLock. 

35 

36 Uses a threading.Lock for atomic claim/release. TTL enforced via 

37 background expiry check. Sufficient for single-node or testing. 

38 """ 

39 

40 LOCK_PREFIX = "agent_ledger:lock:" 

41 DEFAULT_TTL = 300 

42 

43 def __init__(self): 

44 self._locks: Dict[str, Dict[str, Any]] = {} # task_id → {agent_id, expires_at} 

45 self._mu = threading.Lock() 

46 

47 def try_claim_task(self, task_id: str, agent_id: str, ttl: int = None, 

48 heartbeat: bool = False) -> bool: 

49 # ``heartbeat`` is accepted for signature parity with 

50 # DistributedTaskLock.try_claim_task — in-memory locks don't 

51 # expire under Redis pressure, so there's nothing to renew, but 

52 # callers (task_coordinator) pass heartbeat=True uniformly. 

53 del heartbeat # noqa: F841 — explicit-ignore for readers 

54 ttl = ttl if ttl is not None else self.DEFAULT_TTL 

55 now = time.time() 

56 with self._mu: 

57 existing = self._locks.get(task_id) 

58 if existing and existing['expires_at'] > now: 

59 return False # Already claimed and not expired 

60 self._locks[task_id] = { 

61 'agent_id': agent_id, 

62 'expires_at': now + ttl, 

63 } 

64 return True 

65 

66 def renew(self, task_id: str, agent_id: str, ttl: int = None) -> bool: 

67 """Extend this lock's expiry. Mirrors DistributedTaskLock.renew 

68 so task_coordinator can treat both backends polymorphically. 

69 """ 

70 ttl = ttl if ttl is not None else self.DEFAULT_TTL 

71 with self._mu: 

72 existing = self._locks.get(task_id) 

73 if not existing or existing.get('agent_id') != agent_id: 

74 return False 

75 existing['expires_at'] = time.time() + ttl 

76 return True 

77 

78 def start_heartbeat(self, task_id: str, agent_id: str, 

79 ttl: int = None) -> bool: 

80 """No-op for in-memory backend — locks don't drop mid-flight 

81 without external pressure. Provided for API parity. 

82 """ 

83 return False 

84 

85 def stop_heartbeat(self, task_id: str, agent_id: str) -> bool: 

86 return False 

87 

88 def stop_all_heartbeats(self, timeout: float = 5.0) -> None: 

89 return None 

90 

91 def release_task(self, task_id: str, agent_id: str) -> bool: 

92 with self._mu: 

93 existing = self._locks.get(task_id) 

94 if existing and existing['agent_id'] == agent_id: 

95 del self._locks[task_id] 

96 return True 

97 return False 

98 

99 def get_task_owner(self, task_id: str) -> Optional[str]: 

100 with self._mu: 

101 entry = self._locks.get(task_id) 

102 if entry and entry['expires_at'] > time.time(): 

103 return entry['agent_id'] 

104 return None 

105 

106 def is_task_locked(self, task_id: str) -> bool: 

107 return self.get_task_owner(task_id) is not None 

108 

109 def reclaim_stale_tasks(self, heartbeat=None, known_task_ids: List[str] = None) -> List[str]: 

110 reclaimed = [] 

111 now = time.time() 

112 with self._mu: 

113 expired = [tid for tid, v in self._locks.items() if v['expires_at'] <= now] 

114 for tid in expired: 

115 del self._locks[tid] 

116 reclaimed.append(tid) 

117 return reclaimed 

118 

119 

120# ═══════════════════════════════════════════════════════════════ 

121# In-Memory Host Registry (replaces RegionalHostRegistry when no Redis) 

122# ═══════════════════════════════════════════════════════════════ 

123 

124class InMemoryHostRegistry: 

125 """Thread-safe in-process host registry. Same interface as RegionalHostRegistry.""" 

126 

127 def __init__(self, host_id: str, host_url: str = ""): 

128 self.host_id = host_id 

129 self.host_url = host_url 

130 self._hosts: Dict[str, Dict[str, Any]] = {} 

131 self._mu = threading.Lock() 

132 

133 def register_host(self, capabilities: List[str], 

134 compute_budget: Optional[Dict[str, Any]] = None, 

135 agent_ids: Optional[List[str]] = None) -> bool: 

136 with self._mu: 

137 self._hosts[self.host_id] = { 

138 "host_id": self.host_id, 

139 "host_url": self.host_url, 

140 "capabilities": capabilities, 

141 "compute_budget": compute_budget or {}, 

142 "agent_ids": agent_ids or [], 

143 "registered_at": datetime.now().isoformat(), 

144 "last_seen": datetime.now().isoformat(), 

145 } 

146 return True 

147 

148 def deregister_host(self) -> bool: 

149 with self._mu: 

150 self._hosts.pop(self.host_id, None) 

151 return True 

152 

153 def get_all_hosts(self) -> List[Dict[str, Any]]: 

154 with self._mu: 

155 return list(self._hosts.values()) 

156 

157 def get_hosts_with_capability(self, capability: str) -> List[Dict[str, Any]]: 

158 return [h for h in self.get_all_hosts() 

159 if capability in h.get("capabilities", [])] 

160 

161 def get_host_info(self, host_id: str) -> Optional[Dict[str, Any]]: 

162 with self._mu: 

163 return self._hosts.get(host_id) 

164 

165 def update_compute_usage(self, usage: Dict[str, Any]) -> None: 

166 with self._mu: 

167 if self.host_id in self._hosts: 

168 self._hosts[self.host_id]["compute_usage"] = usage 

169 self._hosts[self.host_id]["last_seen"] = datetime.now().isoformat() 

170 

171 

172# ═══════════════════════════════════════════════════════════════ 

173# Gossip Task Bridge — multi-node without Redis 

174# ═══════════════════════════════════════════════════════════════ 

175 

176class GossipTaskBridge: 

177 """Propagate tasks to peers via existing HTTP gossip protocol. 

178 

179 When a node submits a goal, the bridge announces it to known peers 

180 via POST /api/distributed/tasks/announce. Peers add the tasks to 

181 their local coordinator and can claim them. 

182 

183 This is pull-based: peers poll each other's task lists via gossip 

184 exchange rounds, not push-based pub/sub. 

185 """ 

186 

187 def __init__(self): 

188 self._announced: Dict[str, float] = {} # goal_id → timestamp 

189 self._mu = threading.Lock() 

190 

191 def announce_goal(self, goal_id: str, objective: str, 

192 tasks: List[Dict], context: Dict) -> int: 

193 """Announce a new goal to all known peers via HTTP POST. 

194 

195 If the peer has an X25519 public key, the payload is E2E encrypted 

196 so neither network observers nor the hosting node see the task data. 

197 Falls back to plaintext for peers without X25519 keys (old nodes). 

198 

199 Returns number of peers notified. 

200 """ 

201 notified = 0 

202 peers = self._get_active_peers() 

203 

204 payload = { 

205 'goal_id': goal_id, 

206 'objective': objective, 

207 'tasks': tasks, 

208 'context': context, 

209 } 

210 

211 for peer in peers: 

212 peer_url = peer.get('host_url') or peer.get('url', '') 

213 if not peer_url: 

214 continue 

215 try: 

216 from core.http_pool import pooled_post 

217 send_payload = payload 

218 peer_x25519 = peer.get('x25519_public', '') 

219 if peer_x25519: 

220 try: 

221 from security.channel_encryption import encrypt_json_for_peer 

222 send_payload = {'encrypted': True, 

223 'envelope': encrypt_json_for_peer(payload, peer_x25519)} 

224 except Exception: 

225 pass # Encryption unavailable, send plaintext 

226 resp = pooled_post( 

227 f'{peer_url}/api/distributed/tasks/announce', 

228 json=send_payload, 

229 timeout=5, 

230 ) 

231 if resp.status_code == 200: 

232 notified += 1 

233 except Exception: 

234 pass 

235 

236 with self._mu: 

237 self._announced[goal_id] = time.time() 

238 

239 logger.debug(f"Gossip: announced goal {goal_id} to {notified}/{len(peers)} peers") 

240 return notified 

241 

242 def pull_tasks_from_peers(self) -> List[Dict]: 

243 """Pull unclaimed tasks from known peers (gossip exchange). 

244 

245 If the response is E2E encrypted, decrypts it using our X25519 key. 

246 """ 

247 tasks = [] 

248 peers = self._get_active_peers() 

249 

250 for peer in peers: 

251 peer_url = peer.get('host_url') or peer.get('url', '') 

252 if not peer_url: 

253 continue 

254 try: 

255 from core.http_pool import pooled_get 

256 resp = pooled_get( 

257 f'{peer_url}/api/distributed/tasks/available', 

258 timeout=5, 

259 ) 

260 if resp.status_code == 200: 

261 data = resp.json() 

262 # Handle E2E encrypted response 

263 if data.get('encrypted') and data.get('envelope'): 

264 try: 

265 from security.channel_encryption import decrypt_json_from_peer 

266 decrypted = decrypt_json_from_peer(data['envelope']) 

267 if decrypted: 

268 tasks.extend(decrypted.get('tasks', [])) 

269 continue 

270 except Exception: 

271 pass # Decryption failed, try plaintext 

272 tasks.extend(data.get('tasks', [])) 

273 except Exception: 

274 pass 

275 

276 return tasks 

277 

278 @staticmethod 

279 def _get_active_peers() -> List[Dict]: 

280 """Get active peers from the PeerNode table. 

281 

282 Includes x25519_public for E2E encryption of task payloads. 

283 """ 

284 try: 

285 from integrations.social.models import get_db, PeerNode 

286 db = get_db() 

287 try: 

288 peers = db.query(PeerNode).filter( 

289 PeerNode.status == 'active' 

290 ).all() 

291 return [{'host_url': p.url, 'node_id': p.node_id, 

292 'x25519_public': getattr(p, 'x25519_public', '') or ''} 

293 for p in peers if p.url] 

294 finally: 

295 db.close() 

296 except Exception: 

297 return [] 

298 

299 

300# ═══════════════════════════════════════════════════════════════ 

301# Backend Factory — single function to create the best available backend 

302# ═══════════════════════════════════════════════════════════════ 

303 

304def create_coordinator(agent_id: str = None): 

305 """Create a DistributedTaskCoordinator with the best available backend. 

306 

307 Priority: 

308 1. Redis (if reachable) — shared across nodes, pub/sub, distributed locks 

309 2. In-memory (fallback) — single-node, thread-safe, no external deps 

310 

311 Gossip bridge is always attached when peers exist, regardless of backend. 

312 This allows in-memory nodes to announce tasks to peers. 

313 

314 Returns: 

315 (coordinator, backend_type) tuple, or (None, None) if creation fails 

316 """ 

317 agent_id = agent_id or os.environ.get('HEVOLVE_AGENT_ID', 'local') 

318 

319 # In bundled/desktop mode, Redis is never available — skip the attempt 

320 # entirely to avoid a 1-30s timeout stall on every startup. 

321 if not os.environ.get('NUNBA_BUNDLED'): 

322 coordinator = _try_redis_backend(agent_id) 

323 if coordinator: 

324 return coordinator, 'redis' 

325 

326 # Fall back to in-memory + JSON file backend 

327 coordinator = _create_inmemory_backend(agent_id) 

328 if coordinator: 

329 return coordinator, 'inmemory' 

330 

331 return None, None 

332 

333 

334def _try_redis_backend(agent_id: str): 

335 """Try to create coordinator with Redis backend.""" 

336 try: 

337 import redis 

338 host = os.environ.get('REDIS_HOST', 'localhost') 

339 port = int(os.environ.get('REDIS_PORT', 6379)) 

340 

341 # Quick connectivity check — fail fast, no retries 

342 r = redis.Redis(host=host, port=port, decode_responses=True, 

343 socket_connect_timeout=1, socket_timeout=1, 

344 retry_on_timeout=False) 

345 r.ping() 

346 

347 from agent_ledger import SmartLedger, RedisBackend 

348 from agent_ledger.distributed import DistributedTaskLock 

349 from agent_ledger.verification import TaskVerification, TaskBaseline 

350 

351 backend = RedisBackend(host=host, port=port) 

352 shared_redis = backend.redis_client 

353 

354 ledger = SmartLedger( 

355 agent_id=agent_id, 

356 session_id='distributed', 

357 backend=backend, 

358 ) 

359 ledger.enable_pubsub(shared_redis) 

360 

361 from .task_coordinator import DistributedTaskCoordinator 

362 coordinator = DistributedTaskCoordinator( 

363 ledger=ledger, 

364 task_lock=DistributedTaskLock(shared_redis), 

365 verifier=TaskVerification(shared_redis), 

366 baseline=TaskBaseline(backend), 

367 ) 

368 logger.info("Distributed coordinator: Redis backend active") 

369 return coordinator 

370 

371 except Exception as e: 

372 logger.debug(f"Redis backend unavailable: {e}") 

373 return None 

374 

375 

376def _create_inmemory_backend(agent_id: str): 

377 """Create coordinator with in-memory/JSON backend (no Redis needed).""" 

378 try: 

379 from agent_ledger import SmartLedger, JSONBackend 

380 from agent_ledger.verification import TaskVerification, TaskBaseline 

381 

382 # Use agent_data directory for JSON persistence (must be absolute 

383 # and writable — never use relative paths, which resolve to the 

384 # read-only install dir in bundled mode). 

385 db_path = os.environ.get('HEVOLVE_DB_PATH', '') 

386 if db_path and db_path != ':memory:' and os.path.isabs(db_path): 

387 storage_dir = os.path.join(os.path.dirname(db_path), 'distributed_tasks') 

388 else: 

389 # Always fall back to user-writable data dir 

390 try: 

391 from core.platform_paths import get_agent_data_dir 

392 storage_dir = os.path.join(get_agent_data_dir(), 'distributed_tasks') 

393 except ImportError: 

394 storage_dir = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'distributed_tasks') 

395 os.makedirs(storage_dir, exist_ok=True) 

396 

397 backend = JSONBackend(storage_dir=storage_dir) 

398 ledger = SmartLedger( 

399 agent_id=agent_id, 

400 session_id='distributed', 

401 backend=backend, 

402 ) 

403 

404 from .task_coordinator import DistributedTaskCoordinator 

405 coordinator = DistributedTaskCoordinator( 

406 ledger=ledger, 

407 task_lock=InMemoryTaskLock(), 

408 verifier=TaskVerification(), # In-memory verification 

409 baseline=TaskBaseline(), # In-memory baselines 

410 ) 

411 logger.info("Distributed coordinator: in-memory backend active (no Redis)") 

412 return coordinator 

413 

414 except Exception as e: 

415 logger.warning(f"In-memory backend creation failed: {e}") 

416 return None