Coverage for integrations / coding_agent / coding_daemon.py: 56.8%
132 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Coding Agent Daemon
4Background thread that finds active goals, detects idle agents,
5and dispatches work through the existing /chat pipeline.
6No separate task tracking — SmartLedger and ActionState handle that.
8Now also periodically syncs coding benchmark deltas via FederatedAggregator
9for hive-wide tool routing intelligence (torrent-like, never interrupts user).
10"""
11import os
12import time
13import logging
14import threading
15from datetime import datetime
17logger = logging.getLogger('hevolve_social')
20class CodingAgentDaemon:
21 """Background daemon: active goals + idle agents → /chat dispatch."""
23 def __init__(self):
24 self._interval = int(os.environ.get('HEVOLVE_CODING_POLL_INTERVAL', '30'))
25 self._running = False
26 self._thread = None
27 self._lock = threading.Lock()
28 self._tick_count = 0
30 def start(self):
31 with self._lock:
32 if self._running:
33 return
34 self._running = True
35 self._thread = threading.Thread(target=self._loop, daemon=True,
36 name='coding_daemon')
37 self._thread.start()
38 logger.info(f"Coding daemon started (interval={self._interval}s)")
40 def stop(self):
41 with self._lock:
42 self._running = False
43 if self._thread:
44 self._thread.join(timeout=10)
46 def _wd_heartbeat(self):
47 """Send heartbeat to watchdog between potentially blocking operations."""
48 try:
49 from security.node_watchdog import get_watchdog
50 wd = get_watchdog()
51 if wd:
52 wd.heartbeat('coding_daemon')
53 except Exception:
54 pass
56 def _wd_sleep(self, seconds: float) -> None:
57 """Sleep while keeping the coding_daemon heartbeat fresh.
59 Delegates to ``NodeWatchdog.sleep_with_heartbeat`` — same
60 single primitive the agent_daemon uses, so a long sleep
61 (e.g. during platform-affordability back-off) can't age the
62 heartbeat past the 300s frozen threshold. See the helper
63 docstring for the 2026-04-11 incident context.
64 """
65 try:
66 from security.node_watchdog import get_watchdog
67 wd = get_watchdog()
68 if wd is not None:
69 wd.sleep_with_heartbeat(
70 'coding_daemon', seconds,
71 stop_check=lambda: not self._running,
72 )
73 return
74 except Exception:
75 pass
76 time.sleep(seconds)
78 def _loop(self):
79 # Boot grace period: let user chat have exclusive LLM access
80 import os as _os
81 _boot_grace = int(_os.environ.get('HEVOLVE_DAEMON_BOOT_DELAY', '300'))
82 if _boot_grace > 0:
83 self._wd_sleep(_boot_grace)
85 while self._running:
86 self._wd_sleep(self._interval)
87 if not self._running:
88 break
89 self._wd_heartbeat()
90 try:
91 self._tick()
92 except Exception as e:
93 import traceback
94 logger.error(f"Coding daemon tick error: {e}\n{traceback.format_exc()}")
96 def _tick(self):
97 """Find active coding goals, find idle agents, dispatch via /chat.
99 Queries the unified AgentGoal table filtered by CODING_GOAL_TYPES.
100 This daemon handles coding-related goals with idle-agent detection
101 and benchmark sync; agent_daemon skips these types.
102 """
103 from integrations.social.models import get_db, AgentGoal
104 from .idle_detection import IdleDetectionService
105 from integrations.agent_engine.goal_manager import GoalManager, CODING_GOAL_TYPES
106 from .task_distributor import dispatch_to_chat
108 self._tick_count += 1
110 # USER-YIELD GATE — single canonical primitive that every
111 # background daemon consults before burning CPU/GIL/LLM/GPU.
112 # ``should_yield_to_user()`` covers BOTH user-activity (chat in
113 # last 10 min OR live CREATE pipeline) AND system-pressure
114 # (model_lifecycle.get_system_pressure().throttle_factor < 0.1)
115 # in one call. agent_daemon._tick, agent_daemon._proactive_hive_tick,
116 # and hive_benchmark_prover._continuous_loop already consult it
117 # — coding_daemon was the only background loop missing it,
118 # which is why py-spy showed full autogen turns running on the
119 # daemon thread while the user was actively chatting. No new
120 # throttle, no parallel resource-aware system — just plug into
121 # the existing gate.
122 try:
123 from integrations.agent_engine.dispatch import should_yield_to_user
124 if should_yield_to_user():
125 return
126 except Exception:
127 pass # gate import unavailable — fall through (fail-open)
129 # BUDGET GATE: platform affordability check before dispatching coding tasks
130 try:
131 from integrations.agent_engine.budget_gate import check_platform_affordability
132 can_afford, details = check_platform_affordability()
133 if not can_afford:
134 logger.warning(f"Coding daemon paused — platform not affordable: {details}")
135 return
136 except ImportError:
137 pass
139 db = get_db()
140 try:
141 # ORDER BY: never-dispatched goals (NULL last_dispatched_at)
142 # jump to the front, then by oldest dispatch. Without this,
143 # SQLite returns rows in insert/rowid order — a handful of
144 # already-dispatched goals (whose 30s cooldown expires at
145 # the same cadence as the daemon's tick interval) saturate
146 # the limited idle-agent slots every tick, and never-yet-
147 # dispatched goals at the back of the queue (e.g. the 49
148 # self_heal goals observed live 2026-05-07) starve forever.
149 #
150 # SQLAlchemy emits `ORDER BY ... NULLS FIRST` for the
151 # `nulls_first()` modifier; SQLite supports the syntax
152 # natively since 3.30 (Python 3.11 ships 3.49+). For
153 # backends without the modifier, the .nullsfirst() call
154 # is a no-op and rows still order by last_dispatched_at
155 # asc (NULLs land first or last depending on the SQL
156 # dialect — both orderings prevent the starvation pattern
157 # because never-dispatched goals are clearly distinguishable
158 # from the ones that just dispatched 30s ago).
159 from sqlalchemy import asc
160 goals = db.query(AgentGoal).filter(
161 AgentGoal.status == 'active',
162 AgentGoal.goal_type.in_(CODING_GOAL_TYPES),
163 ).order_by(
164 asc(AgentGoal.last_dispatched_at).nulls_first(),
165 ).all()
166 if not goals:
167 return
169 # Use ``get_idle_agent_personas`` — same canonical gate the
170 # agent_daemon uses (agent_daemon.py:555-563). CODING_GOAL_TYPES
171 # are LOCAL maintenance goals (self_heal of THIS node's broken
172 # venvs, autoresearch on THIS repo, code_evolution against THIS
173 # codebase) — they fix this node, never need human-consent
174 # routing. Do NOT use ``get_idle_opted_in_agents``: that's the
175 # distributed-compute privacy gate for peer-share workloads,
176 # which is gated downstream by ``dispatch_goal_distributed``
177 # already. Mismatch silently returned [] on installs where no
178 # human had opted-in → daemon stalled with self_heal goals
179 # piling up (live-evidence 2026-05-07: 42 self_heal goals,
180 # 0 with last_dispatched_at populated). Same root cause +
181 # same fix as the agent_daemon's 2026-05-01 switch.
182 idle_agents = IdleDetectionService.get_idle_agent_personas(db)
183 if not idle_agents:
184 return
186 dispatched = 0
187 agent_idx = 0
188 used_agents = set()
189 max_concurrent = int(os.environ.get('HEVOLVE_CODING_MAX_CONCURRENT', '10'))
190 now = datetime.utcnow()
192 for goal in goals:
193 if dispatched >= max_concurrent:
194 break
196 # Skip recently dispatched goals (30s cooldown)
197 if goal.last_dispatched_at:
198 age = (now - goal.last_dispatched_at).total_seconds()
199 if age < self._interval:
200 continue
202 # Find next available agent
203 while agent_idx < len(idle_agents):
204 if idle_agents[agent_idx]['user_id'] not in used_agents:
205 break
206 agent_idx += 1
207 if agent_idx >= len(idle_agents):
208 break
209 agent = idle_agents[agent_idx]
210 used_agents.add(agent['user_id'])
211 prompt = GoalManager.build_prompt(goal.to_dict())
212 if prompt is None:
213 continue
215 goal.last_dispatched_at = now
216 result = dispatch_to_chat(prompt, str(agent['user_id']), goal.id,
217 goal_type=goal.goal_type or 'coding')
219 if result is None:
220 # Dispatch failed — track for backoff
221 fails = (goal.config_json or {}).get('_dispatch_failures', 0) + 1
222 cfg = goal.config_json or {}
223 cfg['_dispatch_failures'] = fails
224 goal.config_json = cfg
225 if fails >= 5:
226 goal.status = 'paused'
227 cfg['pause_reason'] = f'Auto-paused: {fails} consecutive dispatch failures'
228 goal.config_json = cfg
229 logger.warning(f"Coding goal {goal.id} AUTO-PAUSED after {fails} failures")
230 else:
231 # Success — clear failure count
232 cfg = goal.config_json or {}
233 cfg.pop('_dispatch_failures', None)
234 goal.config_json = cfg
236 agent_idx += 1
237 dispatched += 1
238 self._wd_heartbeat()
240 if dispatched > 0:
241 logger.info(f"Coding daemon: dispatched {dispatched} goal(s) to idle agents")
242 db.commit()
243 except Exception as e:
244 db.rollback()
245 logger.debug(f"Coding daemon error: {e}")
246 finally:
247 db.close()
249 # Every 10 ticks (~5 min): sync benchmark deltas to hive
250 # Torrent-like: only during idle windows, never interrupts user
251 if self._tick_count % 10 == 0:
252 self._sync_benchmark_deltas()
254 def _sync_benchmark_deltas(self):
255 """Export coding benchmark deltas for hive learning.
257 Runs in the daemon thread (low priority, non-blocking).
258 FederatedAggregator picks up the delta on its next tick.
259 """
260 try:
261 from .benchmark_tracker import get_benchmark_tracker
262 tracker = get_benchmark_tracker()
263 delta = tracker.export_learning_delta()
264 if delta:
265 logger.debug(f"Coding benchmark delta exported: "
266 f"{len(delta.get('coding_benchmarks', {}))} task types")
267 except Exception as e:
268 logger.debug(f"Benchmark delta sync skipped: {e}")
271# Module-level singleton
272coding_daemon = CodingAgentDaemon()