Coverage for security / source_protection.py: 79.7%

231 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Source Protection Service — HevolveAI integrity verification. 

3 

4Multi-layer defense for HevolveAI source code: 

5 1. pip install: SSH key required (git+ssh://) 

6 2. Nunba bundling: .pyc only (source stripped) 

7 3. Boot verification: hash manifest signed by build node 

8 4. Runtime gating: certificate tier + CCT gates feature access 

9 5. inspect.getsource() blocking: prevents runtime source extraction 

10 

11This module answers: 

12 - Is HevolveAI installed? How? (SSH, HTTPS, wheel, bundled) 

13 - Is the source code visible? (Should be False in production) 

14 - Does the installed code match the known-good manifest? 

15 

16If integrity check fails → disable in-process mode, force HTTP fallback. 

17""" 

18import hashlib 

19import importlib 

20import importlib.abc 

21import importlib.machinery 

22import inspect 

23import json 

24import logging 

25import os 

26import sys 

27import threading 

28import time 

29from pathlib import Path 

30from types import ModuleType 

31from typing import Callable, Dict, List, Optional 

32 

33logger = logging.getLogger('hevolve_security') 

34 

35# Path to the known-good manifest (populated by CI/CD build) 

36_MANIFEST_PATH = os.environ.get( 

37 'HEVOLVE_HEVOLVEAI_MANIFEST', 

38 os.path.join(os.path.dirname(__file__), 'hevolveai_manifest.json'), 

39) 

40 

41 

42class SourceProtectionService: 

43 """Verifies HevolveAI installation integrity. 

44 

45 Called at boot and periodically to ensure the installed HevolveAI 

46 code matches the signed manifest. Mismatch → HTTP fallback only. 

47 """ 

48 

49 @staticmethod 

50 def check_install_method() -> str: 

51 """Detect how HevolveAI was installed. 

52 

53 Returns one of: 

54 'git_ssh' — pip install from SSH URL 

55 'git_https' — pip install from HTTPS URL 

56 'pip_wheel' — installed from a wheel/sdist 

57 'bundled_pyc' — .pyc only (Nunba build) 

58 'bundled_cython'— .so/.pyd (Cython compiled) 

59 'not_installed' — HevolveAI not found 

60 'unknown' — detected but method unclear 

61 """ 

62 try: 

63 spec = importlib.util.find_spec('hevolveai') 

64 except (ModuleNotFoundError, ValueError): 

65 return 'not_installed' 

66 

67 if spec is None: 

68 return 'not_installed' 

69 

70 origin = spec.origin or '' 

71 

72 # Check for compiled extensions 

73 if origin.endswith(('.so', '.pyd')): 

74 return 'bundled_cython' 

75 

76 # Check for bytecode only 

77 if origin.endswith('.pyc'): 

78 return 'bundled_pyc' 

79 

80 # Check pip metadata for install source 

81 try: 

82 from importlib.metadata import metadata as pkg_metadata 

83 meta = pkg_metadata('hevolveai') 

84 # direct_url.json is set by pip for VCS installs 

85 try: 

86 from importlib.metadata import packages_distributions 

87 dist_info = Path(spec.origin).parent 

88 direct_url = dist_info.parent / ( 

89 dist_info.name.replace('.', '-') + '.dist-info' 

90 ) / 'direct_url.json' 

91 if direct_url.exists(): 

92 url_data = json.loads(direct_url.read_text()) 

93 url = url_data.get('url', '') 

94 if url.startswith('ssh://') or 'git@' in url: 

95 return 'git_ssh' 

96 if url.startswith('https://'): 

97 return 'git_https' 

98 except Exception: 

99 pass 

100 

101 # Fallback: check installer 

102 installer = meta.get('Installer', '') 

103 if installer: 

104 return 'pip_wheel' 

105 except Exception: 

106 pass 

107 

108 if origin.endswith('.py'): 

109 return 'unknown' 

110 

111 return 'unknown' 

112 

113 @staticmethod 

114 def is_source_visible() -> bool: 

115 """Check if HevolveAI .py source files are present. 

116 

117 In production (Nunba builds), only .pyc should exist. 

118 Returns True if .py source is found (bad for production). 

119 """ 

120 try: 

121 spec = importlib.util.find_spec('hevolveai') 

122 except (ModuleNotFoundError, ValueError): 

123 return False 

124 

125 if spec is None or not spec.origin: 

126 return False 

127 

128 # If the spec origin itself is .py, source is visible 

129 if spec.origin.endswith('.py'): 

130 return True 

131 

132 # Check subpackages for .py files 

133 if spec.submodule_search_locations: 

134 for loc in spec.submodule_search_locations: 

135 loc_path = Path(loc) 

136 if loc_path.exists(): 

137 py_files = list(loc_path.glob('**/*.py')) 

138 # Exclude __init__.py stubs (often left as .py) 

139 real_py = [f for f in py_files 

140 if f.name != '__init__.py'] 

141 if real_py: 

142 return True 

143 return False 

144 

145 @staticmethod 

146 def verify_hevolveai_integrity() -> Dict: 

147 """Verify installed HevolveAI against known-good manifest. 

148 

149 Returns: 

150 { 

151 'verified': bool, 

152 'install_method': str, 

153 'source_visible': bool, 

154 'mismatched_files': list, 

155 'missing_files': list, 

156 'extra_files': list, 

157 } 

158 """ 

159 result: Dict = { 

160 'verified': False, 

161 'install_method': SourceProtectionService.check_install_method(), 

162 'source_visible': SourceProtectionService.is_source_visible(), 

163 'mismatched_files': [], 

164 'missing_files': [], 

165 'extra_files': [], 

166 } 

167 

168 if result['install_method'] == 'not_installed': 

169 result['error'] = 'HevolveAI not installed' 

170 return result 

171 

172 # Load manifest 

173 manifest = SourceProtectionService._load_manifest() 

174 if manifest is None: 

175 result['error'] = 'manifest not found or invalid' 

176 # No manifest = cannot verify = fail-closed 

177 result['verified'] = False 

178 return result 

179 

180 # Find HevolveAI package root 

181 try: 

182 spec = importlib.util.find_spec('hevolveai') 

183 if spec is None or not spec.submodule_search_locations: 

184 result['error'] = 'cannot locate HevolveAI package' 

185 return result 

186 pkg_root = Path(list(spec.submodule_search_locations)[0]) 

187 except Exception as e: 

188 result['error'] = f'package location error: {e}' 

189 return result 

190 

191 # Compare file hashes 

192 expected = manifest.get('files', {}) 

193 actual = SourceProtectionService._compute_package_hashes(pkg_root) 

194 

195 for rel_path, expected_hash in expected.items(): 

196 actual_hash = actual.pop(rel_path, None) 

197 if actual_hash is None: 

198 result['missing_files'].append(rel_path) 

199 elif actual_hash != expected_hash: 

200 result['mismatched_files'].append(rel_path) 

201 

202 result['extra_files'] = list(actual.keys()) 

203 

204 # Verified if no mismatches or missing files 

205 result['verified'] = ( 

206 len(result['mismatched_files']) == 0 

207 and len(result['missing_files']) == 0 

208 ) 

209 

210 return result 

211 

212 @staticmethod 

213 def _load_manifest() -> Optional[Dict]: 

214 """Load the signed manifest file.""" 

215 try: 

216 with open(_MANIFEST_PATH, 'r') as f: 

217 return json.load(f) 

218 except (FileNotFoundError, json.JSONDecodeError): 

219 return None 

220 

221 @staticmethod 

222 def _compute_package_hashes(pkg_root: Path) -> Dict[str, str]: 

223 """Compute SHA-256 hashes for all files in the package.""" 

224 hashes: Dict[str, str] = {} 

225 if not pkg_root.exists(): 

226 return hashes 

227 

228 for path in sorted(pkg_root.rglob('*')): 

229 if path.is_file() and not path.name.startswith('.'): 

230 rel = str(path.relative_to(pkg_root)).replace('\\', '/') 

231 h = hashlib.sha256() 

232 try: 

233 with open(path, 'rb') as f: 

234 for chunk in iter(lambda: f.read(8192), b''): 

235 h.update(chunk) 

236 hashes[rel] = h.hexdigest() 

237 except (IOError, OSError): 

238 pass 

239 return hashes 

240 

241 

242def compute_dependency_hash(package_name: str) -> Optional[str]: 

243 """Compute a combined SHA-256 hash of all files in an installed package. 

244 

245 Useful for node_integrity to include dependency hashes in the 

246 overall code hash for tamper detection. 

247 

248 Args: 

249 package_name: pip package name (e.g. 'hevolveai' / HevolveAI) 

250 

251 Returns: 

252 hex digest string or None if package not found 

253 """ 

254 try: 

255 spec = importlib.util.find_spec(package_name) 

256 except (ModuleNotFoundError, ValueError): 

257 return None 

258 

259 if spec is None or not spec.submodule_search_locations: 

260 return None 

261 

262 pkg_root = Path(list(spec.submodule_search_locations)[0]) 

263 if not pkg_root.exists(): 

264 return None 

265 

266 combined = hashlib.sha256() 

267 for path in sorted(pkg_root.rglob('*')): 

268 if path.is_file() and not path.name.startswith('.'): 

269 try: 

270 with open(path, 'rb') as f: 

271 for chunk in iter(lambda: f.read(8192), b''): 

272 combined.update(chunk) 

273 except (IOError, OSError): 

274 pass 

275 

276 digest = combined.hexdigest() 

277 return digest if digest != hashlib.sha256().hexdigest() else None 

278 

279 

280class CrawlIntegrityWatcher: 

281 """Periodic re-verification of HevolveAI package integrity post-boot. 

282 

283 Mirrors RuntimeIntegrityMonitor's pattern but scoped to the HevolveAI 

284 package only. On tamper detection, fires registered callbacks instead 

285 of halting the hive — callers decide how to respond (e.g. disable 

286 in-process mode, fall back to HTTP). 

287 

288 Env vars: 

289 HEVOLVE_TAMPER_CHECK_INTERVAL — seconds between checks (default 300) 

290 """ 

291 

292 def __init__(self, check_interval: int = None): 

293 self._check_interval = check_interval or int( 

294 os.environ.get('HEVOLVE_TAMPER_CHECK_INTERVAL', '300')) 

295 self._running = False 

296 self._thread: Optional[threading.Thread] = None 

297 self._lock = threading.Lock() 

298 self._tampered = False 

299 self._callbacks: List[Callable] = [] 

300 # Snapshot the hash at construction (boot) time 

301 self._boot_hash: str = self._compute_current_hash() 

302 

303 # ── Public API ────────────────────────────────────────────── 

304 

305 def register_tamper_callback(self, callback: Callable) -> None: 

306 """Register a callable invoked when tampering is detected. 

307 

308 Called exactly once per watcher lifetime (stops after first detection). 

309 """ 

310 with self._lock: 

311 self._callbacks.append(callback) 

312 

313 def start(self) -> None: 

314 """Start the background monitoring thread (daemon=True).""" 

315 with self._lock: 

316 if self._running: 

317 return 

318 self._running = True 

319 self._thread = threading.Thread( 

320 target=self._check_loop, daemon=True, 

321 name='crawl_integrity_watcher') 

322 self._thread.start() 

323 logger.info( 

324 f"[CrawlIntegrityWatcher] Started " 

325 f"(interval={self._check_interval}s, " 

326 f"boot_hash={self._boot_hash[:16]}...)" 

327 if self._boot_hash else 

328 "[CrawlIntegrityWatcher] Started (HevolveAI not installed)") 

329 

330 def stop(self) -> None: 

331 """Stop the watcher gracefully.""" 

332 with self._lock: 

333 self._running = False 

334 if self._thread and self._thread.is_alive(): 

335 self._thread.join(timeout=10) 

336 

337 @property 

338 def is_healthy(self) -> bool: 

339 """False if tampering was detected.""" 

340 return not self._tampered 

341 

342 # ── Internal loop ──────────────────────────────────────────── 

343 

344 def _check_loop(self) -> None: 

345 """Background loop: re-hash HevolveAI every interval.""" 

346 while self._running: 

347 time.sleep(self._check_interval) 

348 if not self._running: 

349 break 

350 try: 

351 current = self._compute_current_hash() 

352 if current and self._boot_hash and current != self._boot_hash: 

353 logger.critical( 

354 f"[CrawlIntegrityWatcher] TAMPERING DETECTED: " 

355 f"HevolveAI hash changed from " 

356 f"{self._boot_hash[:16]}... " 

357 f"to {current[:16]}...") 

358 self._tampered = True 

359 self._on_tamper_detected() 

360 return # Stop after first detection 

361 except Exception as e: 

362 logger.warning( 

363 f"[CrawlIntegrityWatcher] Integrity check error: {e}") 

364 

365 def _on_tamper_detected(self) -> None: 

366 """Fire all registered callbacks.""" 

367 with self._lock: 

368 callbacks = list(self._callbacks) 

369 self._running = False 

370 for cb in callbacks: 

371 try: 

372 cb() 

373 except Exception as e: 

374 logger.warning( 

375 f"[CrawlIntegrityWatcher] Callback error: {e}") 

376 

377 def _compute_current_hash(self) -> str: 

378 """Compute combined SHA-256 over all HevolveAI package files.""" 

379 return compute_dependency_hash('hevolveai') or '' 

380 

381 # ── Test helper ────────────────────────────────────────────── 

382 

383 def _check_once_for_test(self) -> None: 

384 """Run a single hash comparison without sleeping (testing only).""" 

385 try: 

386 current = self._compute_current_hash() 

387 if current and self._boot_hash and current != self._boot_hash: 

388 self._tampered = True 

389 self._on_tamper_detected() 

390 except Exception: 

391 pass 

392 

393 

394# ── Runtime Source Extraction Blocking ──────────────────────── 

395# 

396# Even if .py files somehow survive stripping, this prevents 

397# inspect.getsource(), inspect.getsourcelines(), and 

398# inspect.getsourcefile() from returning hevolveai code. 

399 

400_PROTECTED_PACKAGES = ('hevolveai',) 

401 

402_original_getsource = inspect.getsource 

403_original_getsourcelines = inspect.getsourcelines 

404_original_getsourcefile = inspect.getsourcefile 

405_original_findsource = inspect.findsource 

406 

407 

408def _is_protected_object(obj) -> bool: 

409 """Check if obj belongs to a protected package.""" 

410 module = getattr(obj, '__module__', None) 

411 if module and any(module == pkg or module.startswith(pkg + '.') 

412 for pkg in _PROTECTED_PACKAGES): 

413 return True 

414 # For modules directly 

415 if isinstance(obj, ModuleType): 

416 name = getattr(obj, '__name__', '') 

417 if any(name == pkg or name.startswith(pkg + '.') 

418 for pkg in _PROTECTED_PACKAGES): 

419 return True 

420 return False 

421 

422 

423def _guarded_getsource(obj): 

424 """Replacement for inspect.getsource that blocks protected packages.""" 

425 if _is_protected_object(obj): 

426 raise OSError(f"source code not available for {getattr(obj, '__name__', obj)}") 

427 return _original_getsource(obj) 

428 

429 

430def _guarded_getsourcelines(obj): 

431 """Replacement for inspect.getsourcelines that blocks protected packages.""" 

432 if _is_protected_object(obj): 

433 raise OSError(f"source code not available for {getattr(obj, '__name__', obj)}") 

434 return _original_getsourcelines(obj) 

435 

436 

437def _guarded_getsourcefile(obj): 

438 """Replacement for inspect.getsourcefile that blocks protected packages.""" 

439 if _is_protected_object(obj): 

440 return None 

441 return _original_getsourcefile(obj) 

442 

443 

444def _guarded_findsource(obj): 

445 """Replacement for inspect.findsource that blocks protected packages.""" 

446 if _is_protected_object(obj): 

447 raise OSError(f"source code not available for {getattr(obj, '__name__', obj)}") 

448 return _original_findsource(obj) 

449 

450 

451def install_source_guards(): 

452 """Monkey-patch inspect module to block source extraction for protected packages. 

453 

454 Call this at application boot (after imports, before serving requests). 

455 Safe to call multiple times (idempotent). 

456 """ 

457 inspect.getsource = _guarded_getsource 

458 inspect.getsourcelines = _guarded_getsourcelines 

459 inspect.getsourcefile = _guarded_getsourcefile 

460 inspect.findsource = _guarded_findsource 

461 logger.info("[SourceProtection] inspect.getsource() guards installed " 

462 f"for packages: {_PROTECTED_PACKAGES}")