Coverage for security / runtime_monitor.py: 78.5%

107 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Runtime Integrity Monitor: Background daemon that periodically re-checks code hash 

3against the boot-time signed manifest. Detects tampering and disconnects from network. 

4""" 

5import os 

6import time 

7import logging 

8import threading 

9from typing import Optional 

10 

11logger = logging.getLogger('hevolve_security') 

12 

13_monitor: Optional['RuntimeIntegrityMonitor'] = None 

14 

15 

16class RuntimeIntegrityMonitor: 

17 """Background daemon that periodically re-checks code hash against manifest.""" 

18 

19 def __init__(self, manifest: dict, check_interval: int = None, code_root: str = None): 

20 self._manifest = manifest 

21 self._expected_hash = manifest.get('code_hash', '') 

22 self._check_interval = check_interval or int( 

23 os.environ.get('HEVOLVE_TAMPER_CHECK_INTERVAL', '300')) 

24 self._code_root = code_root 

25 self._running = False 

26 self._thread = None 

27 self._lock = threading.Lock() 

28 self._tampered = False 

29 self._boot_manifest_snapshot = None 

30 # Purge __pycache__ before snapshot - blocks bytecode injection 

31 try: 

32 from security.node_integrity import purge_pycache 

33 purge_pycache(code_root) 

34 except Exception: 

35 pass 

36 # Snapshot file manifest at boot for diff on tamper 

37 try: 

38 from security.node_integrity import compute_file_manifest 

39 self._boot_manifest_snapshot = compute_file_manifest(code_root) 

40 except Exception: 

41 pass 

42 

43 def start(self) -> None: 

44 """Start the background monitoring thread (daemon=True).""" 

45 with self._lock: 

46 if self._running: 

47 return 

48 self._running = True 

49 self._thread = threading.Thread(target=self._check_loop, daemon=True) 

50 self._thread.start() 

51 logger.info(f"Runtime integrity monitor started (interval={self._check_interval}s)") 

52 

53 def stop(self) -> None: 

54 """Stop the monitor.""" 

55 with self._lock: 

56 self._running = False 

57 if self._thread: 

58 self._thread.join(timeout=10) 

59 

60 def _wd_heartbeat(self): 

61 """Send heartbeat to watchdog between potentially blocking checks.""" 

62 try: 

63 from security.node_watchdog import get_watchdog 

64 wd = get_watchdog() 

65 if wd: 

66 wd.heartbeat('runtime_monitor') 

67 except Exception: 

68 pass 

69 

70 def _check_loop(self) -> None: 

71 """Background loop: periodic code hash + guardrail hash verification.""" 

72 while self._running: 

73 time.sleep(self._check_interval) 

74 if not self._running: 

75 break 

76 self._wd_heartbeat() 

77 try: 

78 from security.node_integrity import compute_code_hash 

79 current_hash = compute_code_hash(self._code_root) 

80 if current_hash != self._expected_hash: 

81 logger.critical( 

82 f"TAMPERING DETECTED: code hash changed from " 

83 f"{self._expected_hash[:16]}... to {current_hash[:16]}...") 

84 self._tampered = True 

85 self._on_tamper_detected() 

86 return # Stop checking after tamper 

87 except Exception as e: 

88 logger.warning(f"Runtime integrity check error: {e}") 

89 

90 self._wd_heartbeat() 

91 

92 # Guardrail values integrity check 

93 try: 

94 from security.hive_guardrails import verify_guardrail_integrity 

95 if not verify_guardrail_integrity(): 

96 logger.critical( 

97 "GUARDRAIL TAMPERING DETECTED: frozen values hash changed") 

98 self._tampered = True 

99 self._on_tamper_detected() 

100 return 

101 except Exception as e: 

102 logger.warning(f"Guardrail integrity check error: {e}") 

103 

104 self._wd_heartbeat() 

105 

106 # Origin attestation check — detect branding removal 

107 try: 

108 from security.origin_attestation import verify_origin 

109 origin = verify_origin(self._code_root) 

110 if not origin['genuine']: 

111 logger.critical( 

112 f"ORIGIN ATTESTATION FAILED: {origin['details']}") 

113 except Exception: 

114 pass 

115 

116 def _on_tamper_detected(self) -> None: 

117 """Respond to tampering: stop gossip, log changed files.""" 

118 # Log which files changed 

119 try: 

120 from security.node_integrity import compute_file_manifest 

121 if self._boot_manifest_snapshot: 

122 current = compute_file_manifest(self._code_root) 

123 for path, boot_hash in self._boot_manifest_snapshot.items(): 

124 cur_hash = current.get(path) 

125 if cur_hash != boot_hash: 

126 logger.critical(f"TAMPERED FILE: {path}") 

127 for path in current: 

128 if path not in self._boot_manifest_snapshot: 

129 logger.critical(f"NEW FILE (post-boot): {path}") 

130 except Exception: 

131 pass 

132 

133 # Stop gossip protocol 

134 try: 

135 from integrations.social.peer_discovery import gossip 

136 gossip.stop() 

137 logger.critical("Gossip protocol stopped due to code tampering") 

138 except Exception: 

139 pass 

140 

141 self._running = False 

142 

143 def _check_loop_once_for_test(self) -> None: 

144 """Run a single integrity check (for testing only).""" 

145 try: 

146 from security.node_integrity import compute_code_hash 

147 current_hash = compute_code_hash(self._code_root) 

148 if current_hash != self._expected_hash: 

149 self._tampered = True 

150 except Exception: 

151 pass 

152 

153 @property 

154 def is_healthy(self) -> bool: 

155 """Returns False if tampering detected.""" 

156 return not self._tampered 

157 

158 

159def start_monitor(manifest: dict, code_root: str = None) -> RuntimeIntegrityMonitor: 

160 """Start the runtime integrity monitor. Called from init_social().""" 

161 global _monitor 

162 _monitor = RuntimeIntegrityMonitor(manifest, code_root=code_root) 

163 _monitor.start() 

164 return _monitor 

165 

166 

167def get_monitor() -> Optional[RuntimeIntegrityMonitor]: 

168 """Get the current monitor instance.""" 

169 return _monitor 

170 

171 

172def is_code_healthy() -> bool: 

173 """Quick check: True if no tampering detected. Safe to call even if monitor not started.""" 

174 if _monitor is None: 

175 return True # No monitor = no tamper info 

176 return _monitor.is_healthy