Coverage for integrations / agent_engine / dispatch.py: 85.0%
301 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Unified Agent Goal Engine - Chat Dispatch
4Sends agent goals to idle agents via the existing /chat endpoint
5(CREATE/REUSE pipeline). Dispatches with autonomous=True so the
6LLM auto-generates the agent config without user interaction.
8First dispatch = CREATE mode (gather_info + recipe creation).
9Subsequent dispatches = REUSE mode (recipe exists, 90% faster).
11DISTRIBUTED DISPATCH (automatic):
12When a shared Redis coordinator is reachable (i.e. the node is part
13of a hive with peers), goals are automatically submitted to the
14DistributedTaskCoordinator instead of local /chat. Worker nodes
15across the hive claim and execute tasks autonomously. No separate
16mode flag — distribution is an emergent property of having peers.
17Falls back to local /chat when Redis is unavailable.
18"""
19import os
20import logging
21import threading
22import requests
23from typing import Dict, List, Optional
25from core.http_pool import pooled_post
26from core.port_registry import get_port
28logger = logging.getLogger('hevolve_social')
31# ── HTTP circuit breaker ────────────────────────────────────────────────
32# Fast-fail instead of waiting 120s against a dead llama-server.
33# After _CB_THRESHOLD consecutive connection failures, skip HTTP dispatch
34# for _CB_COOLDOWN_S seconds before retrying.
35_CB_THRESHOLD = 3 # failures before opening circuit
36_CB_COOLDOWN_S = 60 # seconds to wait before retrying
37_cb_failures = 0
38_cb_open_until = 0.0
39_cb_lock = threading.Lock()
42def _cb_record_success():
43 global _cb_failures, _cb_open_until
44 with _cb_lock:
45 _cb_failures = 0
46 _cb_open_until = 0.0
49def _cb_record_failure():
50 global _cb_failures, _cb_open_until
51 with _cb_lock:
52 _cb_failures += 1
53 if _cb_failures >= _CB_THRESHOLD:
54 import time as _t
55 _cb_open_until = _t.time() + _CB_COOLDOWN_S
56 logger.warning(f"Circuit breaker OPEN — {_cb_failures} consecutive "
57 f"HTTP failures, skipping dispatch for {_CB_COOLDOWN_S}s")
60def _cb_is_open() -> bool:
61 with _cb_lock:
62 if _cb_failures < _CB_THRESHOLD:
63 return False
64 import time as _t
65 if _t.time() >= _cb_open_until:
66 # Half-open: allow one probe
67 return False
68 return True
71# ── LLM concurrency control ──────────────────────────────────────────────
72# Local llama-server degrades exponentially with concurrent requests
73# (KV cache thrashing). Allow only N concurrent local LLM calls.
74# This prevents the watchdog-restart cascade where restarted daemons
75# pile up concurrent requests that each take longer, triggering more
76# restarts.
77_LOCAL_LLM_MAX_CONCURRENT = int(os.environ.get('HEVOLVE_LOCAL_LLM_MAX_CONCURRENT', '1'))
78_local_llm_semaphore = threading.Semaphore(_LOCAL_LLM_MAX_CONCURRENT)
81# ── User-priority gate ──────────────────────────────────────────────────
82# When a human user is chatting, daemon dispatch must yield the LLM.
83# Tracked via timestamp of last user activity — daemon checks freshness.
84import time as _time
85_last_user_chat_at: float = 0.0
86_USER_CHAT_COOLDOWN = 600 # 10 min — CREATE pipeline can take this long
87_active_create_sessions: int = 0 # count of in-flight CREATE requests
88_create_lock = threading.Lock()
91def mark_user_chat_activity():
92 """Call on every user /chat request (including autonomous CREATE)."""
93 global _last_user_chat_at
94 _last_user_chat_at = _time.time()
97def mark_create_start():
98 """Call when a CREATE pipeline starts."""
99 global _active_create_sessions
100 with _create_lock:
101 _active_create_sessions += 1
102 mark_user_chat_activity()
105def mark_create_end():
106 """Call when a CREATE pipeline finishes."""
107 global _active_create_sessions
108 with _create_lock:
109 _active_create_sessions = max(0, _active_create_sessions - 1)
112def is_user_recently_active() -> bool:
113 """True if user chatted recently OR a CREATE pipeline is running."""
114 if _active_create_sessions > 0:
115 return True
116 return (_time.time() - _last_user_chat_at) < _USER_CHAT_COOLDOWN
119def should_yield_to_user() -> bool:
120 """Single canonical gate every background daemon must call.
122 Returns True when the daemon must skip its tick / iteration
123 (yield CPU, GIL, LLM, GPU to the user-facing path). Three
124 independent yield reasons:
126 1. ``is_user_recently_active()`` — user chatted in the last 10
127 minutes or a CREATE pipeline is running.
128 2. ``model_lifecycle.get_system_pressure().throttle_factor < 0.1``
129 — VRAM/CPU pressure is so high the LLM throttle factor has
130 collapsed; running another LLM call would saturate the
131 system and starve the user.
132 3. ``ResourceGovernor.get_throttle() < 0.3`` — generic CPU/RAM
133 pressure that is NOT LLM-shaped (e.g., a runaway Python loop,
134 a hammering background daemon, OS-level memory pressure).
135 The governor combines its mode (ACTIVE/IDLE/SLEEP), the
136 model_lifecycle pressure, AND its own per-process pressure
137 calc into a single 0.0-1.0 throttle factor (see
138 ``core.resource_governor._calculate_throttle``). Below 0.3
139 means "the system is hot enough that user-facing latency is
140 at risk" — daemons must yield even if the LLM-specific
141 throttle is fine. Captures the case the user flagged
142 2026-05-10: coding_daemon's autogen turns burning CPU/GIL
143 while the user was actively chatting, model_lifecycle's
144 LLM-only pressure didn't see it, gate passed, system slowed.
146 All three checks are best-effort — failure to import / read any
147 signal returns False (don't block daemons on a missing module).
148 The function is the single source of truth for daemon yield
149 semantics: ``agent_daemon._tick``,
150 ``agent_daemon._proactive_hive_tick``,
151 ``hive_benchmark_prover._continuous_loop``, and
152 ``coding_daemon._tick`` all consult it, so adding a fourth
153 yield reason (e.g. battery-saver mode, network-pressure)
154 means editing exactly this function — no per-daemon copy-paste.
155 """
156 try:
157 if is_user_recently_active():
158 return True
159 except Exception:
160 pass
161 try:
162 from integrations.service_tools.model_lifecycle import (
163 get_model_lifecycle_manager)
164 _pressure = get_model_lifecycle_manager().get_system_pressure()
165 if _pressure.get('throttle_factor', 1.0) < 0.1:
166 return True
167 except Exception:
168 pass
169 try:
170 from core.resource_governor import get_governor
171 _gov = get_governor()
172 if _gov is not None and _gov.get_throttle() < 0.3:
173 return True
174 except Exception:
175 pass
176 return False
179def _notify_watchdog_llm_start():
180 """Tell the watchdog the current thread is blocked on a legitimate LLM call.
182 The watchdog will extend the heartbeat threshold for threads marked
183 'in_llm_call' instead of restarting them.
184 """
185 try:
186 from security.node_watchdog import get_watchdog
187 wd = get_watchdog()
188 if not wd:
189 return
190 thread_name = threading.current_thread().name
191 # Match by thread name — works for all daemon threads
192 if wd.is_registered(thread_name):
193 wd.mark_in_llm_call(thread_name)
194 return
195 # Partial match (thread name might have suffix like 'agent_daemon-1')
196 for name in wd.registered_names():
197 if name in thread_name:
198 wd.mark_in_llm_call(name)
199 return
200 # Fallback: in-process/bundled mode — dispatch runs on a
201 # different thread (e.g. Flask worker). Mark the calling daemon
202 # via threadlocal source hint if available.
203 try:
204 from threadlocal import get_task_source
205 source = get_task_source()
206 if source and wd.is_registered(source):
207 wd.mark_in_llm_call(source)
208 return
209 except Exception:
210 pass
211 except Exception:
212 pass
215def _notify_watchdog_llm_end():
216 """Clear the LLM call marker and send a heartbeat for all registered daemons."""
217 try:
218 from security.node_watchdog import get_watchdog
219 wd = get_watchdog()
220 if wd:
221 for name in wd.registered_names():
222 wd.clear_llm_call(name)
223 wd.heartbeat(name)
224 except Exception:
225 pass
228def _get_distributed_coordinator():
229 """Get the shared DistributedTaskCoordinator if Redis is reachable.
231 Returns None when Redis is unavailable — caller falls back to local.
232 No separate mode flag needed: if Redis exists, distribute.
233 """
234 try:
235 from integrations.distributed_agent.api import _get_coordinator
236 return _get_coordinator()
237 except Exception as e:
238 logger.debug(f"Distributed coordinator unavailable: {e}")
239 return None
242def _has_hive_peers() -> bool:
243 """Check if this node has active peers in the hive.
245 Distribution only makes sense when there are other nodes to
246 pick up work. Single-node setups always dispatch locally.
247 """
248 try:
249 from integrations.social.models import db_session, PeerNode
250 with db_session(commit=False) as db:
251 count = db.query(PeerNode).filter(
252 PeerNode.status == 'active'
253 ).count()
254 return count > 1 # >1 because self is in the table too
255 except Exception:
256 return False
259def _decompose_goal(prompt: str, goal_id: str, goal_type: str,
260 user_id: str) -> List[Dict]:
261 """Decompose a goal into distributable sub-tasks.
263 Checks AgentGoal.context for explicit subtask definitions:
264 {"tasks": [...], "parallel": true/false}
266 When subtasks are present, uses SmartLedger to create a proper
267 dependency graph (parallel fan-out or sequential chain).
268 Falls back to single-task decomposition when no subtasks defined.
269 """
270 try:
271 from .parallel_dispatch import (
272 extract_subtasks_from_context, decompose_goal_to_ledger)
274 subtask_defs = extract_subtasks_from_context(goal_id)
275 tasks, _ledger = decompose_goal_to_ledger(
276 prompt, goal_id, goal_type, user_id, subtask_defs)
277 return tasks
278 except Exception:
279 pass
281 return [{
282 'task_id': f'{goal_id}_task_0',
283 'description': prompt[:500],
284 'capabilities': [goal_type],
285 }]
288def dispatch_goal_distributed(prompt: str, user_id: str, goal_id: str,
289 goal_type: str = 'marketing') -> Optional[str]:
290 """Submit a goal to the distributed task coordinator.
292 The goal is decomposed into sub-tasks, published to shared Redis,
293 and worker nodes across the hive will claim and execute them.
295 Returns:
296 goal_id string on success, None on failure
297 """
298 coordinator = _get_distributed_coordinator()
299 if not coordinator:
300 logger.warning(f"Distributed dispatch failed: coordinator unavailable, "
301 f"falling back to local for {goal_type} goal {goal_id}")
302 return None
304 tasks = _decompose_goal(prompt, goal_id, goal_type, user_id)
305 context = {
306 'goal_type': goal_type,
307 'user_id': user_id,
308 'prompt': prompt,
309 'source_node': os.environ.get('HEVOLVE_NODE_ID', 'unknown'),
310 'task_source': 'hive',
311 }
313 try:
314 distributed_goal_id = coordinator.submit_goal(
315 objective=prompt[:200],
316 decomposed_tasks=tasks,
317 context=context,
318 )
319 logger.info(f"Distributed dispatch: goal {goal_id} submitted as "
320 f"{distributed_goal_id} with {len(tasks)} tasks")
321 return distributed_goal_id
322 except Exception as e:
323 logger.warning(f"Distributed dispatch error for {goal_type} goal {goal_id}: {e}")
324 return None
327def _check_robot_capability_match(goal_type: str, goal_id: str) -> bool:
328 """For robot goals, verify this node can handle the task.
330 Checks task requirements against local robot capabilities.
331 Non-robot goals always pass. Robot goals without requirements pass.
333 Returns True if the node is capable, False if it should be
334 dispatched to a more capable peer via distributed dispatch.
335 """
336 if goal_type != 'robot':
337 return True
339 try:
340 from integrations.social.models import db_session, AgentGoal
341 with db_session(commit=False) as db:
342 goal = db.query(AgentGoal).filter_by(id=goal_id).first()
343 if not goal:
344 return True
345 config = goal.config_json or {}
346 required_caps = config.get('required_capabilities', [])
347 if not required_caps:
348 return True
350 from integrations.robotics.capability_advertiser import (
351 get_capability_advertiser,
352 )
353 adv = get_capability_advertiser()
354 score = adv.matches_task_requirements({
355 'required_capabilities': required_caps,
356 'preferred_form_factor': config.get('preferred_form_factor'),
357 'min_payload_kg': config.get('min_payload_kg'),
358 })
359 if score < 0.5:
360 logger.info(
361 f"Robot goal {goal_id} capability mismatch "
362 f"(score={score}), prefer distributed dispatch")
363 return False
364 return True
365 except Exception as e:
366 logger.debug(f"Robot capability check skipped: {e}")
367 return True
370def dispatch_goal(prompt: str, user_id: str, goal_id: str,
371 goal_type: str = 'marketing',
372 model_config: list = None) -> Optional[str]:
373 """Send a goal prompt through the existing /chat pipeline.
375 Uses autonomous=True so Phase 1 (gather_info) runs without
376 human interaction — the LLM generates the agent config itself.
378 GUARDRAILS enforced: GuardrailEnforcer.before_dispatch() + after_response().
380 When Redis is reachable and hive peers exist, goals are automatically
381 submitted to the shared DistributedTaskCoordinator. Worker nodes
382 across the hive claim and execute them. Falls back to local /chat
383 when the coordinator is unavailable or no peers exist.
385 For robot goals: capability matching ensures the task goes to a
386 node with the right hardware (locomotion, manipulation, sensors).
388 Args:
389 prompt: The goal prompt (from build_prompt)
390 user_id: The agent's user_id
391 goal_id: The goal identifier
392 goal_type: Goal type prefix for prompt_id
393 model_config: Optional per-dispatch config_list override
395 Returns:
396 Response text or None on failure
397 """
398 # BUDGET GATE: check goal budget + platform affordability before dispatch
399 try:
400 from integrations.agent_engine.budget_gate import pre_dispatch_budget_gate
401 bg_allowed, bg_reason = pre_dispatch_budget_gate(goal_id, prompt)
402 if not bg_allowed:
403 logger.warning(f"Dispatch blocked by budget gate for {goal_type} goal {goal_id}: {bg_reason}")
404 return None
405 except ImportError:
406 pass
408 # TOOL ALLOWLIST: resolve model tier and attach to dispatch context.
409 # Tier is sent to /chat as body['model_tier']; create_recipe uses it
410 # to call filter_tools_for_model() when building the agent tool list.
411 _dispatch_model_tier = None
412 if model_config:
413 try:
414 from integrations.agent_engine.model_registry import model_registry
415 first_model = model_config[0].get('model', '') if model_config else ''
416 if first_model:
417 info = model_registry.get(first_model)
418 if info:
419 _dispatch_model_tier = (info.get('tier') or info.get('model_tier'))
420 if _dispatch_model_tier:
421 logger.info(f"Dispatch model tier: {_dispatch_model_tier.value} "
422 f"for {goal_type} goal {goal_id}")
423 except Exception:
424 pass # Model registry unavailable — no tier restriction
426 # GUARDRAIL: full pre-dispatch gate (fail-closed: block if guardrails unavailable)
427 try:
428 from security.hive_guardrails import GuardrailEnforcer
429 allowed, reason, prompt = GuardrailEnforcer.before_dispatch(prompt)
430 if not allowed:
431 logger.warning(f"Dispatch blocked for {goal_type} goal {goal_id}: {reason}")
432 return None
433 except ImportError:
434 logger.error("CRITICAL: hive_guardrails not available — blocking dispatch")
435 return None
437 # AUDIT LOG: record goal dispatch
438 try:
439 from security.immutable_audit_log import get_audit_log
440 get_audit_log().log_event(
441 'goal_dispatched', actor_id=user_id,
442 action=f'dispatch {goal_type} goal {goal_id}',
443 target_id=goal_id)
444 except Exception:
445 pass # Audit is best-effort
447 # ROBOT: capability-matched dispatch — prefer distributed for hardware mismatches
448 _tried_distributed = False
449 if not _check_robot_capability_match(goal_type, goal_id):
450 coordinator = _get_distributed_coordinator()
451 if coordinator and _has_hive_peers():
452 _tried_distributed = True
453 result = dispatch_goal_distributed(prompt, user_id, goal_id, goal_type)
454 if result is not None:
455 return result
456 # Fall through to local if no capable peer found
458 # DISTRIBUTED: auto-distribute when coordinator is reachable and hive has peers
459 # Skip if robot dispatch already tried distributed (avoid double submission)
460 if not _tried_distributed:
461 coordinator = _get_distributed_coordinator()
462 if coordinator and _has_hive_peers():
463 result = dispatch_goal_distributed(prompt, user_id, goal_id, goal_type)
464 if result is not None:
465 return result
466 # Fall through to local dispatch if distributed fails
467 logger.info(f"Distributed fallback -> local dispatch for {goal_type} goal {goal_id}")
469 # Generate a NUMERIC prompt_id (same format as hart_intelligence_entry._next_prompt_id)
470 # so it passes the isdigit() check in the adapter and /chat handler.
471 # Use goal_id hash to ensure the SAME goal always gets the SAME prompt_id
472 # across dispatches — this is what enables recipe reuse on subsequent ticks.
473 import hashlib
474 _goal_hash = int(hashlib.md5(goal_id.encode()).hexdigest()[:10], 16) % 100_000_000_000
475 prompt_id = str(max(1, _goal_hash))
477 body = {
478 'user_id': user_id,
479 'prompt_id': prompt_id,
480 'prompt': prompt,
481 'create_agent': True,
482 'autonomous': True,
483 'casual_conv': False,
484 'task_source': 'own',
485 }
486 if model_config:
487 body['model_config'] = model_config
488 if _dispatch_model_tier:
489 body['model_tier'] = _dispatch_model_tier.value
491 # 3-tier dispatch (same as hartos_backend_adapter.py):
492 # Tier 1: Direct in-process import (no ports, no HTTP)
493 # Tier 2: HTTP proxy to backend port
494 # Tier 3: llama.cpp fallback (direct LLM, no agent pipeline)
495 resp = None
497 # Tier 1: Direct in-process import of hart_intelligence
498 # Guarded by semaphore to prevent concurrent request pile-up on
499 # local llama-server (causes exponential slowdown + watchdog restarts).
500 try:
501 try:
502 from routes.hartos_backend_adapter import chat as hevolve_chat
503 except ImportError:
504 from hartos_backend_adapter import chat as hevolve_chat
506 # USER PRIORITY: if user chatted recently, skip this tick — let user have the LLM
507 if is_user_recently_active():
508 logger.info(f"User active ({_USER_CHAT_COOLDOWN}s cooldown), deferring dispatch for goal {goal_id}")
509 return None
511 # Try to acquire semaphore (non-blocking check first)
512 if not _local_llm_semaphore.acquire(timeout=5):
513 logger.info(f"LLM busy ({_LOCAL_LLM_MAX_CONCURRENT} in flight), "
514 f"skipping dispatch for goal {goal_id}")
515 return None
517 # Signal to watchdog that this thread is in a legitimate LLM call
518 _notify_watchdog_llm_start()
519 try:
520 # Use a daemon-specific request_id so thinking traces from daemon
521 # dispatch are isolated from user chat traces. Without this, daemon
522 # traces leak into user responses via drain_thinking_traces().
523 _daemon_request_id = f'daemon_{goal_id}'
524 result = hevolve_chat(
525 text=prompt, user_id=user_id,
526 agent_id=prompt_id, create_agent=True, casual_conv=False,
527 autonomous=True, request_id=_daemon_request_id,
528 )
529 finally:
530 _local_llm_semaphore.release()
531 try:
532 _notify_watchdog_llm_end()
533 except Exception:
534 pass
536 response = result.get('text') or result.get('response', '')
537 if response:
538 return response
539 except ImportError:
540 pass # Nunba adapter not available — fall through to Tier 2
541 except Exception as e:
542 logger.warning(f"Tier-1 dispatch failed for {goal_type} goal {goal_id}: {e}")
544 # Tier 2: HTTP proxy to HARTOS backend port
545 # Circuit breaker: skip HTTP if server recently unresponsive
546 if _cb_is_open():
547 logger.info(f"Circuit breaker open — skipping Tier-2 HTTP for goal {goal_id}")
548 return None
550 base_url = os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}')
552 try:
553 resp = pooled_post(f'{base_url}/chat', json=body, timeout=120)
554 if resp.status_code == 200:
555 _cb_record_success()
556 result = resp.get_json() if hasattr(resp, 'get_json') else resp.json()
557 response = result.get('response', '')
559 # GUARDRAIL: post-response check (fail-closed)
560 try:
561 from security.hive_guardrails import GuardrailEnforcer
562 passed, reason = GuardrailEnforcer.after_response(response)
563 if not passed:
564 logger.warning(f"Response filtered for goal {goal_id}: {reason}")
565 return None
566 except ImportError:
567 logger.error("CRITICAL: hive_guardrails not available — blocking response")
568 return None
570 # GUARDRAIL: coding goals — no merge without constitutional review
571 if goal_type == 'coding':
572 try:
573 from security.hive_guardrails import ConstitutionalFilter
574 review_dict = {
575 'title': f'Code commit review: {goal_id}',
576 'description': response[:2000],
577 'goal_type': 'coding',
578 }
579 passed, reason = ConstitutionalFilter.check_goal(review_dict)
580 if not passed:
581 logger.warning(
582 f"Coding goal {goal_id} output blocked by "
583 f"constitutional review: {reason}")
584 return None
585 except ImportError:
586 logger.error("CRITICAL: ConstitutionalFilter not available — blocking coding goal")
587 return None
589 # Record to world model (training data for hive intelligence)
590 try:
591 from .world_model_bridge import get_world_model_bridge
592 bridge = get_world_model_bridge()
593 bridge.record_interaction(
594 user_id=user_id,
595 prompt_id=prompt_id,
596 prompt=prompt,
597 response=response,
598 goal_id=goal_id,
599 )
600 except Exception:
601 pass
603 return response
604 else:
605 # Non-200 response — log and queue transient errors for retry
606 logger.warning(
607 f"Goal dispatch got HTTP {resp.status_code} for {goal_type} "
608 f"goal {goal_id}: {resp.text[:200]}")
609 if resp.status_code in (429, 500, 502, 503):
610 _cb_record_failure() # Server errors count toward circuit breaker
611 try:
612 from .instruction_queue import enqueue_instruction
613 enqueue_instruction(
614 user_id=user_id, text=prompt[:2000], priority=3,
615 tags=[goal_type],
616 context={'goal_id': goal_id, 'goal_type': goal_type,
617 'queued_reason': f'http_{resp.status_code}'},
618 related_goal_id=goal_id,
619 )
620 except Exception:
621 pass
622 except requests.RequestException as e:
623 _cb_record_failure()
624 logger.warning(f"Goal dispatch failed for {goal_type} goal {goal_id}: {e}")
626 # Queue the instruction for later execution when compute becomes available
627 try:
628 from .instruction_queue import enqueue_instruction
629 enqueue_instruction(
630 user_id=user_id,
631 text=prompt[:2000],
632 priority=3,
633 tags=[goal_type],
634 context={
635 'goal_id': goal_id,
636 'goal_type': goal_type,
637 'queued_reason': f'dispatch_failed: {e}',
638 },
639 related_goal_id=goal_id,
640 )
641 logger.info(f"Instruction queued for later: {goal_type} goal {goal_id}")
642 except Exception as eq:
643 logger.debug(f"Instruction queue unavailable: {eq}")
645 return None
648def _dispatch_single_instruction(base_url: str, user_id: str, inst,
649 batch_id: str) -> tuple:
650 """Dispatch one instruction via /chat. Returns (instruction_id, response_text, error)."""
651 body = {
652 'user_id': user_id,
653 'prompt_id': f'iq_{batch_id}_{inst.id[:8]}',
654 'prompt': inst.text,
655 'create_agent': True,
656 'autonomous': True,
657 'casual_conv': False,
658 'task_source': 'own',
659 }
660 try:
661 resp = pooled_post(f'{base_url}/chat', json=body, timeout=300)
662 if resp.status_code == 200:
663 result_text = resp.json().get('response', '')
664 return (inst.id, result_text[:500], None)
665 return (inst.id, None, f'HTTP {resp.status_code}')
666 except requests.RequestException as e:
667 return (inst.id, None, str(e))
670def drain_instruction_queue(user_id: str, max_tokens: int = 8000) -> Optional[str]:
671 """Pull and execute queued instructions with dependency-aware dispatch.
673 Uses SmartLedger's dependency graph to determine execution order:
674 - Independent instructions dispatch in parallel (concurrent threads)
675 - Dependent instructions wait for prerequisites to complete first
677 Execution proceeds in waves:
678 Wave 0: all instructions with no dependencies → parallel dispatch
679 Wave 1: instructions depending on wave 0 → parallel dispatch
680 ...until all waves complete.
682 Falls back to single-batch dispatch when SmartLedger is unavailable.
684 Called by agent_daemon.py on idle tick, or manually via API.
686 Args:
687 user_id: User whose queue to drain
688 max_tokens: Max tokens across all instructions
690 Returns:
691 Combined response text, or None if queue empty or all failed
692 """
693 try:
694 from .instruction_queue import get_queue
695 q = get_queue(user_id)
697 # Acquire drain lock — prevents concurrent drains for same user
698 # (daemon tick + API call + another agent all trying simultaneously)
699 if not q.acquire_drain_lock():
700 logger.info(f"Drain skipped for {user_id}: another drain in progress")
701 return None
703 try:
704 # Try dependency-aware execution plan
705 plan = q.pull_execution_plan(max_tokens=max_tokens)
706 if plan is None:
707 return None
709 base_url = os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}')
710 all_results = []
711 any_success = False
713 logger.info(
714 f"Draining instruction queue for {user_id}: "
715 f"{plan.total_instructions} instructions in "
716 f"{len(plan.waves)} waves"
717 )
719 for wave_idx, wave in enumerate(plan.waves):
720 logger.info(
721 f"Wave {wave_idx + 1}/{len(plan.waves)}: "
722 f"{len(wave)} instruction(s)"
723 )
725 if len(wave) == 1:
726 # Single instruction — dispatch directly (no thread pool overhead)
727 inst = wave[0]
728 iid, result, error = _dispatch_single_instruction(
729 base_url, user_id, inst, plan.batch_id,
730 )
731 if error:
732 q.fail_instruction(iid, error)
733 logger.warning(f"Instruction [{iid}] failed: {error}")
734 else:
735 q.complete_instruction(iid, result)
736 all_results.append(result)
737 any_success = True
738 else:
739 # Multiple independent instructions — dispatch in parallel.
740 #
741 # Thread safety:
742 # - _dispatch_single_instruction() is a pure HTTP call (no shared state)
743 # - Results collected via as_completed() on the CALLING thread
744 # - q.complete/fail_instruction() acquires q._lock (serialized)
745 # - SmartLedger mutations happen inside q._lock (no separate lock needed)
746 # - File I/O uses atomic write (temp + rename)
747 import concurrent.futures
748 with concurrent.futures.ThreadPoolExecutor(
749 max_workers=min(len(wave), 4),
750 ) as executor:
751 futures = {
752 executor.submit(
753 _dispatch_single_instruction,
754 base_url, user_id, inst, plan.batch_id,
755 ): inst
756 for inst in wave
757 }
758 for future in concurrent.futures.as_completed(futures):
759 iid, result, error = future.result()
760 if error:
761 q.fail_instruction(iid, error)
762 logger.warning(f"Instruction [{iid}] failed: {error}")
763 else:
764 q.complete_instruction(iid, result)
765 all_results.append(result)
766 any_success = True
768 if any_success:
769 combined = '\n---\n'.join(all_results)
770 logger.info(
771 f"Plan {plan.batch_id} completed: "
772 f"{len(all_results)}/{plan.total_instructions} succeeded"
773 )
774 return combined
775 return None
776 finally:
777 q.release_drain_lock()
779 except Exception as e:
780 logger.error(f"Queue drain error: {e}")
781 return None