Coverage for security / node_integrity.py: 92.1%

178 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Node Integrity: Ed25519 keypair management, code hashing, and signature operations. 

3Provides cryptographic identity for peer verification in the HevolveSocial network. 

4""" 

5import os 

6import json 

7import hashlib 

8import logging 

9import shutil 

10from pathlib import Path 

11from typing import Optional, Tuple, Dict 

12 

13from cryptography.hazmat.primitives.asymmetric.ed25519 import ( 

14 Ed25519PrivateKey, Ed25519PublicKey, 

15) 

16from cryptography.hazmat.primitives import serialization 

17from cryptography.exceptions import InvalidSignature 

18 

19logger = logging.getLogger('hevolve_security') 

20 

21def _resolve_key_dir(): 

22 explicit = os.environ.get('HEVOLVE_KEY_DIR') 

23 if explicit: 

24 return explicit 

25 db_path = os.environ.get('HEVOLVE_DB_PATH', '') 

26 if db_path and db_path != ':memory:' and os.path.isabs(db_path): 

27 return os.path.dirname(db_path) 

28 return 'agent_data' 

29 

30_KEY_DIR = _resolve_key_dir() 

31_PRIVATE_KEY_FILE = 'node_private_key.pem' 

32_PUBLIC_KEY_FILE = 'node_public_key.pem' 

33_CODE_ROOT = os.environ.get('HEVOLVE_CODE_ROOT', os.path.dirname( 

34 os.path.dirname(os.path.abspath(__file__)))) 

35 

36# Module-level cache 

37_private_key: Optional[Ed25519PrivateKey] = None 

38_public_key: Optional[Ed25519PublicKey] = None 

39 

40# Directories excluded from code hash computation. 

41# 

42# Defense-in-depth (2026-04-19): when compute_code_hash is called in a 

43# cx_Freeze bundle and HEVOLVE_CODE_HASH_PRECOMPUTED is NOT set (e.g., 

44# env var missing because app.py's setup block raised), the fallback 

45# walk runs against the install root. In a cx_Freeze layout that root 

46# contains `python-embed/` (stdlib + site-packages, 10k+ .py files), 

47# `lib/` (bundled .pyc modules), `lib_src/` (pycparser + cryptography 

48# source copies), `build/` (intermediate artifacts), `landing-page/` 

49# (React build output), `node_modules/` (already excluded). Without 

50# these in the exclude set, a single code-hash walk on cold cache 

51# takes 2-5 minutes per caller, and 5+ peer-discovery threads running 

52# in parallel stalled boot for 10+ minutes in startup_trace.log from 

53# 2026-04-19T17:00:29. The exclude-set expansion keeps that walk 

54# bounded to Nunba/HARTOS source only. 

55_EXCLUDE_DIRS = { 

56 '__pycache__', 'venv310', 'venv', '.venv', '.git', '.idea', 

57 'agent_data', 'tests', 'node_modules', 'hevolve_backend.egg-info', 

58 'autogen-0.2.37', '.pycharm_plugin', 

59 # cx_Freeze bundle dirs (Nunba desktop install) — defense-in-depth 

60 # in case HEVOLVE_CODE_HASH_PRECOMPUTED is not set by the host app. 

61 'python-embed', 'lib', 'lib_src', 'build', 'landing-page', 

62 'Output', 'dist', '.pytest_cache', '.ruff_cache', '.mypy_cache', 

63} 

64 

65 

66def get_or_create_keypair() -> Tuple[Ed25519PrivateKey, Ed25519PublicKey]: 

67 """Load existing keypair from disk or generate a new one on first start.""" 

68 global _private_key, _public_key 

69 if _private_key and _public_key: 

70 return _private_key, _public_key 

71 

72 key_dir = Path(_KEY_DIR) 

73 key_dir.mkdir(parents=True, exist_ok=True) 

74 priv_path = key_dir / _PRIVATE_KEY_FILE 

75 pub_path = key_dir / _PUBLIC_KEY_FILE 

76 

77 if priv_path.exists() and pub_path.exists(): 

78 try: 

79 raw = priv_path.read_bytes() 

80 # Decrypt at rest — auto-detects encrypted vs plaintext PEM 

81 try: 

82 from security.crypto import decrypt_data 

83 raw = decrypt_data(raw) 

84 except ImportError: 

85 pass 

86 _private_key = serialization.load_pem_private_key(raw, password=None) 

87 _public_key = _private_key.public_key() 

88 logger.info(f"Node keypair loaded from {key_dir}") 

89 return _private_key, _public_key 

90 except Exception as e: 

91 logger.warning(f"Failed to load keypair, regenerating: {e}") 

92 

93 # Generate new keypair 

94 _private_key = Ed25519PrivateKey.generate() 

95 _public_key = _private_key.public_key() 

96 

97 # Persist to disk — encrypted at rest when HEVOLVE_DATA_KEY is set 

98 priv_pem = _private_key.private_bytes( 

99 encoding=serialization.Encoding.PEM, 

100 format=serialization.PrivateFormat.PKCS8, 

101 encryption_algorithm=serialization.NoEncryption(), 

102 ) 

103 pub_pem = _public_key.public_bytes( 

104 encoding=serialization.Encoding.PEM, 

105 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

106 ) 

107 try: 

108 from security.crypto import encrypt_data 

109 priv_path.write_bytes(encrypt_data(priv_pem)) 

110 except ImportError: 

111 priv_path.write_bytes(priv_pem) 

112 pub_path.write_bytes(pub_pem) # Public key stays plaintext 

113 logger.info(f"Node keypair generated and saved to {key_dir}") 

114 return _private_key, _public_key 

115 

116 

117def get_public_key_bytes() -> bytes: 

118 """Return raw 32-byte public key.""" 

119 _, pub = get_or_create_keypair() 

120 return pub.public_bytes( 

121 encoding=serialization.Encoding.Raw, 

122 format=serialization.PublicFormat.Raw, 

123 ) 

124 

125 

126def get_public_key_hex() -> str: 

127 """Return hex-encoded public key string for JSON payloads.""" 

128 return get_public_key_bytes().hex() 

129 

130 

131def sign_message(message: bytes) -> bytes: 

132 """Sign arbitrary bytes with node's private key.""" 

133 priv, _ = get_or_create_keypair() 

134 return priv.sign(message) 

135 

136 

137def sign_json_payload(payload: dict) -> str: 

138 """Canonicalize dict (sorted JSON, no spaces), sign it, return hex signature. 

139 The payload dict should NOT contain the 'signature' key itself.""" 

140 clean = {k: v for k, v in payload.items() if k != 'signature'} 

141 canonical = json.dumps(clean, sort_keys=True, separators=(',', ':')) 

142 sig = sign_message(canonical.encode('utf-8')) 

143 return sig.hex() 

144 

145 

146def verify_signature(public_key_hex: str, message: bytes, signature: bytes) -> bool: 

147 """Verify a signature from a peer node.""" 

148 try: 

149 raw_key = bytes.fromhex(public_key_hex) 

150 pub = Ed25519PublicKey.from_public_bytes(raw_key) 

151 pub.verify(signature, message) 

152 return True 

153 except (InvalidSignature, ValueError, Exception): 

154 return False 

155 

156 

157def verify_json_signature(public_key_hex: str, payload: dict, 

158 signature_hex: str) -> bool: 

159 """Verify signature on a JSON payload. Strips 'signature' key before verification.""" 

160 try: 

161 clean = {k: v for k, v in payload.items() if k != 'signature'} 

162 canonical = json.dumps(clean, sort_keys=True, separators=(',', ':')) 

163 sig = bytes.fromhex(signature_hex) 

164 return verify_signature(public_key_hex, canonical.encode('utf-8'), sig) 

165 except (ValueError, Exception): 

166 return False 

167 

168 

169def compute_code_hash(code_root: str = None) -> str: 

170 """Compute SHA-256 manifest hash of all .py files in the project. 

171 

172 Deterministic across identical deployments. 

173 

174 Performance modes for embedded/resource-constrained devices: 

175 HEVOLVE_CODE_HASH_PRECOMPUTED: Skip computation entirely (ROM/SD card). 

176 Set at build time from a known-good hash. 

177 File cache (agent_data/code_hash_cache.json): Reuse cached hash if 

178 no .py file has a newer mtime than the cache timestamp. 

179 """ 

180 # Tier 1: Precomputed hash (ROM/read-only deployments) 

181 precomputed = os.environ.get('HEVOLVE_CODE_HASH_PRECOMPUTED', '') 

182 if precomputed: 

183 logger.debug(f"Code hash: using precomputed {precomputed[:16]}...") 

184 return precomputed 

185 

186 root = Path(code_root or _CODE_ROOT) 

187 

188 # Tier 2: File-based cache (skip recompute if .py files unchanged) 

189 cached = _load_code_hash_cache(root) 

190 if cached: 

191 return cached 

192 

193 # Tier 3: Full computation 

194 manifest_lines = [] 

195 py_files = sorted(_collect_py_files(root, root)) 

196 for rel_path, file_path in py_files: 

197 file_hash = _hash_file(file_path) 

198 manifest_lines.append(f"{rel_path}:{file_hash}") 

199 

200 manifest = '\n'.join(manifest_lines) 

201 result = hashlib.sha256(manifest.encode('utf-8')).hexdigest() 

202 

203 # Save to cache for next boot 

204 _save_code_hash_cache(root, result) 

205 

206 return result 

207 

208 

209def _load_code_hash_cache(root: Path) -> Optional[str]: 

210 """Load cached code hash if no .py file has changed since cache was written.""" 

211 cache_path = root / 'agent_data' / 'code_hash_cache.json' 

212 try: 

213 if not cache_path.exists(): 

214 return None 

215 with open(cache_path, 'r') as f: 

216 cache = json.load(f) 

217 cached_hash = cache.get('code_hash', '') 

218 cached_at = cache.get('cached_at', 0) 

219 if not cached_hash or not cached_at: 

220 return None 

221 

222 # Check if any .py file is newer than the cache 

223 for _, file_path in _collect_py_files(root, root): 

224 try: 

225 if file_path.stat().st_mtime > cached_at: 

226 logger.debug("Code hash cache stale: .py file modified") 

227 return None 

228 except OSError: 

229 continue 

230 

231 logger.debug(f"Code hash: using cache {cached_hash[:16]}...") 

232 return cached_hash 

233 except (json.JSONDecodeError, OSError, KeyError): 

234 return None 

235 

236 

237def _save_code_hash_cache(root: Path, code_hash: str): 

238 """Save code hash to file cache for faster subsequent boots.""" 

239 import time 

240 cache_path = root / 'agent_data' / 'code_hash_cache.json' 

241 try: 

242 cache_path.parent.mkdir(parents=True, exist_ok=True) 

243 with open(cache_path, 'w') as f: 

244 json.dump({'code_hash': code_hash, 'cached_at': time.time()}, f) 

245 except (OSError, IOError) as e: 

246 # Read-only FS - silently skip 

247 logger.debug(f"Code hash cache write skipped: {e}") 

248 

249 

250def compute_file_manifest(code_root: str = None) -> Dict[str, str]: 

251 """Return {relative_path: sha256_hex} for all tracked source files.""" 

252 root = Path(code_root or _CODE_ROOT) 

253 result = {} 

254 for rel_path, file_path in sorted(_collect_py_files(root, root)): 

255 result[rel_path] = _hash_file(file_path) 

256 return result 

257 

258 

259def _collect_py_files(directory: Path, root: Path): 

260 """Walk directory recursively, yield (relative_path, absolute_path) for .py files.""" 

261 try: 

262 for entry in sorted(directory.iterdir()): 

263 if entry.is_dir(): 

264 if entry.name in _EXCLUDE_DIRS: 

265 continue 

266 yield from _collect_py_files(entry, root) 

267 elif entry.is_file() and entry.suffix == '.py': 

268 rel = str(entry.relative_to(root)).replace('\\', '/') 

269 yield (rel, entry) 

270 except (PermissionError, OSError): 

271 pass 

272 

273 

274def _hash_file(file_path: Path) -> str: 

275 """Compute SHA-256 hash of a single file.""" 

276 h = hashlib.sha256() 

277 try: 

278 with open(file_path, 'rb') as f: 

279 for chunk in iter(lambda: f.read(8192), b''): 

280 h.update(chunk) 

281 except (IOError, OSError): 

282 pass 

283 return h.hexdigest() 

284 

285 

286def get_node_identity(code_root: str = None) -> dict: 

287 """Return consolidated node identity info. 

288 

289 Returns dict with node_id (public key hex), public_key, tier, certificate, 

290 and code_hash. Consolidates identity info for gossip and registration. 

291 """ 

292 from security.key_delegation import get_node_tier, load_node_certificate 

293 

294 pub_hex = get_public_key_hex() 

295 cert = load_node_certificate() 

296 code_hash = compute_code_hash(code_root) 

297 

298 return { 

299 'node_id': pub_hex[:16], 

300 'public_key': pub_hex, 

301 'tier': get_node_tier(), 

302 'certificate': cert, 

303 'code_hash': code_hash, 

304 } 

305 

306 

307def reset_keypair(): 

308 """Reset cached keypair (for testing).""" 

309 global _private_key, _public_key 

310 _private_key = None 

311 _public_key = None 

312 

313 

314def purge_pycache(code_root: str = None) -> int: 

315 """Delete all __pycache__ directories and prevent bytecode regeneration. 

316 

317 Called at boot before the integrity manifest snapshot is taken. 

318 Blocks bytecode injection attacks where malicious .pyc files 

319 could be loaded by Python instead of the verified .py sources. 

320 

321 Returns count of __pycache__ directories removed. 

322 """ 

323 os.environ['PYTHONDONTWRITEBYTECODE'] = '1' 

324 root = Path(code_root or _CODE_ROOT) 

325 count = 0 

326 try: 

327 for pycache_dir in root.rglob('__pycache__'): 

328 if pycache_dir.is_dir(): 

329 shutil.rmtree(pycache_dir, ignore_errors=True) 

330 count += 1 

331 if count: 

332 logger.info(f"Boot integrity: purged {count} __pycache__ directories") 

333 except (PermissionError, OSError) as e: 

334 logger.warning(f"Boot integrity: pycache purge partial - {e}") 

335 return count