Coverage for integrations / agent_engine / agent_daemon.py: 20.2%
531 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Unified Agent Goal Engine - Background Daemon
4Finds active goals, detects idle agents, and dispatches work through /chat.
5Skips CODING_GOAL_TYPES — those are handled by coding_daemon which adds
6idle-agent detection and benchmark sync for the coding backends.
8Uses prompt builder registry to build the right prompt per goal type.
9Dispatches with autonomous=True so agents self-configure.
10"""
11import os
12import time
13import random
14import logging
15import threading
16from datetime import datetime
18logger = logging.getLogger('hevolve_social')
20# Lock protecting module-level mutable state accessed from daemon thread + API threads
21_module_lock = threading.Lock()
23# Track which tasks we've already sent HITL notifications for (avoid spam)
24_hitl_notified: set = set()
26# Goals that are budget-blocked are immediately paused (no retries).
27# Previously we retried 3 times, but that just re-dispatches a blocked goal
28# every 30 seconds with no chance of success (budget doesn't change between ticks).
29_budget_blocked_goals: set = set()
31# Track dispatch failures per goal for exponential backoff.
32# Maps goal_id → {'failures': int, 'skip_until': float}
33_dispatch_backoff: dict = {}
36def _get_blocked_hitl_tasks(ledger, goal_id):
37 """Get tasks blocked with APPROVAL_REQUIRED under a goal."""
38 try:
39 parent = ledger.get_task(goal_id)
40 if not parent:
41 return []
42 blocked = []
43 for child_id in (parent.child_task_ids or []):
44 task = ledger.get_task(child_id)
45 if task and str(task.blocked_reason) == 'APPROVAL_REQUIRED':
46 blocked.append(task)
47 return blocked
48 except Exception:
49 return []
52def _send_hitl_notification(db, goal, task):
53 """Send a one-time HITL notification for an approval-blocked task."""
54 notif_key = f"{goal.id}:{task.id}"
55 with _module_lock:
56 if notif_key in _hitl_notified:
57 return
58 _hitl_notified.add(notif_key)
60 try:
61 from integrations.social.services import NotificationService
62 from integrations.social.realtime import on_notification
63 owner_id = goal.created_by or goal.owner_id
64 if not owner_id:
65 return
66 desc_preview = (task.description or '')[:100]
67 notif = NotificationService.create(
68 db, str(owner_id), 'approval_required',
69 target_type='thought_experiment',
70 target_id=str(task.id),
71 message=f'Agent needs your review: {desc_preview}',
72 )
73 on_notification(str(owner_id), notif.to_dict() if hasattr(notif, 'to_dict') else {
74 'type': 'approval_required', 'message': f'Agent needs your review: {desc_preview}',
75 })
76 logger.info(f"HITL notification sent for goal={goal.id} task={task.id}")
77 except Exception as e:
78 logger.debug(f"HITL notification failed: {e}")
81class AgentDaemon:
82 """Background daemon: active goals (any type) + idle agents → /chat dispatch."""
84 def __init__(self):
85 self._interval = int(os.environ.get('HEVOLVE_AGENT_POLL_INTERVAL', '30'))
86 self._running = False
87 self._thread = None
88 self._lock = threading.Lock()
89 self._tick_count = 0
90 self._remediate_every = int(os.environ.get(
91 'HEVOLVE_REMEDIATE_INTERVAL_TICKS', '10'))
92 # Cache SmartLedger instances by (agent_id, session_id) to avoid
93 # re-creating them every tick (which spams "starting fresh" logs).
94 self._ledger_cache: dict = {}
95 # Exponential backoff on consecutive tick failures
96 self._consecutive_failures = 0
97 self._BACKOFF_MAX = 300 # 5 minute cap
98 # Proactive hive tick state
99 self._next_hive_explore_time = time.time() + random.randint(300, 1800)
100 self._base_interval = self._interval # remember original for optimizer
102 def start(self):
103 with self._lock:
104 if self._running:
105 return
106 self._running = True
107 self._consecutive_failures = 0
108 self._thread = threading.Thread(target=self._loop, daemon=True,
109 name='agent_daemon')
110 self._thread.start()
111 logger.info(f"Agent daemon started (interval={self._interval}s)")
113 def stop(self):
114 with self._lock:
115 self._running = False
116 if self._thread:
117 self._thread.join(timeout=10)
119 def _try_parallel_dispatch(self, goal, idle_agents, dispatched, max_concurrent):
120 """Check if a goal has parallel subtasks and dispatch them concurrently.
122 Returns number of tasks dispatched (0 if no parallel tasks found).
123 """
124 try:
125 ledger = self._get_goal_ledger(goal)
126 if not ledger:
127 return 0
129 parallel_tasks = ledger.get_parallel_executable_tasks()
130 if not parallel_tasks:
131 return 0
133 from .parallel_dispatch import dispatch_parallel_tasks
134 from .dispatch import dispatch_goal
136 batch_count = min(
137 len(parallel_tasks),
138 len(idle_agents) - dispatched,
139 max_concurrent - dispatched,
140 )
141 if batch_count <= 0:
142 return 0
144 def _dispatch_task(task):
145 """Dispatch a single parallel subtask via /chat."""
146 goal_id = str(goal.id) if hasattr(goal, 'id') else ''
147 goal_type = goal.goal_type if hasattr(goal, 'goal_type') else 'marketing'
148 user_id = str(goal.user_id) if hasattr(goal, 'user_id') else 'system'
149 result = dispatch_goal(
150 task.description, user_id, goal_id, goal_type)
151 return {'success': result is not None, 'response': result}
153 result = dispatch_parallel_tasks(
154 ledger, _dispatch_task, max_concurrent=batch_count)
156 count = result['completed'] + result['failed']
157 if count > 0:
158 logger.info(
159 f"Parallel dispatch for goal {goal.id}: "
160 f"{result['completed']} completed, {result['failed']} failed")
161 return count
163 except Exception as e:
164 logger.debug(f"Parallel dispatch check failed: {e}")
165 return 0
167 def _get_goal_ledger(self, goal):
168 """Get a SmartLedger for a goal's task graph (if one exists).
170 Returns None when no ledger exists or it has only 1 task (no parallelism).
171 Ledger instances are cached on self._ledger_cache to avoid re-creating
172 (and re-logging "starting fresh") on every daemon tick.
173 """
174 try:
175 from agent_ledger import SmartLedger
176 goal_id = str(goal.id) if hasattr(goal, 'id') else ''
177 user_id = str(goal.user_id) if hasattr(goal, 'user_id') else 'system'
179 cache_key = (user_id, goal_id)
180 ledger = self._ledger_cache.get(cache_key)
181 if ledger is None:
182 ledger = SmartLedger(agent_id=user_id, session_id=str(goal_id))
183 self._ledger_cache[cache_key] = ledger
185 # Use the ledger's resolved dir (handles bundled/read-only fallback)
186 ledger_path = str(ledger.ledger_file)
187 if os.path.isfile(ledger_path):
188 ledger.load(ledger_path)
189 if len(ledger.tasks) > 1:
190 return ledger
191 except Exception:
192 pass
193 return None
195 def _wd_heartbeat(self):
196 """Send heartbeat to watchdog between potentially blocking operations."""
197 try:
198 from security.node_watchdog import get_watchdog
199 wd = get_watchdog()
200 if wd:
201 wd.heartbeat('agent_daemon')
202 except Exception:
203 pass
205 def _wd_sleep(self, seconds: float) -> None:
206 """Sleep for ``seconds`` while keeping the watchdog heartbeat fresh.
208 Delegates to the canonical ``NodeWatchdog.sleep_with_heartbeat``
209 helper so a 480s exponential-backoff sleep can't silently age
210 the heartbeat past the 300s frozen threshold. See the helper's
211 docstring for the full incident context. When the watchdog
212 isn't available (test mode, early boot) this falls back to a
213 plain ``time.sleep`` so behavior is unchanged.
214 """
215 try:
216 from security.node_watchdog import get_watchdog
217 wd = get_watchdog()
218 if wd is not None:
219 wd.sleep_with_heartbeat(
220 'agent_daemon', seconds,
221 stop_check=lambda: not self._running,
222 )
223 return
224 except Exception:
225 pass
226 time.sleep(seconds)
228 def _proactive_hive_tick(self):
229 """Proactive hive daemon tick — exploration, self-promotion, and compute optimization.
231 Runs alongside the main tick on a random 5-30 minute interval.
232 All imports are lazy (try/except) so missing modules never crash the daemon.
234 Feeds dense attribution chain to WorldModelBridge via agent_attribution.
235 """
236 # Single canonical daemon yield gate (user activity + system
237 # pressure). Without this the proactive tick saturates GIL with
238 # speculative dispatches (DB churn + LLM HTTP) while the user is
239 # actively chatting — py-spy showed 8s+ sampling lag.
240 from .dispatch import should_yield_to_user
241 if should_yield_to_user():
242 return
244 now = time.time()
246 # Attribution: track this tick as a long-horizon action
247 action_id = None
248 try:
249 from integrations.agent_engine.agent_attribution import (
250 begin_action, record_step, complete_action,
251 )
252 # Periodically clean expired actions (once per hour)
253 if int(now) % 3600 < 30:
254 from integrations.agent_engine.agent_attribution import get_attribution
255 get_attribution().cleanup_expired()
256 action_id = begin_action(
257 agent_id='agent_daemon',
258 action_type='proactive_hive_tick',
259 expected_outcome={'status': 'ok'},
260 acceptance_criteria=[
261 'benchmark_prover_reachable',
262 'optimizer_health_score > 0',
263 ],
264 )
265 except Exception:
266 pass # Attribution is best-effort — never break the tick
268 # ── 1. Random-interval hive exploration ──
269 if now >= self._next_hive_explore_time:
270 # Schedule next exploration (5-30 minutes from now)
271 self._next_hive_explore_time = now + random.randint(300, 1800)
272 if action_id:
273 try:
274 record_step(action_id, 'hive_exploration_scheduled',
275 state={'next_in_s': self._next_hive_explore_time - now},
276 decision='explore_now', confidence=0.9)
277 except Exception:
278 pass
280 # Check benchmark prover for active benchmark needs
281 benchmark_needs = None
282 try:
283 from integrations.agent_engine.hive_benchmark_prover import get_prover
284 prover = get_prover()
285 benchmark_needs = prover.get_status()
286 logger.info(
287 f"Proactive hive: benchmark prover checked "
288 f"(loop_running={benchmark_needs.get('loop_running', False)})")
289 except ImportError:
290 logger.debug("Proactive hive: hive_benchmark_prover not available")
291 except Exception as e:
292 logger.debug(f"Proactive hive: benchmark prover check failed: {e}")
294 # Check hive task protocol for unassigned tasks
295 pending_tasks = []
296 try:
297 from integrations.coding_agent.hive_task_protocol import get_dispatcher
298 dispatcher = get_dispatcher()
299 pending_tasks = dispatcher.get_pending_tasks()
300 if pending_tasks:
301 logger.info(
302 f"Proactive hive: found {len(pending_tasks)} pending "
303 f"hive tasks for dispatch")
304 except ImportError:
305 logger.debug("Proactive hive: hive_task_protocol not available")
306 except Exception as e:
307 logger.debug(f"Proactive hive: task protocol check failed: {e}")
309 # If idle and tasks exist, auto-dispatch to local Claude hive session
310 if pending_tasks:
311 try:
312 from integrations.coding_agent.claude_hive_session import get_blueprint
313 for task in pending_tasks[:3]: # Max 3 tasks per exploration
314 task_desc = getattr(task, 'description', '') or str(task)
315 logger.info(
316 f"Proactive hive: auto-dispatching task to local "
317 f"hive session: {task_desc[:100]}")
318 except ImportError:
319 logger.debug("Proactive hive: claude_hive_session not available")
320 except Exception as e:
321 logger.debug(f"Proactive hive: hive session dispatch failed: {e}")
323 # ── 2. Self-promotion on benchmark results ──
324 try:
325 from integrations.agent_engine.hive_benchmark_prover import get_prover
326 prover = get_prover()
327 history = prover.get_benchmark_history(limit=1)
328 if history and isinstance(history, list):
329 latest = history[0]
330 score = latest.get('score', 0)
331 benchmark = latest.get('benchmark', 'unknown')
332 node_count = latest.get('node_count', 1)
333 message = (
334 f"Hive benchmark result: {benchmark} — "
335 f"score={score:.2f} across {node_count} nodes"
336 )
338 # Post to all connected channels via signal bridge
339 try:
340 from integrations.channels.hive_signal_bridge import get_signal_bridge
341 bridge = get_signal_bridge()
342 bridge.broadcast_signal({
343 'type': 'benchmark_result',
344 'benchmark': benchmark,
345 'score': score,
346 'node_count': node_count,
347 'message': message,
348 })
349 logger.info(f"Proactive hive: broadcast benchmark result to channels")
350 except ImportError:
351 logger.debug("Proactive hive: hive_signal_bridge not available")
352 except Exception as e:
353 logger.debug(f"Proactive hive: signal broadcast failed: {e}")
355 # Emit EventBus event for LiquidUI dashboard
356 try:
357 from core.platform.events import emit_event
358 emit_event('hive.benchmark.completed', {
359 'benchmark': benchmark,
360 'score': score,
361 'node_count': node_count,
362 'message': message,
363 })
364 except ImportError:
365 pass
366 except Exception as e:
367 logger.debug(f"Proactive hive: EventBus emit failed: {e}")
368 except ImportError:
369 pass
370 except Exception as e:
371 logger.debug(f"Proactive hive: self-promotion check failed: {e}")
373 # ── 3. Compute optimizer integration ──
374 try:
375 from core.compute_optimizer import get_optimizer
376 optimizer = get_optimizer()
377 health = optimizer.get_health_score()
379 # Derive load level from health score: <0.3 = high load, >0.7 = idle
380 if health < 0.3:
381 load_level = 'high'
382 elif health > 0.7:
383 load_level = 'idle'
384 else:
385 load_level = 'normal'
387 if load_level == 'high':
388 # System under load: lengthen tick interval
389 new_interval = min(self._base_interval * 3, self._BACKOFF_MAX)
390 if self._interval != new_interval:
391 self._interval = new_interval
392 logger.info(
393 f"Proactive hive: system under load, "
394 f"lengthened tick interval to {new_interval}s")
395 elif load_level == 'idle':
396 # System idle: shorten tick interval, accept more tasks
397 new_interval = max(self._base_interval // 2, 10)
398 if self._interval != new_interval:
399 self._interval = new_interval
400 logger.info(
401 f"Proactive hive: system idle, "
402 f"shortened tick interval to {new_interval}s")
403 else:
404 # Normal: restore base interval
405 if self._interval != self._base_interval:
406 self._interval = self._base_interval
407 logger.debug(
408 f"Proactive hive: load normal, "
409 f"restored tick interval to {self._base_interval}s")
410 except ImportError:
411 pass # compute_optimizer not available yet
412 except Exception as e:
413 logger.debug(f"Proactive hive: compute optimizer check failed: {e}")
415 # ── Attribution: complete action with outcome summary ──
416 if action_id:
417 try:
418 complete_action(action_id, outcome={
419 'status': 'ok',
420 'interval_seconds': self._interval,
421 'completed_at': time.time(),
422 })
423 except Exception:
424 pass
426 def _loop(self):
427 # Boot grace period: let user chat have exclusive LLM access for 60s.
428 # Without this, daemon goals consume all llama-server slots immediately
429 # and user /chat requests timeout (the 2026-04-12 slot starvation bug).
430 _boot_grace = int(os.environ.get('HEVOLVE_DAEMON_BOOT_DELAY', '300'))
431 if _boot_grace > 0:
432 logger.info(f"Agent daemon: boot grace period {_boot_grace}s — "
433 f"user chat gets priority. Set HEVOLVE_DAEMON_BOOT_DELAY=0 to disable.")
434 self._wd_sleep(_boot_grace)
436 while self._running:
437 # Exponential backoff: sleep longer on consecutive failures.
438 # Uses sleep_with_heartbeat so the backoff can't age the
439 # watchdog heartbeat past the frozen threshold — a 480s
440 # backoff used to trigger a restart cascade because the
441 # 300s threshold hit mid-sleep (2026-04-11 incident).
442 if self._consecutive_failures > 0:
443 backoff = min(
444 self._interval * (2 ** self._consecutive_failures),
445 self._BACKOFF_MAX)
446 self._wd_sleep(backoff)
447 else:
448 self._wd_sleep(self._interval)
449 if not self._running:
450 break
451 self._wd_heartbeat()
453 # Proactive hive tick — exploration, self-promotion, compute
454 # optimization. RUN ASYNC: a stuck call inside the proactive
455 # path (WorldModelBridge.record_interaction was the culprit
456 # 2026-04-29 — daemon restarted 9 times by watchdog without
457 # ever reaching _tick() because complete_action blocked) must
458 # NOT block the goal-dispatch tick. Fire-and-forget in a
459 # daemon thread; watchdog heartbeat stays fresh because
460 # _loop continues immediately to _tick.
461 self._spawn_proactive_hive_tick_async()
463 try:
464 self._tick()
465 self._consecutive_failures = 0
466 except Exception as e:
467 self._consecutive_failures += 1
468 import traceback
469 logger.error(f"Agent daemon tick error (backoff={self._consecutive_failures}): "
470 f"{e}\n{traceback.format_exc()}")
472 def _spawn_proactive_hive_tick_async(self) -> None:
473 """Run _proactive_hive_tick in a daemon thread, never blocking _loop.
475 If the prior iteration's thread is still alive (sub-call stuck on
476 sync I/O), we SKIP this iteration rather than pile up threads.
477 That makes the proactive cadence "best-effort" while keeping the
478 main goal-dispatch tick on its 30s rhythm regardless.
480 Watchdog heartbeat is owned by _loop, not the proactive thread —
481 so a stuck sub-call no longer ages the daemon's heartbeat past
482 the 300s frozen threshold.
483 """
484 prior = getattr(self, '_proactive_thread', None)
485 if prior is not None and prior.is_alive():
486 logger.warning(
487 "proactive_hive_tick from prior iteration still running — "
488 "skipping this cycle (likely a sub-call is blocked on I/O)")
489 return
491 def _runner():
492 try:
493 self._proactive_hive_tick()
494 except Exception as e:
495 logger.debug(f"Proactive hive tick error: {e}")
497 t = threading.Thread(
498 target=_runner, daemon=True, name='proactive_hive_tick')
499 t.start()
500 self._proactive_thread = t
502 def _tick(self):
503 """Find active goals, find idle agents, dispatch via /chat.
505 GUARDRAILS enforced at every layer:
506 - HiveCircuitBreaker.is_halted() → full stop
507 - GuardrailEnforcer.before_dispatch() → per-prompt gate
508 - Speculative dispatch when HEVOLVE_SPECULATIVE_ENABLED=true
509 """
510 self._tick_count += 1
512 # GUARDRAIL: circuit breaker — deterministic stop
513 try:
514 from security.hive_guardrails import HiveCircuitBreaker
515 if HiveCircuitBreaker.is_halted():
516 return
517 except ImportError:
518 pass
520 from integrations.social.models import get_db, AgentGoal, Product
521 from integrations.coding_agent.idle_detection import IdleDetectionService
522 from .goal_manager import GoalManager, CODING_GOAL_TYPES
523 from .dispatch import dispatch_goal, should_yield_to_user
525 # Single canonical yield gate — user activity + system pressure.
526 # See dispatch.should_yield_to_user() docstring for the contract.
527 if should_yield_to_user():
528 logger.debug("Agent daemon: yielding (user active or system pressure)")
529 return
530 # _throttle is consumed lower down for soft scaling decisions —
531 # re-read after the hard yield gate so we still have a value.
532 try:
533 from integrations.service_tools.model_lifecycle import (
534 get_model_lifecycle_manager)
535 _throttle = get_model_lifecycle_manager().get_system_pressure().get(
536 'throttle_factor', 1.0)
537 except Exception:
538 _throttle = 1.0
540 speculative_enabled = os.environ.get(
541 'HEVOLVE_SPECULATIVE_ENABLED', 'false').lower() == 'true'
543 db = get_db()
544 try:
545 # DETERMINISTIC STOP: no goals = no action = system is inert
546 # Skip CODING_GOAL_TYPES — coding_daemon handles those with
547 # idle-agent detection + benchmark sync for backend routing.
548 goals = db.query(AgentGoal).filter(
549 AgentGoal.status == 'active',
550 ~AgentGoal.goal_type.in_(CODING_GOAL_TYPES),
551 ).all()
552 if not goals:
553 return
555 # Use ``get_idle_agent_personas`` — agent-type users (Echo,
556 # Quest, Contest Curator, …) eligible for goal dispatch.
557 # Do NOT use ``get_idle_opted_in_agents`` here: that filter
558 # is the human-consent privacy gate for distributed compute
559 # sharing, not the agent-persona gate. Mismatch silently
560 # returned [] on installs where no human had opted in →
561 # daemon stalled with seeded personas never dispatching
562 # (root-cause logged 2026-05-01).
563 idle_agents = IdleDetectionService.get_idle_agent_personas(db)
564 if not idle_agents:
565 return
567 self._wd_heartbeat()
569 # Assign excess idle agents as exception watchers
570 try:
571 from .exception_watcher import ExceptionWatcher
572 watcher = ExceptionWatcher.get_instance()
573 if len(idle_agents) > len(goals):
574 excess = idle_agents[len(goals):]
575 for agent in excess:
576 watcher.assign_watcher(str(agent['user_id']), agent['username'])
577 if watcher.has_watchers():
578 watcher.process_exceptions(db)
579 except Exception as e:
580 logger.debug(f"Exception watcher: {e}")
582 dispatched = 0
583 used_agents = set()
584 max_concurrent = int(os.environ.get('HEVOLVE_AGENT_MAX_CONCURRENT', '10'))
585 # Reduce concurrency proportional to system pressure
586 max_concurrent = max(1, int(max_concurrent * _throttle))
588 # Minimum interval between dispatches for continuous goals.
589 # Without this, a continuous goal (e.g. autoresearch coordinator)
590 # gets re-dispatched every 30s tick even if the previous dispatch
591 # is still running — causing repeated identical actions.
592 _CONTINUOUS_COOLDOWN_S = 300 # 5 minutes
594 # Split goals into two queues:
595 # - CREATE queue: goals without recipes (need LLM planning, 1 at a time)
596 # - REUSE pool: goals with recipes (cheap replay, round-robin)
597 import hashlib as _hlib
598 _create_queue = []
599 _reuse_pool = []
600 for goal in goals:
601 _gh = int(_hlib.md5(str(goal.id).encode()).hexdigest()[:10], 16) % 100_000_000_000
602 _pid = str(max(1, _gh))
603 _recipe_path = os.path.join('prompts', f'{_pid}_0_recipe.json')
604 if os.path.exists(_recipe_path):
605 _reuse_pool.append(goal)
606 else:
607 _create_queue.append(goal)
609 # REUSE goals round-robin (cheap, can cycle through many per tick)
610 # CREATE goals sequential (1 at a time, rotated so each gets a turn)
611 if _create_queue:
612 _cr_offset = self._tick_count % len(_create_queue)
613 _create_queue = _create_queue[_cr_offset:] + _create_queue[:_cr_offset]
614 if _reuse_pool:
615 _re_offset = self._tick_count % len(_reuse_pool)
616 _reuse_pool = _reuse_pool[_re_offset:] + _reuse_pool[:_re_offset]
618 # Prioritize: 1 CREATE first (if any), then fill remaining slots with REUSE
619 goals = _create_queue[:1] + _reuse_pool + _create_queue[1:]
621 logger.debug(f"Goal split: {len(_create_queue)} need CREATE, "
622 f"{len(_reuse_pool)} have recipes (REUSE)")
624 for goal in goals:
625 if dispatched >= len(idle_agents) or dispatched >= max_concurrent:
626 break
628 # Skip continuous goals dispatched recently — let the previous
629 # dispatch complete before re-dispatching.
630 cfg = goal.config_json or {}
631 if cfg.get('continuous', False) and goal.last_dispatched_at:
632 elapsed = (datetime.utcnow() - goal.last_dispatched_at).total_seconds()
633 if elapsed < _CONTINUOUS_COOLDOWN_S:
634 logger.debug(
635 f"Continuous goal {goal.id} dispatched {elapsed:.0f}s ago "
636 f"(cooldown={_CONTINUOUS_COOLDOWN_S}s), skipping")
637 continue
639 # ── PARALLEL DISPATCH: check for goals with parallel subtasks ──
640 parallel_dispatched = self._try_parallel_dispatch(
641 goal, idle_agents, dispatched, max_concurrent)
642 if parallel_dispatched > 0:
643 dispatched += parallel_dispatched
644 continue
646 # Find next unused agent (don't skip goal if current agent is taken)
647 agent = None
648 while dispatched < len(idle_agents) and dispatched < max_concurrent:
649 candidate = idle_agents[dispatched]
650 if candidate['user_id'] not in used_agents:
651 agent = candidate
652 break
653 dispatched += 1
654 if agent is None:
655 break # No more available agents
656 used_agents.add(agent['user_id'])
658 # Load product if marketing goal
659 product_dict = None
660 if goal.product_id:
661 product = db.query(Product).filter_by(id=goal.product_id).first()
662 if product:
663 product_dict = product.to_dict()
665 # Build prompt using registered builder (guardrail: togetherness rewrite)
666 prompt = GoalManager.build_prompt(goal.to_dict(), product_dict)
667 if prompt is None:
668 # build_prompt returns None when EITHER:
669 # (a) the registered builder declined the goal (e.g.,
670 # robot goal on a non-robot host) — the builder
671 # logs its own reason at INFO; OR
672 # (b) hive_guardrails import failed in 'hard' mode —
673 # build_prompt logs CRITICAL itself.
674 # In both cases the meaningful signal is upstream; this
675 # is just dispatch-loop bookkeeping. DEBUG avoids the
676 # 28-per-8h chronic WARNING noise observed in production
677 # (seeded robot goals on hardware-less hosts) and the
678 # historical misattribution to "guardrails unavailable".
679 logger.debug(f"Goal {goal.id}: build_prompt returned None, skipping")
680 continue
682 # BUDGET PRE-CHECK: read-only check before dispatch.
683 # The actual atomic budget reservation happens inside dispatch_goal()
684 # via pre_dispatch_budget_gate(). This is just a quick reject to
685 # avoid wasting time on goals that are clearly over budget.
686 try:
687 from .budget_gate import estimate_llm_cost_spark, _resolve_model_name
688 goal_key = str(goal.id)
690 # Skip goals already known to be budget-blocked (avoids
691 # re-checking every 30s when nothing has changed).
692 with _module_lock:
693 if goal_key in _budget_blocked_goals:
694 continue
696 # Read-only check: compare remaining budget vs estimated cost
697 # without reserving (no spark_spent increment)
698 budget = goal.spark_budget or 0
699 spent = goal.spark_spent or 0
700 estimated = estimate_llm_cost_spark(prompt, _resolve_model_name('gpt-4o'))
701 bg_allowed = (budget - spent) >= estimated
702 bg_reason = f'insufficient_budget ({budget - spent} < {estimated})' if not bg_allowed else ''
703 if not bg_allowed:
704 # Immediately pause — budget won't change between daemon
705 # ticks, so retrying is wasteful.
706 goal.status = 'paused'
707 cfg = goal.config_json or {}
708 cfg['pause_reason'] = (
709 f'Auto-paused: budget gate blocked. '
710 f'Reason: {bg_reason}')
711 cfg['paused_at'] = datetime.utcnow().isoformat()
712 goal.config_json = cfg
713 with _module_lock:
714 _budget_blocked_goals.add(goal_key)
715 logger.info(
716 f"Goal {goal.id} AUTO-PAUSED by budget gate: "
717 f"{bg_reason}")
718 continue
719 else:
720 # Budget passed — clear from blocked set if it was
721 # previously blocked (e.g. goal was resumed with
722 # more budget).
723 with _module_lock:
724 _budget_blocked_goals.discard(goal_key)
725 except ImportError:
726 pass
728 # GUARDRAIL: full pre-dispatch gate
729 try:
730 from security.hive_guardrails import GuardrailEnforcer
731 allowed, reason, prompt = GuardrailEnforcer.before_dispatch(
732 prompt, goal.to_dict())
733 if not allowed:
734 logger.warning(f"Goal {goal.id} blocked by guardrail: {reason}")
735 continue
736 except ImportError:
737 logger.warning("hive_guardrails not available — dispatch proceeds without guardrail pre-check")
739 # Store prompt_id on goal for REUSE tracking
740 # Must be NUMERIC — the adapter and /chat handler reject non-integer prompt_ids.
741 # Use deterministic hash of goal.id so same goal always gets same prompt_id.
742 import hashlib
743 _gh = int(hashlib.md5(str(goal.id).encode()).hexdigest()[:10], 16) % 100_000_000_000
744 prompt_id = str(max(1, _gh))
745 if not goal.prompt_id or not str(goal.prompt_id).isdigit():
746 goal.prompt_id = prompt_id
748 # BACKOFF: skip goals that have failed repeatedly
749 goal_key = str(goal.id)
750 with _module_lock:
751 backoff_info = _dispatch_backoff.get(goal_key)
752 if backoff_info and time.time() < backoff_info.get('skip_until', 0):
753 logger.debug(
754 f"Skipping goal {goal_key}: backoff "
755 f"({backoff_info['failures']} failures, "
756 f"resume in {backoff_info['skip_until'] - time.time():.0f}s)")
757 continue
759 goal.last_dispatched_at = datetime.utcnow()
761 # Speculative dispatch if enabled and budget allows
762 if speculative_enabled:
763 try:
764 from .speculative_dispatcher import get_speculative_dispatcher
765 dispatcher = get_speculative_dispatcher()
766 if dispatcher.should_speculate(
767 str(agent['user_id']), prompt_id, prompt, goal.to_dict()):
768 dispatcher.dispatch_speculative(
769 prompt, str(agent['user_id']), prompt_id,
770 goal.id, goal.goal_type)
771 dispatched += 1
772 # Success — clear backoff
773 with _module_lock:
774 _dispatch_backoff.pop(goal_key, None)
775 continue
776 except ImportError:
777 pass
779 result = dispatch_goal(prompt, str(agent['user_id']), goal.id, goal.goal_type)
780 dispatched += 1
781 self._wd_heartbeat()
783 # Track failures for exponential backoff
784 if result is None:
785 with _module_lock:
786 info = _dispatch_backoff.get(goal_key, {'failures': 0})
787 info['failures'] = info.get('failures', 0) + 1
788 # Exponential backoff: 60s, 120s, 240s, 480s, max 900s (15 min)
789 delay = min(60 * (2 ** (info['failures'] - 1)), 900)
790 info['skip_until'] = time.time() + delay
791 _dispatch_backoff[goal_key] = info
792 failure_count = info['failures']
793 if failure_count >= 5:
794 # Auto-pause after 5 consecutive failures
795 goal.status = 'paused'
796 cfg = goal.config_json or {}
797 cfg['pause_reason'] = (
798 f'Auto-paused: {failure_count} consecutive '
799 f'dispatch failures')
800 cfg['paused_at'] = datetime.utcnow().isoformat()
801 goal.config_json = cfg
802 logger.warning(
803 f"Goal {goal_key} AUTO-PAUSED after "
804 f"{failure_count} dispatch failures")
805 else:
806 # Success — clear backoff
807 with _module_lock:
808 _dispatch_backoff.pop(goal_key, None)
810 # COMPLETION: dispatch returning non-None means the chat was
811 # handed off, NOT that the agent actually did the work — the
812 # work runs async after dispatch_goal returns. Auto-marking
813 # `completed` on dispatch produced the dashboard lie where
814 # ~280 goals showed 'Completed' with 0/N spark earned.
815 #
816 # Real completion gate: refresh from DB and require
817 # spark_spent > 0 (some real tool/LLM cost was incurred).
818 # Goals dispatched but doing zero work get a 'noop'
819 # counter; after 5 consecutive noops the goal is
820 # auto-paused so the daemon stops re-spinning it.
821 try:
822 db.refresh(goal)
823 except Exception:
824 pass # refresh failure → fall through to attribute read
825 cfg = goal.config_json or {}
826 is_continuous = cfg.get('continuous', False)
827 spark_spent = goal.spark_spent or 0
828 if is_continuous:
829 # Continuous goals never auto-complete; cooldown gate
830 # higher up already prevents re-dispatch storms.
831 pass
832 elif spark_spent > 0:
833 goal.status = 'completed'
834 cfg['completed_at'] = datetime.utcnow().isoformat()
835 cfg.pop('noop_dispatch_count', None)
836 goal.config_json = cfg
837 logger.info(
838 f"Goal {goal_key} COMPLETED "
839 f"(spark_spent={spark_spent})")
840 else:
841 # Dispatched but zero real work — track and back off.
842 noop_count = int(cfg.get('noop_dispatch_count', 0)) + 1
843 cfg['noop_dispatch_count'] = noop_count
844 cfg['last_noop_dispatch'] = datetime.utcnow().isoformat()
845 if noop_count >= 5:
846 goal.status = 'paused'
847 cfg['pause_reason'] = (
848 f'Auto-paused: {noop_count} consecutive '
849 f'dispatches produced 0 spark — work is not '
850 f'reaching tool execution. Investigate '
851 f'agent prompt or tool registration.')
852 cfg['paused_at'] = datetime.utcnow().isoformat()
853 logger.warning(
854 f"Goal {goal_key} AUTO-PAUSED after "
855 f"{noop_count} noop dispatches")
856 else:
857 logger.info(
858 f"Goal {goal_key} dispatched but "
859 f"spark_spent=0 (noop #{noop_count})")
860 goal.config_json = cfg
862 # ── HITL: notify owners of APPROVAL_REQUIRED tasks ──
863 try:
864 for goal in goals:
865 if goal.goal_type != 'thought_experiment':
866 continue
867 ledger = self._get_goal_ledger(goal)
868 if not ledger:
869 continue
870 ledger_tasks = _get_blocked_hitl_tasks(ledger, goal.id)
871 for task in ledger_tasks:
872 _send_hitl_notification(db, goal, task)
873 except Exception as e:
874 logger.debug(f"HITL notification check: {e}")
876 if dispatched > 0:
877 logger.info(f"Agent daemon: dispatched {dispatched} goal(s) to idle agents")
879 # Content gen monitor: check stuck games every 5 ticks (~2.5 min)
880 if self._tick_count % 5 == 0:
881 try:
882 from .content_gen_tracker import ContentGenTracker
883 stuck = ContentGenTracker.get_stuck_games(db)
884 for game in stuck:
885 game_id = game.get('game_id', '')
886 result = ContentGenTracker.attempt_unblock(db, game_id)
887 if result.get('success'):
888 logger.info(
889 f"Content gen unblocked {game_id}: "
890 f"{result.get('action_taken')}")
891 # Record progress snapshot for delta tracking
892 ContentGenTracker.record_progress_snapshot(db, game_id)
893 except Exception as e:
894 logger.debug(f"Content gen monitor: {e}")
896 # Outreach follow-up checker: send due follow-ups every 5 ticks
897 try:
898 from .outreach_crm_tools import check_pending_followups_daemon
899 followup_result = check_pending_followups_daemon()
900 sent = followup_result.get('sent', 0)
901 if sent > 0:
902 logger.info(f"Outreach follow-ups: sent {sent} follow-up email(s)")
903 except ImportError:
904 pass # outreach_crm_tools not available
905 except Exception as e:
906 logger.debug(f"Outreach follow-up check: {e}")
908 # Journey engine tick: stage transitions, A/B analysis, channel routing
909 try:
910 from .journey_engine import journey_daemon_tick
911 journey_result = journey_daemon_tick()
912 if journey_result.get('transitions', 0) > 0 or journey_result.get('actions', 0) > 0:
913 logger.info(f"Journey engine: {journey_result.get('transitions', 0)} transitions, "
914 f"{journey_result.get('actions', 0)} actions")
915 except ImportError:
916 pass # journey_engine not available
917 except Exception as e:
918 logger.debug(f"Journey engine tick: {e}")
920 # Periodic auto-remediation: scan loopholes every Nth tick
921 if self._tick_count % self._remediate_every == 0:
922 try:
923 from .goal_seeding import auto_remediate_loopholes
924 rem_count = auto_remediate_loopholes(db)
925 if rem_count > 0:
926 logger.info(f"Auto-remediation: created {rem_count} goal(s)")
927 except Exception as e:
928 logger.debug(f"Auto-remediation check failed: {e}")
930 # Intelligence milestone: auto-file patent when threshold reached
931 try:
932 from .ip_service import IPService
933 milestone = IPService.check_intelligence_milestone(db)
934 if milestone.get('triggered', False):
935 from integrations.social.models import AgentGoal
936 active_filing = db.query(AgentGoal).filter(
937 AgentGoal.status == 'active',
938 AgentGoal.goal_type == 'ip_protection',
939 ).all()
940 has_filing = any(
941 (g.config_json or {}).get('mode') == 'file'
942 for g in active_filing
943 )
944 if not has_filing:
945 from .goal_manager import GoalManager
946 GoalManager.create_goal(
947 db,
948 goal_type='ip_protection',
949 title='Auto-File Provisional Patent: Critical Intelligence Reached',
950 description=(
951 f'Intelligence milestone triggered: '
952 f'{milestone["consecutive_verified"]} consecutive verified days, '
953 f'moat catch-up: {milestone["moat_catch_up"]}. '
954 f'Use draft_patent_claims then draft_provisional_patent.'
955 ),
956 config={'mode': 'file', 'auto_triggered': True,
957 'milestone': milestone},
958 spark_budget=500,
959 created_by='intelligence_milestone',
960 )
961 logger.info("Intelligence milestone reached — auto-patent goal created")
962 except Exception as e:
963 logger.debug(f"Intelligence milestone check: {e}")
965 # Self-healing: create fix goals for recurring exceptions
966 try:
967 from .self_healing_dispatcher import SelfHealingDispatcher
968 healer = SelfHealingDispatcher.get_instance()
969 fix_count = healer.check_and_dispatch(db)
970 if fix_count > 0:
971 logger.info(f"Self-healing: created {fix_count} fix goal(s)")
972 except Exception as e:
973 logger.debug(f"Self-healing check: {e}")
975 # Revenue → trading funding: every 5th remediation cycle
976 if self._tick_count % (self._remediate_every * 5) == 0:
977 try:
978 from .revenue_aggregator import get_revenue_aggregator
979 rev = get_revenue_aggregator()
980 fund_result = rev.check_and_fund_trading(db)
981 if fund_result.get('funded'):
982 logger.info(
983 f"Revenue aggregator: funded trading with "
984 f"{fund_result['amount']} Spark")
985 except Exception as e:
986 logger.debug(f"Revenue funding check: {e}")
988 # Baseline intelligence check: re-snapshot when world model stats shift
989 if self._tick_count % (self._remediate_every * 2) == 0:
990 try:
991 from .agent_baseline_service import (
992 AgentBaselineService, capture_baseline_async)
993 from integrations.social.models import AgentGoal
994 active_goals = db.query(AgentGoal).filter(
995 AgentGoal.status.in_(['active', 'completed'])).all()
996 checked = set()
997 for goal in active_goals:
998 key = f'{goal.prompt_id}_{goal.flow_id or 0}'
999 if key in checked or not goal.prompt_id:
1000 continue
1001 checked.add(key)
1002 result = AgentBaselineService.validate_against_baseline(
1003 str(goal.prompt_id), goal.flow_id or 0)
1004 if result and not result.get('passed', True):
1005 capture_baseline_async(
1006 prompt_id=str(goal.prompt_id),
1007 flow_id=goal.flow_id or 0,
1008 trigger='intelligence_change')
1009 logger.info(
1010 f"Intelligence change detected for {key}: "
1011 f"{result.get('regressions', [])}")
1012 except Exception as e:
1013 logger.debug(f"Baseline intelligence check: {e}")
1015 self._wd_heartbeat()
1017 # Federation: aggregate learning deltas across peers every 2nd tick
1018 if self._tick_count % 2 == 0:
1019 try:
1020 from .federated_aggregator import get_federated_aggregator
1021 fed = get_federated_aggregator()
1022 fed_result = fed.tick()
1023 if fed_result.get('aggregated'):
1024 logger.info(
1025 f"Federation: epoch={fed_result.get('epoch')}, "
1026 f"convergence={fed_result.get('convergence', 0):.3f}")
1027 except Exception as e:
1028 logger.debug(f"Federation tick: {e}")
1029 self._wd_heartbeat()
1031 # Monthly API quota reset
1032 if self._tick_count == 1 or self._tick_count % (self._remediate_every * 10) == 0:
1033 try:
1034 from .commercial_api import CommercialAPIService
1035 reset_count = CommercialAPIService.reset_monthly_quotas(db)
1036 if reset_count > 0:
1037 logger.info(f"Reset monthly API quotas for {reset_count} keys")
1038 except Exception:
1039 pass
1041 # Marketplace payment reconciliation: retry failed Spark transfers.
1042 # Rows with status='pending' older than 15 minutes are re-attempted
1043 # through the canonical ResonanceService gateway. Prior to this
1044 # hook, a failed Spark transfer during install_app() was silently
1045 # dropped with no retry path — creators lost revenue on
1046 # transient DB errors.
1047 if self._tick_count % self._remediate_every == 0:
1048 try:
1049 from .app_marketplace import get_marketplace
1050 mp = get_marketplace()
1051 counters = mp.reconcile_pending_payments()
1052 if counters.get('retried', 0) > 0 or counters.get('failed', 0) > 0:
1053 logger.info(
1054 f"Marketplace reconcile: "
1055 f"retried={counters.get('retried', 0)}, "
1056 f"settled={counters.get('settled', 0)}, "
1057 f"failed={counters.get('failed', 0)}")
1058 except ImportError:
1059 pass
1060 except Exception as e:
1061 logger.debug(f"Marketplace reconcile failed: {e}")
1063 # CLEANUP: prune stale entries from module-level dicts every 100 ticks
1064 if self._tick_count % 100 == 0:
1065 active_goal_ids = {str(g.id) for g in goals}
1066 with _module_lock:
1067 stale_backoff = [k for k in _dispatch_backoff if k not in active_goal_ids]
1068 for k in stale_backoff:
1069 del _dispatch_backoff[k]
1070 stale_budget = _budget_blocked_goals - active_goal_ids
1071 _budget_blocked_goals.difference_update(stale_budget)
1072 # _hitl_notified: keep entries to avoid re-notifying on restart,
1073 # but cap size to prevent unbounded growth
1074 if len(_hitl_notified) > 10000:
1075 _hitl_notified.clear()
1076 if stale_backoff or stale_budget:
1077 logger.debug(f"Pruned {len(stale_backoff)} backoff + {len(stale_budget)} budget-blocked stale entries")
1078 # Evict completed/archived goals from ledger cache
1079 stale_cache = [k for k in self._ledger_cache if k[1] not in active_goal_ids]
1080 for k in stale_cache:
1081 del self._ledger_cache[k]
1083 db.commit()
1085 # INSTRUCTION QUEUE: drain queued instructions on idle ticks
1086 # When agents are idle and goals are dispatched, check if any
1087 # user has pending queued instructions that can be batch-executed
1088 if dispatched < len(idle_agents):
1089 try:
1090 from .instruction_queue import get_all_pending
1091 from .dispatch import drain_instruction_queue
1092 pending = get_all_pending()
1093 for uid in list(pending.keys())[:3]: # Max 3 users per tick
1094 drain_instruction_queue(uid)
1095 except Exception as eq:
1096 logger.debug(f"Instruction queue drain: {eq}")
1098 except Exception as e:
1099 db.rollback()
1100 # Clear module-level dicts to stay in sync with rolled-back DB state.
1101 # Without this, goals marked as budget-blocked or backed-off in memory
1102 # would be skipped even though their DB status was rolled back.
1103 with _module_lock:
1104 _budget_blocked_goals.clear()
1105 _dispatch_backoff.clear()
1106 logger.warning(f"Agent daemon tick error (state reset): {e}")
1107 finally:
1108 db.close()
1111# Module-level singleton
1112agent_daemon = AgentDaemon()