Coverage for security / runtime_monitor.py: 78.5%
107 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Runtime Integrity Monitor: Background daemon that periodically re-checks code hash
3against the boot-time signed manifest. Detects tampering and disconnects from network.
4"""
5import os
6import time
7import logging
8import threading
9from typing import Optional
11logger = logging.getLogger('hevolve_security')
13_monitor: Optional['RuntimeIntegrityMonitor'] = None
16class RuntimeIntegrityMonitor:
17 """Background daemon that periodically re-checks code hash against manifest."""
19 def __init__(self, manifest: dict, check_interval: int = None, code_root: str = None):
20 self._manifest = manifest
21 self._expected_hash = manifest.get('code_hash', '')
22 self._check_interval = check_interval or int(
23 os.environ.get('HEVOLVE_TAMPER_CHECK_INTERVAL', '300'))
24 self._code_root = code_root
25 self._running = False
26 self._thread = None
27 self._lock = threading.Lock()
28 self._tampered = False
29 self._boot_manifest_snapshot = None
30 # Purge __pycache__ before snapshot - blocks bytecode injection
31 try:
32 from security.node_integrity import purge_pycache
33 purge_pycache(code_root)
34 except Exception:
35 pass
36 # Snapshot file manifest at boot for diff on tamper
37 try:
38 from security.node_integrity import compute_file_manifest
39 self._boot_manifest_snapshot = compute_file_manifest(code_root)
40 except Exception:
41 pass
43 def start(self) -> None:
44 """Start the background monitoring thread (daemon=True)."""
45 with self._lock:
46 if self._running:
47 return
48 self._running = True
49 self._thread = threading.Thread(target=self._check_loop, daemon=True)
50 self._thread.start()
51 logger.info(f"Runtime integrity monitor started (interval={self._check_interval}s)")
53 def stop(self) -> None:
54 """Stop the monitor."""
55 with self._lock:
56 self._running = False
57 if self._thread:
58 self._thread.join(timeout=10)
60 def _wd_heartbeat(self):
61 """Send heartbeat to watchdog between potentially blocking checks."""
62 try:
63 from security.node_watchdog import get_watchdog
64 wd = get_watchdog()
65 if wd:
66 wd.heartbeat('runtime_monitor')
67 except Exception:
68 pass
70 def _check_loop(self) -> None:
71 """Background loop: periodic code hash + guardrail hash verification."""
72 while self._running:
73 time.sleep(self._check_interval)
74 if not self._running:
75 break
76 self._wd_heartbeat()
77 try:
78 from security.node_integrity import compute_code_hash
79 current_hash = compute_code_hash(self._code_root)
80 if current_hash != self._expected_hash:
81 logger.critical(
82 f"TAMPERING DETECTED: code hash changed from "
83 f"{self._expected_hash[:16]}... to {current_hash[:16]}...")
84 self._tampered = True
85 self._on_tamper_detected()
86 return # Stop checking after tamper
87 except Exception as e:
88 logger.warning(f"Runtime integrity check error: {e}")
90 self._wd_heartbeat()
92 # Guardrail values integrity check
93 try:
94 from security.hive_guardrails import verify_guardrail_integrity
95 if not verify_guardrail_integrity():
96 logger.critical(
97 "GUARDRAIL TAMPERING DETECTED: frozen values hash changed")
98 self._tampered = True
99 self._on_tamper_detected()
100 return
101 except Exception as e:
102 logger.warning(f"Guardrail integrity check error: {e}")
104 self._wd_heartbeat()
106 # Origin attestation check — detect branding removal
107 try:
108 from security.origin_attestation import verify_origin
109 origin = verify_origin(self._code_root)
110 if not origin['genuine']:
111 logger.critical(
112 f"ORIGIN ATTESTATION FAILED: {origin['details']}")
113 except Exception:
114 pass
116 def _on_tamper_detected(self) -> None:
117 """Respond to tampering: stop gossip, log changed files."""
118 # Log which files changed
119 try:
120 from security.node_integrity import compute_file_manifest
121 if self._boot_manifest_snapshot:
122 current = compute_file_manifest(self._code_root)
123 for path, boot_hash in self._boot_manifest_snapshot.items():
124 cur_hash = current.get(path)
125 if cur_hash != boot_hash:
126 logger.critical(f"TAMPERED FILE: {path}")
127 for path in current:
128 if path not in self._boot_manifest_snapshot:
129 logger.critical(f"NEW FILE (post-boot): {path}")
130 except Exception:
131 pass
133 # Stop gossip protocol
134 try:
135 from integrations.social.peer_discovery import gossip
136 gossip.stop()
137 logger.critical("Gossip protocol stopped due to code tampering")
138 except Exception:
139 pass
141 self._running = False
143 def _check_loop_once_for_test(self) -> None:
144 """Run a single integrity check (for testing only)."""
145 try:
146 from security.node_integrity import compute_code_hash
147 current_hash = compute_code_hash(self._code_root)
148 if current_hash != self._expected_hash:
149 self._tampered = True
150 except Exception:
151 pass
153 @property
154 def is_healthy(self) -> bool:
155 """Returns False if tampering detected."""
156 return not self._tampered
159def start_monitor(manifest: dict, code_root: str = None) -> RuntimeIntegrityMonitor:
160 """Start the runtime integrity monitor. Called from init_social()."""
161 global _monitor
162 _monitor = RuntimeIntegrityMonitor(manifest, code_root=code_root)
163 _monitor.start()
164 return _monitor
167def get_monitor() -> Optional[RuntimeIntegrityMonitor]:
168 """Get the current monitor instance."""
169 return _monitor
172def is_code_healthy() -> bool:
173 """Quick check: True if no tampering detected. Safe to call even if monitor not started."""
174 if _monitor is None:
175 return True # No monitor = no tamper info
176 return _monitor.is_healthy