Coverage for integrations / agent_engine / dispatch.py: 85.0%

301 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Agent Goal Engine - Chat Dispatch 

3 

4Sends agent goals to idle agents via the existing /chat endpoint 

5(CREATE/REUSE pipeline). Dispatches with autonomous=True so the 

6LLM auto-generates the agent config without user interaction. 

7 

8First dispatch = CREATE mode (gather_info + recipe creation). 

9Subsequent dispatches = REUSE mode (recipe exists, 90% faster). 

10 

11DISTRIBUTED DISPATCH (automatic): 

12When a shared Redis coordinator is reachable (i.e. the node is part 

13of a hive with peers), goals are automatically submitted to the 

14DistributedTaskCoordinator instead of local /chat. Worker nodes 

15across the hive claim and execute tasks autonomously. No separate 

16mode flag — distribution is an emergent property of having peers. 

17Falls back to local /chat when Redis is unavailable. 

18""" 

19import os 

20import logging 

21import threading 

22import requests 

23from typing import Dict, List, Optional 

24 

25from core.http_pool import pooled_post 

26from core.port_registry import get_port 

27 

28logger = logging.getLogger('hevolve_social') 

29 

30 

31# ── HTTP circuit breaker ──────────────────────────────────────────────── 

32# Fast-fail instead of waiting 120s against a dead llama-server. 

33# After _CB_THRESHOLD consecutive connection failures, skip HTTP dispatch 

34# for _CB_COOLDOWN_S seconds before retrying. 

35_CB_THRESHOLD = 3 # failures before opening circuit 

36_CB_COOLDOWN_S = 60 # seconds to wait before retrying 

37_cb_failures = 0 

38_cb_open_until = 0.0 

39_cb_lock = threading.Lock() 

40 

41 

42def _cb_record_success(): 

43 global _cb_failures, _cb_open_until 

44 with _cb_lock: 

45 _cb_failures = 0 

46 _cb_open_until = 0.0 

47 

48 

49def _cb_record_failure(): 

50 global _cb_failures, _cb_open_until 

51 with _cb_lock: 

52 _cb_failures += 1 

53 if _cb_failures >= _CB_THRESHOLD: 

54 import time as _t 

55 _cb_open_until = _t.time() + _CB_COOLDOWN_S 

56 logger.warning(f"Circuit breaker OPEN — {_cb_failures} consecutive " 

57 f"HTTP failures, skipping dispatch for {_CB_COOLDOWN_S}s") 

58 

59 

60def _cb_is_open() -> bool: 

61 with _cb_lock: 

62 if _cb_failures < _CB_THRESHOLD: 

63 return False 

64 import time as _t 

65 if _t.time() >= _cb_open_until: 

66 # Half-open: allow one probe 

67 return False 

68 return True 

69 

70 

71# ── LLM concurrency control ────────────────────────────────────────────── 

72# Local llama-server degrades exponentially with concurrent requests 

73# (KV cache thrashing). Allow only N concurrent local LLM calls. 

74# This prevents the watchdog-restart cascade where restarted daemons 

75# pile up concurrent requests that each take longer, triggering more 

76# restarts. 

77_LOCAL_LLM_MAX_CONCURRENT = int(os.environ.get('HEVOLVE_LOCAL_LLM_MAX_CONCURRENT', '1')) 

78_local_llm_semaphore = threading.Semaphore(_LOCAL_LLM_MAX_CONCURRENT) 

79 

80 

81# ── User-priority gate ────────────────────────────────────────────────── 

82# When a human user is chatting, daemon dispatch must yield the LLM. 

83# Tracked via timestamp of last user activity — daemon checks freshness. 

84import time as _time 

85_last_user_chat_at: float = 0.0 

86_USER_CHAT_COOLDOWN = 600 # 10 min — CREATE pipeline can take this long 

87_active_create_sessions: int = 0 # count of in-flight CREATE requests 

88_create_lock = threading.Lock() 

89 

90 

91def mark_user_chat_activity(): 

92 """Call on every user /chat request (including autonomous CREATE).""" 

93 global _last_user_chat_at 

94 _last_user_chat_at = _time.time() 

95 

96 

97def mark_create_start(): 

98 """Call when a CREATE pipeline starts.""" 

99 global _active_create_sessions 

100 with _create_lock: 

101 _active_create_sessions += 1 

102 mark_user_chat_activity() 

103 

104 

105def mark_create_end(): 

106 """Call when a CREATE pipeline finishes.""" 

107 global _active_create_sessions 

108 with _create_lock: 

109 _active_create_sessions = max(0, _active_create_sessions - 1) 

110 

111 

112def is_user_recently_active() -> bool: 

113 """True if user chatted recently OR a CREATE pipeline is running.""" 

114 if _active_create_sessions > 0: 

115 return True 

116 return (_time.time() - _last_user_chat_at) < _USER_CHAT_COOLDOWN 

117 

118 

119def should_yield_to_user() -> bool: 

120 """Single canonical gate every background daemon must call. 

121 

122 Returns True when the daemon must skip its tick / iteration 

123 (yield CPU, GIL, LLM, GPU to the user-facing path). Three 

124 independent yield reasons: 

125 

126 1. ``is_user_recently_active()`` — user chatted in the last 10 

127 minutes or a CREATE pipeline is running. 

128 2. ``model_lifecycle.get_system_pressure().throttle_factor < 0.1`` 

129 — VRAM/CPU pressure is so high the LLM throttle factor has 

130 collapsed; running another LLM call would saturate the 

131 system and starve the user. 

132 3. ``ResourceGovernor.get_throttle() < 0.3`` — generic CPU/RAM 

133 pressure that is NOT LLM-shaped (e.g., a runaway Python loop, 

134 a hammering background daemon, OS-level memory pressure). 

135 The governor combines its mode (ACTIVE/IDLE/SLEEP), the 

136 model_lifecycle pressure, AND its own per-process pressure 

137 calc into a single 0.0-1.0 throttle factor (see 

138 ``core.resource_governor._calculate_throttle``). Below 0.3 

139 means "the system is hot enough that user-facing latency is 

140 at risk" — daemons must yield even if the LLM-specific 

141 throttle is fine. Captures the case the user flagged 

142 2026-05-10: coding_daemon's autogen turns burning CPU/GIL 

143 while the user was actively chatting, model_lifecycle's 

144 LLM-only pressure didn't see it, gate passed, system slowed. 

145 

146 All three checks are best-effort — failure to import / read any 

147 signal returns False (don't block daemons on a missing module). 

148 The function is the single source of truth for daemon yield 

149 semantics: ``agent_daemon._tick``, 

150 ``agent_daemon._proactive_hive_tick``, 

151 ``hive_benchmark_prover._continuous_loop``, and 

152 ``coding_daemon._tick`` all consult it, so adding a fourth 

153 yield reason (e.g. battery-saver mode, network-pressure) 

154 means editing exactly this function — no per-daemon copy-paste. 

155 """ 

156 try: 

157 if is_user_recently_active(): 

158 return True 

159 except Exception: 

160 pass 

161 try: 

162 from integrations.service_tools.model_lifecycle import ( 

163 get_model_lifecycle_manager) 

164 _pressure = get_model_lifecycle_manager().get_system_pressure() 

165 if _pressure.get('throttle_factor', 1.0) < 0.1: 

166 return True 

167 except Exception: 

168 pass 

169 try: 

170 from core.resource_governor import get_governor 

171 _gov = get_governor() 

172 if _gov is not None and _gov.get_throttle() < 0.3: 

173 return True 

174 except Exception: 

175 pass 

176 return False 

177 

178 

179def _notify_watchdog_llm_start(): 

180 """Tell the watchdog the current thread is blocked on a legitimate LLM call. 

181 

182 The watchdog will extend the heartbeat threshold for threads marked 

183 'in_llm_call' instead of restarting them. 

184 """ 

185 try: 

186 from security.node_watchdog import get_watchdog 

187 wd = get_watchdog() 

188 if not wd: 

189 return 

190 thread_name = threading.current_thread().name 

191 # Match by thread name — works for all daemon threads 

192 if wd.is_registered(thread_name): 

193 wd.mark_in_llm_call(thread_name) 

194 return 

195 # Partial match (thread name might have suffix like 'agent_daemon-1') 

196 for name in wd.registered_names(): 

197 if name in thread_name: 

198 wd.mark_in_llm_call(name) 

199 return 

200 # Fallback: in-process/bundled mode — dispatch runs on a 

201 # different thread (e.g. Flask worker). Mark the calling daemon 

202 # via threadlocal source hint if available. 

203 try: 

204 from threadlocal import get_task_source 

205 source = get_task_source() 

206 if source and wd.is_registered(source): 

207 wd.mark_in_llm_call(source) 

208 return 

209 except Exception: 

210 pass 

211 except Exception: 

212 pass 

213 

214 

215def _notify_watchdog_llm_end(): 

216 """Clear the LLM call marker and send a heartbeat for all registered daemons.""" 

217 try: 

218 from security.node_watchdog import get_watchdog 

219 wd = get_watchdog() 

220 if wd: 

221 for name in wd.registered_names(): 

222 wd.clear_llm_call(name) 

223 wd.heartbeat(name) 

224 except Exception: 

225 pass 

226 

227 

228def _get_distributed_coordinator(): 

229 """Get the shared DistributedTaskCoordinator if Redis is reachable. 

230 

231 Returns None when Redis is unavailable — caller falls back to local. 

232 No separate mode flag needed: if Redis exists, distribute. 

233 """ 

234 try: 

235 from integrations.distributed_agent.api import _get_coordinator 

236 return _get_coordinator() 

237 except Exception as e: 

238 logger.debug(f"Distributed coordinator unavailable: {e}") 

239 return None 

240 

241 

242def _has_hive_peers() -> bool: 

243 """Check if this node has active peers in the hive. 

244 

245 Distribution only makes sense when there are other nodes to 

246 pick up work. Single-node setups always dispatch locally. 

247 """ 

248 try: 

249 from integrations.social.models import db_session, PeerNode 

250 with db_session(commit=False) as db: 

251 count = db.query(PeerNode).filter( 

252 PeerNode.status == 'active' 

253 ).count() 

254 return count > 1 # >1 because self is in the table too 

255 except Exception: 

256 return False 

257 

258 

259def _decompose_goal(prompt: str, goal_id: str, goal_type: str, 

260 user_id: str) -> List[Dict]: 

261 """Decompose a goal into distributable sub-tasks. 

262 

263 Checks AgentGoal.context for explicit subtask definitions: 

264 {"tasks": [...], "parallel": true/false} 

265 

266 When subtasks are present, uses SmartLedger to create a proper 

267 dependency graph (parallel fan-out or sequential chain). 

268 Falls back to single-task decomposition when no subtasks defined. 

269 """ 

270 try: 

271 from .parallel_dispatch import ( 

272 extract_subtasks_from_context, decompose_goal_to_ledger) 

273 

274 subtask_defs = extract_subtasks_from_context(goal_id) 

275 tasks, _ledger = decompose_goal_to_ledger( 

276 prompt, goal_id, goal_type, user_id, subtask_defs) 

277 return tasks 

278 except Exception: 

279 pass 

280 

281 return [{ 

282 'task_id': f'{goal_id}_task_0', 

283 'description': prompt[:500], 

284 'capabilities': [goal_type], 

285 }] 

286 

287 

288def dispatch_goal_distributed(prompt: str, user_id: str, goal_id: str, 

289 goal_type: str = 'marketing') -> Optional[str]: 

290 """Submit a goal to the distributed task coordinator. 

291 

292 The goal is decomposed into sub-tasks, published to shared Redis, 

293 and worker nodes across the hive will claim and execute them. 

294 

295 Returns: 

296 goal_id string on success, None on failure 

297 """ 

298 coordinator = _get_distributed_coordinator() 

299 if not coordinator: 

300 logger.warning(f"Distributed dispatch failed: coordinator unavailable, " 

301 f"falling back to local for {goal_type} goal {goal_id}") 

302 return None 

303 

304 tasks = _decompose_goal(prompt, goal_id, goal_type, user_id) 

305 context = { 

306 'goal_type': goal_type, 

307 'user_id': user_id, 

308 'prompt': prompt, 

309 'source_node': os.environ.get('HEVOLVE_NODE_ID', 'unknown'), 

310 'task_source': 'hive', 

311 } 

312 

313 try: 

314 distributed_goal_id = coordinator.submit_goal( 

315 objective=prompt[:200], 

316 decomposed_tasks=tasks, 

317 context=context, 

318 ) 

319 logger.info(f"Distributed dispatch: goal {goal_id} submitted as " 

320 f"{distributed_goal_id} with {len(tasks)} tasks") 

321 return distributed_goal_id 

322 except Exception as e: 

323 logger.warning(f"Distributed dispatch error for {goal_type} goal {goal_id}: {e}") 

324 return None 

325 

326 

327def _check_robot_capability_match(goal_type: str, goal_id: str) -> bool: 

328 """For robot goals, verify this node can handle the task. 

329 

330 Checks task requirements against local robot capabilities. 

331 Non-robot goals always pass. Robot goals without requirements pass. 

332 

333 Returns True if the node is capable, False if it should be 

334 dispatched to a more capable peer via distributed dispatch. 

335 """ 

336 if goal_type != 'robot': 

337 return True 

338 

339 try: 

340 from integrations.social.models import db_session, AgentGoal 

341 with db_session(commit=False) as db: 

342 goal = db.query(AgentGoal).filter_by(id=goal_id).first() 

343 if not goal: 

344 return True 

345 config = goal.config_json or {} 

346 required_caps = config.get('required_capabilities', []) 

347 if not required_caps: 

348 return True 

349 

350 from integrations.robotics.capability_advertiser import ( 

351 get_capability_advertiser, 

352 ) 

353 adv = get_capability_advertiser() 

354 score = adv.matches_task_requirements({ 

355 'required_capabilities': required_caps, 

356 'preferred_form_factor': config.get('preferred_form_factor'), 

357 'min_payload_kg': config.get('min_payload_kg'), 

358 }) 

359 if score < 0.5: 

360 logger.info( 

361 f"Robot goal {goal_id} capability mismatch " 

362 f"(score={score}), prefer distributed dispatch") 

363 return False 

364 return True 

365 except Exception as e: 

366 logger.debug(f"Robot capability check skipped: {e}") 

367 return True 

368 

369 

370def dispatch_goal(prompt: str, user_id: str, goal_id: str, 

371 goal_type: str = 'marketing', 

372 model_config: list = None) -> Optional[str]: 

373 """Send a goal prompt through the existing /chat pipeline. 

374 

375 Uses autonomous=True so Phase 1 (gather_info) runs without 

376 human interaction — the LLM generates the agent config itself. 

377 

378 GUARDRAILS enforced: GuardrailEnforcer.before_dispatch() + after_response(). 

379 

380 When Redis is reachable and hive peers exist, goals are automatically 

381 submitted to the shared DistributedTaskCoordinator. Worker nodes 

382 across the hive claim and execute them. Falls back to local /chat 

383 when the coordinator is unavailable or no peers exist. 

384 

385 For robot goals: capability matching ensures the task goes to a 

386 node with the right hardware (locomotion, manipulation, sensors). 

387 

388 Args: 

389 prompt: The goal prompt (from build_prompt) 

390 user_id: The agent's user_id 

391 goal_id: The goal identifier 

392 goal_type: Goal type prefix for prompt_id 

393 model_config: Optional per-dispatch config_list override 

394 

395 Returns: 

396 Response text or None on failure 

397 """ 

398 # BUDGET GATE: check goal budget + platform affordability before dispatch 

399 try: 

400 from integrations.agent_engine.budget_gate import pre_dispatch_budget_gate 

401 bg_allowed, bg_reason = pre_dispatch_budget_gate(goal_id, prompt) 

402 if not bg_allowed: 

403 logger.warning(f"Dispatch blocked by budget gate for {goal_type} goal {goal_id}: {bg_reason}") 

404 return None 

405 except ImportError: 

406 pass 

407 

408 # TOOL ALLOWLIST: resolve model tier and attach to dispatch context. 

409 # Tier is sent to /chat as body['model_tier']; create_recipe uses it 

410 # to call filter_tools_for_model() when building the agent tool list. 

411 _dispatch_model_tier = None 

412 if model_config: 

413 try: 

414 from integrations.agent_engine.model_registry import model_registry 

415 first_model = model_config[0].get('model', '') if model_config else '' 

416 if first_model: 

417 info = model_registry.get(first_model) 

418 if info: 

419 _dispatch_model_tier = (info.get('tier') or info.get('model_tier')) 

420 if _dispatch_model_tier: 

421 logger.info(f"Dispatch model tier: {_dispatch_model_tier.value} " 

422 f"for {goal_type} goal {goal_id}") 

423 except Exception: 

424 pass # Model registry unavailable — no tier restriction 

425 

426 # GUARDRAIL: full pre-dispatch gate (fail-closed: block if guardrails unavailable) 

427 try: 

428 from security.hive_guardrails import GuardrailEnforcer 

429 allowed, reason, prompt = GuardrailEnforcer.before_dispatch(prompt) 

430 if not allowed: 

431 logger.warning(f"Dispatch blocked for {goal_type} goal {goal_id}: {reason}") 

432 return None 

433 except ImportError: 

434 logger.error("CRITICAL: hive_guardrails not available — blocking dispatch") 

435 return None 

436 

437 # AUDIT LOG: record goal dispatch 

438 try: 

439 from security.immutable_audit_log import get_audit_log 

440 get_audit_log().log_event( 

441 'goal_dispatched', actor_id=user_id, 

442 action=f'dispatch {goal_type} goal {goal_id}', 

443 target_id=goal_id) 

444 except Exception: 

445 pass # Audit is best-effort 

446 

447 # ROBOT: capability-matched dispatch — prefer distributed for hardware mismatches 

448 _tried_distributed = False 

449 if not _check_robot_capability_match(goal_type, goal_id): 

450 coordinator = _get_distributed_coordinator() 

451 if coordinator and _has_hive_peers(): 

452 _tried_distributed = True 

453 result = dispatch_goal_distributed(prompt, user_id, goal_id, goal_type) 

454 if result is not None: 

455 return result 

456 # Fall through to local if no capable peer found 

457 

458 # DISTRIBUTED: auto-distribute when coordinator is reachable and hive has peers 

459 # Skip if robot dispatch already tried distributed (avoid double submission) 

460 if not _tried_distributed: 

461 coordinator = _get_distributed_coordinator() 

462 if coordinator and _has_hive_peers(): 

463 result = dispatch_goal_distributed(prompt, user_id, goal_id, goal_type) 

464 if result is not None: 

465 return result 

466 # Fall through to local dispatch if distributed fails 

467 logger.info(f"Distributed fallback -> local dispatch for {goal_type} goal {goal_id}") 

468 

469 # Generate a NUMERIC prompt_id (same format as hart_intelligence_entry._next_prompt_id) 

470 # so it passes the isdigit() check in the adapter and /chat handler. 

471 # Use goal_id hash to ensure the SAME goal always gets the SAME prompt_id 

472 # across dispatches — this is what enables recipe reuse on subsequent ticks. 

473 import hashlib 

474 _goal_hash = int(hashlib.md5(goal_id.encode()).hexdigest()[:10], 16) % 100_000_000_000 

475 prompt_id = str(max(1, _goal_hash)) 

476 

477 body = { 

478 'user_id': user_id, 

479 'prompt_id': prompt_id, 

480 'prompt': prompt, 

481 'create_agent': True, 

482 'autonomous': True, 

483 'casual_conv': False, 

484 'task_source': 'own', 

485 } 

486 if model_config: 

487 body['model_config'] = model_config 

488 if _dispatch_model_tier: 

489 body['model_tier'] = _dispatch_model_tier.value 

490 

491 # 3-tier dispatch (same as hartos_backend_adapter.py): 

492 # Tier 1: Direct in-process import (no ports, no HTTP) 

493 # Tier 2: HTTP proxy to backend port 

494 # Tier 3: llama.cpp fallback (direct LLM, no agent pipeline) 

495 resp = None 

496 

497 # Tier 1: Direct in-process import of hart_intelligence 

498 # Guarded by semaphore to prevent concurrent request pile-up on 

499 # local llama-server (causes exponential slowdown + watchdog restarts). 

500 try: 

501 try: 

502 from routes.hartos_backend_adapter import chat as hevolve_chat 

503 except ImportError: 

504 from hartos_backend_adapter import chat as hevolve_chat 

505 

506 # USER PRIORITY: if user chatted recently, skip this tick — let user have the LLM 

507 if is_user_recently_active(): 

508 logger.info(f"User active ({_USER_CHAT_COOLDOWN}s cooldown), deferring dispatch for goal {goal_id}") 

509 return None 

510 

511 # Try to acquire semaphore (non-blocking check first) 

512 if not _local_llm_semaphore.acquire(timeout=5): 

513 logger.info(f"LLM busy ({_LOCAL_LLM_MAX_CONCURRENT} in flight), " 

514 f"skipping dispatch for goal {goal_id}") 

515 return None 

516 

517 # Signal to watchdog that this thread is in a legitimate LLM call 

518 _notify_watchdog_llm_start() 

519 try: 

520 # Use a daemon-specific request_id so thinking traces from daemon 

521 # dispatch are isolated from user chat traces. Without this, daemon 

522 # traces leak into user responses via drain_thinking_traces(). 

523 _daemon_request_id = f'daemon_{goal_id}' 

524 result = hevolve_chat( 

525 text=prompt, user_id=user_id, 

526 agent_id=prompt_id, create_agent=True, casual_conv=False, 

527 autonomous=True, request_id=_daemon_request_id, 

528 ) 

529 finally: 

530 _local_llm_semaphore.release() 

531 try: 

532 _notify_watchdog_llm_end() 

533 except Exception: 

534 pass 

535 

536 response = result.get('text') or result.get('response', '') 

537 if response: 

538 return response 

539 except ImportError: 

540 pass # Nunba adapter not available — fall through to Tier 2 

541 except Exception as e: 

542 logger.warning(f"Tier-1 dispatch failed for {goal_type} goal {goal_id}: {e}") 

543 

544 # Tier 2: HTTP proxy to HARTOS backend port 

545 # Circuit breaker: skip HTTP if server recently unresponsive 

546 if _cb_is_open(): 

547 logger.info(f"Circuit breaker open — skipping Tier-2 HTTP for goal {goal_id}") 

548 return None 

549 

550 base_url = os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}') 

551 

552 try: 

553 resp = pooled_post(f'{base_url}/chat', json=body, timeout=120) 

554 if resp.status_code == 200: 

555 _cb_record_success() 

556 result = resp.get_json() if hasattr(resp, 'get_json') else resp.json() 

557 response = result.get('response', '') 

558 

559 # GUARDRAIL: post-response check (fail-closed) 

560 try: 

561 from security.hive_guardrails import GuardrailEnforcer 

562 passed, reason = GuardrailEnforcer.after_response(response) 

563 if not passed: 

564 logger.warning(f"Response filtered for goal {goal_id}: {reason}") 

565 return None 

566 except ImportError: 

567 logger.error("CRITICAL: hive_guardrails not available — blocking response") 

568 return None 

569 

570 # GUARDRAIL: coding goals — no merge without constitutional review 

571 if goal_type == 'coding': 

572 try: 

573 from security.hive_guardrails import ConstitutionalFilter 

574 review_dict = { 

575 'title': f'Code commit review: {goal_id}', 

576 'description': response[:2000], 

577 'goal_type': 'coding', 

578 } 

579 passed, reason = ConstitutionalFilter.check_goal(review_dict) 

580 if not passed: 

581 logger.warning( 

582 f"Coding goal {goal_id} output blocked by " 

583 f"constitutional review: {reason}") 

584 return None 

585 except ImportError: 

586 logger.error("CRITICAL: ConstitutionalFilter not available — blocking coding goal") 

587 return None 

588 

589 # Record to world model (training data for hive intelligence) 

590 try: 

591 from .world_model_bridge import get_world_model_bridge 

592 bridge = get_world_model_bridge() 

593 bridge.record_interaction( 

594 user_id=user_id, 

595 prompt_id=prompt_id, 

596 prompt=prompt, 

597 response=response, 

598 goal_id=goal_id, 

599 ) 

600 except Exception: 

601 pass 

602 

603 return response 

604 else: 

605 # Non-200 response — log and queue transient errors for retry 

606 logger.warning( 

607 f"Goal dispatch got HTTP {resp.status_code} for {goal_type} " 

608 f"goal {goal_id}: {resp.text[:200]}") 

609 if resp.status_code in (429, 500, 502, 503): 

610 _cb_record_failure() # Server errors count toward circuit breaker 

611 try: 

612 from .instruction_queue import enqueue_instruction 

613 enqueue_instruction( 

614 user_id=user_id, text=prompt[:2000], priority=3, 

615 tags=[goal_type], 

616 context={'goal_id': goal_id, 'goal_type': goal_type, 

617 'queued_reason': f'http_{resp.status_code}'}, 

618 related_goal_id=goal_id, 

619 ) 

620 except Exception: 

621 pass 

622 except requests.RequestException as e: 

623 _cb_record_failure() 

624 logger.warning(f"Goal dispatch failed for {goal_type} goal {goal_id}: {e}") 

625 

626 # Queue the instruction for later execution when compute becomes available 

627 try: 

628 from .instruction_queue import enqueue_instruction 

629 enqueue_instruction( 

630 user_id=user_id, 

631 text=prompt[:2000], 

632 priority=3, 

633 tags=[goal_type], 

634 context={ 

635 'goal_id': goal_id, 

636 'goal_type': goal_type, 

637 'queued_reason': f'dispatch_failed: {e}', 

638 }, 

639 related_goal_id=goal_id, 

640 ) 

641 logger.info(f"Instruction queued for later: {goal_type} goal {goal_id}") 

642 except Exception as eq: 

643 logger.debug(f"Instruction queue unavailable: {eq}") 

644 

645 return None 

646 

647 

648def _dispatch_single_instruction(base_url: str, user_id: str, inst, 

649 batch_id: str) -> tuple: 

650 """Dispatch one instruction via /chat. Returns (instruction_id, response_text, error).""" 

651 body = { 

652 'user_id': user_id, 

653 'prompt_id': f'iq_{batch_id}_{inst.id[:8]}', 

654 'prompt': inst.text, 

655 'create_agent': True, 

656 'autonomous': True, 

657 'casual_conv': False, 

658 'task_source': 'own', 

659 } 

660 try: 

661 resp = pooled_post(f'{base_url}/chat', json=body, timeout=300) 

662 if resp.status_code == 200: 

663 result_text = resp.json().get('response', '') 

664 return (inst.id, result_text[:500], None) 

665 return (inst.id, None, f'HTTP {resp.status_code}') 

666 except requests.RequestException as e: 

667 return (inst.id, None, str(e)) 

668 

669 

670def drain_instruction_queue(user_id: str, max_tokens: int = 8000) -> Optional[str]: 

671 """Pull and execute queued instructions with dependency-aware dispatch. 

672 

673 Uses SmartLedger's dependency graph to determine execution order: 

674 - Independent instructions dispatch in parallel (concurrent threads) 

675 - Dependent instructions wait for prerequisites to complete first 

676 

677 Execution proceeds in waves: 

678 Wave 0: all instructions with no dependencies → parallel dispatch 

679 Wave 1: instructions depending on wave 0 → parallel dispatch 

680 ...until all waves complete. 

681 

682 Falls back to single-batch dispatch when SmartLedger is unavailable. 

683 

684 Called by agent_daemon.py on idle tick, or manually via API. 

685 

686 Args: 

687 user_id: User whose queue to drain 

688 max_tokens: Max tokens across all instructions 

689 

690 Returns: 

691 Combined response text, or None if queue empty or all failed 

692 """ 

693 try: 

694 from .instruction_queue import get_queue 

695 q = get_queue(user_id) 

696 

697 # Acquire drain lock — prevents concurrent drains for same user 

698 # (daemon tick + API call + another agent all trying simultaneously) 

699 if not q.acquire_drain_lock(): 

700 logger.info(f"Drain skipped for {user_id}: another drain in progress") 

701 return None 

702 

703 try: 

704 # Try dependency-aware execution plan 

705 plan = q.pull_execution_plan(max_tokens=max_tokens) 

706 if plan is None: 

707 return None 

708 

709 base_url = os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}') 

710 all_results = [] 

711 any_success = False 

712 

713 logger.info( 

714 f"Draining instruction queue for {user_id}: " 

715 f"{plan.total_instructions} instructions in " 

716 f"{len(plan.waves)} waves" 

717 ) 

718 

719 for wave_idx, wave in enumerate(plan.waves): 

720 logger.info( 

721 f"Wave {wave_idx + 1}/{len(plan.waves)}: " 

722 f"{len(wave)} instruction(s)" 

723 ) 

724 

725 if len(wave) == 1: 

726 # Single instruction — dispatch directly (no thread pool overhead) 

727 inst = wave[0] 

728 iid, result, error = _dispatch_single_instruction( 

729 base_url, user_id, inst, plan.batch_id, 

730 ) 

731 if error: 

732 q.fail_instruction(iid, error) 

733 logger.warning(f"Instruction [{iid}] failed: {error}") 

734 else: 

735 q.complete_instruction(iid, result) 

736 all_results.append(result) 

737 any_success = True 

738 else: 

739 # Multiple independent instructions — dispatch in parallel. 

740 # 

741 # Thread safety: 

742 # - _dispatch_single_instruction() is a pure HTTP call (no shared state) 

743 # - Results collected via as_completed() on the CALLING thread 

744 # - q.complete/fail_instruction() acquires q._lock (serialized) 

745 # - SmartLedger mutations happen inside q._lock (no separate lock needed) 

746 # - File I/O uses atomic write (temp + rename) 

747 import concurrent.futures 

748 with concurrent.futures.ThreadPoolExecutor( 

749 max_workers=min(len(wave), 4), 

750 ) as executor: 

751 futures = { 

752 executor.submit( 

753 _dispatch_single_instruction, 

754 base_url, user_id, inst, plan.batch_id, 

755 ): inst 

756 for inst in wave 

757 } 

758 for future in concurrent.futures.as_completed(futures): 

759 iid, result, error = future.result() 

760 if error: 

761 q.fail_instruction(iid, error) 

762 logger.warning(f"Instruction [{iid}] failed: {error}") 

763 else: 

764 q.complete_instruction(iid, result) 

765 all_results.append(result) 

766 any_success = True 

767 

768 if any_success: 

769 combined = '\n---\n'.join(all_results) 

770 logger.info( 

771 f"Plan {plan.batch_id} completed: " 

772 f"{len(all_results)}/{plan.total_instructions} succeeded" 

773 ) 

774 return combined 

775 return None 

776 finally: 

777 q.release_drain_lock() 

778 

779 except Exception as e: 

780 logger.error(f"Queue drain error: {e}") 

781 return None