Coverage for integrations / distributed_agent / task_coordinator.py: 88.2%

153 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Distributed Task Coordinator — cross-host task orchestration for ANY agent type. 

3 

4Composes existing components: 

5- SmartLedger (agent_ledger.core) for task state management 

6- DistributedTaskLock (agent_ledger.distributed) for atomic task claiming 

7- LedgerPubSub (agent_ledger.pubsub) for cross-host notifications 

8- TaskVerification + TaskBaseline (agent_ledger.verification) for trust 

9- RegionalHostRegistry (this package) for host discovery 

10 

11Workflow: 

121. Any host decomposes a goal into tasks (coding, research, music, etc.) 

132. Tasks are published to shared Redis 

143. Regional hosts claim tasks atomically (DistributedTaskLock) 

154. Results are verified via SHA-256 hashing 

165. Progress is baselined periodically 

17""" 

18 

19import logging 

20import uuid 

21from datetime import datetime 

22from typing import Any, Dict, List, Optional 

23 

24from agent_ledger.core import SmartLedger, Task, TaskType, TaskStatus 

25from agent_ledger.distributed import DistributedTaskLock 

26from agent_ledger.verification import TaskVerification, TaskBaseline 

27from core.constants import HIVE_DEPTH 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32class HiveDepthExceeded(ValueError): 

33 """Raised when a propagation request exceeds HIVE_DEPTH hops. 

34 

35 The Hevolve topology is a 3-level pyramid (flat → regional → central). 

36 Anything deeper is a bug (cycle) or a policy violation; the coordinator 

37 surfaces this as a dedicated exception so callers can log a structured 

38 audit event rather than silently eating a generic ValueError. 

39 """ 

40 

41 

42class DistributedTaskCoordinator: 

43 """ 

44 Coordinates task delegation across regional hosts. 

45 

46 Agent-type agnostic — works for coding agents, research agents, 

47 music agents, or any other domain. The `context` dict on each task 

48 carries domain-specific metadata (repo_url, genre, search_query, etc.). 

49 """ 

50 

51 def __init__( 

52 self, 

53 ledger: SmartLedger, 

54 task_lock: DistributedTaskLock, 

55 verifier: Optional[TaskVerification] = None, 

56 baseline: Optional[TaskBaseline] = None, 

57 ): 

58 self._ledger = ledger 

59 self._lock = task_lock 

60 self._verifier = verifier or TaskVerification() 

61 self._baseline = baseline or TaskBaseline(ledger.backend) 

62 

63 def submit_goal( 

64 self, 

65 objective: str, 

66 decomposed_tasks: List[Dict[str, Any]], 

67 context: Optional[Dict[str, Any]] = None, 

68 ) -> str: 

69 """ 

70 Submit a goal with pre-decomposed tasks. 

71 

72 Args: 

73 objective: High-level goal description (any domain) 

74 decomposed_tasks: List of {"task_id": ..., "description": ..., "capabilities": [...]} 

75 context: Optional domain-specific metadata (e.g. repo_url, genre, dataset_path). 

76 Stored on the parent task and inherited by children. 

77 

78 Returns: 

79 goal_id (parent task ID) 

80 """ 

81 goal_id = f"goal_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" 

82 context = context or {} 

83 

84 # Enforce HIVE_DEPTH — reject propagations deeper than the 

85 # published 3-level topology (flat → regional → central). A 

86 # missing/zero hop is treated as the origin submission (hop=0); 

87 # any forwarder must increment before calling submit_goal again. 

88 hop = int(context.get("hop", 0) or 0) 

89 if hop >= HIVE_DEPTH: 

90 raise HiveDepthExceeded( 

91 f"submit_goal: hop={hop} >= HIVE_DEPTH={HIVE_DEPTH}; " 

92 f"objective={objective!r} rejected to prevent hive propagation cycles" 

93 ) 

94 # Stamp the hop we just accepted so children inherit it. 

95 context["hop"] = hop 

96 

97 # Create parent goal task 

98 parent = Task( 

99 task_id=goal_id, 

100 description=f"[GOAL] {objective}", 

101 task_type=TaskType.PRE_ASSIGNED, 

102 ) 

103 parent.context["objective"] = objective 

104 parent.context.update(context) 

105 self._ledger.add_task(parent) 

106 self._ledger.update_task_status(goal_id, TaskStatus.IN_PROGRESS) 

107 

108 # Create child tasks 

109 for task_def in decomposed_tasks: 

110 child_id = task_def.get("task_id", f"{goal_id}_sub_{len(self._ledger.tasks)}") 

111 child = Task( 

112 task_id=child_id, 

113 description=task_def["description"], 

114 task_type=TaskType.AUTONOMOUS, 

115 ) 

116 child.context["capabilities_required"] = task_def.get("capabilities", []) 

117 # Inherit parent context so children know the domain 

118 child.context.update(context) 

119 child.parent_task_id = goal_id 

120 self._ledger.add_task(child) 

121 parent.child_task_ids.append(child_id) 

122 

123 self._ledger.save() 

124 

125 # Create initial baseline 

126 self._baseline.create_snapshot(self._ledger, label=f"goal_submitted_{goal_id}") 

127 

128 logger.info(f"Goal submitted: {goal_id} with {len(decomposed_tasks)} tasks") 

129 return goal_id 

130 

131 def claim_next_task( 

132 self, 

133 agent_id: str, 

134 capabilities: Optional[List[str]] = None, 

135 ) -> Optional[Task]: 

136 """ 

137 Atomically claim the next available task matching capabilities. 

138 

139 Uses DistributedTaskLock to prevent double-claiming. 

140 """ 

141 for task_id in self._ledger.task_order: 

142 task = self._ledger.get_task(task_id) 

143 if not task or task.status != TaskStatus.PENDING: 

144 continue 

145 

146 # Check capability match 

147 if capabilities: 

148 required = task.context.get("capabilities_required", []) 

149 if required and not any(c in capabilities for c in required): 

150 continue 

151 

152 # Try atomic claim — with heartbeat so the lock is renewed 

153 # every HEARTBEAT_INTERVAL until submit_result / release_task. 

154 # Protects tasks that run longer than DEFAULT_TTL (5 min). 

155 if self._lock.try_claim_task(task_id, agent_id, heartbeat=True): 

156 self._ledger.update_task_status(task_id, TaskStatus.IN_PROGRESS) 

157 task.context["claimed_by"] = agent_id 

158 task.context["claimed_at"] = datetime.now().isoformat() 

159 self._ledger.save() 

160 logger.info(f"Task {task_id} claimed by {agent_id}") 

161 return task 

162 

163 logger.debug(f"No available tasks for agent {agent_id}") 

164 return None 

165 

166 def claim_parallel_batch( 

167 self, 

168 agent_id: str, 

169 max_tasks: int = 4, 

170 capabilities: Optional[List[str]] = None, 

171 ) -> List[Task]: 

172 """Atomically claim multiple parallel-ready tasks for batch execution. 

173 

174 Only claims tasks with execution_mode='parallel' (set by SmartLedger 

175 create_sibling_tasks + parallel_dispatch). Sequential tasks are skipped. 

176 

177 Args: 

178 agent_id: Claiming agent's ID 

179 max_tasks: Maximum tasks to claim in one batch 

180 capabilities: Optional capability filter 

181 

182 Returns: 

183 List of claimed Task objects (may be empty) 

184 """ 

185 from agent_ledger.core import ExecutionMode 

186 

187 claimed = [] 

188 for task_id in self._ledger.task_order: 

189 if len(claimed) >= max_tasks: 

190 break 

191 

192 task = self._ledger.get_task(task_id) 

193 if not task or task.status != TaskStatus.PENDING: 

194 continue 

195 

196 # Only claim parallel-mode tasks 

197 if getattr(task, 'execution_mode', None) != ExecutionMode.PARALLEL: 

198 continue 

199 

200 # Check capability match 

201 if capabilities: 

202 required = task.context.get("capabilities_required", []) 

203 if required and not any(c in capabilities for c in required): 

204 continue 

205 

206 # Try atomic claim (heartbeat-protected, see claim_next_task) 

207 if self._lock.try_claim_task(task_id, agent_id, heartbeat=True): 

208 self._ledger.update_task_status(task_id, TaskStatus.IN_PROGRESS) 

209 task.context["claimed_by"] = agent_id 

210 task.context["claimed_at"] = datetime.now().isoformat() 

211 claimed.append(task) 

212 

213 if claimed: 

214 self._ledger.save() 

215 logger.info( 

216 f"{len(claimed)} parallel tasks claimed by {agent_id}: " 

217 f"{[t.task_id for t in claimed]}") 

218 

219 return claimed 

220 

221 def submit_result( 

222 self, 

223 task_id: str, 

224 agent_id: str, 

225 result: Any, 

226 ) -> Dict[str, Any]: 

227 """ 

228 Submit a task result for verification. 

229 

230 Computes SHA-256 hash and publishes verification request via PubSub. 

231 """ 

232 result_hash = TaskVerification.compute_result_hash(result) 

233 

234 self._ledger.complete_task(task_id, result=result) 

235 self._lock.release_task(task_id, agent_id) 

236 

237 # Store result_hash so verify_result() can compare later 

238 task = self._ledger.get_task(task_id) 

239 if task: 

240 task.context["result_hash"] = result_hash 

241 self._ledger.save() 

242 

243 # Publish verification request if pubsub is enabled 

244 if hasattr(self._ledger, '_pubsub') and self._ledger._pubsub: 

245 self._ledger._pubsub.publish_verification_request(task_id, result_hash) 

246 self._notify_goal_contribution( 

247 task_id, agent_id, 

248 task.description if task else "a task", 

249 ) 

250 

251 logger.info(f"Result submitted: task={task_id} agent={agent_id} hash={result_hash[:16]}...") 

252 return { 

253 "task_id": task_id, 

254 "result_hash": result_hash, 

255 "status": "completed", 

256 } 

257 

258 def _notify_goal_contribution(self, task_id: str, agent_id: str, task_description: str): 

259 """Notify the user who owns the agent that their agent contributed to a goal.""" 

260 try: 

261 task = self._ledger.get_task(task_id) 

262 if not task or not task.parent_task_id: 

263 return 

264 parent = self._ledger.get_task(task.parent_task_id) 

265 if not parent: 

266 return 

267 

268 objective = parent.context.get("objective", parent.description) 

269 user_id = agent_id # agent_id IS str(g.user.id) — set in api.py 

270 

271 # Use Flask request context db if available, else open a fresh session 

272 try: 

273 from flask import g as flask_g 

274 db = flask_g.db 

275 owns_session = False 

276 except (RuntimeError, AttributeError): 

277 from integrations.social.models import get_db 

278 db = get_db() 

279 owns_session = True 

280 

281 from integrations.social.services import NotificationService 

282 message = f'Your agent contributed to "{objective}": completed "{task_description}"' 

283 notif = NotificationService.create( 

284 db, user_id, 'goal_contribution', 

285 source_user_id=None, 

286 target_type='goal', 

287 target_id=task.parent_task_id, 

288 message=message, 

289 ) 

290 

291 if owns_session: 

292 db.commit() 

293 db.close() 

294 

295 # Push real-time notification via WAMP (fires silently if Crossbar unavailable) 

296 try: 

297 from integrations.social.realtime import on_notification 

298 on_notification(user_id, notif.to_dict()) 

299 except Exception: 

300 pass 

301 

302 logger.info(f"Notified user {user_id}: goal contribution for {task_id}") 

303 except Exception as e: 

304 logger.warning(f"Failed to notify goal contribution: {e}") 

305 

306 def verify_result(self, task_id: str, verifying_agent: str) -> bool: 

307 """ 

308 Verify another agent's task result. 

309 

310 Re-computes hash and compares. Records verification. 

311 """ 

312 task = self._ledger.get_task(task_id) 

313 if not task or task.result is None: 

314 logger.warning(f"Cannot verify task {task_id}: no result") 

315 return False 

316 

317 current_hash = TaskVerification.compute_result_hash(task.result) 

318 stored_hash = task.context.get("result_hash", "") 

319 

320 verified = current_hash == stored_hash 

321 self._verifier.record_verification(task_id, current_hash, verifying_agent, verified) 

322 

323 logger.info(f"Verification: task={task_id} by={verifying_agent} passed={verified}") 

324 return verified 

325 

326 def get_goal_progress(self, goal_id: str) -> Dict[str, Any]: 

327 """Get progress across all hosts for a goal.""" 

328 parent = self._ledger.get_task(goal_id) 

329 if not parent: 

330 return {"error": f"Goal {goal_id} not found"} 

331 

332 child_ids = parent.child_task_ids 

333 children = [] 

334 completed = 0 

335 

336 for child_id in child_ids: 

337 child = self._ledger.get_task(child_id) 

338 if child: 

339 status = child.status.value if hasattr(child.status, "value") else str(child.status) 

340 children.append({ 

341 "task_id": child_id, 

342 "description": child.description, 

343 "status": status, 

344 "claimed_by": child.context.get("claimed_by"), 

345 "result_hash": child.context.get("result_hash"), 

346 }) 

347 if child.status == TaskStatus.COMPLETED: 

348 completed += 1 

349 

350 total = len(child_ids) or 1 

351 return { 

352 "goal_id": goal_id, 

353 "objective": parent.context.get("objective", parent.description), 

354 "context": {k: v for k, v in parent.context.items() if k != "objective"}, 

355 "total_tasks": len(child_ids), 

356 "completed": completed, 

357 "progress_pct": round(completed / total * 100, 1), 

358 "tasks": children, 

359 } 

360 

361 def create_baseline(self, label: str = "") -> str: 

362 """Create a snapshot baseline of current progress.""" 

363 return self._baseline.create_snapshot(self._ledger, label) 

364 

365 def compare_to_baseline(self, snapshot_id: str) -> Dict[str, Any]: 

366 """Compare current state against a baseline.""" 

367 return self._baseline.compare_to_snapshot(self._ledger, snapshot_id)