Coverage for security / node_watchdog.py: 89.0%

219 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Node Watchdog - Frozen Thread Auto-Detection and Restart 

3 

4Monitors all background daemon threads via heartbeat protocol. 

5Detects frozen/crashed threads and auto-restarts them. 

6 

7Each daemon calls watchdog.heartbeat('name') every loop iteration. 

8If a heartbeat is older than 2× the expected interval, the thread 

9is considered frozen and gets auto-restarted. 

10 

11After 5 consecutive restart failures, the thread is marked 'dead'. 

12""" 

13import logging 

14import os 

15import threading 

16import time 

17from dataclasses import dataclass, field 

18from datetime import datetime, timezone 

19from typing import Callable, Dict, List, Optional 

20 

21logger = logging.getLogger('hevolve_security') 

22 

23MAX_CONSECUTIVE_FAILURES = 5 

24# Grace period after registration/restart before monitoring begins. 

25# Prevents false FROZEN alerts during startup when daemons are still 

26# initialising and haven't entered their heartbeat loop yet. 

27STARTUP_GRACE_SECONDS = 60 

28# LLM calls can take 30-120s on local models. When a thread is marked 

29# in_llm_call, the frozen threshold is multiplied by this factor instead 

30# of the normal frozen_multiplier. Prevents restart cascade. 

31# Autonomous gather_info + recipe() on local LLM routinely takes 5-10 min. 

32# 300s was too tight — caused watchdog restart loops where each restart 

33# re-dispatched the same goal, producing duplicate messages. 

34LLM_CALL_TIMEOUT_SECONDS = int(os.environ.get('HEVOLVE_LLM_CALL_TIMEOUT', '900')) 

35# Fleet-wide restart budget: hard ceiling on cross-thread restart storms. 

36# The per-thread budget (3 restarts / 5min → mark dead) still applies first; 

37# this is a second-line escalation when many different threads are flapping 

38# in parallel (e.g. cascade failure from a shared resource exhaustion). 

39# When the ceiling is breached the whole watchdog halts further restarts 

40# until manual intervention. Prior to this guard an infinite restart loop 

41# was possible when N threads rotated through fresh per-thread budgets. 

42FLEET_RESTART_WINDOW_SECONDS = int( 

43 os.environ.get('HEVOLVE_WATCHDOG_FLEET_WINDOW_S', '60')) 

44FLEET_RESTART_MAX = int(os.environ.get('HEVOLVE_WATCHDOG_FLEET_MAX', '10')) 

45 

46 

47@dataclass 

48class ThreadInfo: 

49 """Tracked state for a single monitored daemon thread.""" 

50 name: str 

51 expected_interval: float 

52 restart_fn: Callable 

53 stop_fn: Optional[Callable] = None 

54 last_heartbeat: float = field(default_factory=time.time) 

55 status: str = 'healthy' # healthy | frozen | restarting | dead | in_llm_call 

56 restart_count: int = 0 

57 last_restart_at: Optional[float] = None 

58 consecutive_failures: int = 0 

59 # Track recent restart times to detect rapid-restart loops 

60 recent_restart_times: list = field(default_factory=list) 

61 # LLM call awareness: when True, the thread is blocked on a legitimate 

62 # LLM inference call. Watchdog extends the threshold by LLM_CALL_MULTIPLIER 

63 # instead of restarting. 

64 in_llm_call: bool = False 

65 llm_call_started_at: Optional[float] = None 

66 

67 

68class NodeWatchdog: 

69 """Monitors background daemon threads via heartbeat protocol. 

70 

71 Usage: 

72 watchdog = NodeWatchdog() 

73 watchdog.register('gossip', expected_interval=60, 

74 restart_fn=gossip.start, stop_fn=gossip.stop) 

75 watchdog.start() 

76 

77 # In daemon loops: 

78 watchdog.heartbeat('gossip') 

79 """ 

80 

81 def __init__(self, check_interval: int = None, frozen_multiplier: float = 10.0): 

82 import os 

83 self._check_interval = check_interval or int( 

84 os.environ.get('HEVOLVE_WATCHDOG_INTERVAL', '30')) 

85 self._frozen_multiplier = frozen_multiplier 

86 self._threads: Dict[str, ThreadInfo] = {} 

87 self._lock = threading.Lock() 

88 self._running = False 

89 self._thread: Optional[threading.Thread] = None 

90 self._restart_log: List[Dict] = [] 

91 self._started_at: Optional[float] = None 

92 # Fleet-wide restart budget — timestamps (monotonic seconds) of every 

93 # restart across every thread. Rolling window pruned in _restart_thread. 

94 # Breach → self._fleet_halted = True; no further restarts until cleared. 

95 self._fleet_restart_times: List[float] = [] 

96 self._fleet_halted: bool = False 

97 

98 def register(self, name: str, expected_interval: float, 

99 restart_fn: Callable, stop_fn: Callable = None) -> None: 

100 """Register a daemon thread to be monitored. 

101 

102 A grace period delays monitoring so startup initialisation doesn't 

103 trigger false FROZEN alerts before the daemon enters its loop. 

104 """ 

105 with self._lock: 

106 self._threads[name] = ThreadInfo( 

107 name=name, 

108 expected_interval=expected_interval, 

109 restart_fn=restart_fn, 

110 stop_fn=stop_fn, 

111 # Pretend the last heartbeat is in the future so the grace 

112 # period must elapse before we consider the thread frozen. 

113 last_heartbeat=time.time() + STARTUP_GRACE_SECONDS, 

114 ) 

115 logger.info(f"Watchdog: registered thread '{name}' " 

116 f"(interval={expected_interval}s)") 

117 

118 def unregister(self, name: str) -> None: 

119 """Remove a thread from monitoring.""" 

120 with self._lock: 

121 self._threads.pop(name, None) 

122 

123 def heartbeat(self, name: str) -> None: 

124 """Called by daemon threads each cycle to signal they are alive.""" 

125 with self._lock: 

126 info = self._threads.get(name) 

127 if info: 

128 info.last_heartbeat = time.time() 

129 

130 def is_registered(self, name: str) -> bool: 

131 """Check if a thread name is registered.""" 

132 with self._lock: 

133 return name in self._threads 

134 

135 def registered_names(self) -> list: 

136 """Return list of all registered thread names.""" 

137 with self._lock: 

138 return list(self._threads.keys()) 

139 

140 def mark_in_llm_call(self, name: str) -> None: 

141 """Mark a thread as blocked on a legitimate LLM inference call. 

142 

143 The watchdog will use LLM_CALL_TIMEOUT_SECONDS instead of the 

144 normal frozen threshold, preventing false restarts during long 

145 inference calls. 

146 """ 

147 with self._lock: 

148 info = self._threads.get(name) 

149 if info: 

150 info.in_llm_call = True 

151 info.llm_call_started_at = time.time() 

152 info.last_heartbeat = time.time() # refresh heartbeat 

153 

154 def clear_llm_call(self, name: str) -> None: 

155 """Clear the LLM call marker after inference completes.""" 

156 with self._lock: 

157 info = self._threads.get(name) 

158 if info: 

159 info.in_llm_call = False 

160 info.llm_call_started_at = None 

161 

162 def sleep_with_heartbeat( 

163 self, 

164 name: str, 

165 total_seconds: float, 

166 chunk_seconds: float = 10.0, 

167 stop_check: Optional[Callable[[], bool]] = None, 

168 ) -> None: 

169 """Sleep for ``total_seconds`` while keeping the daemon's heartbeat fresh. 

170 

171 Background daemons freeze the watchdog when a single ``time.sleep`` 

172 or ``time.sleep + blocking call`` exceeds ``expected_interval * 

173 frozen_multiplier`` (default 300s for a 30s interval). The 

174 exponential-backoff path in agent_daemon / coding_daemon could 

175 issue ``time.sleep(480)`` on its 4th consecutive failure, which 

176 aged the heartbeat past the 300s threshold and triggered a 

177 restart CASCADE — the exact symptom in 2026-04-11 langchain.log 

178 (9 restarts of auto_discovery, agent_daemon, coding_daemon in a 

179 single session). 

180 

181 This helper breaks the sleep into short chunks (default 10s each) 

182 and calls :meth:`heartbeat` after every chunk, so: 

183 

184 - A 30s sleep produces 3 heartbeats. 

185 - A 480s backoff produces 48 heartbeats, none aged past 10s. 

186 - The watchdog sees the thread as healthy throughout the sleep. 

187 

188 If ``stop_check`` is provided, the sleep exits early when it 

189 returns True — this lets callers shut down cleanly without 

190 waiting for the full duration. 

191 

192 The helper is a no-op when ``name`` isn't registered (during 

193 tests or when the watchdog isn't running) so daemons can use it 

194 unconditionally without guarding on ``get_watchdog() is not None``. 

195 

196 Args: 

197 name: Thread name registered via :meth:`register`. 

198 total_seconds: Total wall-clock time to sleep. 

199 chunk_seconds: Size of each sleep chunk. Keep this below 

200 ``expected_interval`` so the heartbeat stays fresh. 

201 10s is a good default for the standard 30s daemon 

202 interval (3× headroom against the 300s frozen threshold). 

203 stop_check: Optional callable that returns True to exit 

204 the sleep early. Called between chunks. 

205 """ 

206 if total_seconds <= 0: 

207 self.heartbeat(name) 

208 return 

209 end = time.monotonic() + total_seconds 

210 # Guarantee at least one heartbeat at the start so callers that 

211 # were blocked before calling sleep_with_heartbeat reset their age. 

212 self.heartbeat(name) 

213 while True: 

214 if stop_check is not None and stop_check(): 

215 return 

216 remaining = end - time.monotonic() 

217 if remaining <= 0: 

218 return 

219 chunk = min(chunk_seconds, remaining) 

220 time.sleep(chunk) 

221 self.heartbeat(name) 

222 

223 def start(self) -> None: 

224 """Start the watchdog background thread. Call LAST after all daemons.""" 

225 with self._lock: 

226 if self._running: 

227 return 

228 self._running = True 

229 self._started_at = time.time() 

230 self._thread = threading.Thread(target=self._check_loop, daemon=True) 

231 self._thread.start() 

232 logger.info(f"NodeWatchdog started (interval={self._check_interval}s, " 

233 f"multiplier={self._frozen_multiplier}x)") 

234 

235 def stop(self) -> None: 

236 """Stop the watchdog.""" 

237 with self._lock: 

238 self._running = False 

239 if self._thread: 

240 self._thread.join(timeout=10) 

241 

242 def get_health(self) -> Dict: 

243 """Return health status of all monitored threads.""" 

244 now = time.time() 

245 threads = {} 

246 with self._lock: 

247 for name, info in self._threads.items(): 

248 age = now - info.last_heartbeat 

249 threads[name] = { 

250 'status': info.status, 

251 'last_heartbeat_age_s': round(age, 1), 

252 'last_heartbeat_iso': datetime.fromtimestamp( 

253 info.last_heartbeat, tz=timezone.utc).isoformat(), 

254 'expected_interval': info.expected_interval, 

255 'restart_count': info.restart_count, 

256 'consecutive_failures': info.consecutive_failures, 

257 } 

258 if info.last_restart_at: 

259 threads[name]['last_restart_iso'] = datetime.fromtimestamp( 

260 info.last_restart_at, tz=timezone.utc).isoformat() 

261 

262 uptime = round(now - self._started_at, 1) if self._started_at else 0 

263 with self._lock: 

264 fleet_recent = len(self._fleet_restart_times) 

265 fleet_halted = self._fleet_halted 

266 return { 

267 'watchdog': 'healthy' if self._running else 'stopped', 

268 'uptime_seconds': uptime, 

269 'threads': threads, 

270 'restart_log': list(self._restart_log[-20:]), # last 20 events 

271 'fleet_restarts_in_window': fleet_recent, 

272 'fleet_restart_window_s': FLEET_RESTART_WINDOW_SECONDS, 

273 'fleet_restart_ceiling': FLEET_RESTART_MAX, 

274 'fleet_halted': fleet_halted, 

275 } 

276 

277 def clear_fleet_halt(self) -> None: 

278 """Clear the fleet-wide restart halt flag (manual recovery). 

279 

280 Call after diagnosing + fixing the cascade failure that triggered 

281 the halt. Does NOT revive threads marked 'dead' — those still 

282 require a process restart. 

283 """ 

284 with self._lock: 

285 self._fleet_halted = False 

286 self._fleet_restart_times.clear() 

287 logger.critical("Watchdog: fleet halt cleared — restarts re-enabled") 

288 

289 def _check_loop(self) -> None: 

290 """Background loop: check heartbeats, restart frozen threads.""" 

291 while self._running: 

292 time.sleep(self._check_interval) 

293 if not self._running: 

294 break 

295 self._check_all() 

296 

297 def _check_all(self) -> None: 

298 """Single check pass over all threads.""" 

299 now = time.time() 

300 to_restart = [] 

301 with self._lock: 

302 for name, info in self._threads.items(): 

303 if info.status == 'dead': 

304 continue 

305 age = now - info.last_heartbeat 

306 # Negative age means we're still in the grace period 

307 if age < 0: 

308 continue 

309 # Use extended timeout when thread is in a legitimate LLM call 

310 if info.in_llm_call: 

311 threshold = LLM_CALL_TIMEOUT_SECONDS 

312 else: 

313 threshold = info.expected_interval * self._frozen_multiplier 

314 if age > threshold and info.status in ('healthy', 'frozen', 'in_llm_call'): 

315 # Detect rapid-restart loop: 3+ restarts in last 5 minutes 

316 # means the thread keeps dying — stop restarting it 

317 recent = [t for t in info.recent_restart_times 

318 if now - t < 300] 

319 if len(recent) >= 3: 

320 info.status = 'dead' 

321 logger.warning( 

322 f"Watchdog: thread '{name}' stuck in restart loop " 

323 f"({len(recent)} restarts in 5min) — marking dormant. " 

324 f"Will not restart again until app is restarted.") 

325 continue 

326 logger.critical( 

327 f"Watchdog: thread '{name}' FROZEN - no heartbeat " 

328 f"for {age:.0f}s (threshold: {threshold:.0f}s)") 

329 info.status = 'frozen' 

330 to_restart.append(name) 

331 

332 # Best-effort thread-stack dump BEFORE restart — capture the 

333 # live frame state of the frozen thread so we can diagnose 

334 # WHY it stalled. Uses the unified `core.diag.dump_all_thread_stacks` 

335 # canonical helper (refactor: replaces a 30-line module-lookup + 

336 # inline-fallback that silently dropped dumps in some bundle layouts). 

337 # The dump goes to BOTH the logger AND startup_trace.log 

338 # (flushes immediately, survives GIL-held hangs). 

339 if to_restart: 

340 try: 

341 _dumper = None 

342 # Preferred: direct import from Nunba's core package (works in 

343 # dev + most frozen layouts because Nunba's `core/` is on the 

344 # default sys.path). 

345 try: 

346 from core.diag import dump_all_thread_stacks as _dumper 

347 except Exception: 

348 pass 

349 # Fallback: builtin published by core.diag at import time 

350 # (`builtins._nunba_dump_threads`). Guarantees the dumper is 

351 # reachable from any frozen-bundle topology where direct 

352 # import paths break. 

353 if _dumper is None: 

354 import builtins as _b 

355 _dumper = getattr(_b, '_nunba_dump_threads', None) 

356 if _dumper: 

357 _dumper( 

358 f"NodeWatchdog FROZEN restart: {','.join(to_restart)}", 

359 ) 

360 else: 

361 # Last-resort fallback if even the builtin is missing 

362 # (e.g., HARTOS standalone, no Nunba in process). Keep 

363 # this minimal — the unified path is the supported one. 

364 import sys as _sys 

365 import traceback as _tb 

366 logger.error( 

367 "NodeWatchdog: dumping thread stacks before " 

368 "restart (core.diag unavailable):", 

369 ) 

370 for _tid, _frame in _sys._current_frames().items(): 

371 try: 

372 _stack = ''.join(_tb.format_stack(_frame)) 

373 logger.error( 

374 f" Thread id={_tid}:\n{_stack}", 

375 ) 

376 except Exception: 

377 pass 

378 except Exception as _de: 

379 logger.debug(f"Thread-stack dump failed: {_de}") 

380 

381 # Restart outside the lock to avoid deadlocks 

382 for name in to_restart: 

383 self._restart_thread(name) 

384 

385 def _restart_thread(self, name: str) -> bool: 

386 """Stop and restart a frozen thread. Returns True on success.""" 

387 with self._lock: 

388 info = self._threads.get(name) 

389 if not info: 

390 return False 

391 # Fleet-wide restart budget: hard ceiling across all threads. 

392 # Prune outside the window, then check against the ceiling. 

393 now_mono = time.monotonic() 

394 cutoff = now_mono - FLEET_RESTART_WINDOW_SECONDS 

395 self._fleet_restart_times = [ 

396 t for t in self._fleet_restart_times if t > cutoff] 

397 if self._fleet_halted: 

398 # Already escalated — don't restart anything until manual clear 

399 return False 

400 if len(self._fleet_restart_times) >= FLEET_RESTART_MAX: 

401 self._fleet_halted = True 

402 info.status = 'dead' 

403 logger.critical( 

404 f"Watchdog: FLEET RESTART BUDGET EXCEEDED — " 

405 f"{len(self._fleet_restart_times)} restarts in " 

406 f"{FLEET_RESTART_WINDOW_SECONDS}s (ceiling=" 

407 f"{FLEET_RESTART_MAX}). Halting all further restarts; " 

408 f"thread '{name}' marked DEAD. Investigate cascade " 

409 f"failure before restarting the process.") 

410 return False 

411 if info.consecutive_failures >= MAX_CONSECUTIVE_FAILURES: 

412 info.status = 'dead' 

413 logger.critical( 

414 f"Watchdog: thread '{name}' marked DEAD after " 

415 f"{MAX_CONSECUTIVE_FAILURES} consecutive restart failures") 

416 return False 

417 # Record the restart attempt in the fleet window before dispatch 

418 self._fleet_restart_times.append(now_mono) 

419 info.status = 'restarting' 

420 stop_fn = info.stop_fn 

421 restart_fn = info.restart_fn 

422 

423 # Stop the frozen thread 

424 if stop_fn: 

425 try: 

426 stop_fn() 

427 except Exception as e: 

428 logger.warning(f"Watchdog: error stopping '{name}': {e}") 

429 

430 # Restart it 

431 try: 

432 restart_fn() 

433 with self._lock: 

434 info = self._threads.get(name) 

435 if info: 

436 info.status = 'healthy' 

437 # Give the restarted thread a grace period before 

438 # monitoring resumes (same as initial registration) 

439 info.last_heartbeat = time.time() + STARTUP_GRACE_SECONDS 

440 info.restart_count += 1 

441 info.last_restart_at = time.time() 

442 info.consecutive_failures = 0 

443 info.recent_restart_times.append(time.time()) 

444 # Prune entries older than 5 minutes 

445 cutoff = time.time() - 300 

446 info.recent_restart_times = [ 

447 t for t in info.recent_restart_times if t > cutoff] 

448 self._restart_log.append({ 

449 'name': name, 

450 'time': datetime.now(timezone.utc).isoformat(), 

451 'restart_count': info.restart_count, 

452 }) 

453 # Cap at 100 entries to prevent memory leak on long-running 

454 # instances with flapping daemons (SRE audit finding). 

455 if len(self._restart_log) > 100: 

456 self._restart_log = self._restart_log[-100:] 

457 _count = info.restart_count if info else '?' 

458 logger.critical( 

459 f"Watchdog: thread '{name}' RESTARTED successfully " 

460 f"(total restarts: {_count})") 

461 return True 

462 except Exception as e: 

463 with self._lock: 

464 info = self._threads.get(name) 

465 if info: 

466 info.consecutive_failures += 1 

467 if info.consecutive_failures >= MAX_CONSECUTIVE_FAILURES: 

468 info.status = 'dead' 

469 else: 

470 info.status = 'frozen' 

471 logger.critical(f"Watchdog: FAILED to restart '{name}': {e}") 

472 return False 

473 

474 

475# ─── Module singleton ─── 

476 

477_watchdog: Optional[NodeWatchdog] = None 

478 

479 

480def start_watchdog(check_interval: int = None) -> NodeWatchdog: 

481 """Create and return the global watchdog instance. 

482 

483 Idempotent: if a watchdog already exists and is running, return it. 

484 Without this guard, a second call (e.g. init_social called from both 

485 hart_intelligence_entry AND Nunba main.py) replaces the global 

486 singleton. The first watchdog's check-loop thread keeps running 

487 with stale ThreadInfo entries that never receive heartbeats, 

488 causing it to false-FROZEN every daemon every 300 s — the exact 

489 6-minute restart cascade seen since 2026-04-11. 

490 """ 

491 global _watchdog 

492 if _watchdog is not None: 

493 return _watchdog 

494 _watchdog = NodeWatchdog(check_interval=check_interval) 

495 return _watchdog 

496 

497 

498def get_watchdog() -> Optional[NodeWatchdog]: 

499 """Get the current watchdog instance (or None).""" 

500 return _watchdog