Coverage for security / node_integrity.py: 92.1%
178 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Node Integrity: Ed25519 keypair management, code hashing, and signature operations.
3Provides cryptographic identity for peer verification in the HevolveSocial network.
4"""
5import os
6import json
7import hashlib
8import logging
9import shutil
10from pathlib import Path
11from typing import Optional, Tuple, Dict
13from cryptography.hazmat.primitives.asymmetric.ed25519 import (
14 Ed25519PrivateKey, Ed25519PublicKey,
15)
16from cryptography.hazmat.primitives import serialization
17from cryptography.exceptions import InvalidSignature
19logger = logging.getLogger('hevolve_security')
21def _resolve_key_dir():
22 explicit = os.environ.get('HEVOLVE_KEY_DIR')
23 if explicit:
24 return explicit
25 db_path = os.environ.get('HEVOLVE_DB_PATH', '')
26 if db_path and db_path != ':memory:' and os.path.isabs(db_path):
27 return os.path.dirname(db_path)
28 return 'agent_data'
30_KEY_DIR = _resolve_key_dir()
31_PRIVATE_KEY_FILE = 'node_private_key.pem'
32_PUBLIC_KEY_FILE = 'node_public_key.pem'
33_CODE_ROOT = os.environ.get('HEVOLVE_CODE_ROOT', os.path.dirname(
34 os.path.dirname(os.path.abspath(__file__))))
36# Module-level cache
37_private_key: Optional[Ed25519PrivateKey] = None
38_public_key: Optional[Ed25519PublicKey] = None
40# Directories excluded from code hash computation.
41#
42# Defense-in-depth (2026-04-19): when compute_code_hash is called in a
43# cx_Freeze bundle and HEVOLVE_CODE_HASH_PRECOMPUTED is NOT set (e.g.,
44# env var missing because app.py's setup block raised), the fallback
45# walk runs against the install root. In a cx_Freeze layout that root
46# contains `python-embed/` (stdlib + site-packages, 10k+ .py files),
47# `lib/` (bundled .pyc modules), `lib_src/` (pycparser + cryptography
48# source copies), `build/` (intermediate artifacts), `landing-page/`
49# (React build output), `node_modules/` (already excluded). Without
50# these in the exclude set, a single code-hash walk on cold cache
51# takes 2-5 minutes per caller, and 5+ peer-discovery threads running
52# in parallel stalled boot for 10+ minutes in startup_trace.log from
53# 2026-04-19T17:00:29. The exclude-set expansion keeps that walk
54# bounded to Nunba/HARTOS source only.
55_EXCLUDE_DIRS = {
56 '__pycache__', 'venv310', 'venv', '.venv', '.git', '.idea',
57 'agent_data', 'tests', 'node_modules', 'hevolve_backend.egg-info',
58 'autogen-0.2.37', '.pycharm_plugin',
59 # cx_Freeze bundle dirs (Nunba desktop install) — defense-in-depth
60 # in case HEVOLVE_CODE_HASH_PRECOMPUTED is not set by the host app.
61 'python-embed', 'lib', 'lib_src', 'build', 'landing-page',
62 'Output', 'dist', '.pytest_cache', '.ruff_cache', '.mypy_cache',
63}
66def get_or_create_keypair() -> Tuple[Ed25519PrivateKey, Ed25519PublicKey]:
67 """Load existing keypair from disk or generate a new one on first start."""
68 global _private_key, _public_key
69 if _private_key and _public_key:
70 return _private_key, _public_key
72 key_dir = Path(_KEY_DIR)
73 key_dir.mkdir(parents=True, exist_ok=True)
74 priv_path = key_dir / _PRIVATE_KEY_FILE
75 pub_path = key_dir / _PUBLIC_KEY_FILE
77 if priv_path.exists() and pub_path.exists():
78 try:
79 raw = priv_path.read_bytes()
80 # Decrypt at rest — auto-detects encrypted vs plaintext PEM
81 try:
82 from security.crypto import decrypt_data
83 raw = decrypt_data(raw)
84 except ImportError:
85 pass
86 _private_key = serialization.load_pem_private_key(raw, password=None)
87 _public_key = _private_key.public_key()
88 logger.info(f"Node keypair loaded from {key_dir}")
89 return _private_key, _public_key
90 except Exception as e:
91 logger.warning(f"Failed to load keypair, regenerating: {e}")
93 # Generate new keypair
94 _private_key = Ed25519PrivateKey.generate()
95 _public_key = _private_key.public_key()
97 # Persist to disk — encrypted at rest when HEVOLVE_DATA_KEY is set
98 priv_pem = _private_key.private_bytes(
99 encoding=serialization.Encoding.PEM,
100 format=serialization.PrivateFormat.PKCS8,
101 encryption_algorithm=serialization.NoEncryption(),
102 )
103 pub_pem = _public_key.public_bytes(
104 encoding=serialization.Encoding.PEM,
105 format=serialization.PublicFormat.SubjectPublicKeyInfo,
106 )
107 try:
108 from security.crypto import encrypt_data
109 priv_path.write_bytes(encrypt_data(priv_pem))
110 except ImportError:
111 priv_path.write_bytes(priv_pem)
112 pub_path.write_bytes(pub_pem) # Public key stays plaintext
113 logger.info(f"Node keypair generated and saved to {key_dir}")
114 return _private_key, _public_key
117def get_public_key_bytes() -> bytes:
118 """Return raw 32-byte public key."""
119 _, pub = get_or_create_keypair()
120 return pub.public_bytes(
121 encoding=serialization.Encoding.Raw,
122 format=serialization.PublicFormat.Raw,
123 )
126def get_public_key_hex() -> str:
127 """Return hex-encoded public key string for JSON payloads."""
128 return get_public_key_bytes().hex()
131def sign_message(message: bytes) -> bytes:
132 """Sign arbitrary bytes with node's private key."""
133 priv, _ = get_or_create_keypair()
134 return priv.sign(message)
137def sign_json_payload(payload: dict) -> str:
138 """Canonicalize dict (sorted JSON, no spaces), sign it, return hex signature.
139 The payload dict should NOT contain the 'signature' key itself."""
140 clean = {k: v for k, v in payload.items() if k != 'signature'}
141 canonical = json.dumps(clean, sort_keys=True, separators=(',', ':'))
142 sig = sign_message(canonical.encode('utf-8'))
143 return sig.hex()
146def verify_signature(public_key_hex: str, message: bytes, signature: bytes) -> bool:
147 """Verify a signature from a peer node."""
148 try:
149 raw_key = bytes.fromhex(public_key_hex)
150 pub = Ed25519PublicKey.from_public_bytes(raw_key)
151 pub.verify(signature, message)
152 return True
153 except (InvalidSignature, ValueError, Exception):
154 return False
157def verify_json_signature(public_key_hex: str, payload: dict,
158 signature_hex: str) -> bool:
159 """Verify signature on a JSON payload. Strips 'signature' key before verification."""
160 try:
161 clean = {k: v for k, v in payload.items() if k != 'signature'}
162 canonical = json.dumps(clean, sort_keys=True, separators=(',', ':'))
163 sig = bytes.fromhex(signature_hex)
164 return verify_signature(public_key_hex, canonical.encode('utf-8'), sig)
165 except (ValueError, Exception):
166 return False
169def compute_code_hash(code_root: str = None) -> str:
170 """Compute SHA-256 manifest hash of all .py files in the project.
172 Deterministic across identical deployments.
174 Performance modes for embedded/resource-constrained devices:
175 HEVOLVE_CODE_HASH_PRECOMPUTED: Skip computation entirely (ROM/SD card).
176 Set at build time from a known-good hash.
177 File cache (agent_data/code_hash_cache.json): Reuse cached hash if
178 no .py file has a newer mtime than the cache timestamp.
179 """
180 # Tier 1: Precomputed hash (ROM/read-only deployments)
181 precomputed = os.environ.get('HEVOLVE_CODE_HASH_PRECOMPUTED', '')
182 if precomputed:
183 logger.debug(f"Code hash: using precomputed {precomputed[:16]}...")
184 return precomputed
186 root = Path(code_root or _CODE_ROOT)
188 # Tier 2: File-based cache (skip recompute if .py files unchanged)
189 cached = _load_code_hash_cache(root)
190 if cached:
191 return cached
193 # Tier 3: Full computation
194 manifest_lines = []
195 py_files = sorted(_collect_py_files(root, root))
196 for rel_path, file_path in py_files:
197 file_hash = _hash_file(file_path)
198 manifest_lines.append(f"{rel_path}:{file_hash}")
200 manifest = '\n'.join(manifest_lines)
201 result = hashlib.sha256(manifest.encode('utf-8')).hexdigest()
203 # Save to cache for next boot
204 _save_code_hash_cache(root, result)
206 return result
209def _load_code_hash_cache(root: Path) -> Optional[str]:
210 """Load cached code hash if no .py file has changed since cache was written."""
211 cache_path = root / 'agent_data' / 'code_hash_cache.json'
212 try:
213 if not cache_path.exists():
214 return None
215 with open(cache_path, 'r') as f:
216 cache = json.load(f)
217 cached_hash = cache.get('code_hash', '')
218 cached_at = cache.get('cached_at', 0)
219 if not cached_hash or not cached_at:
220 return None
222 # Check if any .py file is newer than the cache
223 for _, file_path in _collect_py_files(root, root):
224 try:
225 if file_path.stat().st_mtime > cached_at:
226 logger.debug("Code hash cache stale: .py file modified")
227 return None
228 except OSError:
229 continue
231 logger.debug(f"Code hash: using cache {cached_hash[:16]}...")
232 return cached_hash
233 except (json.JSONDecodeError, OSError, KeyError):
234 return None
237def _save_code_hash_cache(root: Path, code_hash: str):
238 """Save code hash to file cache for faster subsequent boots."""
239 import time
240 cache_path = root / 'agent_data' / 'code_hash_cache.json'
241 try:
242 cache_path.parent.mkdir(parents=True, exist_ok=True)
243 with open(cache_path, 'w') as f:
244 json.dump({'code_hash': code_hash, 'cached_at': time.time()}, f)
245 except (OSError, IOError) as e:
246 # Read-only FS - silently skip
247 logger.debug(f"Code hash cache write skipped: {e}")
250def compute_file_manifest(code_root: str = None) -> Dict[str, str]:
251 """Return {relative_path: sha256_hex} for all tracked source files."""
252 root = Path(code_root or _CODE_ROOT)
253 result = {}
254 for rel_path, file_path in sorted(_collect_py_files(root, root)):
255 result[rel_path] = _hash_file(file_path)
256 return result
259def _collect_py_files(directory: Path, root: Path):
260 """Walk directory recursively, yield (relative_path, absolute_path) for .py files."""
261 try:
262 for entry in sorted(directory.iterdir()):
263 if entry.is_dir():
264 if entry.name in _EXCLUDE_DIRS:
265 continue
266 yield from _collect_py_files(entry, root)
267 elif entry.is_file() and entry.suffix == '.py':
268 rel = str(entry.relative_to(root)).replace('\\', '/')
269 yield (rel, entry)
270 except (PermissionError, OSError):
271 pass
274def _hash_file(file_path: Path) -> str:
275 """Compute SHA-256 hash of a single file."""
276 h = hashlib.sha256()
277 try:
278 with open(file_path, 'rb') as f:
279 for chunk in iter(lambda: f.read(8192), b''):
280 h.update(chunk)
281 except (IOError, OSError):
282 pass
283 return h.hexdigest()
286def get_node_identity(code_root: str = None) -> dict:
287 """Return consolidated node identity info.
289 Returns dict with node_id (public key hex), public_key, tier, certificate,
290 and code_hash. Consolidates identity info for gossip and registration.
291 """
292 from security.key_delegation import get_node_tier, load_node_certificate
294 pub_hex = get_public_key_hex()
295 cert = load_node_certificate()
296 code_hash = compute_code_hash(code_root)
298 return {
299 'node_id': pub_hex[:16],
300 'public_key': pub_hex,
301 'tier': get_node_tier(),
302 'certificate': cert,
303 'code_hash': code_hash,
304 }
307def reset_keypair():
308 """Reset cached keypair (for testing)."""
309 global _private_key, _public_key
310 _private_key = None
311 _public_key = None
314def purge_pycache(code_root: str = None) -> int:
315 """Delete all __pycache__ directories and prevent bytecode regeneration.
317 Called at boot before the integrity manifest snapshot is taken.
318 Blocks bytecode injection attacks where malicious .pyc files
319 could be loaded by Python instead of the verified .py sources.
321 Returns count of __pycache__ directories removed.
322 """
323 os.environ['PYTHONDONTWRITEBYTECODE'] = '1'
324 root = Path(code_root or _CODE_ROOT)
325 count = 0
326 try:
327 for pycache_dir in root.rglob('__pycache__'):
328 if pycache_dir.is_dir():
329 shutil.rmtree(pycache_dir, ignore_errors=True)
330 count += 1
331 if count:
332 logger.info(f"Boot integrity: purged {count} __pycache__ directories")
333 except (PermissionError, OSError) as e:
334 logger.warning(f"Boot integrity: pycache purge partial - {e}")
335 return count