Coverage for integrations / distributed_agent / task_coordinator.py: 88.2%
153 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Distributed Task Coordinator — cross-host task orchestration for ANY agent type.
4Composes existing components:
5- SmartLedger (agent_ledger.core) for task state management
6- DistributedTaskLock (agent_ledger.distributed) for atomic task claiming
7- LedgerPubSub (agent_ledger.pubsub) for cross-host notifications
8- TaskVerification + TaskBaseline (agent_ledger.verification) for trust
9- RegionalHostRegistry (this package) for host discovery
11Workflow:
121. Any host decomposes a goal into tasks (coding, research, music, etc.)
132. Tasks are published to shared Redis
143. Regional hosts claim tasks atomically (DistributedTaskLock)
154. Results are verified via SHA-256 hashing
165. Progress is baselined periodically
17"""
19import logging
20import uuid
21from datetime import datetime
22from typing import Any, Dict, List, Optional
24from agent_ledger.core import SmartLedger, Task, TaskType, TaskStatus
25from agent_ledger.distributed import DistributedTaskLock
26from agent_ledger.verification import TaskVerification, TaskBaseline
27from core.constants import HIVE_DEPTH
29logger = logging.getLogger(__name__)
32class HiveDepthExceeded(ValueError):
33 """Raised when a propagation request exceeds HIVE_DEPTH hops.
35 The Hevolve topology is a 3-level pyramid (flat → regional → central).
36 Anything deeper is a bug (cycle) or a policy violation; the coordinator
37 surfaces this as a dedicated exception so callers can log a structured
38 audit event rather than silently eating a generic ValueError.
39 """
42class DistributedTaskCoordinator:
43 """
44 Coordinates task delegation across regional hosts.
46 Agent-type agnostic — works for coding agents, research agents,
47 music agents, or any other domain. The `context` dict on each task
48 carries domain-specific metadata (repo_url, genre, search_query, etc.).
49 """
51 def __init__(
52 self,
53 ledger: SmartLedger,
54 task_lock: DistributedTaskLock,
55 verifier: Optional[TaskVerification] = None,
56 baseline: Optional[TaskBaseline] = None,
57 ):
58 self._ledger = ledger
59 self._lock = task_lock
60 self._verifier = verifier or TaskVerification()
61 self._baseline = baseline or TaskBaseline(ledger.backend)
63 def submit_goal(
64 self,
65 objective: str,
66 decomposed_tasks: List[Dict[str, Any]],
67 context: Optional[Dict[str, Any]] = None,
68 ) -> str:
69 """
70 Submit a goal with pre-decomposed tasks.
72 Args:
73 objective: High-level goal description (any domain)
74 decomposed_tasks: List of {"task_id": ..., "description": ..., "capabilities": [...]}
75 context: Optional domain-specific metadata (e.g. repo_url, genre, dataset_path).
76 Stored on the parent task and inherited by children.
78 Returns:
79 goal_id (parent task ID)
80 """
81 goal_id = f"goal_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
82 context = context or {}
84 # Enforce HIVE_DEPTH — reject propagations deeper than the
85 # published 3-level topology (flat → regional → central). A
86 # missing/zero hop is treated as the origin submission (hop=0);
87 # any forwarder must increment before calling submit_goal again.
88 hop = int(context.get("hop", 0) or 0)
89 if hop >= HIVE_DEPTH:
90 raise HiveDepthExceeded(
91 f"submit_goal: hop={hop} >= HIVE_DEPTH={HIVE_DEPTH}; "
92 f"objective={objective!r} rejected to prevent hive propagation cycles"
93 )
94 # Stamp the hop we just accepted so children inherit it.
95 context["hop"] = hop
97 # Create parent goal task
98 parent = Task(
99 task_id=goal_id,
100 description=f"[GOAL] {objective}",
101 task_type=TaskType.PRE_ASSIGNED,
102 )
103 parent.context["objective"] = objective
104 parent.context.update(context)
105 self._ledger.add_task(parent)
106 self._ledger.update_task_status(goal_id, TaskStatus.IN_PROGRESS)
108 # Create child tasks
109 for task_def in decomposed_tasks:
110 child_id = task_def.get("task_id", f"{goal_id}_sub_{len(self._ledger.tasks)}")
111 child = Task(
112 task_id=child_id,
113 description=task_def["description"],
114 task_type=TaskType.AUTONOMOUS,
115 )
116 child.context["capabilities_required"] = task_def.get("capabilities", [])
117 # Inherit parent context so children know the domain
118 child.context.update(context)
119 child.parent_task_id = goal_id
120 self._ledger.add_task(child)
121 parent.child_task_ids.append(child_id)
123 self._ledger.save()
125 # Create initial baseline
126 self._baseline.create_snapshot(self._ledger, label=f"goal_submitted_{goal_id}")
128 logger.info(f"Goal submitted: {goal_id} with {len(decomposed_tasks)} tasks")
129 return goal_id
131 def claim_next_task(
132 self,
133 agent_id: str,
134 capabilities: Optional[List[str]] = None,
135 ) -> Optional[Task]:
136 """
137 Atomically claim the next available task matching capabilities.
139 Uses DistributedTaskLock to prevent double-claiming.
140 """
141 for task_id in self._ledger.task_order:
142 task = self._ledger.get_task(task_id)
143 if not task or task.status != TaskStatus.PENDING:
144 continue
146 # Check capability match
147 if capabilities:
148 required = task.context.get("capabilities_required", [])
149 if required and not any(c in capabilities for c in required):
150 continue
152 # Try atomic claim — with heartbeat so the lock is renewed
153 # every HEARTBEAT_INTERVAL until submit_result / release_task.
154 # Protects tasks that run longer than DEFAULT_TTL (5 min).
155 if self._lock.try_claim_task(task_id, agent_id, heartbeat=True):
156 self._ledger.update_task_status(task_id, TaskStatus.IN_PROGRESS)
157 task.context["claimed_by"] = agent_id
158 task.context["claimed_at"] = datetime.now().isoformat()
159 self._ledger.save()
160 logger.info(f"Task {task_id} claimed by {agent_id}")
161 return task
163 logger.debug(f"No available tasks for agent {agent_id}")
164 return None
166 def claim_parallel_batch(
167 self,
168 agent_id: str,
169 max_tasks: int = 4,
170 capabilities: Optional[List[str]] = None,
171 ) -> List[Task]:
172 """Atomically claim multiple parallel-ready tasks for batch execution.
174 Only claims tasks with execution_mode='parallel' (set by SmartLedger
175 create_sibling_tasks + parallel_dispatch). Sequential tasks are skipped.
177 Args:
178 agent_id: Claiming agent's ID
179 max_tasks: Maximum tasks to claim in one batch
180 capabilities: Optional capability filter
182 Returns:
183 List of claimed Task objects (may be empty)
184 """
185 from agent_ledger.core import ExecutionMode
187 claimed = []
188 for task_id in self._ledger.task_order:
189 if len(claimed) >= max_tasks:
190 break
192 task = self._ledger.get_task(task_id)
193 if not task or task.status != TaskStatus.PENDING:
194 continue
196 # Only claim parallel-mode tasks
197 if getattr(task, 'execution_mode', None) != ExecutionMode.PARALLEL:
198 continue
200 # Check capability match
201 if capabilities:
202 required = task.context.get("capabilities_required", [])
203 if required and not any(c in capabilities for c in required):
204 continue
206 # Try atomic claim (heartbeat-protected, see claim_next_task)
207 if self._lock.try_claim_task(task_id, agent_id, heartbeat=True):
208 self._ledger.update_task_status(task_id, TaskStatus.IN_PROGRESS)
209 task.context["claimed_by"] = agent_id
210 task.context["claimed_at"] = datetime.now().isoformat()
211 claimed.append(task)
213 if claimed:
214 self._ledger.save()
215 logger.info(
216 f"{len(claimed)} parallel tasks claimed by {agent_id}: "
217 f"{[t.task_id for t in claimed]}")
219 return claimed
221 def submit_result(
222 self,
223 task_id: str,
224 agent_id: str,
225 result: Any,
226 ) -> Dict[str, Any]:
227 """
228 Submit a task result for verification.
230 Computes SHA-256 hash and publishes verification request via PubSub.
231 """
232 result_hash = TaskVerification.compute_result_hash(result)
234 self._ledger.complete_task(task_id, result=result)
235 self._lock.release_task(task_id, agent_id)
237 # Store result_hash so verify_result() can compare later
238 task = self._ledger.get_task(task_id)
239 if task:
240 task.context["result_hash"] = result_hash
241 self._ledger.save()
243 # Publish verification request if pubsub is enabled
244 if hasattr(self._ledger, '_pubsub') and self._ledger._pubsub:
245 self._ledger._pubsub.publish_verification_request(task_id, result_hash)
246 self._notify_goal_contribution(
247 task_id, agent_id,
248 task.description if task else "a task",
249 )
251 logger.info(f"Result submitted: task={task_id} agent={agent_id} hash={result_hash[:16]}...")
252 return {
253 "task_id": task_id,
254 "result_hash": result_hash,
255 "status": "completed",
256 }
258 def _notify_goal_contribution(self, task_id: str, agent_id: str, task_description: str):
259 """Notify the user who owns the agent that their agent contributed to a goal."""
260 try:
261 task = self._ledger.get_task(task_id)
262 if not task or not task.parent_task_id:
263 return
264 parent = self._ledger.get_task(task.parent_task_id)
265 if not parent:
266 return
268 objective = parent.context.get("objective", parent.description)
269 user_id = agent_id # agent_id IS str(g.user.id) — set in api.py
271 # Use Flask request context db if available, else open a fresh session
272 try:
273 from flask import g as flask_g
274 db = flask_g.db
275 owns_session = False
276 except (RuntimeError, AttributeError):
277 from integrations.social.models import get_db
278 db = get_db()
279 owns_session = True
281 from integrations.social.services import NotificationService
282 message = f'Your agent contributed to "{objective}": completed "{task_description}"'
283 notif = NotificationService.create(
284 db, user_id, 'goal_contribution',
285 source_user_id=None,
286 target_type='goal',
287 target_id=task.parent_task_id,
288 message=message,
289 )
291 if owns_session:
292 db.commit()
293 db.close()
295 # Push real-time notification via WAMP (fires silently if Crossbar unavailable)
296 try:
297 from integrations.social.realtime import on_notification
298 on_notification(user_id, notif.to_dict())
299 except Exception:
300 pass
302 logger.info(f"Notified user {user_id}: goal contribution for {task_id}")
303 except Exception as e:
304 logger.warning(f"Failed to notify goal contribution: {e}")
306 def verify_result(self, task_id: str, verifying_agent: str) -> bool:
307 """
308 Verify another agent's task result.
310 Re-computes hash and compares. Records verification.
311 """
312 task = self._ledger.get_task(task_id)
313 if not task or task.result is None:
314 logger.warning(f"Cannot verify task {task_id}: no result")
315 return False
317 current_hash = TaskVerification.compute_result_hash(task.result)
318 stored_hash = task.context.get("result_hash", "")
320 verified = current_hash == stored_hash
321 self._verifier.record_verification(task_id, current_hash, verifying_agent, verified)
323 logger.info(f"Verification: task={task_id} by={verifying_agent} passed={verified}")
324 return verified
326 def get_goal_progress(self, goal_id: str) -> Dict[str, Any]:
327 """Get progress across all hosts for a goal."""
328 parent = self._ledger.get_task(goal_id)
329 if not parent:
330 return {"error": f"Goal {goal_id} not found"}
332 child_ids = parent.child_task_ids
333 children = []
334 completed = 0
336 for child_id in child_ids:
337 child = self._ledger.get_task(child_id)
338 if child:
339 status = child.status.value if hasattr(child.status, "value") else str(child.status)
340 children.append({
341 "task_id": child_id,
342 "description": child.description,
343 "status": status,
344 "claimed_by": child.context.get("claimed_by"),
345 "result_hash": child.context.get("result_hash"),
346 })
347 if child.status == TaskStatus.COMPLETED:
348 completed += 1
350 total = len(child_ids) or 1
351 return {
352 "goal_id": goal_id,
353 "objective": parent.context.get("objective", parent.description),
354 "context": {k: v for k, v in parent.context.items() if k != "objective"},
355 "total_tasks": len(child_ids),
356 "completed": completed,
357 "progress_pct": round(completed / total * 100, 1),
358 "tasks": children,
359 }
361 def create_baseline(self, label: str = "") -> str:
362 """Create a snapshot baseline of current progress."""
363 return self._baseline.create_snapshot(self._ledger, label)
365 def compare_to_baseline(self, snapshot_id: str) -> Dict[str, Any]:
366 """Compare current state against a baseline."""
367 return self._baseline.compare_to_snapshot(self._ledger, snapshot_id)