Coverage for integrations / agent_engine / parallel_dispatch.py: 96.4%

112 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Parallel Agent Dispatch — bridges SmartLedger task graph to concurrent execution. 

3 

4Uses the existing SmartLedger primitives: 

5- get_parallel_executable_tasks() — returns all tasks ready to run NOW 

6- get_next_executable_task() — respects dependencies, priorities, insertion order 

7- complete_task_and_route() — auto-unblocks dependents on completion 

8 

9ThreadPoolExecutor pattern reused from speculative_dispatcher.py. 

10""" 

11import atexit 

12import json 

13import logging 

14import os 

15import threading 

16from concurrent.futures import ThreadPoolExecutor, as_completed 

17from typing import Any, Callable, Dict, List, Optional 

18 

19logger = logging.getLogger('hevolve_social') 

20 

21_executor = None 

22_executor_lock = threading.Lock() 

23MAX_PARALLEL_WORKERS = int(os.environ.get('HEVOLVE_PARALLEL_WORKERS', '8')) 

24 

25 

26def get_executor() -> ThreadPoolExecutor: 

27 """Lazy singleton ThreadPoolExecutor.""" 

28 global _executor 

29 if _executor is None: 

30 with _executor_lock: 

31 if _executor is None: 

32 _executor = ThreadPoolExecutor( 

33 max_workers=MAX_PARALLEL_WORKERS, 

34 thread_name_prefix='hart-parallel') 

35 atexit.register(lambda: _executor.shutdown(wait=False)) 

36 return _executor 

37 

38 

39def dispatch_parallel_tasks(ledger, dispatch_fn: Callable, 

40 max_concurrent: int = 8) -> Dict: 

41 """Fan-out all parallel-ready tasks from the ledger to concurrent workers. 

42 

43 Args: 

44 ledger: SmartLedger instance with tasks loaded 

45 dispatch_fn: Callable(task) -> result dict (e.g., calls /chat) 

46 max_concurrent: Max simultaneous dispatches 

47 

48 Returns: 

49 {completed: int, failed: int, results: {task_id: result}} 

50 """ 

51 from agent_ledger.core import TaskStatus 

52 

53 parallel_tasks = ledger.get_parallel_executable_tasks() 

54 if not parallel_tasks: 

55 return {'completed': 0, 'failed': 0, 'results': {}} 

56 

57 batch = parallel_tasks[:max_concurrent] 

58 executor = get_executor() 

59 results = {} 

60 futures = {} 

61 

62 # Mark all as IN_PROGRESS before dispatch 

63 for task in batch: 

64 ledger.update_task_status(task.task_id, TaskStatus.IN_PROGRESS) 

65 future = executor.submit(dispatch_fn, task) 

66 futures[future] = task 

67 

68 completed = 0 

69 failed = 0 

70 for future in as_completed(futures): 

71 task = futures[future] 

72 try: 

73 result = future.result(timeout=300) 

74 if result.get('success', True): 

75 # Use update_task_status to trigger _handle_task_completion 

76 # which properly unblocks dependent tasks 

77 ledger.update_task_status( 

78 task.task_id, TaskStatus.COMPLETED, result=result) 

79 completed += 1 

80 else: 

81 ledger.update_task_status( 

82 task.task_id, TaskStatus.FAILED, 

83 error_message=str(result.get('error', 'failed'))) 

84 failed += 1 

85 results[task.task_id] = result 

86 except Exception as e: 

87 logger.warning(f"Parallel task {task.task_id} failed: {e}") 

88 ledger.update_task_status( 

89 task.task_id, TaskStatus.FAILED, error_message=str(e)) 

90 results[task.task_id] = {'error': str(e)} 

91 failed += 1 

92 

93 ledger.save() 

94 return {'completed': completed, 'failed': failed, 'results': results} 

95 

96 

97def dispatch_goal_with_ledger(ledger, dispatch_fn: Callable) -> Dict: 

98 """Execute ALL tasks in a ledger, respecting parallel/sequential ordering. 

99 

100 Loops until no more executable tasks remain: 

101 1. Get parallel-ready tasks -> fan-out 

102 2. Get next sequential task -> execute 

103 3. complete_task_and_route() auto-unblocks dependents 

104 4. Repeat until done 

105 """ 

106 from agent_ledger.core import TaskStatus 

107 

108 total_completed = 0 

109 total_failed = 0 

110 all_results = {} 

111 max_iterations = 100 # Safety cap 

112 

113 for _ in range(max_iterations): 

114 # Try parallel batch first 

115 parallel = ledger.get_parallel_executable_tasks() 

116 if parallel: 

117 batch_result = dispatch_parallel_tasks( 

118 ledger, dispatch_fn, max_concurrent=MAX_PARALLEL_WORKERS) 

119 total_completed += batch_result['completed'] 

120 total_failed += batch_result['failed'] 

121 all_results.update(batch_result['results']) 

122 continue 

123 

124 # Fall back to sequential 

125 next_task = ledger.get_next_executable_task() 

126 if not next_task: 

127 break 

128 

129 ledger.update_task_status(next_task.task_id, TaskStatus.IN_PROGRESS) 

130 try: 

131 result = dispatch_fn(next_task) 

132 if result.get('success', True): 

133 ledger.update_task_status( 

134 next_task.task_id, TaskStatus.COMPLETED, result=result) 

135 total_completed += 1 

136 else: 

137 ledger.update_task_status( 

138 next_task.task_id, TaskStatus.FAILED, 

139 error_message=str(result.get('error', 'failed'))) 

140 total_failed += 1 

141 all_results[next_task.task_id] = result 

142 except Exception as e: 

143 logger.warning(f"Sequential task {next_task.task_id} failed: {e}") 

144 ledger.update_task_status( 

145 next_task.task_id, TaskStatus.FAILED, error_message=str(e)) 

146 all_results[next_task.task_id] = {'error': str(e)} 

147 total_failed += 1 

148 

149 ledger.save() 

150 return { 

151 'completed': total_completed, 

152 'failed': total_failed, 

153 'results': all_results, 

154 'awareness': ledger.get_awareness(), 

155 } 

156 

157 

158def decompose_goal_to_ledger(prompt: str, goal_id: str, goal_type: str, 

159 user_id: str, subtask_defs: Optional[Dict] = None): 

160 """Decompose a goal into a SmartLedger with parallel/sequential tasks. 

161 

162 Args: 

163 prompt: Goal prompt text 

164 goal_id: Goal identifier 

165 goal_type: Goal type (marketing, coding, etc.) 

166 user_id: Owner user ID 

167 subtask_defs: Optional dict with 'tasks' list and 'parallel' bool. 

168 If None, creates a single root task (backward-compatible). 

169 

170 Returns: 

171 (task_list, ledger) — task_list for coordinator compatibility, 

172 ledger for parallel dispatch. ledger is None for single-task goals. 

173 """ 

174 try: 

175 from agent_ledger import SmartLedger, Task, TaskType, ExecutionMode 

176 

177 ledger = SmartLedger(agent_id=user_id, session_id=str(goal_id)) 

178 

179 # Create root task (the goal itself) 

180 root = Task( 

181 task_id=f'{goal_id}_root', 

182 description=prompt[:500], 

183 task_type=TaskType.AUTONOMOUS, 

184 execution_mode=ExecutionMode.SEQUENTIAL, 

185 ) 

186 ledger.add_task(root) 

187 

188 if subtask_defs and isinstance(subtask_defs.get('tasks'), list) \ 

189 and len(subtask_defs['tasks']) > 1: 

190 is_parallel = subtask_defs.get('parallel', False) 

191 tasks = subtask_defs['tasks'] 

192 

193 if is_parallel: 

194 # Fan-out: create sibling tasks under root 

195 siblings = ledger.create_sibling_tasks( 

196 parent_task_id=root.task_id, 

197 sibling_descriptions=[ 

198 t.get('description', t) if isinstance(t, dict) 

199 else str(t) 

200 for t in tasks 

201 ], 

202 task_type=TaskType.PRE_ASSIGNED, 

203 ) 

204 # Mark siblings as PARALLEL + ready 

205 # (create_sibling_tasks defaults to SEQUENTIAL, pending_reason=None) 

206 for sib in siblings: 

207 sib.execution_mode = ExecutionMode.PARALLEL 

208 sib.pending_reason = 'ready' 

209 # Ensure in task_order (create_sibling_tasks skips add_task) 

210 if sib.task_id not in ledger.task_order: 

211 ledger.task_order.append(sib.task_id) 

212 else: 

213 # Sequential chain under root 

214 seq_tasks = ledger.create_sequential_tasks( 

215 [t.get('description', t) if isinstance(t, dict) 

216 else str(t) 

217 for t in tasks], 

218 task_type=TaskType.PRE_ASSIGNED, 

219 parent_task_id=root.task_id, 

220 ) 

221 # Ensure in task_order (create_sequential_tasks skips add_task) 

222 for st in seq_tasks: 

223 if st.task_id not in ledger.task_order: 

224 ledger.task_order.append(st.task_id) 

225 

226 ledger.save() 

227 

228 # Convert to coordinator-compatible dicts 

229 result = [] 

230 for tid in ledger.task_order: 

231 task = ledger.tasks[tid] 

232 result.append({ 

233 'task_id': tid, 

234 'description': task.description[:500], 

235 'capabilities': [goal_type], 

236 'execution_mode': task.execution_mode.value 

237 if hasattr(task.execution_mode, 'value') 

238 else str(task.execution_mode), 

239 'prerequisites': list(task.prerequisites), 

240 }) 

241 return result, ledger 

242 

243 # Single task — no ledger needed for parallel dispatch 

244 return [{ 

245 'task_id': f'{goal_id}_task_0', 

246 'description': prompt[:500], 

247 'capabilities': [goal_type], 

248 }], None 

249 

250 except ImportError: 

251 # Fallback: single-task decomposition 

252 return [{ 

253 'task_id': f'{goal_id}_task_0', 

254 'description': prompt[:500], 

255 'capabilities': [goal_type], 

256 }], None 

257 

258 

259def extract_subtasks_from_context(goal_id: str) -> Optional[Dict]: 

260 """Check if the goal has explicit subtask definitions in its context. 

261 

262 AgentGoal.context can contain: 

263 {"tasks": [{"description": "..."}, ...], "parallel": true/false} 

264 

265 Returns the context dict if subtasks are present, None otherwise. 

266 """ 

267 try: 

268 from integrations.social.models import get_db, AgentGoal 

269 db = get_db() 

270 try: 

271 goal = db.query(AgentGoal).filter_by(id=goal_id).first() 

272 if goal and goal.context: 

273 ctx = json.loads(goal.context) if isinstance( 

274 goal.context, str) else goal.context 

275 if isinstance(ctx, dict) and 'tasks' in ctx: 

276 return ctx 

277 finally: 

278 db.close() 

279 except Exception: 

280 pass 

281 return None