Coverage for security / node_watchdog.py: 89.0%
219 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Node Watchdog - Frozen Thread Auto-Detection and Restart
4Monitors all background daemon threads via heartbeat protocol.
5Detects frozen/crashed threads and auto-restarts them.
7Each daemon calls watchdog.heartbeat('name') every loop iteration.
8If a heartbeat is older than 2× the expected interval, the thread
9is considered frozen and gets auto-restarted.
11After 5 consecutive restart failures, the thread is marked 'dead'.
12"""
13import logging
14import os
15import threading
16import time
17from dataclasses import dataclass, field
18from datetime import datetime, timezone
19from typing import Callable, Dict, List, Optional
21logger = logging.getLogger('hevolve_security')
23MAX_CONSECUTIVE_FAILURES = 5
24# Grace period after registration/restart before monitoring begins.
25# Prevents false FROZEN alerts during startup when daemons are still
26# initialising and haven't entered their heartbeat loop yet.
27STARTUP_GRACE_SECONDS = 60
28# LLM calls can take 30-120s on local models. When a thread is marked
29# in_llm_call, the frozen threshold is multiplied by this factor instead
30# of the normal frozen_multiplier. Prevents restart cascade.
31# Autonomous gather_info + recipe() on local LLM routinely takes 5-10 min.
32# 300s was too tight — caused watchdog restart loops where each restart
33# re-dispatched the same goal, producing duplicate messages.
34LLM_CALL_TIMEOUT_SECONDS = int(os.environ.get('HEVOLVE_LLM_CALL_TIMEOUT', '900'))
35# Fleet-wide restart budget: hard ceiling on cross-thread restart storms.
36# The per-thread budget (3 restarts / 5min → mark dead) still applies first;
37# this is a second-line escalation when many different threads are flapping
38# in parallel (e.g. cascade failure from a shared resource exhaustion).
39# When the ceiling is breached the whole watchdog halts further restarts
40# until manual intervention. Prior to this guard an infinite restart loop
41# was possible when N threads rotated through fresh per-thread budgets.
42FLEET_RESTART_WINDOW_SECONDS = int(
43 os.environ.get('HEVOLVE_WATCHDOG_FLEET_WINDOW_S', '60'))
44FLEET_RESTART_MAX = int(os.environ.get('HEVOLVE_WATCHDOG_FLEET_MAX', '10'))
47@dataclass
48class ThreadInfo:
49 """Tracked state for a single monitored daemon thread."""
50 name: str
51 expected_interval: float
52 restart_fn: Callable
53 stop_fn: Optional[Callable] = None
54 last_heartbeat: float = field(default_factory=time.time)
55 status: str = 'healthy' # healthy | frozen | restarting | dead | in_llm_call
56 restart_count: int = 0
57 last_restart_at: Optional[float] = None
58 consecutive_failures: int = 0
59 # Track recent restart times to detect rapid-restart loops
60 recent_restart_times: list = field(default_factory=list)
61 # LLM call awareness: when True, the thread is blocked on a legitimate
62 # LLM inference call. Watchdog extends the threshold by LLM_CALL_MULTIPLIER
63 # instead of restarting.
64 in_llm_call: bool = False
65 llm_call_started_at: Optional[float] = None
68class NodeWatchdog:
69 """Monitors background daemon threads via heartbeat protocol.
71 Usage:
72 watchdog = NodeWatchdog()
73 watchdog.register('gossip', expected_interval=60,
74 restart_fn=gossip.start, stop_fn=gossip.stop)
75 watchdog.start()
77 # In daemon loops:
78 watchdog.heartbeat('gossip')
79 """
81 def __init__(self, check_interval: int = None, frozen_multiplier: float = 10.0):
82 import os
83 self._check_interval = check_interval or int(
84 os.environ.get('HEVOLVE_WATCHDOG_INTERVAL', '30'))
85 self._frozen_multiplier = frozen_multiplier
86 self._threads: Dict[str, ThreadInfo] = {}
87 self._lock = threading.Lock()
88 self._running = False
89 self._thread: Optional[threading.Thread] = None
90 self._restart_log: List[Dict] = []
91 self._started_at: Optional[float] = None
92 # Fleet-wide restart budget — timestamps (monotonic seconds) of every
93 # restart across every thread. Rolling window pruned in _restart_thread.
94 # Breach → self._fleet_halted = True; no further restarts until cleared.
95 self._fleet_restart_times: List[float] = []
96 self._fleet_halted: bool = False
98 def register(self, name: str, expected_interval: float,
99 restart_fn: Callable, stop_fn: Callable = None) -> None:
100 """Register a daemon thread to be monitored.
102 A grace period delays monitoring so startup initialisation doesn't
103 trigger false FROZEN alerts before the daemon enters its loop.
104 """
105 with self._lock:
106 self._threads[name] = ThreadInfo(
107 name=name,
108 expected_interval=expected_interval,
109 restart_fn=restart_fn,
110 stop_fn=stop_fn,
111 # Pretend the last heartbeat is in the future so the grace
112 # period must elapse before we consider the thread frozen.
113 last_heartbeat=time.time() + STARTUP_GRACE_SECONDS,
114 )
115 logger.info(f"Watchdog: registered thread '{name}' "
116 f"(interval={expected_interval}s)")
118 def unregister(self, name: str) -> None:
119 """Remove a thread from monitoring."""
120 with self._lock:
121 self._threads.pop(name, None)
123 def heartbeat(self, name: str) -> None:
124 """Called by daemon threads each cycle to signal they are alive."""
125 with self._lock:
126 info = self._threads.get(name)
127 if info:
128 info.last_heartbeat = time.time()
130 def is_registered(self, name: str) -> bool:
131 """Check if a thread name is registered."""
132 with self._lock:
133 return name in self._threads
135 def registered_names(self) -> list:
136 """Return list of all registered thread names."""
137 with self._lock:
138 return list(self._threads.keys())
140 def mark_in_llm_call(self, name: str) -> None:
141 """Mark a thread as blocked on a legitimate LLM inference call.
143 The watchdog will use LLM_CALL_TIMEOUT_SECONDS instead of the
144 normal frozen threshold, preventing false restarts during long
145 inference calls.
146 """
147 with self._lock:
148 info = self._threads.get(name)
149 if info:
150 info.in_llm_call = True
151 info.llm_call_started_at = time.time()
152 info.last_heartbeat = time.time() # refresh heartbeat
154 def clear_llm_call(self, name: str) -> None:
155 """Clear the LLM call marker after inference completes."""
156 with self._lock:
157 info = self._threads.get(name)
158 if info:
159 info.in_llm_call = False
160 info.llm_call_started_at = None
162 def sleep_with_heartbeat(
163 self,
164 name: str,
165 total_seconds: float,
166 chunk_seconds: float = 10.0,
167 stop_check: Optional[Callable[[], bool]] = None,
168 ) -> None:
169 """Sleep for ``total_seconds`` while keeping the daemon's heartbeat fresh.
171 Background daemons freeze the watchdog when a single ``time.sleep``
172 or ``time.sleep + blocking call`` exceeds ``expected_interval *
173 frozen_multiplier`` (default 300s for a 30s interval). The
174 exponential-backoff path in agent_daemon / coding_daemon could
175 issue ``time.sleep(480)`` on its 4th consecutive failure, which
176 aged the heartbeat past the 300s threshold and triggered a
177 restart CASCADE — the exact symptom in 2026-04-11 langchain.log
178 (9 restarts of auto_discovery, agent_daemon, coding_daemon in a
179 single session).
181 This helper breaks the sleep into short chunks (default 10s each)
182 and calls :meth:`heartbeat` after every chunk, so:
184 - A 30s sleep produces 3 heartbeats.
185 - A 480s backoff produces 48 heartbeats, none aged past 10s.
186 - The watchdog sees the thread as healthy throughout the sleep.
188 If ``stop_check`` is provided, the sleep exits early when it
189 returns True — this lets callers shut down cleanly without
190 waiting for the full duration.
192 The helper is a no-op when ``name`` isn't registered (during
193 tests or when the watchdog isn't running) so daemons can use it
194 unconditionally without guarding on ``get_watchdog() is not None``.
196 Args:
197 name: Thread name registered via :meth:`register`.
198 total_seconds: Total wall-clock time to sleep.
199 chunk_seconds: Size of each sleep chunk. Keep this below
200 ``expected_interval`` so the heartbeat stays fresh.
201 10s is a good default for the standard 30s daemon
202 interval (3× headroom against the 300s frozen threshold).
203 stop_check: Optional callable that returns True to exit
204 the sleep early. Called between chunks.
205 """
206 if total_seconds <= 0:
207 self.heartbeat(name)
208 return
209 end = time.monotonic() + total_seconds
210 # Guarantee at least one heartbeat at the start so callers that
211 # were blocked before calling sleep_with_heartbeat reset their age.
212 self.heartbeat(name)
213 while True:
214 if stop_check is not None and stop_check():
215 return
216 remaining = end - time.monotonic()
217 if remaining <= 0:
218 return
219 chunk = min(chunk_seconds, remaining)
220 time.sleep(chunk)
221 self.heartbeat(name)
223 def start(self) -> None:
224 """Start the watchdog background thread. Call LAST after all daemons."""
225 with self._lock:
226 if self._running:
227 return
228 self._running = True
229 self._started_at = time.time()
230 self._thread = threading.Thread(target=self._check_loop, daemon=True)
231 self._thread.start()
232 logger.info(f"NodeWatchdog started (interval={self._check_interval}s, "
233 f"multiplier={self._frozen_multiplier}x)")
235 def stop(self) -> None:
236 """Stop the watchdog."""
237 with self._lock:
238 self._running = False
239 if self._thread:
240 self._thread.join(timeout=10)
242 def get_health(self) -> Dict:
243 """Return health status of all monitored threads."""
244 now = time.time()
245 threads = {}
246 with self._lock:
247 for name, info in self._threads.items():
248 age = now - info.last_heartbeat
249 threads[name] = {
250 'status': info.status,
251 'last_heartbeat_age_s': round(age, 1),
252 'last_heartbeat_iso': datetime.fromtimestamp(
253 info.last_heartbeat, tz=timezone.utc).isoformat(),
254 'expected_interval': info.expected_interval,
255 'restart_count': info.restart_count,
256 'consecutive_failures': info.consecutive_failures,
257 }
258 if info.last_restart_at:
259 threads[name]['last_restart_iso'] = datetime.fromtimestamp(
260 info.last_restart_at, tz=timezone.utc).isoformat()
262 uptime = round(now - self._started_at, 1) if self._started_at else 0
263 with self._lock:
264 fleet_recent = len(self._fleet_restart_times)
265 fleet_halted = self._fleet_halted
266 return {
267 'watchdog': 'healthy' if self._running else 'stopped',
268 'uptime_seconds': uptime,
269 'threads': threads,
270 'restart_log': list(self._restart_log[-20:]), # last 20 events
271 'fleet_restarts_in_window': fleet_recent,
272 'fleet_restart_window_s': FLEET_RESTART_WINDOW_SECONDS,
273 'fleet_restart_ceiling': FLEET_RESTART_MAX,
274 'fleet_halted': fleet_halted,
275 }
277 def clear_fleet_halt(self) -> None:
278 """Clear the fleet-wide restart halt flag (manual recovery).
280 Call after diagnosing + fixing the cascade failure that triggered
281 the halt. Does NOT revive threads marked 'dead' — those still
282 require a process restart.
283 """
284 with self._lock:
285 self._fleet_halted = False
286 self._fleet_restart_times.clear()
287 logger.critical("Watchdog: fleet halt cleared — restarts re-enabled")
289 def _check_loop(self) -> None:
290 """Background loop: check heartbeats, restart frozen threads."""
291 while self._running:
292 time.sleep(self._check_interval)
293 if not self._running:
294 break
295 self._check_all()
297 def _check_all(self) -> None:
298 """Single check pass over all threads."""
299 now = time.time()
300 to_restart = []
301 with self._lock:
302 for name, info in self._threads.items():
303 if info.status == 'dead':
304 continue
305 age = now - info.last_heartbeat
306 # Negative age means we're still in the grace period
307 if age < 0:
308 continue
309 # Use extended timeout when thread is in a legitimate LLM call
310 if info.in_llm_call:
311 threshold = LLM_CALL_TIMEOUT_SECONDS
312 else:
313 threshold = info.expected_interval * self._frozen_multiplier
314 if age > threshold and info.status in ('healthy', 'frozen', 'in_llm_call'):
315 # Detect rapid-restart loop: 3+ restarts in last 5 minutes
316 # means the thread keeps dying — stop restarting it
317 recent = [t for t in info.recent_restart_times
318 if now - t < 300]
319 if len(recent) >= 3:
320 info.status = 'dead'
321 logger.warning(
322 f"Watchdog: thread '{name}' stuck in restart loop "
323 f"({len(recent)} restarts in 5min) — marking dormant. "
324 f"Will not restart again until app is restarted.")
325 continue
326 logger.critical(
327 f"Watchdog: thread '{name}' FROZEN - no heartbeat "
328 f"for {age:.0f}s (threshold: {threshold:.0f}s)")
329 info.status = 'frozen'
330 to_restart.append(name)
332 # Best-effort thread-stack dump BEFORE restart — capture the
333 # live frame state of the frozen thread so we can diagnose
334 # WHY it stalled. Uses the unified `core.diag.dump_all_thread_stacks`
335 # canonical helper (refactor: replaces a 30-line module-lookup +
336 # inline-fallback that silently dropped dumps in some bundle layouts).
337 # The dump goes to BOTH the logger AND startup_trace.log
338 # (flushes immediately, survives GIL-held hangs).
339 if to_restart:
340 try:
341 _dumper = None
342 # Preferred: direct import from Nunba's core package (works in
343 # dev + most frozen layouts because Nunba's `core/` is on the
344 # default sys.path).
345 try:
346 from core.diag import dump_all_thread_stacks as _dumper
347 except Exception:
348 pass
349 # Fallback: builtin published by core.diag at import time
350 # (`builtins._nunba_dump_threads`). Guarantees the dumper is
351 # reachable from any frozen-bundle topology where direct
352 # import paths break.
353 if _dumper is None:
354 import builtins as _b
355 _dumper = getattr(_b, '_nunba_dump_threads', None)
356 if _dumper:
357 _dumper(
358 f"NodeWatchdog FROZEN restart: {','.join(to_restart)}",
359 )
360 else:
361 # Last-resort fallback if even the builtin is missing
362 # (e.g., HARTOS standalone, no Nunba in process). Keep
363 # this minimal — the unified path is the supported one.
364 import sys as _sys
365 import traceback as _tb
366 logger.error(
367 "NodeWatchdog: dumping thread stacks before "
368 "restart (core.diag unavailable):",
369 )
370 for _tid, _frame in _sys._current_frames().items():
371 try:
372 _stack = ''.join(_tb.format_stack(_frame))
373 logger.error(
374 f" Thread id={_tid}:\n{_stack}",
375 )
376 except Exception:
377 pass
378 except Exception as _de:
379 logger.debug(f"Thread-stack dump failed: {_de}")
381 # Restart outside the lock to avoid deadlocks
382 for name in to_restart:
383 self._restart_thread(name)
385 def _restart_thread(self, name: str) -> bool:
386 """Stop and restart a frozen thread. Returns True on success."""
387 with self._lock:
388 info = self._threads.get(name)
389 if not info:
390 return False
391 # Fleet-wide restart budget: hard ceiling across all threads.
392 # Prune outside the window, then check against the ceiling.
393 now_mono = time.monotonic()
394 cutoff = now_mono - FLEET_RESTART_WINDOW_SECONDS
395 self._fleet_restart_times = [
396 t for t in self._fleet_restart_times if t > cutoff]
397 if self._fleet_halted:
398 # Already escalated — don't restart anything until manual clear
399 return False
400 if len(self._fleet_restart_times) >= FLEET_RESTART_MAX:
401 self._fleet_halted = True
402 info.status = 'dead'
403 logger.critical(
404 f"Watchdog: FLEET RESTART BUDGET EXCEEDED — "
405 f"{len(self._fleet_restart_times)} restarts in "
406 f"{FLEET_RESTART_WINDOW_SECONDS}s (ceiling="
407 f"{FLEET_RESTART_MAX}). Halting all further restarts; "
408 f"thread '{name}' marked DEAD. Investigate cascade "
409 f"failure before restarting the process.")
410 return False
411 if info.consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
412 info.status = 'dead'
413 logger.critical(
414 f"Watchdog: thread '{name}' marked DEAD after "
415 f"{MAX_CONSECUTIVE_FAILURES} consecutive restart failures")
416 return False
417 # Record the restart attempt in the fleet window before dispatch
418 self._fleet_restart_times.append(now_mono)
419 info.status = 'restarting'
420 stop_fn = info.stop_fn
421 restart_fn = info.restart_fn
423 # Stop the frozen thread
424 if stop_fn:
425 try:
426 stop_fn()
427 except Exception as e:
428 logger.warning(f"Watchdog: error stopping '{name}': {e}")
430 # Restart it
431 try:
432 restart_fn()
433 with self._lock:
434 info = self._threads.get(name)
435 if info:
436 info.status = 'healthy'
437 # Give the restarted thread a grace period before
438 # monitoring resumes (same as initial registration)
439 info.last_heartbeat = time.time() + STARTUP_GRACE_SECONDS
440 info.restart_count += 1
441 info.last_restart_at = time.time()
442 info.consecutive_failures = 0
443 info.recent_restart_times.append(time.time())
444 # Prune entries older than 5 minutes
445 cutoff = time.time() - 300
446 info.recent_restart_times = [
447 t for t in info.recent_restart_times if t > cutoff]
448 self._restart_log.append({
449 'name': name,
450 'time': datetime.now(timezone.utc).isoformat(),
451 'restart_count': info.restart_count,
452 })
453 # Cap at 100 entries to prevent memory leak on long-running
454 # instances with flapping daemons (SRE audit finding).
455 if len(self._restart_log) > 100:
456 self._restart_log = self._restart_log[-100:]
457 _count = info.restart_count if info else '?'
458 logger.critical(
459 f"Watchdog: thread '{name}' RESTARTED successfully "
460 f"(total restarts: {_count})")
461 return True
462 except Exception as e:
463 with self._lock:
464 info = self._threads.get(name)
465 if info:
466 info.consecutive_failures += 1
467 if info.consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
468 info.status = 'dead'
469 else:
470 info.status = 'frozen'
471 logger.critical(f"Watchdog: FAILED to restart '{name}': {e}")
472 return False
475# ─── Module singleton ───
477_watchdog: Optional[NodeWatchdog] = None
480def start_watchdog(check_interval: int = None) -> NodeWatchdog:
481 """Create and return the global watchdog instance.
483 Idempotent: if a watchdog already exists and is running, return it.
484 Without this guard, a second call (e.g. init_social called from both
485 hart_intelligence_entry AND Nunba main.py) replaces the global
486 singleton. The first watchdog's check-loop thread keeps running
487 with stale ThreadInfo entries that never receive heartbeats,
488 causing it to false-FROZEN every daemon every 300 s — the exact
489 6-minute restart cascade seen since 2026-04-11.
490 """
491 global _watchdog
492 if _watchdog is not None:
493 return _watchdog
494 _watchdog = NodeWatchdog(check_interval=check_interval)
495 return _watchdog
498def get_watchdog() -> Optional[NodeWatchdog]:
499 """Get the current watchdog instance (or None)."""
500 return _watchdog