Coverage for integrations / coding_agent / coding_daemon.py: 56.8%

132 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Coding Agent Daemon 

3 

4Background thread that finds active goals, detects idle agents, 

5and dispatches work through the existing /chat pipeline. 

6No separate task tracking — SmartLedger and ActionState handle that. 

7 

8Now also periodically syncs coding benchmark deltas via FederatedAggregator 

9for hive-wide tool routing intelligence (torrent-like, never interrupts user). 

10""" 

11import os 

12import time 

13import logging 

14import threading 

15from datetime import datetime 

16 

17logger = logging.getLogger('hevolve_social') 

18 

19 

20class CodingAgentDaemon: 

21 """Background daemon: active goals + idle agents → /chat dispatch.""" 

22 

23 def __init__(self): 

24 self._interval = int(os.environ.get('HEVOLVE_CODING_POLL_INTERVAL', '30')) 

25 self._running = False 

26 self._thread = None 

27 self._lock = threading.Lock() 

28 self._tick_count = 0 

29 

30 def start(self): 

31 with self._lock: 

32 if self._running: 

33 return 

34 self._running = True 

35 self._thread = threading.Thread(target=self._loop, daemon=True, 

36 name='coding_daemon') 

37 self._thread.start() 

38 logger.info(f"Coding daemon started (interval={self._interval}s)") 

39 

40 def stop(self): 

41 with self._lock: 

42 self._running = False 

43 if self._thread: 

44 self._thread.join(timeout=10) 

45 

46 def _wd_heartbeat(self): 

47 """Send heartbeat to watchdog between potentially blocking operations.""" 

48 try: 

49 from security.node_watchdog import get_watchdog 

50 wd = get_watchdog() 

51 if wd: 

52 wd.heartbeat('coding_daemon') 

53 except Exception: 

54 pass 

55 

56 def _wd_sleep(self, seconds: float) -> None: 

57 """Sleep while keeping the coding_daemon heartbeat fresh. 

58 

59 Delegates to ``NodeWatchdog.sleep_with_heartbeat`` — same 

60 single primitive the agent_daemon uses, so a long sleep 

61 (e.g. during platform-affordability back-off) can't age the 

62 heartbeat past the 300s frozen threshold. See the helper 

63 docstring for the 2026-04-11 incident context. 

64 """ 

65 try: 

66 from security.node_watchdog import get_watchdog 

67 wd = get_watchdog() 

68 if wd is not None: 

69 wd.sleep_with_heartbeat( 

70 'coding_daemon', seconds, 

71 stop_check=lambda: not self._running, 

72 ) 

73 return 

74 except Exception: 

75 pass 

76 time.sleep(seconds) 

77 

78 def _loop(self): 

79 # Boot grace period: let user chat have exclusive LLM access 

80 import os as _os 

81 _boot_grace = int(_os.environ.get('HEVOLVE_DAEMON_BOOT_DELAY', '300')) 

82 if _boot_grace > 0: 

83 self._wd_sleep(_boot_grace) 

84 

85 while self._running: 

86 self._wd_sleep(self._interval) 

87 if not self._running: 

88 break 

89 self._wd_heartbeat() 

90 try: 

91 self._tick() 

92 except Exception as e: 

93 import traceback 

94 logger.error(f"Coding daemon tick error: {e}\n{traceback.format_exc()}") 

95 

96 def _tick(self): 

97 """Find active coding goals, find idle agents, dispatch via /chat. 

98 

99 Queries the unified AgentGoal table filtered by CODING_GOAL_TYPES. 

100 This daemon handles coding-related goals with idle-agent detection 

101 and benchmark sync; agent_daemon skips these types. 

102 """ 

103 from integrations.social.models import get_db, AgentGoal 

104 from .idle_detection import IdleDetectionService 

105 from integrations.agent_engine.goal_manager import GoalManager, CODING_GOAL_TYPES 

106 from .task_distributor import dispatch_to_chat 

107 

108 self._tick_count += 1 

109 

110 # USER-YIELD GATE — single canonical primitive that every 

111 # background daemon consults before burning CPU/GIL/LLM/GPU. 

112 # ``should_yield_to_user()`` covers BOTH user-activity (chat in 

113 # last 10 min OR live CREATE pipeline) AND system-pressure 

114 # (model_lifecycle.get_system_pressure().throttle_factor < 0.1) 

115 # in one call. agent_daemon._tick, agent_daemon._proactive_hive_tick, 

116 # and hive_benchmark_prover._continuous_loop already consult it 

117 # — coding_daemon was the only background loop missing it, 

118 # which is why py-spy showed full autogen turns running on the 

119 # daemon thread while the user was actively chatting. No new 

120 # throttle, no parallel resource-aware system — just plug into 

121 # the existing gate. 

122 try: 

123 from integrations.agent_engine.dispatch import should_yield_to_user 

124 if should_yield_to_user(): 

125 return 

126 except Exception: 

127 pass # gate import unavailable — fall through (fail-open) 

128 

129 # BUDGET GATE: platform affordability check before dispatching coding tasks 

130 try: 

131 from integrations.agent_engine.budget_gate import check_platform_affordability 

132 can_afford, details = check_platform_affordability() 

133 if not can_afford: 

134 logger.warning(f"Coding daemon paused — platform not affordable: {details}") 

135 return 

136 except ImportError: 

137 pass 

138 

139 db = get_db() 

140 try: 

141 # ORDER BY: never-dispatched goals (NULL last_dispatched_at) 

142 # jump to the front, then by oldest dispatch. Without this, 

143 # SQLite returns rows in insert/rowid order — a handful of 

144 # already-dispatched goals (whose 30s cooldown expires at 

145 # the same cadence as the daemon's tick interval) saturate 

146 # the limited idle-agent slots every tick, and never-yet- 

147 # dispatched goals at the back of the queue (e.g. the 49 

148 # self_heal goals observed live 2026-05-07) starve forever. 

149 # 

150 # SQLAlchemy emits `ORDER BY ... NULLS FIRST` for the 

151 # `nulls_first()` modifier; SQLite supports the syntax 

152 # natively since 3.30 (Python 3.11 ships 3.49+). For 

153 # backends without the modifier, the .nullsfirst() call 

154 # is a no-op and rows still order by last_dispatched_at 

155 # asc (NULLs land first or last depending on the SQL 

156 # dialect — both orderings prevent the starvation pattern 

157 # because never-dispatched goals are clearly distinguishable 

158 # from the ones that just dispatched 30s ago). 

159 from sqlalchemy import asc 

160 goals = db.query(AgentGoal).filter( 

161 AgentGoal.status == 'active', 

162 AgentGoal.goal_type.in_(CODING_GOAL_TYPES), 

163 ).order_by( 

164 asc(AgentGoal.last_dispatched_at).nulls_first(), 

165 ).all() 

166 if not goals: 

167 return 

168 

169 # Use ``get_idle_agent_personas`` — same canonical gate the 

170 # agent_daemon uses (agent_daemon.py:555-563). CODING_GOAL_TYPES 

171 # are LOCAL maintenance goals (self_heal of THIS node's broken 

172 # venvs, autoresearch on THIS repo, code_evolution against THIS 

173 # codebase) — they fix this node, never need human-consent 

174 # routing. Do NOT use ``get_idle_opted_in_agents``: that's the 

175 # distributed-compute privacy gate for peer-share workloads, 

176 # which is gated downstream by ``dispatch_goal_distributed`` 

177 # already. Mismatch silently returned [] on installs where no 

178 # human had opted-in → daemon stalled with self_heal goals 

179 # piling up (live-evidence 2026-05-07: 42 self_heal goals, 

180 # 0 with last_dispatched_at populated). Same root cause + 

181 # same fix as the agent_daemon's 2026-05-01 switch. 

182 idle_agents = IdleDetectionService.get_idle_agent_personas(db) 

183 if not idle_agents: 

184 return 

185 

186 dispatched = 0 

187 agent_idx = 0 

188 used_agents = set() 

189 max_concurrent = int(os.environ.get('HEVOLVE_CODING_MAX_CONCURRENT', '10')) 

190 now = datetime.utcnow() 

191 

192 for goal in goals: 

193 if dispatched >= max_concurrent: 

194 break 

195 

196 # Skip recently dispatched goals (30s cooldown) 

197 if goal.last_dispatched_at: 

198 age = (now - goal.last_dispatched_at).total_seconds() 

199 if age < self._interval: 

200 continue 

201 

202 # Find next available agent 

203 while agent_idx < len(idle_agents): 

204 if idle_agents[agent_idx]['user_id'] not in used_agents: 

205 break 

206 agent_idx += 1 

207 if agent_idx >= len(idle_agents): 

208 break 

209 agent = idle_agents[agent_idx] 

210 used_agents.add(agent['user_id']) 

211 prompt = GoalManager.build_prompt(goal.to_dict()) 

212 if prompt is None: 

213 continue 

214 

215 goal.last_dispatched_at = now 

216 result = dispatch_to_chat(prompt, str(agent['user_id']), goal.id, 

217 goal_type=goal.goal_type or 'coding') 

218 

219 if result is None: 

220 # Dispatch failed — track for backoff 

221 fails = (goal.config_json or {}).get('_dispatch_failures', 0) + 1 

222 cfg = goal.config_json or {} 

223 cfg['_dispatch_failures'] = fails 

224 goal.config_json = cfg 

225 if fails >= 5: 

226 goal.status = 'paused' 

227 cfg['pause_reason'] = f'Auto-paused: {fails} consecutive dispatch failures' 

228 goal.config_json = cfg 

229 logger.warning(f"Coding goal {goal.id} AUTO-PAUSED after {fails} failures") 

230 else: 

231 # Success — clear failure count 

232 cfg = goal.config_json or {} 

233 cfg.pop('_dispatch_failures', None) 

234 goal.config_json = cfg 

235 

236 agent_idx += 1 

237 dispatched += 1 

238 self._wd_heartbeat() 

239 

240 if dispatched > 0: 

241 logger.info(f"Coding daemon: dispatched {dispatched} goal(s) to idle agents") 

242 db.commit() 

243 except Exception as e: 

244 db.rollback() 

245 logger.debug(f"Coding daemon error: {e}") 

246 finally: 

247 db.close() 

248 

249 # Every 10 ticks (~5 min): sync benchmark deltas to hive 

250 # Torrent-like: only during idle windows, never interrupts user 

251 if self._tick_count % 10 == 0: 

252 self._sync_benchmark_deltas() 

253 

254 def _sync_benchmark_deltas(self): 

255 """Export coding benchmark deltas for hive learning. 

256 

257 Runs in the daemon thread (low priority, non-blocking). 

258 FederatedAggregator picks up the delta on its next tick. 

259 """ 

260 try: 

261 from .benchmark_tracker import get_benchmark_tracker 

262 tracker = get_benchmark_tracker() 

263 delta = tracker.export_learning_delta() 

264 if delta: 

265 logger.debug(f"Coding benchmark delta exported: " 

266 f"{len(delta.get('coding_benchmarks', {}))} task types") 

267 except Exception as e: 

268 logger.debug(f"Benchmark delta sync skipped: {e}") 

269 

270 

271# Module-level singleton 

272coding_daemon = CodingAgentDaemon()