Coverage for integrations / distributed_agent / api.py: 29.4%

214 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Distributed Agent API — generic endpoints for ANY agent type. 

3 

4Not tied to coding agents. Any agent (coding, research, music, teaching) 

5can use these endpoints for distributed task coordination, host registration, 

6verification, and baselining. 

7 

8Backend-agnostic: coordinator auto-selects Redis when available, falls 

9back to in-memory + JSON. No external dependency required for single-node. 

10Multi-node without Redis uses peer gossip (HTTP REST). 

11 

12Routes: /api/distributed/* 

13Blueprint: distributed_agent_bp 

14""" 

15 

16import os 

17import time 

18import logging 

19import threading 

20from collections import deque 

21from flask import Blueprint, request, jsonify, g 

22 

23from integrations.social.auth import require_auth, require_admin 

24 

25logger = logging.getLogger(__name__) 

26 

27distributed_agent_bp = Blueprint('distributed_agent', __name__) 

28 

29# Track which backend is active 

30_coordinator_backend_type = None 

31 

32# ─── Delegation subscriber state ─────────────────────────────────── 

33# When Redis is available, a daemon thread subscribes to 

34# `LedgerPubSub.CHANNEL_DELEGATION` so distributed nodes see real-time 

35# delegation broadcasts from create_recipe / reuse_recipe ledgers. 

36# Ring buffer capped at 100 so memory cannot grow unbounded. 

37# Gated entirely on Redis presence — no Redis → no subscriber, and the 

38# rest of /api/distributed/* keeps working via in-memory coordinator. 

39_delegation_sub_started: bool = False 

40_delegation_sub_lock = threading.Lock() 

41_recent_delegations: "deque[dict]" = deque(maxlen=100) 

42 

43 

44def _ensure_delegation_subscriber() -> bool: 

45 """Lazy-start the CHANNEL_DELEGATION subscriber. 

46 

47 Idempotent (the `_delegation_sub_started` flag prevents double 

48 starts). Gated on: 

49 

50 * Redis reachable via `_get_redis_client()` + `.ping()`. 

51 * `agent_ledger.pubsub` importable. 

52 

53 Fails silently at debug level. Returns True iff the subscriber is 

54 active after the call; callers (endpoints, tests) can use the 

55 return value to surface status without raising. 

56 """ 

57 global _delegation_sub_started 

58 with _delegation_sub_lock: 

59 if _delegation_sub_started: 

60 return True 

61 redis_client = _get_redis_client() 

62 if not redis_client: 

63 return False 

64 try: 

65 redis_client.ping() 

66 except Exception as e: 

67 logger.debug(f"[delegation_subscriber] redis ping failed: {e}") 

68 return False 

69 try: 

70 from agent_ledger.pubsub import LedgerPubSub 

71 except Exception as e: 

72 logger.debug(f"[delegation_subscriber] pubsub import failed: {e}") 

73 return False 

74 try: 

75 node_id = os.environ.get('HEVOLVE_NODE_ID', 'distributed_node') 

76 pubsub = LedgerPubSub(redis_client, agent_id=node_id) 

77 

78 def _on_message(channel: str, data: dict) -> None: 

79 try: 

80 _recent_delegations.append({ 

81 'channel': channel, 

82 'data': data, 

83 'received_at': time.time(), 

84 }) 

85 logger.info( 

86 f"[delegation_subscriber] {channel}: " 

87 f"{str(data)[:160]}" 

88 ) 

89 except Exception as e: 

90 logger.debug( 

91 f"[delegation_subscriber] handler error: {e}" 

92 ) 

93 

94 pubsub.subscribe( 

95 [LedgerPubSub.CHANNEL_DELEGATION], _on_message, 

96 ) 

97 _delegation_sub_started = True 

98 logger.info( 

99 f"[delegation_subscriber] started as node={node_id}" 

100 ) 

101 return True 

102 except Exception as e: 

103 logger.debug(f"[delegation_subscriber] start failed: {e}") 

104 return False 

105 

106 

107@distributed_agent_bp.before_request 

108def _bp_before_request_ensure_subscriber(): 

109 """One-shot subscriber bootstrap on the first /api/distributed/* 

110 request. After it succeeds the guard short-circuits cheaply — 

111 no per-request overhead in steady state.""" 

112 if not _delegation_sub_started: 

113 _ensure_delegation_subscriber() 

114 

115 

116# ─── Shared helpers ─── 

117 

118def _get_redis_client(): 

119 """Get Redis client from environment or return None.""" 

120 try: 

121 import redis 

122 host = os.environ.get('REDIS_HOST', 'localhost') 

123 port = int(os.environ.get('REDIS_PORT', 6379)) 

124 return redis.Redis(host=host, port=port, decode_responses=True, 

125 socket_connect_timeout=1, socket_timeout=1, 

126 retry_on_timeout=False) 

127 except Exception: 

128 return None 

129 

130 

131def _get_coordinator(): 

132 """Lazy-init DistributedTaskCoordinator (singleton). 

133 

134 Backend priority: Redis → in-memory. No Redis required. 

135 Single-node hives work with in-memory backend. 

136 """ 

137 global _coordinator_backend_type 

138 if not hasattr(_get_coordinator, '_instance'): 

139 from .coordinator_backends import create_coordinator 

140 coordinator, backend_type = create_coordinator() 

141 _get_coordinator._instance = coordinator 

142 _coordinator_backend_type = backend_type 

143 return _get_coordinator._instance 

144 

145 

146def get_coordinator_backend_type() -> str: 

147 """Return which backend the coordinator is using ('redis', 'inmemory', or None).""" 

148 return _coordinator_backend_type 

149 

150 

151def _get_host_registry(host_id: str = "query", host_url: str = ""): 

152 """Get host registry (Redis-backed or in-memory).""" 

153 redis_client = _get_redis_client() 

154 if redis_client: 

155 from .host_registry import RegionalHostRegistry 

156 return RegionalHostRegistry(redis_client, host_id=host_id, host_url=host_url) 

157 else: 

158 from .coordinator_backends import InMemoryHostRegistry 

159 # Singleton in-memory registry 

160 if not hasattr(_get_host_registry, '_instance'): 

161 _get_host_registry._instance = InMemoryHostRegistry( 

162 host_id=host_id, host_url=host_url 

163 ) 

164 return _get_host_registry._instance 

165 

166 

167def _no_coordinator(): 

168 return jsonify({ 

169 'success': False, 

170 'error': 'Coordinator not available (neither Redis nor in-memory could initialize)', 

171 }), 503 

172 

173 

174# ─── Task announcement (gossip-based distribution) ─── 

175 

176@distributed_agent_bp.route('/api/distributed/tasks/announce', methods=['POST']) 

177@require_auth 

178def announce_tasks(): 

179 """Receive task announcements from peer nodes (gossip protocol). 

180 Supports E2E encrypted envelopes.""" 

181 coordinator = _get_coordinator() 

182 if not coordinator: 

183 return _no_coordinator() 

184 

185 data = request.get_json() or {} 

186 # Decrypt E2E encrypted task announcement 

187 if data.get('encrypted') and data.get('envelope'): 

188 try: 

189 from security.channel_encryption import decrypt_json_from_peer 

190 decrypted = decrypt_json_from_peer(data['envelope']) 

191 if decrypted: 

192 data = decrypted 

193 except Exception: 

194 pass # Decryption failed, try using data as-is 

195 goal_id = data.get('goal_id') 

196 objective = data.get('objective', '') 

197 tasks = data.get('tasks', []) 

198 context = data.get('context', {}) 

199 

200 sender_host = data.get('sender_host', '') 

201 

202 if not goal_id or not tasks: 

203 return jsonify({'success': False, 'error': 'goal_id and tasks required'}), 400 

204 

205 # Add tasks to local coordinator (idempotent — skip if goal already exists) 

206 try: 

207 existing = coordinator.get_goal_progress(goal_id) 

208 if 'error' not in existing: 

209 return jsonify({ 

210 'success': True, 

211 'message': 'goal already known', 

212 'goal_id': goal_id, 

213 'local_goal_id': goal_id, 

214 }) 

215 except Exception: 

216 pass 

217 

218 try: 

219 # Preserve the sender's goal_id so multi-node coordination can 

220 # correlate tasks across peers. submit_goal accepts an optional 

221 # goal_id when the caller already has one (gossip case). 

222 local_goal_id = coordinator.submit_goal(objective, tasks, context, goal_id=goal_id) 

223 return jsonify({ 

224 'success': True, 

225 'goal_id': goal_id, 

226 'local_goal_id': local_goal_id, 

227 'sender_host': sender_host, 

228 }) 

229 except TypeError: 

230 # Fallback: coordinator.submit_goal does not accept goal_id kwarg yet 

231 local_goal_id = coordinator.submit_goal(objective, tasks, context) 

232 return jsonify({ 

233 'success': True, 

234 'goal_id': goal_id, 

235 'local_goal_id': local_goal_id, 

236 'sender_host': sender_host, 

237 }) 

238 except Exception as e: 

239 return jsonify({'success': False, 'error': str(e)}), 500 

240 

241 

242@distributed_agent_bp.route('/api/distributed/tasks/available', methods=['GET']) 

243@require_auth 

244def list_available_tasks(): 

245 """List unclaimed tasks (for gossip pull by peers).""" 

246 coordinator = _get_coordinator() 

247 if not coordinator: 

248 return _no_coordinator() 

249 

250 from agent_ledger.core import TaskStatus 

251 available = [] 

252 for task_id in coordinator._ledger.task_order: 

253 task = coordinator._ledger.get_task(task_id) 

254 if task and task.status == TaskStatus.PENDING: 

255 available.append({ 

256 'task_id': task.task_id, 

257 'description': task.description, 

258 'capabilities_required': task.context.get('capabilities_required', []), 

259 'context': task.context, 

260 }) 

261 

262 return jsonify({'success': True, 'tasks': available}) 

263 

264 

265# ─── Hosts ─── 

266 

267@distributed_agent_bp.route('/api/distributed/hosts', methods=['GET']) 

268@require_auth 

269def list_hosts(): 

270 """List all regional hosts contributing compute.""" 

271 registry = _get_host_registry() 

272 hosts = registry.get_all_hosts() 

273 return jsonify({'success': True, 'hosts': hosts}) 

274 

275 

276@distributed_agent_bp.route('/api/distributed/hosts/register', methods=['POST']) 

277@require_auth 

278def register_host(): 

279 """Register this node as a compute contributor.""" 

280 data = request.get_json() or {} 

281 host_id = data.get('host_id', os.environ.get('HEVOLVE_HOST_ID', 'unknown')) 

282 host_url = data.get('host_url', '') 

283 capabilities = data.get('capabilities', []) 

284 compute_budget = data.get('compute_budget', {}) 

285 

286 registry = _get_host_registry(host_id=host_id, host_url=host_url) 

287 success = registry.register_host(capabilities, compute_budget) 

288 return jsonify({'success': success, 'host_id': host_id}) 

289 

290 

291# ─── Tasks ─── 

292 

293@distributed_agent_bp.route('/api/distributed/tasks/claim', methods=['POST']) 

294@require_auth 

295def claim_task(): 

296 """Claim the next available task matching this agent's capabilities.""" 

297 coordinator = _get_coordinator() 

298 if not coordinator: 

299 return _no_coordinator() 

300 

301 data = request.get_json() or {} 

302 agent_id = data.get('agent_id', str(g.user.id)) 

303 if agent_id != str(g.user.id) and not getattr(g.user, 'is_admin', False): 

304 return jsonify({'success': False, 'error': 'Cannot act as another agent'}), 403 

305 capabilities = data.get('capabilities', []) 

306 

307 task = coordinator.claim_next_task(agent_id, capabilities) 

308 if task: 

309 return jsonify({ 

310 'success': True, 

311 'task_id': task.task_id, 

312 'description': task.description, 

313 'context': task.context, 

314 }) 

315 return jsonify({'success': True, 'task_id': None, 'message': 'No tasks available'}) 

316 

317 

318@distributed_agent_bp.route('/api/distributed/tasks/<task_id>/submit', methods=['POST']) 

319@require_auth 

320def submit_task_result(task_id): 

321 """Submit a task result for verification.""" 

322 coordinator = _get_coordinator() 

323 if not coordinator: 

324 return _no_coordinator() 

325 

326 data = request.get_json() or {} 

327 agent_id = data.get('agent_id', str(g.user.id)) 

328 if agent_id != str(g.user.id) and not getattr(g.user, 'is_admin', False): 

329 return jsonify({'success': False, 'error': 'Cannot act as another agent'}), 403 

330 result = data.get('result') 

331 

332 if result is None: 

333 return jsonify({'success': False, 'error': 'result is required'}), 400 

334 

335 info = coordinator.submit_result(task_id, agent_id, result) 

336 return jsonify({'success': True, **info}) 

337 

338 

339@distributed_agent_bp.route('/api/distributed/tasks/<task_id>/verify', methods=['POST']) 

340@require_auth 

341def verify_task_result(task_id): 

342 """Verify another agent's task result.""" 

343 coordinator = _get_coordinator() 

344 if not coordinator: 

345 return _no_coordinator() 

346 

347 data = request.get_json() or {} 

348 verifying_agent = data.get('agent_id', str(g.user.id)) 

349 if verifying_agent != str(g.user.id) and not getattr(g.user, 'is_admin', False): 

350 return jsonify({'success': False, 'error': 'Cannot act as another agent'}), 403 

351 

352 passed = coordinator.verify_result(task_id, verifying_agent) 

353 return jsonify({'success': True, 'task_id': task_id, 'verified': passed}) 

354 

355 

356# ─── Goals ─── 

357 

358@distributed_agent_bp.route('/api/distributed/goals', methods=['POST']) 

359@require_auth 

360def submit_goal(): 

361 """Submit a goal with decomposed tasks. Works for any agent type.""" 

362 coordinator = _get_coordinator() 

363 if not coordinator: 

364 return _no_coordinator() 

365 

366 data = request.get_json() or {} 

367 objective = data.get('objective') 

368 tasks = data.get('tasks', []) 

369 context = data.get('context', {}) 

370 

371 if not objective: 

372 return jsonify({'success': False, 'error': 'objective is required'}), 400 

373 if not tasks: 

374 return jsonify({'success': False, 'error': 'tasks list is required'}), 400 

375 

376 goal_id = coordinator.submit_goal(objective, tasks, context) 

377 

378 # Announce to peers via gossip if we have peers 

379 try: 

380 from .coordinator_backends import GossipTaskBridge 

381 bridge = GossipTaskBridge() 

382 bridge.announce_goal(goal_id, objective, tasks, context) 

383 except Exception: 

384 pass 

385 

386 return jsonify({'success': True, 'goal_id': goal_id}) 

387 

388 

389@distributed_agent_bp.route('/api/distributed/goals/<goal_id>/progress', methods=['GET']) 

390@require_auth 

391def goal_progress(goal_id): 

392 """Get distributed progress for a goal.""" 

393 coordinator = _get_coordinator() 

394 if not coordinator: 

395 return _no_coordinator() 

396 

397 progress = coordinator.get_goal_progress(goal_id) 

398 return jsonify({'success': True, **progress}) 

399 

400 

401# ─── Baselines ─── 

402 

403@distributed_agent_bp.route('/api/distributed/baselines', methods=['POST']) 

404@require_auth 

405def create_baseline(): 

406 """Create a progress baseline snapshot.""" 

407 coordinator = _get_coordinator() 

408 if not coordinator: 

409 return _no_coordinator() 

410 

411 data = request.get_json() or {} 

412 label = data.get('label', '') 

413 

414 snapshot_id = coordinator.create_baseline(label) 

415 return jsonify({'success': True, 'snapshot_id': snapshot_id}) 

416 

417 

418# ─── Status ─── 

419 

420@distributed_agent_bp.route('/api/distributed/status', methods=['GET']) 

421@require_auth 

422def coordinator_status(): 

423 """Report coordinator status and backend type.""" 

424 coordinator = _get_coordinator() 

425 # Attempt lazy subscriber start here too — /status is typically the 

426 # first endpoint admin UIs hit, so this doubles as a bootstrap. 

427 _ensure_delegation_subscriber() 

428 return jsonify({ 

429 'success': True, 

430 'coordinator_active': coordinator is not None, 

431 'backend_type': _coordinator_backend_type, 

432 'delegation_subscriber_active': _delegation_sub_started, 

433 }) 

434 

435 

436@distributed_agent_bp.route('/api/distributed/delegations/recent', methods=['GET']) 

437@require_auth 

438def recent_delegations(): 

439 """Return the most recent delegation broadcasts received over the 

440 ``agent_ledger:delegation`` Redis channel. 

441 

442 Response shape:: 

443 

444 { 

445 "success": true, 

446 "subscriber_active": bool, 

447 "delegations": [ 

448 {"channel": str, "data": {...}, "received_at": float}, 

449 ... 

450 ] 

451 } 

452 

453 When Redis is unavailable the subscriber never starts and the list 

454 stays empty — this is the correct degradation for single-node / 

455 flat-tier deployments (Nunba desktop). Admin UIs can poll this to 

456 visualize the distributed ledger traffic. 

457 """ 

458 # Best-effort start on read so admins can see status even if no 

459 # prior distributed endpoint has been hit yet. 

460 _ensure_delegation_subscriber() 

461 return jsonify({ 

462 'success': True, 

463 'subscriber_active': _delegation_sub_started, 

464 'delegations': list(_recent_delegations), 

465 })