Coverage for integrations / distributed_agent / api.py: 29.4%
214 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Distributed Agent API — generic endpoints for ANY agent type.
4Not tied to coding agents. Any agent (coding, research, music, teaching)
5can use these endpoints for distributed task coordination, host registration,
6verification, and baselining.
8Backend-agnostic: coordinator auto-selects Redis when available, falls
9back to in-memory + JSON. No external dependency required for single-node.
10Multi-node without Redis uses peer gossip (HTTP REST).
12Routes: /api/distributed/*
13Blueprint: distributed_agent_bp
14"""
16import os
17import time
18import logging
19import threading
20from collections import deque
21from flask import Blueprint, request, jsonify, g
23from integrations.social.auth import require_auth, require_admin
25logger = logging.getLogger(__name__)
27distributed_agent_bp = Blueprint('distributed_agent', __name__)
29# Track which backend is active
30_coordinator_backend_type = None
32# ─── Delegation subscriber state ───────────────────────────────────
33# When Redis is available, a daemon thread subscribes to
34# `LedgerPubSub.CHANNEL_DELEGATION` so distributed nodes see real-time
35# delegation broadcasts from create_recipe / reuse_recipe ledgers.
36# Ring buffer capped at 100 so memory cannot grow unbounded.
37# Gated entirely on Redis presence — no Redis → no subscriber, and the
38# rest of /api/distributed/* keeps working via in-memory coordinator.
39_delegation_sub_started: bool = False
40_delegation_sub_lock = threading.Lock()
41_recent_delegations: "deque[dict]" = deque(maxlen=100)
44def _ensure_delegation_subscriber() -> bool:
45 """Lazy-start the CHANNEL_DELEGATION subscriber.
47 Idempotent (the `_delegation_sub_started` flag prevents double
48 starts). Gated on:
50 * Redis reachable via `_get_redis_client()` + `.ping()`.
51 * `agent_ledger.pubsub` importable.
53 Fails silently at debug level. Returns True iff the subscriber is
54 active after the call; callers (endpoints, tests) can use the
55 return value to surface status without raising.
56 """
57 global _delegation_sub_started
58 with _delegation_sub_lock:
59 if _delegation_sub_started:
60 return True
61 redis_client = _get_redis_client()
62 if not redis_client:
63 return False
64 try:
65 redis_client.ping()
66 except Exception as e:
67 logger.debug(f"[delegation_subscriber] redis ping failed: {e}")
68 return False
69 try:
70 from agent_ledger.pubsub import LedgerPubSub
71 except Exception as e:
72 logger.debug(f"[delegation_subscriber] pubsub import failed: {e}")
73 return False
74 try:
75 node_id = os.environ.get('HEVOLVE_NODE_ID', 'distributed_node')
76 pubsub = LedgerPubSub(redis_client, agent_id=node_id)
78 def _on_message(channel: str, data: dict) -> None:
79 try:
80 _recent_delegations.append({
81 'channel': channel,
82 'data': data,
83 'received_at': time.time(),
84 })
85 logger.info(
86 f"[delegation_subscriber] {channel}: "
87 f"{str(data)[:160]}"
88 )
89 except Exception as e:
90 logger.debug(
91 f"[delegation_subscriber] handler error: {e}"
92 )
94 pubsub.subscribe(
95 [LedgerPubSub.CHANNEL_DELEGATION], _on_message,
96 )
97 _delegation_sub_started = True
98 logger.info(
99 f"[delegation_subscriber] started as node={node_id}"
100 )
101 return True
102 except Exception as e:
103 logger.debug(f"[delegation_subscriber] start failed: {e}")
104 return False
107@distributed_agent_bp.before_request
108def _bp_before_request_ensure_subscriber():
109 """One-shot subscriber bootstrap on the first /api/distributed/*
110 request. After it succeeds the guard short-circuits cheaply —
111 no per-request overhead in steady state."""
112 if not _delegation_sub_started:
113 _ensure_delegation_subscriber()
116# ─── Shared helpers ───
118def _get_redis_client():
119 """Get Redis client from environment or return None."""
120 try:
121 import redis
122 host = os.environ.get('REDIS_HOST', 'localhost')
123 port = int(os.environ.get('REDIS_PORT', 6379))
124 return redis.Redis(host=host, port=port, decode_responses=True,
125 socket_connect_timeout=1, socket_timeout=1,
126 retry_on_timeout=False)
127 except Exception:
128 return None
131def _get_coordinator():
132 """Lazy-init DistributedTaskCoordinator (singleton).
134 Backend priority: Redis → in-memory. No Redis required.
135 Single-node hives work with in-memory backend.
136 """
137 global _coordinator_backend_type
138 if not hasattr(_get_coordinator, '_instance'):
139 from .coordinator_backends import create_coordinator
140 coordinator, backend_type = create_coordinator()
141 _get_coordinator._instance = coordinator
142 _coordinator_backend_type = backend_type
143 return _get_coordinator._instance
146def get_coordinator_backend_type() -> str:
147 """Return which backend the coordinator is using ('redis', 'inmemory', or None)."""
148 return _coordinator_backend_type
151def _get_host_registry(host_id: str = "query", host_url: str = ""):
152 """Get host registry (Redis-backed or in-memory)."""
153 redis_client = _get_redis_client()
154 if redis_client:
155 from .host_registry import RegionalHostRegistry
156 return RegionalHostRegistry(redis_client, host_id=host_id, host_url=host_url)
157 else:
158 from .coordinator_backends import InMemoryHostRegistry
159 # Singleton in-memory registry
160 if not hasattr(_get_host_registry, '_instance'):
161 _get_host_registry._instance = InMemoryHostRegistry(
162 host_id=host_id, host_url=host_url
163 )
164 return _get_host_registry._instance
167def _no_coordinator():
168 return jsonify({
169 'success': False,
170 'error': 'Coordinator not available (neither Redis nor in-memory could initialize)',
171 }), 503
174# ─── Task announcement (gossip-based distribution) ───
176@distributed_agent_bp.route('/api/distributed/tasks/announce', methods=['POST'])
177@require_auth
178def announce_tasks():
179 """Receive task announcements from peer nodes (gossip protocol).
180 Supports E2E encrypted envelopes."""
181 coordinator = _get_coordinator()
182 if not coordinator:
183 return _no_coordinator()
185 data = request.get_json() or {}
186 # Decrypt E2E encrypted task announcement
187 if data.get('encrypted') and data.get('envelope'):
188 try:
189 from security.channel_encryption import decrypt_json_from_peer
190 decrypted = decrypt_json_from_peer(data['envelope'])
191 if decrypted:
192 data = decrypted
193 except Exception:
194 pass # Decryption failed, try using data as-is
195 goal_id = data.get('goal_id')
196 objective = data.get('objective', '')
197 tasks = data.get('tasks', [])
198 context = data.get('context', {})
200 sender_host = data.get('sender_host', '')
202 if not goal_id or not tasks:
203 return jsonify({'success': False, 'error': 'goal_id and tasks required'}), 400
205 # Add tasks to local coordinator (idempotent — skip if goal already exists)
206 try:
207 existing = coordinator.get_goal_progress(goal_id)
208 if 'error' not in existing:
209 return jsonify({
210 'success': True,
211 'message': 'goal already known',
212 'goal_id': goal_id,
213 'local_goal_id': goal_id,
214 })
215 except Exception:
216 pass
218 try:
219 # Preserve the sender's goal_id so multi-node coordination can
220 # correlate tasks across peers. submit_goal accepts an optional
221 # goal_id when the caller already has one (gossip case).
222 local_goal_id = coordinator.submit_goal(objective, tasks, context, goal_id=goal_id)
223 return jsonify({
224 'success': True,
225 'goal_id': goal_id,
226 'local_goal_id': local_goal_id,
227 'sender_host': sender_host,
228 })
229 except TypeError:
230 # Fallback: coordinator.submit_goal does not accept goal_id kwarg yet
231 local_goal_id = coordinator.submit_goal(objective, tasks, context)
232 return jsonify({
233 'success': True,
234 'goal_id': goal_id,
235 'local_goal_id': local_goal_id,
236 'sender_host': sender_host,
237 })
238 except Exception as e:
239 return jsonify({'success': False, 'error': str(e)}), 500
242@distributed_agent_bp.route('/api/distributed/tasks/available', methods=['GET'])
243@require_auth
244def list_available_tasks():
245 """List unclaimed tasks (for gossip pull by peers)."""
246 coordinator = _get_coordinator()
247 if not coordinator:
248 return _no_coordinator()
250 from agent_ledger.core import TaskStatus
251 available = []
252 for task_id in coordinator._ledger.task_order:
253 task = coordinator._ledger.get_task(task_id)
254 if task and task.status == TaskStatus.PENDING:
255 available.append({
256 'task_id': task.task_id,
257 'description': task.description,
258 'capabilities_required': task.context.get('capabilities_required', []),
259 'context': task.context,
260 })
262 return jsonify({'success': True, 'tasks': available})
265# ─── Hosts ───
267@distributed_agent_bp.route('/api/distributed/hosts', methods=['GET'])
268@require_auth
269def list_hosts():
270 """List all regional hosts contributing compute."""
271 registry = _get_host_registry()
272 hosts = registry.get_all_hosts()
273 return jsonify({'success': True, 'hosts': hosts})
276@distributed_agent_bp.route('/api/distributed/hosts/register', methods=['POST'])
277@require_auth
278def register_host():
279 """Register this node as a compute contributor."""
280 data = request.get_json() or {}
281 host_id = data.get('host_id', os.environ.get('HEVOLVE_HOST_ID', 'unknown'))
282 host_url = data.get('host_url', '')
283 capabilities = data.get('capabilities', [])
284 compute_budget = data.get('compute_budget', {})
286 registry = _get_host_registry(host_id=host_id, host_url=host_url)
287 success = registry.register_host(capabilities, compute_budget)
288 return jsonify({'success': success, 'host_id': host_id})
291# ─── Tasks ───
293@distributed_agent_bp.route('/api/distributed/tasks/claim', methods=['POST'])
294@require_auth
295def claim_task():
296 """Claim the next available task matching this agent's capabilities."""
297 coordinator = _get_coordinator()
298 if not coordinator:
299 return _no_coordinator()
301 data = request.get_json() or {}
302 agent_id = data.get('agent_id', str(g.user.id))
303 if agent_id != str(g.user.id) and not getattr(g.user, 'is_admin', False):
304 return jsonify({'success': False, 'error': 'Cannot act as another agent'}), 403
305 capabilities = data.get('capabilities', [])
307 task = coordinator.claim_next_task(agent_id, capabilities)
308 if task:
309 return jsonify({
310 'success': True,
311 'task_id': task.task_id,
312 'description': task.description,
313 'context': task.context,
314 })
315 return jsonify({'success': True, 'task_id': None, 'message': 'No tasks available'})
318@distributed_agent_bp.route('/api/distributed/tasks/<task_id>/submit', methods=['POST'])
319@require_auth
320def submit_task_result(task_id):
321 """Submit a task result for verification."""
322 coordinator = _get_coordinator()
323 if not coordinator:
324 return _no_coordinator()
326 data = request.get_json() or {}
327 agent_id = data.get('agent_id', str(g.user.id))
328 if agent_id != str(g.user.id) and not getattr(g.user, 'is_admin', False):
329 return jsonify({'success': False, 'error': 'Cannot act as another agent'}), 403
330 result = data.get('result')
332 if result is None:
333 return jsonify({'success': False, 'error': 'result is required'}), 400
335 info = coordinator.submit_result(task_id, agent_id, result)
336 return jsonify({'success': True, **info})
339@distributed_agent_bp.route('/api/distributed/tasks/<task_id>/verify', methods=['POST'])
340@require_auth
341def verify_task_result(task_id):
342 """Verify another agent's task result."""
343 coordinator = _get_coordinator()
344 if not coordinator:
345 return _no_coordinator()
347 data = request.get_json() or {}
348 verifying_agent = data.get('agent_id', str(g.user.id))
349 if verifying_agent != str(g.user.id) and not getattr(g.user, 'is_admin', False):
350 return jsonify({'success': False, 'error': 'Cannot act as another agent'}), 403
352 passed = coordinator.verify_result(task_id, verifying_agent)
353 return jsonify({'success': True, 'task_id': task_id, 'verified': passed})
356# ─── Goals ───
358@distributed_agent_bp.route('/api/distributed/goals', methods=['POST'])
359@require_auth
360def submit_goal():
361 """Submit a goal with decomposed tasks. Works for any agent type."""
362 coordinator = _get_coordinator()
363 if not coordinator:
364 return _no_coordinator()
366 data = request.get_json() or {}
367 objective = data.get('objective')
368 tasks = data.get('tasks', [])
369 context = data.get('context', {})
371 if not objective:
372 return jsonify({'success': False, 'error': 'objective is required'}), 400
373 if not tasks:
374 return jsonify({'success': False, 'error': 'tasks list is required'}), 400
376 goal_id = coordinator.submit_goal(objective, tasks, context)
378 # Announce to peers via gossip if we have peers
379 try:
380 from .coordinator_backends import GossipTaskBridge
381 bridge = GossipTaskBridge()
382 bridge.announce_goal(goal_id, objective, tasks, context)
383 except Exception:
384 pass
386 return jsonify({'success': True, 'goal_id': goal_id})
389@distributed_agent_bp.route('/api/distributed/goals/<goal_id>/progress', methods=['GET'])
390@require_auth
391def goal_progress(goal_id):
392 """Get distributed progress for a goal."""
393 coordinator = _get_coordinator()
394 if not coordinator:
395 return _no_coordinator()
397 progress = coordinator.get_goal_progress(goal_id)
398 return jsonify({'success': True, **progress})
401# ─── Baselines ───
403@distributed_agent_bp.route('/api/distributed/baselines', methods=['POST'])
404@require_auth
405def create_baseline():
406 """Create a progress baseline snapshot."""
407 coordinator = _get_coordinator()
408 if not coordinator:
409 return _no_coordinator()
411 data = request.get_json() or {}
412 label = data.get('label', '')
414 snapshot_id = coordinator.create_baseline(label)
415 return jsonify({'success': True, 'snapshot_id': snapshot_id})
418# ─── Status ───
420@distributed_agent_bp.route('/api/distributed/status', methods=['GET'])
421@require_auth
422def coordinator_status():
423 """Report coordinator status and backend type."""
424 coordinator = _get_coordinator()
425 # Attempt lazy subscriber start here too — /status is typically the
426 # first endpoint admin UIs hit, so this doubles as a bootstrap.
427 _ensure_delegation_subscriber()
428 return jsonify({
429 'success': True,
430 'coordinator_active': coordinator is not None,
431 'backend_type': _coordinator_backend_type,
432 'delegation_subscriber_active': _delegation_sub_started,
433 })
436@distributed_agent_bp.route('/api/distributed/delegations/recent', methods=['GET'])
437@require_auth
438def recent_delegations():
439 """Return the most recent delegation broadcasts received over the
440 ``agent_ledger:delegation`` Redis channel.
442 Response shape::
444 {
445 "success": true,
446 "subscriber_active": bool,
447 "delegations": [
448 {"channel": str, "data": {...}, "received_at": float},
449 ...
450 ]
451 }
453 When Redis is unavailable the subscriber never starts and the list
454 stays empty — this is the correct degradation for single-node /
455 flat-tier deployments (Nunba desktop). Admin UIs can poll this to
456 visualize the distributed ledger traffic.
457 """
458 # Best-effort start on read so admins can see status even if no
459 # prior distributed endpoint has been hit yet.
460 _ensure_delegation_subscriber()
461 return jsonify({
462 'success': True,
463 'subscriber_active': _delegation_sub_started,
464 'delegations': list(_recent_delegations),
465 })