Coverage for integrations / agent_engine / agent_daemon.py: 20.2%

531 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Agent Goal Engine - Background Daemon 

3 

4Finds active goals, detects idle agents, and dispatches work through /chat. 

5Skips CODING_GOAL_TYPES — those are handled by coding_daemon which adds 

6idle-agent detection and benchmark sync for the coding backends. 

7 

8Uses prompt builder registry to build the right prompt per goal type. 

9Dispatches with autonomous=True so agents self-configure. 

10""" 

11import os 

12import time 

13import random 

14import logging 

15import threading 

16from datetime import datetime 

17 

18logger = logging.getLogger('hevolve_social') 

19 

20# Lock protecting module-level mutable state accessed from daemon thread + API threads 

21_module_lock = threading.Lock() 

22 

23# Track which tasks we've already sent HITL notifications for (avoid spam) 

24_hitl_notified: set = set() 

25 

26# Goals that are budget-blocked are immediately paused (no retries). 

27# Previously we retried 3 times, but that just re-dispatches a blocked goal 

28# every 30 seconds with no chance of success (budget doesn't change between ticks). 

29_budget_blocked_goals: set = set() 

30 

31# Track dispatch failures per goal for exponential backoff. 

32# Maps goal_id → {'failures': int, 'skip_until': float} 

33_dispatch_backoff: dict = {} 

34 

35 

36def _get_blocked_hitl_tasks(ledger, goal_id): 

37 """Get tasks blocked with APPROVAL_REQUIRED under a goal.""" 

38 try: 

39 parent = ledger.get_task(goal_id) 

40 if not parent: 

41 return [] 

42 blocked = [] 

43 for child_id in (parent.child_task_ids or []): 

44 task = ledger.get_task(child_id) 

45 if task and str(task.blocked_reason) == 'APPROVAL_REQUIRED': 

46 blocked.append(task) 

47 return blocked 

48 except Exception: 

49 return [] 

50 

51 

52def _send_hitl_notification(db, goal, task): 

53 """Send a one-time HITL notification for an approval-blocked task.""" 

54 notif_key = f"{goal.id}:{task.id}" 

55 with _module_lock: 

56 if notif_key in _hitl_notified: 

57 return 

58 _hitl_notified.add(notif_key) 

59 

60 try: 

61 from integrations.social.services import NotificationService 

62 from integrations.social.realtime import on_notification 

63 owner_id = goal.created_by or goal.owner_id 

64 if not owner_id: 

65 return 

66 desc_preview = (task.description or '')[:100] 

67 notif = NotificationService.create( 

68 db, str(owner_id), 'approval_required', 

69 target_type='thought_experiment', 

70 target_id=str(task.id), 

71 message=f'Agent needs your review: {desc_preview}', 

72 ) 

73 on_notification(str(owner_id), notif.to_dict() if hasattr(notif, 'to_dict') else { 

74 'type': 'approval_required', 'message': f'Agent needs your review: {desc_preview}', 

75 }) 

76 logger.info(f"HITL notification sent for goal={goal.id} task={task.id}") 

77 except Exception as e: 

78 logger.debug(f"HITL notification failed: {e}") 

79 

80 

81class AgentDaemon: 

82 """Background daemon: active goals (any type) + idle agents → /chat dispatch.""" 

83 

84 def __init__(self): 

85 self._interval = int(os.environ.get('HEVOLVE_AGENT_POLL_INTERVAL', '30')) 

86 self._running = False 

87 self._thread = None 

88 self._lock = threading.Lock() 

89 self._tick_count = 0 

90 self._remediate_every = int(os.environ.get( 

91 'HEVOLVE_REMEDIATE_INTERVAL_TICKS', '10')) 

92 # Cache SmartLedger instances by (agent_id, session_id) to avoid 

93 # re-creating them every tick (which spams "starting fresh" logs). 

94 self._ledger_cache: dict = {} 

95 # Exponential backoff on consecutive tick failures 

96 self._consecutive_failures = 0 

97 self._BACKOFF_MAX = 300 # 5 minute cap 

98 # Proactive hive tick state 

99 self._next_hive_explore_time = time.time() + random.randint(300, 1800) 

100 self._base_interval = self._interval # remember original for optimizer 

101 

102 def start(self): 

103 with self._lock: 

104 if self._running: 

105 return 

106 self._running = True 

107 self._consecutive_failures = 0 

108 self._thread = threading.Thread(target=self._loop, daemon=True, 

109 name='agent_daemon') 

110 self._thread.start() 

111 logger.info(f"Agent daemon started (interval={self._interval}s)") 

112 

113 def stop(self): 

114 with self._lock: 

115 self._running = False 

116 if self._thread: 

117 self._thread.join(timeout=10) 

118 

119 def _try_parallel_dispatch(self, goal, idle_agents, dispatched, max_concurrent): 

120 """Check if a goal has parallel subtasks and dispatch them concurrently. 

121 

122 Returns number of tasks dispatched (0 if no parallel tasks found). 

123 """ 

124 try: 

125 ledger = self._get_goal_ledger(goal) 

126 if not ledger: 

127 return 0 

128 

129 parallel_tasks = ledger.get_parallel_executable_tasks() 

130 if not parallel_tasks: 

131 return 0 

132 

133 from .parallel_dispatch import dispatch_parallel_tasks 

134 from .dispatch import dispatch_goal 

135 

136 batch_count = min( 

137 len(parallel_tasks), 

138 len(idle_agents) - dispatched, 

139 max_concurrent - dispatched, 

140 ) 

141 if batch_count <= 0: 

142 return 0 

143 

144 def _dispatch_task(task): 

145 """Dispatch a single parallel subtask via /chat.""" 

146 goal_id = str(goal.id) if hasattr(goal, 'id') else '' 

147 goal_type = goal.goal_type if hasattr(goal, 'goal_type') else 'marketing' 

148 user_id = str(goal.user_id) if hasattr(goal, 'user_id') else 'system' 

149 result = dispatch_goal( 

150 task.description, user_id, goal_id, goal_type) 

151 return {'success': result is not None, 'response': result} 

152 

153 result = dispatch_parallel_tasks( 

154 ledger, _dispatch_task, max_concurrent=batch_count) 

155 

156 count = result['completed'] + result['failed'] 

157 if count > 0: 

158 logger.info( 

159 f"Parallel dispatch for goal {goal.id}: " 

160 f"{result['completed']} completed, {result['failed']} failed") 

161 return count 

162 

163 except Exception as e: 

164 logger.debug(f"Parallel dispatch check failed: {e}") 

165 return 0 

166 

167 def _get_goal_ledger(self, goal): 

168 """Get a SmartLedger for a goal's task graph (if one exists). 

169 

170 Returns None when no ledger exists or it has only 1 task (no parallelism). 

171 Ledger instances are cached on self._ledger_cache to avoid re-creating 

172 (and re-logging "starting fresh") on every daemon tick. 

173 """ 

174 try: 

175 from agent_ledger import SmartLedger 

176 goal_id = str(goal.id) if hasattr(goal, 'id') else '' 

177 user_id = str(goal.user_id) if hasattr(goal, 'user_id') else 'system' 

178 

179 cache_key = (user_id, goal_id) 

180 ledger = self._ledger_cache.get(cache_key) 

181 if ledger is None: 

182 ledger = SmartLedger(agent_id=user_id, session_id=str(goal_id)) 

183 self._ledger_cache[cache_key] = ledger 

184 

185 # Use the ledger's resolved dir (handles bundled/read-only fallback) 

186 ledger_path = str(ledger.ledger_file) 

187 if os.path.isfile(ledger_path): 

188 ledger.load(ledger_path) 

189 if len(ledger.tasks) > 1: 

190 return ledger 

191 except Exception: 

192 pass 

193 return None 

194 

195 def _wd_heartbeat(self): 

196 """Send heartbeat to watchdog between potentially blocking operations.""" 

197 try: 

198 from security.node_watchdog import get_watchdog 

199 wd = get_watchdog() 

200 if wd: 

201 wd.heartbeat('agent_daemon') 

202 except Exception: 

203 pass 

204 

205 def _wd_sleep(self, seconds: float) -> None: 

206 """Sleep for ``seconds`` while keeping the watchdog heartbeat fresh. 

207 

208 Delegates to the canonical ``NodeWatchdog.sleep_with_heartbeat`` 

209 helper so a 480s exponential-backoff sleep can't silently age 

210 the heartbeat past the 300s frozen threshold. See the helper's 

211 docstring for the full incident context. When the watchdog 

212 isn't available (test mode, early boot) this falls back to a 

213 plain ``time.sleep`` so behavior is unchanged. 

214 """ 

215 try: 

216 from security.node_watchdog import get_watchdog 

217 wd = get_watchdog() 

218 if wd is not None: 

219 wd.sleep_with_heartbeat( 

220 'agent_daemon', seconds, 

221 stop_check=lambda: not self._running, 

222 ) 

223 return 

224 except Exception: 

225 pass 

226 time.sleep(seconds) 

227 

228 def _proactive_hive_tick(self): 

229 """Proactive hive daemon tick — exploration, self-promotion, and compute optimization. 

230 

231 Runs alongside the main tick on a random 5-30 minute interval. 

232 All imports are lazy (try/except) so missing modules never crash the daemon. 

233 

234 Feeds dense attribution chain to WorldModelBridge via agent_attribution. 

235 """ 

236 # Single canonical daemon yield gate (user activity + system 

237 # pressure). Without this the proactive tick saturates GIL with 

238 # speculative dispatches (DB churn + LLM HTTP) while the user is 

239 # actively chatting — py-spy showed 8s+ sampling lag. 

240 from .dispatch import should_yield_to_user 

241 if should_yield_to_user(): 

242 return 

243 

244 now = time.time() 

245 

246 # Attribution: track this tick as a long-horizon action 

247 action_id = None 

248 try: 

249 from integrations.agent_engine.agent_attribution import ( 

250 begin_action, record_step, complete_action, 

251 ) 

252 # Periodically clean expired actions (once per hour) 

253 if int(now) % 3600 < 30: 

254 from integrations.agent_engine.agent_attribution import get_attribution 

255 get_attribution().cleanup_expired() 

256 action_id = begin_action( 

257 agent_id='agent_daemon', 

258 action_type='proactive_hive_tick', 

259 expected_outcome={'status': 'ok'}, 

260 acceptance_criteria=[ 

261 'benchmark_prover_reachable', 

262 'optimizer_health_score > 0', 

263 ], 

264 ) 

265 except Exception: 

266 pass # Attribution is best-effort — never break the tick 

267 

268 # ── 1. Random-interval hive exploration ── 

269 if now >= self._next_hive_explore_time: 

270 # Schedule next exploration (5-30 minutes from now) 

271 self._next_hive_explore_time = now + random.randint(300, 1800) 

272 if action_id: 

273 try: 

274 record_step(action_id, 'hive_exploration_scheduled', 

275 state={'next_in_s': self._next_hive_explore_time - now}, 

276 decision='explore_now', confidence=0.9) 

277 except Exception: 

278 pass 

279 

280 # Check benchmark prover for active benchmark needs 

281 benchmark_needs = None 

282 try: 

283 from integrations.agent_engine.hive_benchmark_prover import get_prover 

284 prover = get_prover() 

285 benchmark_needs = prover.get_status() 

286 logger.info( 

287 f"Proactive hive: benchmark prover checked " 

288 f"(loop_running={benchmark_needs.get('loop_running', False)})") 

289 except ImportError: 

290 logger.debug("Proactive hive: hive_benchmark_prover not available") 

291 except Exception as e: 

292 logger.debug(f"Proactive hive: benchmark prover check failed: {e}") 

293 

294 # Check hive task protocol for unassigned tasks 

295 pending_tasks = [] 

296 try: 

297 from integrations.coding_agent.hive_task_protocol import get_dispatcher 

298 dispatcher = get_dispatcher() 

299 pending_tasks = dispatcher.get_pending_tasks() 

300 if pending_tasks: 

301 logger.info( 

302 f"Proactive hive: found {len(pending_tasks)} pending " 

303 f"hive tasks for dispatch") 

304 except ImportError: 

305 logger.debug("Proactive hive: hive_task_protocol not available") 

306 except Exception as e: 

307 logger.debug(f"Proactive hive: task protocol check failed: {e}") 

308 

309 # If idle and tasks exist, auto-dispatch to local Claude hive session 

310 if pending_tasks: 

311 try: 

312 from integrations.coding_agent.claude_hive_session import get_blueprint 

313 for task in pending_tasks[:3]: # Max 3 tasks per exploration 

314 task_desc = getattr(task, 'description', '') or str(task) 

315 logger.info( 

316 f"Proactive hive: auto-dispatching task to local " 

317 f"hive session: {task_desc[:100]}") 

318 except ImportError: 

319 logger.debug("Proactive hive: claude_hive_session not available") 

320 except Exception as e: 

321 logger.debug(f"Proactive hive: hive session dispatch failed: {e}") 

322 

323 # ── 2. Self-promotion on benchmark results ── 

324 try: 

325 from integrations.agent_engine.hive_benchmark_prover import get_prover 

326 prover = get_prover() 

327 history = prover.get_benchmark_history(limit=1) 

328 if history and isinstance(history, list): 

329 latest = history[0] 

330 score = latest.get('score', 0) 

331 benchmark = latest.get('benchmark', 'unknown') 

332 node_count = latest.get('node_count', 1) 

333 message = ( 

334 f"Hive benchmark result: {benchmark} — " 

335 f"score={score:.2f} across {node_count} nodes" 

336 ) 

337 

338 # Post to all connected channels via signal bridge 

339 try: 

340 from integrations.channels.hive_signal_bridge import get_signal_bridge 

341 bridge = get_signal_bridge() 

342 bridge.broadcast_signal({ 

343 'type': 'benchmark_result', 

344 'benchmark': benchmark, 

345 'score': score, 

346 'node_count': node_count, 

347 'message': message, 

348 }) 

349 logger.info(f"Proactive hive: broadcast benchmark result to channels") 

350 except ImportError: 

351 logger.debug("Proactive hive: hive_signal_bridge not available") 

352 except Exception as e: 

353 logger.debug(f"Proactive hive: signal broadcast failed: {e}") 

354 

355 # Emit EventBus event for LiquidUI dashboard 

356 try: 

357 from core.platform.events import emit_event 

358 emit_event('hive.benchmark.completed', { 

359 'benchmark': benchmark, 

360 'score': score, 

361 'node_count': node_count, 

362 'message': message, 

363 }) 

364 except ImportError: 

365 pass 

366 except Exception as e: 

367 logger.debug(f"Proactive hive: EventBus emit failed: {e}") 

368 except ImportError: 

369 pass 

370 except Exception as e: 

371 logger.debug(f"Proactive hive: self-promotion check failed: {e}") 

372 

373 # ── 3. Compute optimizer integration ── 

374 try: 

375 from core.compute_optimizer import get_optimizer 

376 optimizer = get_optimizer() 

377 health = optimizer.get_health_score() 

378 

379 # Derive load level from health score: <0.3 = high load, >0.7 = idle 

380 if health < 0.3: 

381 load_level = 'high' 

382 elif health > 0.7: 

383 load_level = 'idle' 

384 else: 

385 load_level = 'normal' 

386 

387 if load_level == 'high': 

388 # System under load: lengthen tick interval 

389 new_interval = min(self._base_interval * 3, self._BACKOFF_MAX) 

390 if self._interval != new_interval: 

391 self._interval = new_interval 

392 logger.info( 

393 f"Proactive hive: system under load, " 

394 f"lengthened tick interval to {new_interval}s") 

395 elif load_level == 'idle': 

396 # System idle: shorten tick interval, accept more tasks 

397 new_interval = max(self._base_interval // 2, 10) 

398 if self._interval != new_interval: 

399 self._interval = new_interval 

400 logger.info( 

401 f"Proactive hive: system idle, " 

402 f"shortened tick interval to {new_interval}s") 

403 else: 

404 # Normal: restore base interval 

405 if self._interval != self._base_interval: 

406 self._interval = self._base_interval 

407 logger.debug( 

408 f"Proactive hive: load normal, " 

409 f"restored tick interval to {self._base_interval}s") 

410 except ImportError: 

411 pass # compute_optimizer not available yet 

412 except Exception as e: 

413 logger.debug(f"Proactive hive: compute optimizer check failed: {e}") 

414 

415 # ── Attribution: complete action with outcome summary ── 

416 if action_id: 

417 try: 

418 complete_action(action_id, outcome={ 

419 'status': 'ok', 

420 'interval_seconds': self._interval, 

421 'completed_at': time.time(), 

422 }) 

423 except Exception: 

424 pass 

425 

426 def _loop(self): 

427 # Boot grace period: let user chat have exclusive LLM access for 60s. 

428 # Without this, daemon goals consume all llama-server slots immediately 

429 # and user /chat requests timeout (the 2026-04-12 slot starvation bug). 

430 _boot_grace = int(os.environ.get('HEVOLVE_DAEMON_BOOT_DELAY', '300')) 

431 if _boot_grace > 0: 

432 logger.info(f"Agent daemon: boot grace period {_boot_grace}s — " 

433 f"user chat gets priority. Set HEVOLVE_DAEMON_BOOT_DELAY=0 to disable.") 

434 self._wd_sleep(_boot_grace) 

435 

436 while self._running: 

437 # Exponential backoff: sleep longer on consecutive failures. 

438 # Uses sleep_with_heartbeat so the backoff can't age the 

439 # watchdog heartbeat past the frozen threshold — a 480s 

440 # backoff used to trigger a restart cascade because the 

441 # 300s threshold hit mid-sleep (2026-04-11 incident). 

442 if self._consecutive_failures > 0: 

443 backoff = min( 

444 self._interval * (2 ** self._consecutive_failures), 

445 self._BACKOFF_MAX) 

446 self._wd_sleep(backoff) 

447 else: 

448 self._wd_sleep(self._interval) 

449 if not self._running: 

450 break 

451 self._wd_heartbeat() 

452 

453 # Proactive hive tick — exploration, self-promotion, compute 

454 # optimization. RUN ASYNC: a stuck call inside the proactive 

455 # path (WorldModelBridge.record_interaction was the culprit 

456 # 2026-04-29 — daemon restarted 9 times by watchdog without 

457 # ever reaching _tick() because complete_action blocked) must 

458 # NOT block the goal-dispatch tick. Fire-and-forget in a 

459 # daemon thread; watchdog heartbeat stays fresh because 

460 # _loop continues immediately to _tick. 

461 self._spawn_proactive_hive_tick_async() 

462 

463 try: 

464 self._tick() 

465 self._consecutive_failures = 0 

466 except Exception as e: 

467 self._consecutive_failures += 1 

468 import traceback 

469 logger.error(f"Agent daemon tick error (backoff={self._consecutive_failures}): " 

470 f"{e}\n{traceback.format_exc()}") 

471 

472 def _spawn_proactive_hive_tick_async(self) -> None: 

473 """Run _proactive_hive_tick in a daemon thread, never blocking _loop. 

474 

475 If the prior iteration's thread is still alive (sub-call stuck on 

476 sync I/O), we SKIP this iteration rather than pile up threads. 

477 That makes the proactive cadence "best-effort" while keeping the 

478 main goal-dispatch tick on its 30s rhythm regardless. 

479 

480 Watchdog heartbeat is owned by _loop, not the proactive thread — 

481 so a stuck sub-call no longer ages the daemon's heartbeat past 

482 the 300s frozen threshold. 

483 """ 

484 prior = getattr(self, '_proactive_thread', None) 

485 if prior is not None and prior.is_alive(): 

486 logger.warning( 

487 "proactive_hive_tick from prior iteration still running — " 

488 "skipping this cycle (likely a sub-call is blocked on I/O)") 

489 return 

490 

491 def _runner(): 

492 try: 

493 self._proactive_hive_tick() 

494 except Exception as e: 

495 logger.debug(f"Proactive hive tick error: {e}") 

496 

497 t = threading.Thread( 

498 target=_runner, daemon=True, name='proactive_hive_tick') 

499 t.start() 

500 self._proactive_thread = t 

501 

502 def _tick(self): 

503 """Find active goals, find idle agents, dispatch via /chat. 

504 

505 GUARDRAILS enforced at every layer: 

506 - HiveCircuitBreaker.is_halted() → full stop 

507 - GuardrailEnforcer.before_dispatch() → per-prompt gate 

508 - Speculative dispatch when HEVOLVE_SPECULATIVE_ENABLED=true 

509 """ 

510 self._tick_count += 1 

511 

512 # GUARDRAIL: circuit breaker — deterministic stop 

513 try: 

514 from security.hive_guardrails import HiveCircuitBreaker 

515 if HiveCircuitBreaker.is_halted(): 

516 return 

517 except ImportError: 

518 pass 

519 

520 from integrations.social.models import get_db, AgentGoal, Product 

521 from integrations.coding_agent.idle_detection import IdleDetectionService 

522 from .goal_manager import GoalManager, CODING_GOAL_TYPES 

523 from .dispatch import dispatch_goal, should_yield_to_user 

524 

525 # Single canonical yield gate — user activity + system pressure. 

526 # See dispatch.should_yield_to_user() docstring for the contract. 

527 if should_yield_to_user(): 

528 logger.debug("Agent daemon: yielding (user active or system pressure)") 

529 return 

530 # _throttle is consumed lower down for soft scaling decisions — 

531 # re-read after the hard yield gate so we still have a value. 

532 try: 

533 from integrations.service_tools.model_lifecycle import ( 

534 get_model_lifecycle_manager) 

535 _throttle = get_model_lifecycle_manager().get_system_pressure().get( 

536 'throttle_factor', 1.0) 

537 except Exception: 

538 _throttle = 1.0 

539 

540 speculative_enabled = os.environ.get( 

541 'HEVOLVE_SPECULATIVE_ENABLED', 'false').lower() == 'true' 

542 

543 db = get_db() 

544 try: 

545 # DETERMINISTIC STOP: no goals = no action = system is inert 

546 # Skip CODING_GOAL_TYPES — coding_daemon handles those with 

547 # idle-agent detection + benchmark sync for backend routing. 

548 goals = db.query(AgentGoal).filter( 

549 AgentGoal.status == 'active', 

550 ~AgentGoal.goal_type.in_(CODING_GOAL_TYPES), 

551 ).all() 

552 if not goals: 

553 return 

554 

555 # Use ``get_idle_agent_personas`` — agent-type users (Echo, 

556 # Quest, Contest Curator, …) eligible for goal dispatch. 

557 # Do NOT use ``get_idle_opted_in_agents`` here: that filter 

558 # is the human-consent privacy gate for distributed compute 

559 # sharing, not the agent-persona gate. Mismatch silently 

560 # returned [] on installs where no human had opted in → 

561 # daemon stalled with seeded personas never dispatching 

562 # (root-cause logged 2026-05-01). 

563 idle_agents = IdleDetectionService.get_idle_agent_personas(db) 

564 if not idle_agents: 

565 return 

566 

567 self._wd_heartbeat() 

568 

569 # Assign excess idle agents as exception watchers 

570 try: 

571 from .exception_watcher import ExceptionWatcher 

572 watcher = ExceptionWatcher.get_instance() 

573 if len(idle_agents) > len(goals): 

574 excess = idle_agents[len(goals):] 

575 for agent in excess: 

576 watcher.assign_watcher(str(agent['user_id']), agent['username']) 

577 if watcher.has_watchers(): 

578 watcher.process_exceptions(db) 

579 except Exception as e: 

580 logger.debug(f"Exception watcher: {e}") 

581 

582 dispatched = 0 

583 used_agents = set() 

584 max_concurrent = int(os.environ.get('HEVOLVE_AGENT_MAX_CONCURRENT', '10')) 

585 # Reduce concurrency proportional to system pressure 

586 max_concurrent = max(1, int(max_concurrent * _throttle)) 

587 

588 # Minimum interval between dispatches for continuous goals. 

589 # Without this, a continuous goal (e.g. autoresearch coordinator) 

590 # gets re-dispatched every 30s tick even if the previous dispatch 

591 # is still running — causing repeated identical actions. 

592 _CONTINUOUS_COOLDOWN_S = 300 # 5 minutes 

593 

594 # Split goals into two queues: 

595 # - CREATE queue: goals without recipes (need LLM planning, 1 at a time) 

596 # - REUSE pool: goals with recipes (cheap replay, round-robin) 

597 import hashlib as _hlib 

598 _create_queue = [] 

599 _reuse_pool = [] 

600 for goal in goals: 

601 _gh = int(_hlib.md5(str(goal.id).encode()).hexdigest()[:10], 16) % 100_000_000_000 

602 _pid = str(max(1, _gh)) 

603 _recipe_path = os.path.join('prompts', f'{_pid}_0_recipe.json') 

604 if os.path.exists(_recipe_path): 

605 _reuse_pool.append(goal) 

606 else: 

607 _create_queue.append(goal) 

608 

609 # REUSE goals round-robin (cheap, can cycle through many per tick) 

610 # CREATE goals sequential (1 at a time, rotated so each gets a turn) 

611 if _create_queue: 

612 _cr_offset = self._tick_count % len(_create_queue) 

613 _create_queue = _create_queue[_cr_offset:] + _create_queue[:_cr_offset] 

614 if _reuse_pool: 

615 _re_offset = self._tick_count % len(_reuse_pool) 

616 _reuse_pool = _reuse_pool[_re_offset:] + _reuse_pool[:_re_offset] 

617 

618 # Prioritize: 1 CREATE first (if any), then fill remaining slots with REUSE 

619 goals = _create_queue[:1] + _reuse_pool + _create_queue[1:] 

620 

621 logger.debug(f"Goal split: {len(_create_queue)} need CREATE, " 

622 f"{len(_reuse_pool)} have recipes (REUSE)") 

623 

624 for goal in goals: 

625 if dispatched >= len(idle_agents) or dispatched >= max_concurrent: 

626 break 

627 

628 # Skip continuous goals dispatched recently — let the previous 

629 # dispatch complete before re-dispatching. 

630 cfg = goal.config_json or {} 

631 if cfg.get('continuous', False) and goal.last_dispatched_at: 

632 elapsed = (datetime.utcnow() - goal.last_dispatched_at).total_seconds() 

633 if elapsed < _CONTINUOUS_COOLDOWN_S: 

634 logger.debug( 

635 f"Continuous goal {goal.id} dispatched {elapsed:.0f}s ago " 

636 f"(cooldown={_CONTINUOUS_COOLDOWN_S}s), skipping") 

637 continue 

638 

639 # ── PARALLEL DISPATCH: check for goals with parallel subtasks ── 

640 parallel_dispatched = self._try_parallel_dispatch( 

641 goal, idle_agents, dispatched, max_concurrent) 

642 if parallel_dispatched > 0: 

643 dispatched += parallel_dispatched 

644 continue 

645 

646 # Find next unused agent (don't skip goal if current agent is taken) 

647 agent = None 

648 while dispatched < len(idle_agents) and dispatched < max_concurrent: 

649 candidate = idle_agents[dispatched] 

650 if candidate['user_id'] not in used_agents: 

651 agent = candidate 

652 break 

653 dispatched += 1 

654 if agent is None: 

655 break # No more available agents 

656 used_agents.add(agent['user_id']) 

657 

658 # Load product if marketing goal 

659 product_dict = None 

660 if goal.product_id: 

661 product = db.query(Product).filter_by(id=goal.product_id).first() 

662 if product: 

663 product_dict = product.to_dict() 

664 

665 # Build prompt using registered builder (guardrail: togetherness rewrite) 

666 prompt = GoalManager.build_prompt(goal.to_dict(), product_dict) 

667 if prompt is None: 

668 # build_prompt returns None when EITHER: 

669 # (a) the registered builder declined the goal (e.g., 

670 # robot goal on a non-robot host) — the builder 

671 # logs its own reason at INFO; OR 

672 # (b) hive_guardrails import failed in 'hard' mode — 

673 # build_prompt logs CRITICAL itself. 

674 # In both cases the meaningful signal is upstream; this 

675 # is just dispatch-loop bookkeeping. DEBUG avoids the 

676 # 28-per-8h chronic WARNING noise observed in production 

677 # (seeded robot goals on hardware-less hosts) and the 

678 # historical misattribution to "guardrails unavailable". 

679 logger.debug(f"Goal {goal.id}: build_prompt returned None, skipping") 

680 continue 

681 

682 # BUDGET PRE-CHECK: read-only check before dispatch. 

683 # The actual atomic budget reservation happens inside dispatch_goal() 

684 # via pre_dispatch_budget_gate(). This is just a quick reject to 

685 # avoid wasting time on goals that are clearly over budget. 

686 try: 

687 from .budget_gate import estimate_llm_cost_spark, _resolve_model_name 

688 goal_key = str(goal.id) 

689 

690 # Skip goals already known to be budget-blocked (avoids 

691 # re-checking every 30s when nothing has changed). 

692 with _module_lock: 

693 if goal_key in _budget_blocked_goals: 

694 continue 

695 

696 # Read-only check: compare remaining budget vs estimated cost 

697 # without reserving (no spark_spent increment) 

698 budget = goal.spark_budget or 0 

699 spent = goal.spark_spent or 0 

700 estimated = estimate_llm_cost_spark(prompt, _resolve_model_name('gpt-4o')) 

701 bg_allowed = (budget - spent) >= estimated 

702 bg_reason = f'insufficient_budget ({budget - spent} < {estimated})' if not bg_allowed else '' 

703 if not bg_allowed: 

704 # Immediately pause — budget won't change between daemon 

705 # ticks, so retrying is wasteful. 

706 goal.status = 'paused' 

707 cfg = goal.config_json or {} 

708 cfg['pause_reason'] = ( 

709 f'Auto-paused: budget gate blocked. ' 

710 f'Reason: {bg_reason}') 

711 cfg['paused_at'] = datetime.utcnow().isoformat() 

712 goal.config_json = cfg 

713 with _module_lock: 

714 _budget_blocked_goals.add(goal_key) 

715 logger.info( 

716 f"Goal {goal.id} AUTO-PAUSED by budget gate: " 

717 f"{bg_reason}") 

718 continue 

719 else: 

720 # Budget passed — clear from blocked set if it was 

721 # previously blocked (e.g. goal was resumed with 

722 # more budget). 

723 with _module_lock: 

724 _budget_blocked_goals.discard(goal_key) 

725 except ImportError: 

726 pass 

727 

728 # GUARDRAIL: full pre-dispatch gate 

729 try: 

730 from security.hive_guardrails import GuardrailEnforcer 

731 allowed, reason, prompt = GuardrailEnforcer.before_dispatch( 

732 prompt, goal.to_dict()) 

733 if not allowed: 

734 logger.warning(f"Goal {goal.id} blocked by guardrail: {reason}") 

735 continue 

736 except ImportError: 

737 logger.warning("hive_guardrails not available — dispatch proceeds without guardrail pre-check") 

738 

739 # Store prompt_id on goal for REUSE tracking 

740 # Must be NUMERIC — the adapter and /chat handler reject non-integer prompt_ids. 

741 # Use deterministic hash of goal.id so same goal always gets same prompt_id. 

742 import hashlib 

743 _gh = int(hashlib.md5(str(goal.id).encode()).hexdigest()[:10], 16) % 100_000_000_000 

744 prompt_id = str(max(1, _gh)) 

745 if not goal.prompt_id or not str(goal.prompt_id).isdigit(): 

746 goal.prompt_id = prompt_id 

747 

748 # BACKOFF: skip goals that have failed repeatedly 

749 goal_key = str(goal.id) 

750 with _module_lock: 

751 backoff_info = _dispatch_backoff.get(goal_key) 

752 if backoff_info and time.time() < backoff_info.get('skip_until', 0): 

753 logger.debug( 

754 f"Skipping goal {goal_key}: backoff " 

755 f"({backoff_info['failures']} failures, " 

756 f"resume in {backoff_info['skip_until'] - time.time():.0f}s)") 

757 continue 

758 

759 goal.last_dispatched_at = datetime.utcnow() 

760 

761 # Speculative dispatch if enabled and budget allows 

762 if speculative_enabled: 

763 try: 

764 from .speculative_dispatcher import get_speculative_dispatcher 

765 dispatcher = get_speculative_dispatcher() 

766 if dispatcher.should_speculate( 

767 str(agent['user_id']), prompt_id, prompt, goal.to_dict()): 

768 dispatcher.dispatch_speculative( 

769 prompt, str(agent['user_id']), prompt_id, 

770 goal.id, goal.goal_type) 

771 dispatched += 1 

772 # Success — clear backoff 

773 with _module_lock: 

774 _dispatch_backoff.pop(goal_key, None) 

775 continue 

776 except ImportError: 

777 pass 

778 

779 result = dispatch_goal(prompt, str(agent['user_id']), goal.id, goal.goal_type) 

780 dispatched += 1 

781 self._wd_heartbeat() 

782 

783 # Track failures for exponential backoff 

784 if result is None: 

785 with _module_lock: 

786 info = _dispatch_backoff.get(goal_key, {'failures': 0}) 

787 info['failures'] = info.get('failures', 0) + 1 

788 # Exponential backoff: 60s, 120s, 240s, 480s, max 900s (15 min) 

789 delay = min(60 * (2 ** (info['failures'] - 1)), 900) 

790 info['skip_until'] = time.time() + delay 

791 _dispatch_backoff[goal_key] = info 

792 failure_count = info['failures'] 

793 if failure_count >= 5: 

794 # Auto-pause after 5 consecutive failures 

795 goal.status = 'paused' 

796 cfg = goal.config_json or {} 

797 cfg['pause_reason'] = ( 

798 f'Auto-paused: {failure_count} consecutive ' 

799 f'dispatch failures') 

800 cfg['paused_at'] = datetime.utcnow().isoformat() 

801 goal.config_json = cfg 

802 logger.warning( 

803 f"Goal {goal_key} AUTO-PAUSED after " 

804 f"{failure_count} dispatch failures") 

805 else: 

806 # Success — clear backoff 

807 with _module_lock: 

808 _dispatch_backoff.pop(goal_key, None) 

809 

810 # COMPLETION: dispatch returning non-None means the chat was 

811 # handed off, NOT that the agent actually did the work — the 

812 # work runs async after dispatch_goal returns. Auto-marking 

813 # `completed` on dispatch produced the dashboard lie where 

814 # ~280 goals showed 'Completed' with 0/N spark earned. 

815 # 

816 # Real completion gate: refresh from DB and require 

817 # spark_spent > 0 (some real tool/LLM cost was incurred). 

818 # Goals dispatched but doing zero work get a 'noop' 

819 # counter; after 5 consecutive noops the goal is 

820 # auto-paused so the daemon stops re-spinning it. 

821 try: 

822 db.refresh(goal) 

823 except Exception: 

824 pass # refresh failure → fall through to attribute read 

825 cfg = goal.config_json or {} 

826 is_continuous = cfg.get('continuous', False) 

827 spark_spent = goal.spark_spent or 0 

828 if is_continuous: 

829 # Continuous goals never auto-complete; cooldown gate 

830 # higher up already prevents re-dispatch storms. 

831 pass 

832 elif spark_spent > 0: 

833 goal.status = 'completed' 

834 cfg['completed_at'] = datetime.utcnow().isoformat() 

835 cfg.pop('noop_dispatch_count', None) 

836 goal.config_json = cfg 

837 logger.info( 

838 f"Goal {goal_key} COMPLETED " 

839 f"(spark_spent={spark_spent})") 

840 else: 

841 # Dispatched but zero real work — track and back off. 

842 noop_count = int(cfg.get('noop_dispatch_count', 0)) + 1 

843 cfg['noop_dispatch_count'] = noop_count 

844 cfg['last_noop_dispatch'] = datetime.utcnow().isoformat() 

845 if noop_count >= 5: 

846 goal.status = 'paused' 

847 cfg['pause_reason'] = ( 

848 f'Auto-paused: {noop_count} consecutive ' 

849 f'dispatches produced 0 spark — work is not ' 

850 f'reaching tool execution. Investigate ' 

851 f'agent prompt or tool registration.') 

852 cfg['paused_at'] = datetime.utcnow().isoformat() 

853 logger.warning( 

854 f"Goal {goal_key} AUTO-PAUSED after " 

855 f"{noop_count} noop dispatches") 

856 else: 

857 logger.info( 

858 f"Goal {goal_key} dispatched but " 

859 f"spark_spent=0 (noop #{noop_count})") 

860 goal.config_json = cfg 

861 

862 # ── HITL: notify owners of APPROVAL_REQUIRED tasks ── 

863 try: 

864 for goal in goals: 

865 if goal.goal_type != 'thought_experiment': 

866 continue 

867 ledger = self._get_goal_ledger(goal) 

868 if not ledger: 

869 continue 

870 ledger_tasks = _get_blocked_hitl_tasks(ledger, goal.id) 

871 for task in ledger_tasks: 

872 _send_hitl_notification(db, goal, task) 

873 except Exception as e: 

874 logger.debug(f"HITL notification check: {e}") 

875 

876 if dispatched > 0: 

877 logger.info(f"Agent daemon: dispatched {dispatched} goal(s) to idle agents") 

878 

879 # Content gen monitor: check stuck games every 5 ticks (~2.5 min) 

880 if self._tick_count % 5 == 0: 

881 try: 

882 from .content_gen_tracker import ContentGenTracker 

883 stuck = ContentGenTracker.get_stuck_games(db) 

884 for game in stuck: 

885 game_id = game.get('game_id', '') 

886 result = ContentGenTracker.attempt_unblock(db, game_id) 

887 if result.get('success'): 

888 logger.info( 

889 f"Content gen unblocked {game_id}: " 

890 f"{result.get('action_taken')}") 

891 # Record progress snapshot for delta tracking 

892 ContentGenTracker.record_progress_snapshot(db, game_id) 

893 except Exception as e: 

894 logger.debug(f"Content gen monitor: {e}") 

895 

896 # Outreach follow-up checker: send due follow-ups every 5 ticks 

897 try: 

898 from .outreach_crm_tools import check_pending_followups_daemon 

899 followup_result = check_pending_followups_daemon() 

900 sent = followup_result.get('sent', 0) 

901 if sent > 0: 

902 logger.info(f"Outreach follow-ups: sent {sent} follow-up email(s)") 

903 except ImportError: 

904 pass # outreach_crm_tools not available 

905 except Exception as e: 

906 logger.debug(f"Outreach follow-up check: {e}") 

907 

908 # Journey engine tick: stage transitions, A/B analysis, channel routing 

909 try: 

910 from .journey_engine import journey_daemon_tick 

911 journey_result = journey_daemon_tick() 

912 if journey_result.get('transitions', 0) > 0 or journey_result.get('actions', 0) > 0: 

913 logger.info(f"Journey engine: {journey_result.get('transitions', 0)} transitions, " 

914 f"{journey_result.get('actions', 0)} actions") 

915 except ImportError: 

916 pass # journey_engine not available 

917 except Exception as e: 

918 logger.debug(f"Journey engine tick: {e}") 

919 

920 # Periodic auto-remediation: scan loopholes every Nth tick 

921 if self._tick_count % self._remediate_every == 0: 

922 try: 

923 from .goal_seeding import auto_remediate_loopholes 

924 rem_count = auto_remediate_loopholes(db) 

925 if rem_count > 0: 

926 logger.info(f"Auto-remediation: created {rem_count} goal(s)") 

927 except Exception as e: 

928 logger.debug(f"Auto-remediation check failed: {e}") 

929 

930 # Intelligence milestone: auto-file patent when threshold reached 

931 try: 

932 from .ip_service import IPService 

933 milestone = IPService.check_intelligence_milestone(db) 

934 if milestone.get('triggered', False): 

935 from integrations.social.models import AgentGoal 

936 active_filing = db.query(AgentGoal).filter( 

937 AgentGoal.status == 'active', 

938 AgentGoal.goal_type == 'ip_protection', 

939 ).all() 

940 has_filing = any( 

941 (g.config_json or {}).get('mode') == 'file' 

942 for g in active_filing 

943 ) 

944 if not has_filing: 

945 from .goal_manager import GoalManager 

946 GoalManager.create_goal( 

947 db, 

948 goal_type='ip_protection', 

949 title='Auto-File Provisional Patent: Critical Intelligence Reached', 

950 description=( 

951 f'Intelligence milestone triggered: ' 

952 f'{milestone["consecutive_verified"]} consecutive verified days, ' 

953 f'moat catch-up: {milestone["moat_catch_up"]}. ' 

954 f'Use draft_patent_claims then draft_provisional_patent.' 

955 ), 

956 config={'mode': 'file', 'auto_triggered': True, 

957 'milestone': milestone}, 

958 spark_budget=500, 

959 created_by='intelligence_milestone', 

960 ) 

961 logger.info("Intelligence milestone reached — auto-patent goal created") 

962 except Exception as e: 

963 logger.debug(f"Intelligence milestone check: {e}") 

964 

965 # Self-healing: create fix goals for recurring exceptions 

966 try: 

967 from .self_healing_dispatcher import SelfHealingDispatcher 

968 healer = SelfHealingDispatcher.get_instance() 

969 fix_count = healer.check_and_dispatch(db) 

970 if fix_count > 0: 

971 logger.info(f"Self-healing: created {fix_count} fix goal(s)") 

972 except Exception as e: 

973 logger.debug(f"Self-healing check: {e}") 

974 

975 # Revenue → trading funding: every 5th remediation cycle 

976 if self._tick_count % (self._remediate_every * 5) == 0: 

977 try: 

978 from .revenue_aggregator import get_revenue_aggregator 

979 rev = get_revenue_aggregator() 

980 fund_result = rev.check_and_fund_trading(db) 

981 if fund_result.get('funded'): 

982 logger.info( 

983 f"Revenue aggregator: funded trading with " 

984 f"{fund_result['amount']} Spark") 

985 except Exception as e: 

986 logger.debug(f"Revenue funding check: {e}") 

987 

988 # Baseline intelligence check: re-snapshot when world model stats shift 

989 if self._tick_count % (self._remediate_every * 2) == 0: 

990 try: 

991 from .agent_baseline_service import ( 

992 AgentBaselineService, capture_baseline_async) 

993 from integrations.social.models import AgentGoal 

994 active_goals = db.query(AgentGoal).filter( 

995 AgentGoal.status.in_(['active', 'completed'])).all() 

996 checked = set() 

997 for goal in active_goals: 

998 key = f'{goal.prompt_id}_{goal.flow_id or 0}' 

999 if key in checked or not goal.prompt_id: 

1000 continue 

1001 checked.add(key) 

1002 result = AgentBaselineService.validate_against_baseline( 

1003 str(goal.prompt_id), goal.flow_id or 0) 

1004 if result and not result.get('passed', True): 

1005 capture_baseline_async( 

1006 prompt_id=str(goal.prompt_id), 

1007 flow_id=goal.flow_id or 0, 

1008 trigger='intelligence_change') 

1009 logger.info( 

1010 f"Intelligence change detected for {key}: " 

1011 f"{result.get('regressions', [])}") 

1012 except Exception as e: 

1013 logger.debug(f"Baseline intelligence check: {e}") 

1014 

1015 self._wd_heartbeat() 

1016 

1017 # Federation: aggregate learning deltas across peers every 2nd tick 

1018 if self._tick_count % 2 == 0: 

1019 try: 

1020 from .federated_aggregator import get_federated_aggregator 

1021 fed = get_federated_aggregator() 

1022 fed_result = fed.tick() 

1023 if fed_result.get('aggregated'): 

1024 logger.info( 

1025 f"Federation: epoch={fed_result.get('epoch')}, " 

1026 f"convergence={fed_result.get('convergence', 0):.3f}") 

1027 except Exception as e: 

1028 logger.debug(f"Federation tick: {e}") 

1029 self._wd_heartbeat() 

1030 

1031 # Monthly API quota reset 

1032 if self._tick_count == 1 or self._tick_count % (self._remediate_every * 10) == 0: 

1033 try: 

1034 from .commercial_api import CommercialAPIService 

1035 reset_count = CommercialAPIService.reset_monthly_quotas(db) 

1036 if reset_count > 0: 

1037 logger.info(f"Reset monthly API quotas for {reset_count} keys") 

1038 except Exception: 

1039 pass 

1040 

1041 # Marketplace payment reconciliation: retry failed Spark transfers. 

1042 # Rows with status='pending' older than 15 minutes are re-attempted 

1043 # through the canonical ResonanceService gateway. Prior to this 

1044 # hook, a failed Spark transfer during install_app() was silently 

1045 # dropped with no retry path — creators lost revenue on 

1046 # transient DB errors. 

1047 if self._tick_count % self._remediate_every == 0: 

1048 try: 

1049 from .app_marketplace import get_marketplace 

1050 mp = get_marketplace() 

1051 counters = mp.reconcile_pending_payments() 

1052 if counters.get('retried', 0) > 0 or counters.get('failed', 0) > 0: 

1053 logger.info( 

1054 f"Marketplace reconcile: " 

1055 f"retried={counters.get('retried', 0)}, " 

1056 f"settled={counters.get('settled', 0)}, " 

1057 f"failed={counters.get('failed', 0)}") 

1058 except ImportError: 

1059 pass 

1060 except Exception as e: 

1061 logger.debug(f"Marketplace reconcile failed: {e}") 

1062 

1063 # CLEANUP: prune stale entries from module-level dicts every 100 ticks 

1064 if self._tick_count % 100 == 0: 

1065 active_goal_ids = {str(g.id) for g in goals} 

1066 with _module_lock: 

1067 stale_backoff = [k for k in _dispatch_backoff if k not in active_goal_ids] 

1068 for k in stale_backoff: 

1069 del _dispatch_backoff[k] 

1070 stale_budget = _budget_blocked_goals - active_goal_ids 

1071 _budget_blocked_goals.difference_update(stale_budget) 

1072 # _hitl_notified: keep entries to avoid re-notifying on restart, 

1073 # but cap size to prevent unbounded growth 

1074 if len(_hitl_notified) > 10000: 

1075 _hitl_notified.clear() 

1076 if stale_backoff or stale_budget: 

1077 logger.debug(f"Pruned {len(stale_backoff)} backoff + {len(stale_budget)} budget-blocked stale entries") 

1078 # Evict completed/archived goals from ledger cache 

1079 stale_cache = [k for k in self._ledger_cache if k[1] not in active_goal_ids] 

1080 for k in stale_cache: 

1081 del self._ledger_cache[k] 

1082 

1083 db.commit() 

1084 

1085 # INSTRUCTION QUEUE: drain queued instructions on idle ticks 

1086 # When agents are idle and goals are dispatched, check if any 

1087 # user has pending queued instructions that can be batch-executed 

1088 if dispatched < len(idle_agents): 

1089 try: 

1090 from .instruction_queue import get_all_pending 

1091 from .dispatch import drain_instruction_queue 

1092 pending = get_all_pending() 

1093 for uid in list(pending.keys())[:3]: # Max 3 users per tick 

1094 drain_instruction_queue(uid) 

1095 except Exception as eq: 

1096 logger.debug(f"Instruction queue drain: {eq}") 

1097 

1098 except Exception as e: 

1099 db.rollback() 

1100 # Clear module-level dicts to stay in sync with rolled-back DB state. 

1101 # Without this, goals marked as budget-blocked or backed-off in memory 

1102 # would be skipped even though their DB status was rolled back. 

1103 with _module_lock: 

1104 _budget_blocked_goals.clear() 

1105 _dispatch_backoff.clear() 

1106 logger.warning(f"Agent daemon tick error (state reset): {e}") 

1107 finally: 

1108 db.close() 

1109 

1110 

1111# Module-level singleton 

1112agent_daemon = AgentDaemon()