Coverage for security / source_protection.py: 79.7%
231 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Source Protection Service — HevolveAI integrity verification.
4Multi-layer defense for HevolveAI source code:
5 1. pip install: SSH key required (git+ssh://)
6 2. Nunba bundling: .pyc only (source stripped)
7 3. Boot verification: hash manifest signed by build node
8 4. Runtime gating: certificate tier + CCT gates feature access
9 5. inspect.getsource() blocking: prevents runtime source extraction
11This module answers:
12 - Is HevolveAI installed? How? (SSH, HTTPS, wheel, bundled)
13 - Is the source code visible? (Should be False in production)
14 - Does the installed code match the known-good manifest?
16If integrity check fails → disable in-process mode, force HTTP fallback.
17"""
18import hashlib
19import importlib
20import importlib.abc
21import importlib.machinery
22import inspect
23import json
24import logging
25import os
26import sys
27import threading
28import time
29from pathlib import Path
30from types import ModuleType
31from typing import Callable, Dict, List, Optional
33logger = logging.getLogger('hevolve_security')
35# Path to the known-good manifest (populated by CI/CD build)
36_MANIFEST_PATH = os.environ.get(
37 'HEVOLVE_HEVOLVEAI_MANIFEST',
38 os.path.join(os.path.dirname(__file__), 'hevolveai_manifest.json'),
39)
42class SourceProtectionService:
43 """Verifies HevolveAI installation integrity.
45 Called at boot and periodically to ensure the installed HevolveAI
46 code matches the signed manifest. Mismatch → HTTP fallback only.
47 """
49 @staticmethod
50 def check_install_method() -> str:
51 """Detect how HevolveAI was installed.
53 Returns one of:
54 'git_ssh' — pip install from SSH URL
55 'git_https' — pip install from HTTPS URL
56 'pip_wheel' — installed from a wheel/sdist
57 'bundled_pyc' — .pyc only (Nunba build)
58 'bundled_cython'— .so/.pyd (Cython compiled)
59 'not_installed' — HevolveAI not found
60 'unknown' — detected but method unclear
61 """
62 try:
63 spec = importlib.util.find_spec('hevolveai')
64 except (ModuleNotFoundError, ValueError):
65 return 'not_installed'
67 if spec is None:
68 return 'not_installed'
70 origin = spec.origin or ''
72 # Check for compiled extensions
73 if origin.endswith(('.so', '.pyd')):
74 return 'bundled_cython'
76 # Check for bytecode only
77 if origin.endswith('.pyc'):
78 return 'bundled_pyc'
80 # Check pip metadata for install source
81 try:
82 from importlib.metadata import metadata as pkg_metadata
83 meta = pkg_metadata('hevolveai')
84 # direct_url.json is set by pip for VCS installs
85 try:
86 from importlib.metadata import packages_distributions
87 dist_info = Path(spec.origin).parent
88 direct_url = dist_info.parent / (
89 dist_info.name.replace('.', '-') + '.dist-info'
90 ) / 'direct_url.json'
91 if direct_url.exists():
92 url_data = json.loads(direct_url.read_text())
93 url = url_data.get('url', '')
94 if url.startswith('ssh://') or 'git@' in url:
95 return 'git_ssh'
96 if url.startswith('https://'):
97 return 'git_https'
98 except Exception:
99 pass
101 # Fallback: check installer
102 installer = meta.get('Installer', '')
103 if installer:
104 return 'pip_wheel'
105 except Exception:
106 pass
108 if origin.endswith('.py'):
109 return 'unknown'
111 return 'unknown'
113 @staticmethod
114 def is_source_visible() -> bool:
115 """Check if HevolveAI .py source files are present.
117 In production (Nunba builds), only .pyc should exist.
118 Returns True if .py source is found (bad for production).
119 """
120 try:
121 spec = importlib.util.find_spec('hevolveai')
122 except (ModuleNotFoundError, ValueError):
123 return False
125 if spec is None or not spec.origin:
126 return False
128 # If the spec origin itself is .py, source is visible
129 if spec.origin.endswith('.py'):
130 return True
132 # Check subpackages for .py files
133 if spec.submodule_search_locations:
134 for loc in spec.submodule_search_locations:
135 loc_path = Path(loc)
136 if loc_path.exists():
137 py_files = list(loc_path.glob('**/*.py'))
138 # Exclude __init__.py stubs (often left as .py)
139 real_py = [f for f in py_files
140 if f.name != '__init__.py']
141 if real_py:
142 return True
143 return False
145 @staticmethod
146 def verify_hevolveai_integrity() -> Dict:
147 """Verify installed HevolveAI against known-good manifest.
149 Returns:
150 {
151 'verified': bool,
152 'install_method': str,
153 'source_visible': bool,
154 'mismatched_files': list,
155 'missing_files': list,
156 'extra_files': list,
157 }
158 """
159 result: Dict = {
160 'verified': False,
161 'install_method': SourceProtectionService.check_install_method(),
162 'source_visible': SourceProtectionService.is_source_visible(),
163 'mismatched_files': [],
164 'missing_files': [],
165 'extra_files': [],
166 }
168 if result['install_method'] == 'not_installed':
169 result['error'] = 'HevolveAI not installed'
170 return result
172 # Load manifest
173 manifest = SourceProtectionService._load_manifest()
174 if manifest is None:
175 result['error'] = 'manifest not found or invalid'
176 # No manifest = cannot verify = fail-closed
177 result['verified'] = False
178 return result
180 # Find HevolveAI package root
181 try:
182 spec = importlib.util.find_spec('hevolveai')
183 if spec is None or not spec.submodule_search_locations:
184 result['error'] = 'cannot locate HevolveAI package'
185 return result
186 pkg_root = Path(list(spec.submodule_search_locations)[0])
187 except Exception as e:
188 result['error'] = f'package location error: {e}'
189 return result
191 # Compare file hashes
192 expected = manifest.get('files', {})
193 actual = SourceProtectionService._compute_package_hashes(pkg_root)
195 for rel_path, expected_hash in expected.items():
196 actual_hash = actual.pop(rel_path, None)
197 if actual_hash is None:
198 result['missing_files'].append(rel_path)
199 elif actual_hash != expected_hash:
200 result['mismatched_files'].append(rel_path)
202 result['extra_files'] = list(actual.keys())
204 # Verified if no mismatches or missing files
205 result['verified'] = (
206 len(result['mismatched_files']) == 0
207 and len(result['missing_files']) == 0
208 )
210 return result
212 @staticmethod
213 def _load_manifest() -> Optional[Dict]:
214 """Load the signed manifest file."""
215 try:
216 with open(_MANIFEST_PATH, 'r') as f:
217 return json.load(f)
218 except (FileNotFoundError, json.JSONDecodeError):
219 return None
221 @staticmethod
222 def _compute_package_hashes(pkg_root: Path) -> Dict[str, str]:
223 """Compute SHA-256 hashes for all files in the package."""
224 hashes: Dict[str, str] = {}
225 if not pkg_root.exists():
226 return hashes
228 for path in sorted(pkg_root.rglob('*')):
229 if path.is_file() and not path.name.startswith('.'):
230 rel = str(path.relative_to(pkg_root)).replace('\\', '/')
231 h = hashlib.sha256()
232 try:
233 with open(path, 'rb') as f:
234 for chunk in iter(lambda: f.read(8192), b''):
235 h.update(chunk)
236 hashes[rel] = h.hexdigest()
237 except (IOError, OSError):
238 pass
239 return hashes
242def compute_dependency_hash(package_name: str) -> Optional[str]:
243 """Compute a combined SHA-256 hash of all files in an installed package.
245 Useful for node_integrity to include dependency hashes in the
246 overall code hash for tamper detection.
248 Args:
249 package_name: pip package name (e.g. 'hevolveai' / HevolveAI)
251 Returns:
252 hex digest string or None if package not found
253 """
254 try:
255 spec = importlib.util.find_spec(package_name)
256 except (ModuleNotFoundError, ValueError):
257 return None
259 if spec is None or not spec.submodule_search_locations:
260 return None
262 pkg_root = Path(list(spec.submodule_search_locations)[0])
263 if not pkg_root.exists():
264 return None
266 combined = hashlib.sha256()
267 for path in sorted(pkg_root.rglob('*')):
268 if path.is_file() and not path.name.startswith('.'):
269 try:
270 with open(path, 'rb') as f:
271 for chunk in iter(lambda: f.read(8192), b''):
272 combined.update(chunk)
273 except (IOError, OSError):
274 pass
276 digest = combined.hexdigest()
277 return digest if digest != hashlib.sha256().hexdigest() else None
280class CrawlIntegrityWatcher:
281 """Periodic re-verification of HevolveAI package integrity post-boot.
283 Mirrors RuntimeIntegrityMonitor's pattern but scoped to the HevolveAI
284 package only. On tamper detection, fires registered callbacks instead
285 of halting the hive — callers decide how to respond (e.g. disable
286 in-process mode, fall back to HTTP).
288 Env vars:
289 HEVOLVE_TAMPER_CHECK_INTERVAL — seconds between checks (default 300)
290 """
292 def __init__(self, check_interval: int = None):
293 self._check_interval = check_interval or int(
294 os.environ.get('HEVOLVE_TAMPER_CHECK_INTERVAL', '300'))
295 self._running = False
296 self._thread: Optional[threading.Thread] = None
297 self._lock = threading.Lock()
298 self._tampered = False
299 self._callbacks: List[Callable] = []
300 # Snapshot the hash at construction (boot) time
301 self._boot_hash: str = self._compute_current_hash()
303 # ── Public API ──────────────────────────────────────────────
305 def register_tamper_callback(self, callback: Callable) -> None:
306 """Register a callable invoked when tampering is detected.
308 Called exactly once per watcher lifetime (stops after first detection).
309 """
310 with self._lock:
311 self._callbacks.append(callback)
313 def start(self) -> None:
314 """Start the background monitoring thread (daemon=True)."""
315 with self._lock:
316 if self._running:
317 return
318 self._running = True
319 self._thread = threading.Thread(
320 target=self._check_loop, daemon=True,
321 name='crawl_integrity_watcher')
322 self._thread.start()
323 logger.info(
324 f"[CrawlIntegrityWatcher] Started "
325 f"(interval={self._check_interval}s, "
326 f"boot_hash={self._boot_hash[:16]}...)"
327 if self._boot_hash else
328 "[CrawlIntegrityWatcher] Started (HevolveAI not installed)")
330 def stop(self) -> None:
331 """Stop the watcher gracefully."""
332 with self._lock:
333 self._running = False
334 if self._thread and self._thread.is_alive():
335 self._thread.join(timeout=10)
337 @property
338 def is_healthy(self) -> bool:
339 """False if tampering was detected."""
340 return not self._tampered
342 # ── Internal loop ────────────────────────────────────────────
344 def _check_loop(self) -> None:
345 """Background loop: re-hash HevolveAI every interval."""
346 while self._running:
347 time.sleep(self._check_interval)
348 if not self._running:
349 break
350 try:
351 current = self._compute_current_hash()
352 if current and self._boot_hash and current != self._boot_hash:
353 logger.critical(
354 f"[CrawlIntegrityWatcher] TAMPERING DETECTED: "
355 f"HevolveAI hash changed from "
356 f"{self._boot_hash[:16]}... "
357 f"to {current[:16]}...")
358 self._tampered = True
359 self._on_tamper_detected()
360 return # Stop after first detection
361 except Exception as e:
362 logger.warning(
363 f"[CrawlIntegrityWatcher] Integrity check error: {e}")
365 def _on_tamper_detected(self) -> None:
366 """Fire all registered callbacks."""
367 with self._lock:
368 callbacks = list(self._callbacks)
369 self._running = False
370 for cb in callbacks:
371 try:
372 cb()
373 except Exception as e:
374 logger.warning(
375 f"[CrawlIntegrityWatcher] Callback error: {e}")
377 def _compute_current_hash(self) -> str:
378 """Compute combined SHA-256 over all HevolveAI package files."""
379 return compute_dependency_hash('hevolveai') or ''
381 # ── Test helper ──────────────────────────────────────────────
383 def _check_once_for_test(self) -> None:
384 """Run a single hash comparison without sleeping (testing only)."""
385 try:
386 current = self._compute_current_hash()
387 if current and self._boot_hash and current != self._boot_hash:
388 self._tampered = True
389 self._on_tamper_detected()
390 except Exception:
391 pass
394# ── Runtime Source Extraction Blocking ────────────────────────
395#
396# Even if .py files somehow survive stripping, this prevents
397# inspect.getsource(), inspect.getsourcelines(), and
398# inspect.getsourcefile() from returning hevolveai code.
400_PROTECTED_PACKAGES = ('hevolveai',)
402_original_getsource = inspect.getsource
403_original_getsourcelines = inspect.getsourcelines
404_original_getsourcefile = inspect.getsourcefile
405_original_findsource = inspect.findsource
408def _is_protected_object(obj) -> bool:
409 """Check if obj belongs to a protected package."""
410 module = getattr(obj, '__module__', None)
411 if module and any(module == pkg or module.startswith(pkg + '.')
412 for pkg in _PROTECTED_PACKAGES):
413 return True
414 # For modules directly
415 if isinstance(obj, ModuleType):
416 name = getattr(obj, '__name__', '')
417 if any(name == pkg or name.startswith(pkg + '.')
418 for pkg in _PROTECTED_PACKAGES):
419 return True
420 return False
423def _guarded_getsource(obj):
424 """Replacement for inspect.getsource that blocks protected packages."""
425 if _is_protected_object(obj):
426 raise OSError(f"source code not available for {getattr(obj, '__name__', obj)}")
427 return _original_getsource(obj)
430def _guarded_getsourcelines(obj):
431 """Replacement for inspect.getsourcelines that blocks protected packages."""
432 if _is_protected_object(obj):
433 raise OSError(f"source code not available for {getattr(obj, '__name__', obj)}")
434 return _original_getsourcelines(obj)
437def _guarded_getsourcefile(obj):
438 """Replacement for inspect.getsourcefile that blocks protected packages."""
439 if _is_protected_object(obj):
440 return None
441 return _original_getsourcefile(obj)
444def _guarded_findsource(obj):
445 """Replacement for inspect.findsource that blocks protected packages."""
446 if _is_protected_object(obj):
447 raise OSError(f"source code not available for {getattr(obj, '__name__', obj)}")
448 return _original_findsource(obj)
451def install_source_guards():
452 """Monkey-patch inspect module to block source extraction for protected packages.
454 Call this at application boot (after imports, before serving requests).
455 Safe to call multiple times (idempotent).
456 """
457 inspect.getsource = _guarded_getsource
458 inspect.getsourcelines = _guarded_getsourcelines
459 inspect.getsourcefile = _guarded_getsourcefile
460 inspect.findsource = _guarded_findsource
461 logger.info("[SourceProtection] inspect.getsource() guards installed "
462 f"for packages: {_PROTECTED_PACKAGES}")