Coverage for integrations / distributed_agent / coordinator_backends.py: 79.2%
192 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Coordinator Backend Abstraction — pluggable task coordination without Redis.
4Three backends for DistributedTaskCoordinator:
51. Redis — fast pub/sub, shared queue across nodes (production multi-node)
62. In-Memory — thread-safe, single-process (default when Redis unavailable)
73. Gossip — HTTP-based peer gossip for multi-node without Redis
9The coordinator doesn't know or care which backend it uses. All three
10provide SmartLedger + TaskLock + TaskVerification + TaskBaseline.
12Philosophy: Redis is ONE transport, not THE transport. Distribution is
13emergent from having peers. A single-node hive works with in-memory.
14A multi-node hive can use Redis OR peer gossip. Every drop counts.
15"""
17import json
18import logging
19import os
20import sys
21import threading
22import time
23from datetime import datetime
24from typing import Any, Dict, List, Optional
26logger = logging.getLogger('hevolve_social')
29# ═══════════════════════════════════════════════════════════════
30# In-Memory Task Lock (replaces DistributedTaskLock when no Redis)
31# ═══════════════════════════════════════════════════════════════
33class InMemoryTaskLock:
34 """Thread-safe in-process task lock. Same interface as DistributedTaskLock.
36 Uses a threading.Lock for atomic claim/release. TTL enforced via
37 background expiry check. Sufficient for single-node or testing.
38 """
40 LOCK_PREFIX = "agent_ledger:lock:"
41 DEFAULT_TTL = 300
43 def __init__(self):
44 self._locks: Dict[str, Dict[str, Any]] = {} # task_id → {agent_id, expires_at}
45 self._mu = threading.Lock()
47 def try_claim_task(self, task_id: str, agent_id: str, ttl: int = None,
48 heartbeat: bool = False) -> bool:
49 # ``heartbeat`` is accepted for signature parity with
50 # DistributedTaskLock.try_claim_task — in-memory locks don't
51 # expire under Redis pressure, so there's nothing to renew, but
52 # callers (task_coordinator) pass heartbeat=True uniformly.
53 del heartbeat # noqa: F841 — explicit-ignore for readers
54 ttl = ttl if ttl is not None else self.DEFAULT_TTL
55 now = time.time()
56 with self._mu:
57 existing = self._locks.get(task_id)
58 if existing and existing['expires_at'] > now:
59 return False # Already claimed and not expired
60 self._locks[task_id] = {
61 'agent_id': agent_id,
62 'expires_at': now + ttl,
63 }
64 return True
66 def renew(self, task_id: str, agent_id: str, ttl: int = None) -> bool:
67 """Extend this lock's expiry. Mirrors DistributedTaskLock.renew
68 so task_coordinator can treat both backends polymorphically.
69 """
70 ttl = ttl if ttl is not None else self.DEFAULT_TTL
71 with self._mu:
72 existing = self._locks.get(task_id)
73 if not existing or existing.get('agent_id') != agent_id:
74 return False
75 existing['expires_at'] = time.time() + ttl
76 return True
78 def start_heartbeat(self, task_id: str, agent_id: str,
79 ttl: int = None) -> bool:
80 """No-op for in-memory backend — locks don't drop mid-flight
81 without external pressure. Provided for API parity.
82 """
83 return False
85 def stop_heartbeat(self, task_id: str, agent_id: str) -> bool:
86 return False
88 def stop_all_heartbeats(self, timeout: float = 5.0) -> None:
89 return None
91 def release_task(self, task_id: str, agent_id: str) -> bool:
92 with self._mu:
93 existing = self._locks.get(task_id)
94 if existing and existing['agent_id'] == agent_id:
95 del self._locks[task_id]
96 return True
97 return False
99 def get_task_owner(self, task_id: str) -> Optional[str]:
100 with self._mu:
101 entry = self._locks.get(task_id)
102 if entry and entry['expires_at'] > time.time():
103 return entry['agent_id']
104 return None
106 def is_task_locked(self, task_id: str) -> bool:
107 return self.get_task_owner(task_id) is not None
109 def reclaim_stale_tasks(self, heartbeat=None, known_task_ids: List[str] = None) -> List[str]:
110 reclaimed = []
111 now = time.time()
112 with self._mu:
113 expired = [tid for tid, v in self._locks.items() if v['expires_at'] <= now]
114 for tid in expired:
115 del self._locks[tid]
116 reclaimed.append(tid)
117 return reclaimed
120# ═══════════════════════════════════════════════════════════════
121# In-Memory Host Registry (replaces RegionalHostRegistry when no Redis)
122# ═══════════════════════════════════════════════════════════════
124class InMemoryHostRegistry:
125 """Thread-safe in-process host registry. Same interface as RegionalHostRegistry."""
127 def __init__(self, host_id: str, host_url: str = ""):
128 self.host_id = host_id
129 self.host_url = host_url
130 self._hosts: Dict[str, Dict[str, Any]] = {}
131 self._mu = threading.Lock()
133 def register_host(self, capabilities: List[str],
134 compute_budget: Optional[Dict[str, Any]] = None,
135 agent_ids: Optional[List[str]] = None) -> bool:
136 with self._mu:
137 self._hosts[self.host_id] = {
138 "host_id": self.host_id,
139 "host_url": self.host_url,
140 "capabilities": capabilities,
141 "compute_budget": compute_budget or {},
142 "agent_ids": agent_ids or [],
143 "registered_at": datetime.now().isoformat(),
144 "last_seen": datetime.now().isoformat(),
145 }
146 return True
148 def deregister_host(self) -> bool:
149 with self._mu:
150 self._hosts.pop(self.host_id, None)
151 return True
153 def get_all_hosts(self) -> List[Dict[str, Any]]:
154 with self._mu:
155 return list(self._hosts.values())
157 def get_hosts_with_capability(self, capability: str) -> List[Dict[str, Any]]:
158 return [h for h in self.get_all_hosts()
159 if capability in h.get("capabilities", [])]
161 def get_host_info(self, host_id: str) -> Optional[Dict[str, Any]]:
162 with self._mu:
163 return self._hosts.get(host_id)
165 def update_compute_usage(self, usage: Dict[str, Any]) -> None:
166 with self._mu:
167 if self.host_id in self._hosts:
168 self._hosts[self.host_id]["compute_usage"] = usage
169 self._hosts[self.host_id]["last_seen"] = datetime.now().isoformat()
172# ═══════════════════════════════════════════════════════════════
173# Gossip Task Bridge — multi-node without Redis
174# ═══════════════════════════════════════════════════════════════
176class GossipTaskBridge:
177 """Propagate tasks to peers via existing HTTP gossip protocol.
179 When a node submits a goal, the bridge announces it to known peers
180 via POST /api/distributed/tasks/announce. Peers add the tasks to
181 their local coordinator and can claim them.
183 This is pull-based: peers poll each other's task lists via gossip
184 exchange rounds, not push-based pub/sub.
185 """
187 def __init__(self):
188 self._announced: Dict[str, float] = {} # goal_id → timestamp
189 self._mu = threading.Lock()
191 def announce_goal(self, goal_id: str, objective: str,
192 tasks: List[Dict], context: Dict) -> int:
193 """Announce a new goal to all known peers via HTTP POST.
195 If the peer has an X25519 public key, the payload is E2E encrypted
196 so neither network observers nor the hosting node see the task data.
197 Falls back to plaintext for peers without X25519 keys (old nodes).
199 Returns number of peers notified.
200 """
201 notified = 0
202 peers = self._get_active_peers()
204 payload = {
205 'goal_id': goal_id,
206 'objective': objective,
207 'tasks': tasks,
208 'context': context,
209 }
211 for peer in peers:
212 peer_url = peer.get('host_url') or peer.get('url', '')
213 if not peer_url:
214 continue
215 try:
216 from core.http_pool import pooled_post
217 send_payload = payload
218 peer_x25519 = peer.get('x25519_public', '')
219 if peer_x25519:
220 try:
221 from security.channel_encryption import encrypt_json_for_peer
222 send_payload = {'encrypted': True,
223 'envelope': encrypt_json_for_peer(payload, peer_x25519)}
224 except Exception:
225 pass # Encryption unavailable, send plaintext
226 resp = pooled_post(
227 f'{peer_url}/api/distributed/tasks/announce',
228 json=send_payload,
229 timeout=5,
230 )
231 if resp.status_code == 200:
232 notified += 1
233 except Exception:
234 pass
236 with self._mu:
237 self._announced[goal_id] = time.time()
239 logger.debug(f"Gossip: announced goal {goal_id} to {notified}/{len(peers)} peers")
240 return notified
242 def pull_tasks_from_peers(self) -> List[Dict]:
243 """Pull unclaimed tasks from known peers (gossip exchange).
245 If the response is E2E encrypted, decrypts it using our X25519 key.
246 """
247 tasks = []
248 peers = self._get_active_peers()
250 for peer in peers:
251 peer_url = peer.get('host_url') or peer.get('url', '')
252 if not peer_url:
253 continue
254 try:
255 from core.http_pool import pooled_get
256 resp = pooled_get(
257 f'{peer_url}/api/distributed/tasks/available',
258 timeout=5,
259 )
260 if resp.status_code == 200:
261 data = resp.json()
262 # Handle E2E encrypted response
263 if data.get('encrypted') and data.get('envelope'):
264 try:
265 from security.channel_encryption import decrypt_json_from_peer
266 decrypted = decrypt_json_from_peer(data['envelope'])
267 if decrypted:
268 tasks.extend(decrypted.get('tasks', []))
269 continue
270 except Exception:
271 pass # Decryption failed, try plaintext
272 tasks.extend(data.get('tasks', []))
273 except Exception:
274 pass
276 return tasks
278 @staticmethod
279 def _get_active_peers() -> List[Dict]:
280 """Get active peers from the PeerNode table.
282 Includes x25519_public for E2E encryption of task payloads.
283 """
284 try:
285 from integrations.social.models import get_db, PeerNode
286 db = get_db()
287 try:
288 peers = db.query(PeerNode).filter(
289 PeerNode.status == 'active'
290 ).all()
291 return [{'host_url': p.url, 'node_id': p.node_id,
292 'x25519_public': getattr(p, 'x25519_public', '') or ''}
293 for p in peers if p.url]
294 finally:
295 db.close()
296 except Exception:
297 return []
300# ═══════════════════════════════════════════════════════════════
301# Backend Factory — single function to create the best available backend
302# ═══════════════════════════════════════════════════════════════
304def create_coordinator(agent_id: str = None):
305 """Create a DistributedTaskCoordinator with the best available backend.
307 Priority:
308 1. Redis (if reachable) — shared across nodes, pub/sub, distributed locks
309 2. In-memory (fallback) — single-node, thread-safe, no external deps
311 Gossip bridge is always attached when peers exist, regardless of backend.
312 This allows in-memory nodes to announce tasks to peers.
314 Returns:
315 (coordinator, backend_type) tuple, or (None, None) if creation fails
316 """
317 agent_id = agent_id or os.environ.get('HEVOLVE_AGENT_ID', 'local')
319 # In bundled/desktop mode, Redis is never available — skip the attempt
320 # entirely to avoid a 1-30s timeout stall on every startup.
321 if not os.environ.get('NUNBA_BUNDLED'):
322 coordinator = _try_redis_backend(agent_id)
323 if coordinator:
324 return coordinator, 'redis'
326 # Fall back to in-memory + JSON file backend
327 coordinator = _create_inmemory_backend(agent_id)
328 if coordinator:
329 return coordinator, 'inmemory'
331 return None, None
334def _try_redis_backend(agent_id: str):
335 """Try to create coordinator with Redis backend."""
336 try:
337 import redis
338 host = os.environ.get('REDIS_HOST', 'localhost')
339 port = int(os.environ.get('REDIS_PORT', 6379))
341 # Quick connectivity check — fail fast, no retries
342 r = redis.Redis(host=host, port=port, decode_responses=True,
343 socket_connect_timeout=1, socket_timeout=1,
344 retry_on_timeout=False)
345 r.ping()
347 from agent_ledger import SmartLedger, RedisBackend
348 from agent_ledger.distributed import DistributedTaskLock
349 from agent_ledger.verification import TaskVerification, TaskBaseline
351 backend = RedisBackend(host=host, port=port)
352 shared_redis = backend.redis_client
354 ledger = SmartLedger(
355 agent_id=agent_id,
356 session_id='distributed',
357 backend=backend,
358 )
359 ledger.enable_pubsub(shared_redis)
361 from .task_coordinator import DistributedTaskCoordinator
362 coordinator = DistributedTaskCoordinator(
363 ledger=ledger,
364 task_lock=DistributedTaskLock(shared_redis),
365 verifier=TaskVerification(shared_redis),
366 baseline=TaskBaseline(backend),
367 )
368 logger.info("Distributed coordinator: Redis backend active")
369 return coordinator
371 except Exception as e:
372 logger.debug(f"Redis backend unavailable: {e}")
373 return None
376def _create_inmemory_backend(agent_id: str):
377 """Create coordinator with in-memory/JSON backend (no Redis needed)."""
378 try:
379 from agent_ledger import SmartLedger, JSONBackend
380 from agent_ledger.verification import TaskVerification, TaskBaseline
382 # Use agent_data directory for JSON persistence (must be absolute
383 # and writable — never use relative paths, which resolve to the
384 # read-only install dir in bundled mode).
385 db_path = os.environ.get('HEVOLVE_DB_PATH', '')
386 if db_path and db_path != ':memory:' and os.path.isabs(db_path):
387 storage_dir = os.path.join(os.path.dirname(db_path), 'distributed_tasks')
388 else:
389 # Always fall back to user-writable data dir
390 try:
391 from core.platform_paths import get_agent_data_dir
392 storage_dir = os.path.join(get_agent_data_dir(), 'distributed_tasks')
393 except ImportError:
394 storage_dir = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'distributed_tasks')
395 os.makedirs(storage_dir, exist_ok=True)
397 backend = JSONBackend(storage_dir=storage_dir)
398 ledger = SmartLedger(
399 agent_id=agent_id,
400 session_id='distributed',
401 backend=backend,
402 )
404 from .task_coordinator import DistributedTaskCoordinator
405 coordinator = DistributedTaskCoordinator(
406 ledger=ledger,
407 task_lock=InMemoryTaskLock(),
408 verifier=TaskVerification(), # In-memory verification
409 baseline=TaskBaseline(), # In-memory baselines
410 )
411 logger.info("Distributed coordinator: in-memory backend active (no Redis)")
412 return coordinator
414 except Exception as e:
415 logger.warning(f"In-memory backend creation failed: {e}")
416 return None