Coverage for security / channel_encryption.py: 86.7%

120 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2E2E Encrypted Channels — Noise-like protocol for inter-node communication. 

3 

4When nodes exchange tasks, sync data, or gossip, payloads are encrypted 

5so that neither network observers nor the compute-hosting node can read 

6the task owner's data. 

7 

8Uses: X25519 key exchange + HKDF + AES-256-GCM (from ``cryptography`` library, 

9already a dependency via node_integrity.py). 

10 

11Flow: 

12 1. Node A wants to send encrypted data to Node B 

13 2. A generates ephemeral X25519 keypair 

14 3. A derives shared secret: ECDH(ephemeral_private, B_x25519_public) 

15 4. A derives AES key: HKDF(shared_secret, salt=nonce, info=b'hart-e2e-v1') 

16 5. A encrypts payload with AES-256-GCM(key, nonce, plaintext) 

17 6. A sends: {eph, nonce, ct, v} (all hex-encoded) 

18 7. B derives same shared secret: ECDH(B_x25519_private, ephemeral_public) 

19 8. B derives same AES key, decrypts payload 

20 

21Forward secrecy: ephemeral key discarded after encryption. Compromise of 

22node's long-term key cannot decrypt past sessions. 

23""" 

24 

25import json 

26import logging 

27import os 

28from typing import Dict, Optional, Tuple 

29 

30from cryptography.hazmat.primitives import hashes, serialization 

31from cryptography.hazmat.primitives.asymmetric.x25519 import ( 

32 X25519PrivateKey, 

33 X25519PublicKey, 

34) 

35from cryptography.hazmat.primitives.ciphers.aead import AESGCM 

36from cryptography.hazmat.primitives.kdf.hkdf import HKDF 

37 

38logger = logging.getLogger('hevolve_security') 

39 

40# Protocol identifier embedded in HKDF info — bump on breaking changes 

41_PROTOCOL_INFO = b'hart-e2e-v1' 

42_PROTOCOL_VERSION = 1 

43 

44# ── X25519 Keypair Management ────────────────────────────────────── 

45 

46_x25519_private: Optional[X25519PrivateKey] = None 

47_x25519_public_bytes: Optional[bytes] = None 

48 

49 

50def _resolve_key_dir() -> str: 

51 explicit = os.environ.get('HEVOLVE_KEY_DIR') 

52 if explicit: 

53 return explicit 

54 db_path = os.environ.get('HEVOLVE_DB_PATH', '') 

55 if db_path and db_path != ':memory:' and os.path.isabs(db_path): 

56 return os.path.dirname(db_path) 

57 return 'agent_data' 

58 

59 

60def get_x25519_keypair() -> Tuple[X25519PrivateKey, bytes]: 

61 """Get or create X25519 keypair for ECDH key exchange. 

62 

63 The keypair is persisted alongside the Ed25519 identity keys. 

64 Separate from Ed25519 because Ed25519 (twisted Edwards) and 

65 X25519 (Montgomery) operate on different curves. 

66 """ 

67 global _x25519_private, _x25519_public_bytes 

68 if _x25519_private is not None: 

69 return _x25519_private, _x25519_public_bytes 

70 

71 key_dir = _resolve_key_dir() 

72 x_priv_path = os.path.join(key_dir, 'node_x25519_private.key') 

73 x_pub_path = os.path.join(key_dir, 'node_x25519_public.key') 

74 

75 if os.path.isfile(x_priv_path): 

76 try: 

77 with open(x_priv_path, 'rb') as f: 

78 raw = f.read() 

79 # Decrypt at rest — auto-detects encrypted vs plaintext 

80 try: 

81 from security.crypto import decrypt_data 

82 raw = decrypt_data(raw) 

83 except ImportError: 

84 pass 

85 _x25519_private = X25519PrivateKey.from_private_bytes(raw) 

86 logger.info("X25519 keypair loaded from %s", key_dir) 

87 except Exception as e: 

88 logger.warning("Failed to load X25519 key, regenerating: %s", e) 

89 _x25519_private = None 

90 

91 if _x25519_private is None: 

92 _x25519_private = X25519PrivateKey.generate() 

93 os.makedirs(key_dir, exist_ok=True) 

94 raw = _x25519_private.private_bytes( 

95 serialization.Encoding.Raw, 

96 serialization.PrivateFormat.Raw, 

97 serialization.NoEncryption(), 

98 ) 

99 # Encrypt at rest when HEVOLVE_DATA_KEY is configured 

100 try: 

101 from security.crypto import encrypt_data 

102 with open(x_priv_path, 'wb') as f: 

103 f.write(encrypt_data(raw)) 

104 except ImportError: 

105 with open(x_priv_path, 'wb') as f: 

106 f.write(raw) 

107 logger.info("X25519 keypair generated and saved to %s", key_dir) 

108 

109 _x25519_public_bytes = _x25519_private.public_key().public_bytes( 

110 serialization.Encoding.Raw, 

111 serialization.PublicFormat.Raw, 

112 ) 

113 # Persist public key for external use (beacon inclusion) 

114 if not os.path.isfile(x_pub_path): 

115 with open(x_pub_path, 'wb') as f: 

116 f.write(_x25519_public_bytes) 

117 

118 return _x25519_private, _x25519_public_bytes 

119 

120 

121def get_x25519_public_hex() -> str: 

122 """Return hex-encoded X25519 public key for inclusion in gossip beacons.""" 

123 _, pub = get_x25519_keypair() 

124 return pub.hex() 

125 

126 

127def reset_keypair_cache(): 

128 """Reset the cached keypair (for testing).""" 

129 global _x25519_private, _x25519_public_bytes 

130 _x25519_private = None 

131 _x25519_public_bytes = None 

132 

133 

134# ── Encrypt / Decrypt ────────────────────────────────────────────── 

135 

136def _derive_aes_key(shared_secret: bytes, nonce: bytes) -> bytes: 

137 """Derive a 256-bit AES key from ECDH shared secret via HKDF.""" 

138 return HKDF( 

139 algorithm=hashes.SHA256(), 

140 length=32, 

141 salt=nonce, 

142 info=_PROTOCOL_INFO, 

143 ).derive(shared_secret) 

144 

145 

146def encrypt_for_peer(plaintext: bytes, 

147 peer_x25519_public_hex: str) -> Dict[str, str]: 

148 """Encrypt data for a specific peer using ephemeral ECDH + AES-256-GCM. 

149 

150 Returns envelope dict with hex-encoded fields: 

151 - eph: ephemeral X25519 public key (32 bytes) 

152 - nonce: random 12-byte nonce 

153 - ct: ciphertext + GCM tag 

154 - v: protocol version 

155 

156 Forward secrecy: ephemeral key is discarded after this call. 

157 """ 

158 peer_pub = X25519PublicKey.from_public_bytes( 

159 bytes.fromhex(peer_x25519_public_hex)) 

160 

161 # Ephemeral keypair — used once and discarded 

162 ephemeral = X25519PrivateKey.generate() 

163 shared_secret = ephemeral.exchange(peer_pub) 

164 

165 nonce = os.urandom(12) 

166 aes_key = _derive_aes_key(shared_secret, nonce) 

167 

168 ciphertext = AESGCM(aes_key).encrypt(nonce, plaintext, None) 

169 

170 eph_pub = ephemeral.public_key().public_bytes( 

171 serialization.Encoding.Raw, 

172 serialization.PublicFormat.Raw, 

173 ) 

174 

175 return { 

176 'eph': eph_pub.hex(), 

177 'nonce': nonce.hex(), 

178 'ct': ciphertext.hex(), 

179 'v': _PROTOCOL_VERSION, 

180 } 

181 

182 

183def decrypt_from_peer(envelope: Dict[str, str]) -> Optional[bytes]: 

184 """Decrypt an envelope sent by a peer. 

185 

186 Uses our long-term X25519 private key + the sender's ephemeral public key 

187 to reconstruct the shared secret and decrypt. 

188 

189 Returns plaintext bytes, or None if decryption fails. 

190 """ 

191 try: 

192 eph_pub = X25519PublicKey.from_public_bytes( 

193 bytes.fromhex(envelope['eph'])) 

194 nonce = bytes.fromhex(envelope['nonce']) 

195 ciphertext = bytes.fromhex(envelope['ct']) 

196 

197 our_private, _ = get_x25519_keypair() 

198 shared_secret = our_private.exchange(eph_pub) 

199 

200 aes_key = _derive_aes_key(shared_secret, nonce) 

201 

202 return AESGCM(aes_key).decrypt(nonce, ciphertext, None) 

203 except Exception as e: 

204 logger.warning("E2E decrypt failed: %s", e) 

205 return None 

206 

207 

208# ── JSON Convenience Wrappers ────────────────────────────────────── 

209 

210def encrypt_json_for_peer(payload: dict, 

211 peer_x25519_public_hex: str, 

212 skip_egress_check: bool = False) -> Dict[str, str]: 

213 """Encrypt a JSON-serializable dict for a peer. 

214 

215 Before encryption, this function: 

216 1. Runs ScopeGuard.check_egress() — blocks data that exceeds its privacy scope 

217 2. Stamps origin provenance (node fingerprint + timestamp) so the receiver 

218 can verify who sent it and that it's from a genuine HART OS node 

219 

220 These checks happen at the encryption layer (not the caller) so they 

221 cannot be bypassed. Every outbound encrypted payload is scope-checked 

222 and provenance-stamped. 

223 

224 Args: 

225 payload: JSON-serializable dict to encrypt 

226 peer_x25519_public_hex: Recipient's X25519 public key 

227 skip_egress_check: Only True for internal key exchange / handshake messages 

228 """ 

229 if not skip_egress_check and '_privacy_scope' in payload: 

230 # ── Egress check: only for scope-tagged data ── 

231 # Untagged data = legacy/internal — allowed through (encryption is the gate). 

232 # Tagged data = explicitly classified — ScopeGuard enforces the declared scope. 

233 try: 

234 from security.edge_privacy import ScopeGuard, PrivacyScope 

235 guard = ScopeGuard() 

236 dest = PrivacyScope.TRUSTED_PEER 

237 allowed, reason = guard.check_egress(payload, dest) 

238 if not allowed: 

239 logger.warning("Egress BLOCKED: %s", reason) 

240 raise ValueError(f"Egress blocked by ScopeGuard: {reason}") 

241 except ImportError: 

242 pass 

243 except ValueError: 

244 raise 

245 

246 # ── Provenance stamp: who is sending this and are they genuine? ── 

247 # Copy so we don't mutate the caller's dict 

248 stamped = dict(payload) 

249 try: 

250 from security.origin_attestation import compute_origin_fingerprint 

251 import time as _time 

252 stamped['_provenance'] = { 

253 'origin_fingerprint': compute_origin_fingerprint(), 

254 'node_public_key': get_x25519_public_hex(), 

255 'timestamp': _time.time(), 

256 } 

257 except Exception: 

258 pass # Provenance is best-effort — don't block on it 

259 

260 plaintext = json.dumps(stamped, separators=(',', ':')).encode('utf-8') 

261 return encrypt_for_peer(plaintext, peer_x25519_public_hex) 

262 

263 

264def decrypt_json_from_peer(envelope: Dict[str, str]) -> Optional[dict]: 

265 """Decrypt an envelope and parse as JSON. 

266 

267 After decryption, verifies origin provenance if present: 

268 - origin_fingerprint must match a known HART OS fingerprint 

269 - timestamp must be within acceptable drift (5 min) 

270 Provenance failure logs a warning but does NOT block — the encryption 

271 itself is the primary auth. Provenance is defense-in-depth for audit. 

272 """ 

273 plaintext = decrypt_from_peer(envelope) 

274 if plaintext is None: 

275 return None 

276 try: 

277 payload = json.loads(plaintext) 

278 except (json.JSONDecodeError, UnicodeDecodeError) as e: 

279 logger.warning("E2E JSON decode failed: %s", e) 

280 return None 

281 

282 # ── Verify provenance (audit, not blocking) ── 

283 prov = payload.pop('_provenance', None) 

284 if prov: 

285 try: 

286 from security.origin_attestation import compute_origin_fingerprint 

287 import time as _time 

288 peer_fp = prov.get('origin_fingerprint', '') 

289 our_fp = compute_origin_fingerprint() 

290 ts = prov.get('timestamp', 0) 

291 drift = abs(_time.time() - ts) 

292 

293 if peer_fp != our_fp: 

294 # Different fingerprint = different HART OS build or a fork 

295 # Log but don't block — could be a legitimate older version 

296 logger.info("Provenance: peer fingerprint %s differs from ours %s", 

297 peer_fp[:12], our_fp[:12]) 

298 if drift > 300: # 5 min 

299 logger.info("Provenance: timestamp drift %.0fs from peer %s", 

300 drift, prov.get('node_public_key', '?')[:12]) 

301 except Exception: 

302 pass # Best-effort 

303 

304 return payload 

305 

306 

307# ── Utility: Check if payload is an encrypted envelope ───────────── 

308 

309def is_encrypted_envelope(data: dict) -> bool: 

310 """Check if a dict looks like an E2E encrypted envelope.""" 

311 return (isinstance(data, dict) 

312 and 'eph' in data 

313 and 'nonce' in data 

314 and 'ct' in data 

315 and data.get('v') == _PROTOCOL_VERSION)