Coverage for integrations / agent_engine / parallel_dispatch.py: 96.4%
112 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Parallel Agent Dispatch — bridges SmartLedger task graph to concurrent execution.
4Uses the existing SmartLedger primitives:
5- get_parallel_executable_tasks() — returns all tasks ready to run NOW
6- get_next_executable_task() — respects dependencies, priorities, insertion order
7- complete_task_and_route() — auto-unblocks dependents on completion
9ThreadPoolExecutor pattern reused from speculative_dispatcher.py.
10"""
11import atexit
12import json
13import logging
14import os
15import threading
16from concurrent.futures import ThreadPoolExecutor, as_completed
17from typing import Any, Callable, Dict, List, Optional
19logger = logging.getLogger('hevolve_social')
21_executor = None
22_executor_lock = threading.Lock()
23MAX_PARALLEL_WORKERS = int(os.environ.get('HEVOLVE_PARALLEL_WORKERS', '8'))
26def get_executor() -> ThreadPoolExecutor:
27 """Lazy singleton ThreadPoolExecutor."""
28 global _executor
29 if _executor is None:
30 with _executor_lock:
31 if _executor is None:
32 _executor = ThreadPoolExecutor(
33 max_workers=MAX_PARALLEL_WORKERS,
34 thread_name_prefix='hart-parallel')
35 atexit.register(lambda: _executor.shutdown(wait=False))
36 return _executor
39def dispatch_parallel_tasks(ledger, dispatch_fn: Callable,
40 max_concurrent: int = 8) -> Dict:
41 """Fan-out all parallel-ready tasks from the ledger to concurrent workers.
43 Args:
44 ledger: SmartLedger instance with tasks loaded
45 dispatch_fn: Callable(task) -> result dict (e.g., calls /chat)
46 max_concurrent: Max simultaneous dispatches
48 Returns:
49 {completed: int, failed: int, results: {task_id: result}}
50 """
51 from agent_ledger.core import TaskStatus
53 parallel_tasks = ledger.get_parallel_executable_tasks()
54 if not parallel_tasks:
55 return {'completed': 0, 'failed': 0, 'results': {}}
57 batch = parallel_tasks[:max_concurrent]
58 executor = get_executor()
59 results = {}
60 futures = {}
62 # Mark all as IN_PROGRESS before dispatch
63 for task in batch:
64 ledger.update_task_status(task.task_id, TaskStatus.IN_PROGRESS)
65 future = executor.submit(dispatch_fn, task)
66 futures[future] = task
68 completed = 0
69 failed = 0
70 for future in as_completed(futures):
71 task = futures[future]
72 try:
73 result = future.result(timeout=300)
74 if result.get('success', True):
75 # Use update_task_status to trigger _handle_task_completion
76 # which properly unblocks dependent tasks
77 ledger.update_task_status(
78 task.task_id, TaskStatus.COMPLETED, result=result)
79 completed += 1
80 else:
81 ledger.update_task_status(
82 task.task_id, TaskStatus.FAILED,
83 error_message=str(result.get('error', 'failed')))
84 failed += 1
85 results[task.task_id] = result
86 except Exception as e:
87 logger.warning(f"Parallel task {task.task_id} failed: {e}")
88 ledger.update_task_status(
89 task.task_id, TaskStatus.FAILED, error_message=str(e))
90 results[task.task_id] = {'error': str(e)}
91 failed += 1
93 ledger.save()
94 return {'completed': completed, 'failed': failed, 'results': results}
97def dispatch_goal_with_ledger(ledger, dispatch_fn: Callable) -> Dict:
98 """Execute ALL tasks in a ledger, respecting parallel/sequential ordering.
100 Loops until no more executable tasks remain:
101 1. Get parallel-ready tasks -> fan-out
102 2. Get next sequential task -> execute
103 3. complete_task_and_route() auto-unblocks dependents
104 4. Repeat until done
105 """
106 from agent_ledger.core import TaskStatus
108 total_completed = 0
109 total_failed = 0
110 all_results = {}
111 max_iterations = 100 # Safety cap
113 for _ in range(max_iterations):
114 # Try parallel batch first
115 parallel = ledger.get_parallel_executable_tasks()
116 if parallel:
117 batch_result = dispatch_parallel_tasks(
118 ledger, dispatch_fn, max_concurrent=MAX_PARALLEL_WORKERS)
119 total_completed += batch_result['completed']
120 total_failed += batch_result['failed']
121 all_results.update(batch_result['results'])
122 continue
124 # Fall back to sequential
125 next_task = ledger.get_next_executable_task()
126 if not next_task:
127 break
129 ledger.update_task_status(next_task.task_id, TaskStatus.IN_PROGRESS)
130 try:
131 result = dispatch_fn(next_task)
132 if result.get('success', True):
133 ledger.update_task_status(
134 next_task.task_id, TaskStatus.COMPLETED, result=result)
135 total_completed += 1
136 else:
137 ledger.update_task_status(
138 next_task.task_id, TaskStatus.FAILED,
139 error_message=str(result.get('error', 'failed')))
140 total_failed += 1
141 all_results[next_task.task_id] = result
142 except Exception as e:
143 logger.warning(f"Sequential task {next_task.task_id} failed: {e}")
144 ledger.update_task_status(
145 next_task.task_id, TaskStatus.FAILED, error_message=str(e))
146 all_results[next_task.task_id] = {'error': str(e)}
147 total_failed += 1
149 ledger.save()
150 return {
151 'completed': total_completed,
152 'failed': total_failed,
153 'results': all_results,
154 'awareness': ledger.get_awareness(),
155 }
158def decompose_goal_to_ledger(prompt: str, goal_id: str, goal_type: str,
159 user_id: str, subtask_defs: Optional[Dict] = None):
160 """Decompose a goal into a SmartLedger with parallel/sequential tasks.
162 Args:
163 prompt: Goal prompt text
164 goal_id: Goal identifier
165 goal_type: Goal type (marketing, coding, etc.)
166 user_id: Owner user ID
167 subtask_defs: Optional dict with 'tasks' list and 'parallel' bool.
168 If None, creates a single root task (backward-compatible).
170 Returns:
171 (task_list, ledger) — task_list for coordinator compatibility,
172 ledger for parallel dispatch. ledger is None for single-task goals.
173 """
174 try:
175 from agent_ledger import SmartLedger, Task, TaskType, ExecutionMode
177 ledger = SmartLedger(agent_id=user_id, session_id=str(goal_id))
179 # Create root task (the goal itself)
180 root = Task(
181 task_id=f'{goal_id}_root',
182 description=prompt[:500],
183 task_type=TaskType.AUTONOMOUS,
184 execution_mode=ExecutionMode.SEQUENTIAL,
185 )
186 ledger.add_task(root)
188 if subtask_defs and isinstance(subtask_defs.get('tasks'), list) \
189 and len(subtask_defs['tasks']) > 1:
190 is_parallel = subtask_defs.get('parallel', False)
191 tasks = subtask_defs['tasks']
193 if is_parallel:
194 # Fan-out: create sibling tasks under root
195 siblings = ledger.create_sibling_tasks(
196 parent_task_id=root.task_id,
197 sibling_descriptions=[
198 t.get('description', t) if isinstance(t, dict)
199 else str(t)
200 for t in tasks
201 ],
202 task_type=TaskType.PRE_ASSIGNED,
203 )
204 # Mark siblings as PARALLEL + ready
205 # (create_sibling_tasks defaults to SEQUENTIAL, pending_reason=None)
206 for sib in siblings:
207 sib.execution_mode = ExecutionMode.PARALLEL
208 sib.pending_reason = 'ready'
209 # Ensure in task_order (create_sibling_tasks skips add_task)
210 if sib.task_id not in ledger.task_order:
211 ledger.task_order.append(sib.task_id)
212 else:
213 # Sequential chain under root
214 seq_tasks = ledger.create_sequential_tasks(
215 [t.get('description', t) if isinstance(t, dict)
216 else str(t)
217 for t in tasks],
218 task_type=TaskType.PRE_ASSIGNED,
219 parent_task_id=root.task_id,
220 )
221 # Ensure in task_order (create_sequential_tasks skips add_task)
222 for st in seq_tasks:
223 if st.task_id not in ledger.task_order:
224 ledger.task_order.append(st.task_id)
226 ledger.save()
228 # Convert to coordinator-compatible dicts
229 result = []
230 for tid in ledger.task_order:
231 task = ledger.tasks[tid]
232 result.append({
233 'task_id': tid,
234 'description': task.description[:500],
235 'capabilities': [goal_type],
236 'execution_mode': task.execution_mode.value
237 if hasattr(task.execution_mode, 'value')
238 else str(task.execution_mode),
239 'prerequisites': list(task.prerequisites),
240 })
241 return result, ledger
243 # Single task — no ledger needed for parallel dispatch
244 return [{
245 'task_id': f'{goal_id}_task_0',
246 'description': prompt[:500],
247 'capabilities': [goal_type],
248 }], None
250 except ImportError:
251 # Fallback: single-task decomposition
252 return [{
253 'task_id': f'{goal_id}_task_0',
254 'description': prompt[:500],
255 'capabilities': [goal_type],
256 }], None
259def extract_subtasks_from_context(goal_id: str) -> Optional[Dict]:
260 """Check if the goal has explicit subtask definitions in its context.
262 AgentGoal.context can contain:
263 {"tasks": [{"description": "..."}, ...], "parallel": true/false}
265 Returns the context dict if subtasks are present, None otherwise.
266 """
267 try:
268 from integrations.social.models import get_db, AgentGoal
269 db = get_db()
270 try:
271 goal = db.query(AgentGoal).filter_by(id=goal_id).first()
272 if goal and goal.context:
273 ctx = json.loads(goal.context) if isinstance(
274 goal.context, str) else goal.context
275 if isinstance(ctx, dict) and 'tasks' in ctx:
276 return ctx
277 finally:
278 db.close()
279 except Exception:
280 pass
281 return None