Coverage for security / channel_encryption.py: 86.7%
120 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2E2E Encrypted Channels — Noise-like protocol for inter-node communication.
4When nodes exchange tasks, sync data, or gossip, payloads are encrypted
5so that neither network observers nor the compute-hosting node can read
6the task owner's data.
8Uses: X25519 key exchange + HKDF + AES-256-GCM (from ``cryptography`` library,
9already a dependency via node_integrity.py).
11Flow:
12 1. Node A wants to send encrypted data to Node B
13 2. A generates ephemeral X25519 keypair
14 3. A derives shared secret: ECDH(ephemeral_private, B_x25519_public)
15 4. A derives AES key: HKDF(shared_secret, salt=nonce, info=b'hart-e2e-v1')
16 5. A encrypts payload with AES-256-GCM(key, nonce, plaintext)
17 6. A sends: {eph, nonce, ct, v} (all hex-encoded)
18 7. B derives same shared secret: ECDH(B_x25519_private, ephemeral_public)
19 8. B derives same AES key, decrypts payload
21Forward secrecy: ephemeral key discarded after encryption. Compromise of
22node's long-term key cannot decrypt past sessions.
23"""
25import json
26import logging
27import os
28from typing import Dict, Optional, Tuple
30from cryptography.hazmat.primitives import hashes, serialization
31from cryptography.hazmat.primitives.asymmetric.x25519 import (
32 X25519PrivateKey,
33 X25519PublicKey,
34)
35from cryptography.hazmat.primitives.ciphers.aead import AESGCM
36from cryptography.hazmat.primitives.kdf.hkdf import HKDF
38logger = logging.getLogger('hevolve_security')
40# Protocol identifier embedded in HKDF info — bump on breaking changes
41_PROTOCOL_INFO = b'hart-e2e-v1'
42_PROTOCOL_VERSION = 1
44# ── X25519 Keypair Management ──────────────────────────────────────
46_x25519_private: Optional[X25519PrivateKey] = None
47_x25519_public_bytes: Optional[bytes] = None
50def _resolve_key_dir() -> str:
51 explicit = os.environ.get('HEVOLVE_KEY_DIR')
52 if explicit:
53 return explicit
54 db_path = os.environ.get('HEVOLVE_DB_PATH', '')
55 if db_path and db_path != ':memory:' and os.path.isabs(db_path):
56 return os.path.dirname(db_path)
57 return 'agent_data'
60def get_x25519_keypair() -> Tuple[X25519PrivateKey, bytes]:
61 """Get or create X25519 keypair for ECDH key exchange.
63 The keypair is persisted alongside the Ed25519 identity keys.
64 Separate from Ed25519 because Ed25519 (twisted Edwards) and
65 X25519 (Montgomery) operate on different curves.
66 """
67 global _x25519_private, _x25519_public_bytes
68 if _x25519_private is not None:
69 return _x25519_private, _x25519_public_bytes
71 key_dir = _resolve_key_dir()
72 x_priv_path = os.path.join(key_dir, 'node_x25519_private.key')
73 x_pub_path = os.path.join(key_dir, 'node_x25519_public.key')
75 if os.path.isfile(x_priv_path):
76 try:
77 with open(x_priv_path, 'rb') as f:
78 raw = f.read()
79 # Decrypt at rest — auto-detects encrypted vs plaintext
80 try:
81 from security.crypto import decrypt_data
82 raw = decrypt_data(raw)
83 except ImportError:
84 pass
85 _x25519_private = X25519PrivateKey.from_private_bytes(raw)
86 logger.info("X25519 keypair loaded from %s", key_dir)
87 except Exception as e:
88 logger.warning("Failed to load X25519 key, regenerating: %s", e)
89 _x25519_private = None
91 if _x25519_private is None:
92 _x25519_private = X25519PrivateKey.generate()
93 os.makedirs(key_dir, exist_ok=True)
94 raw = _x25519_private.private_bytes(
95 serialization.Encoding.Raw,
96 serialization.PrivateFormat.Raw,
97 serialization.NoEncryption(),
98 )
99 # Encrypt at rest when HEVOLVE_DATA_KEY is configured
100 try:
101 from security.crypto import encrypt_data
102 with open(x_priv_path, 'wb') as f:
103 f.write(encrypt_data(raw))
104 except ImportError:
105 with open(x_priv_path, 'wb') as f:
106 f.write(raw)
107 logger.info("X25519 keypair generated and saved to %s", key_dir)
109 _x25519_public_bytes = _x25519_private.public_key().public_bytes(
110 serialization.Encoding.Raw,
111 serialization.PublicFormat.Raw,
112 )
113 # Persist public key for external use (beacon inclusion)
114 if not os.path.isfile(x_pub_path):
115 with open(x_pub_path, 'wb') as f:
116 f.write(_x25519_public_bytes)
118 return _x25519_private, _x25519_public_bytes
121def get_x25519_public_hex() -> str:
122 """Return hex-encoded X25519 public key for inclusion in gossip beacons."""
123 _, pub = get_x25519_keypair()
124 return pub.hex()
127def reset_keypair_cache():
128 """Reset the cached keypair (for testing)."""
129 global _x25519_private, _x25519_public_bytes
130 _x25519_private = None
131 _x25519_public_bytes = None
134# ── Encrypt / Decrypt ──────────────────────────────────────────────
136def _derive_aes_key(shared_secret: bytes, nonce: bytes) -> bytes:
137 """Derive a 256-bit AES key from ECDH shared secret via HKDF."""
138 return HKDF(
139 algorithm=hashes.SHA256(),
140 length=32,
141 salt=nonce,
142 info=_PROTOCOL_INFO,
143 ).derive(shared_secret)
146def encrypt_for_peer(plaintext: bytes,
147 peer_x25519_public_hex: str) -> Dict[str, str]:
148 """Encrypt data for a specific peer using ephemeral ECDH + AES-256-GCM.
150 Returns envelope dict with hex-encoded fields:
151 - eph: ephemeral X25519 public key (32 bytes)
152 - nonce: random 12-byte nonce
153 - ct: ciphertext + GCM tag
154 - v: protocol version
156 Forward secrecy: ephemeral key is discarded after this call.
157 """
158 peer_pub = X25519PublicKey.from_public_bytes(
159 bytes.fromhex(peer_x25519_public_hex))
161 # Ephemeral keypair — used once and discarded
162 ephemeral = X25519PrivateKey.generate()
163 shared_secret = ephemeral.exchange(peer_pub)
165 nonce = os.urandom(12)
166 aes_key = _derive_aes_key(shared_secret, nonce)
168 ciphertext = AESGCM(aes_key).encrypt(nonce, plaintext, None)
170 eph_pub = ephemeral.public_key().public_bytes(
171 serialization.Encoding.Raw,
172 serialization.PublicFormat.Raw,
173 )
175 return {
176 'eph': eph_pub.hex(),
177 'nonce': nonce.hex(),
178 'ct': ciphertext.hex(),
179 'v': _PROTOCOL_VERSION,
180 }
183def decrypt_from_peer(envelope: Dict[str, str]) -> Optional[bytes]:
184 """Decrypt an envelope sent by a peer.
186 Uses our long-term X25519 private key + the sender's ephemeral public key
187 to reconstruct the shared secret and decrypt.
189 Returns plaintext bytes, or None if decryption fails.
190 """
191 try:
192 eph_pub = X25519PublicKey.from_public_bytes(
193 bytes.fromhex(envelope['eph']))
194 nonce = bytes.fromhex(envelope['nonce'])
195 ciphertext = bytes.fromhex(envelope['ct'])
197 our_private, _ = get_x25519_keypair()
198 shared_secret = our_private.exchange(eph_pub)
200 aes_key = _derive_aes_key(shared_secret, nonce)
202 return AESGCM(aes_key).decrypt(nonce, ciphertext, None)
203 except Exception as e:
204 logger.warning("E2E decrypt failed: %s", e)
205 return None
208# ── JSON Convenience Wrappers ──────────────────────────────────────
210def encrypt_json_for_peer(payload: dict,
211 peer_x25519_public_hex: str,
212 skip_egress_check: bool = False) -> Dict[str, str]:
213 """Encrypt a JSON-serializable dict for a peer.
215 Before encryption, this function:
216 1. Runs ScopeGuard.check_egress() — blocks data that exceeds its privacy scope
217 2. Stamps origin provenance (node fingerprint + timestamp) so the receiver
218 can verify who sent it and that it's from a genuine HART OS node
220 These checks happen at the encryption layer (not the caller) so they
221 cannot be bypassed. Every outbound encrypted payload is scope-checked
222 and provenance-stamped.
224 Args:
225 payload: JSON-serializable dict to encrypt
226 peer_x25519_public_hex: Recipient's X25519 public key
227 skip_egress_check: Only True for internal key exchange / handshake messages
228 """
229 if not skip_egress_check and '_privacy_scope' in payload:
230 # ── Egress check: only for scope-tagged data ──
231 # Untagged data = legacy/internal — allowed through (encryption is the gate).
232 # Tagged data = explicitly classified — ScopeGuard enforces the declared scope.
233 try:
234 from security.edge_privacy import ScopeGuard, PrivacyScope
235 guard = ScopeGuard()
236 dest = PrivacyScope.TRUSTED_PEER
237 allowed, reason = guard.check_egress(payload, dest)
238 if not allowed:
239 logger.warning("Egress BLOCKED: %s", reason)
240 raise ValueError(f"Egress blocked by ScopeGuard: {reason}")
241 except ImportError:
242 pass
243 except ValueError:
244 raise
246 # ── Provenance stamp: who is sending this and are they genuine? ──
247 # Copy so we don't mutate the caller's dict
248 stamped = dict(payload)
249 try:
250 from security.origin_attestation import compute_origin_fingerprint
251 import time as _time
252 stamped['_provenance'] = {
253 'origin_fingerprint': compute_origin_fingerprint(),
254 'node_public_key': get_x25519_public_hex(),
255 'timestamp': _time.time(),
256 }
257 except Exception:
258 pass # Provenance is best-effort — don't block on it
260 plaintext = json.dumps(stamped, separators=(',', ':')).encode('utf-8')
261 return encrypt_for_peer(plaintext, peer_x25519_public_hex)
264def decrypt_json_from_peer(envelope: Dict[str, str]) -> Optional[dict]:
265 """Decrypt an envelope and parse as JSON.
267 After decryption, verifies origin provenance if present:
268 - origin_fingerprint must match a known HART OS fingerprint
269 - timestamp must be within acceptable drift (5 min)
270 Provenance failure logs a warning but does NOT block — the encryption
271 itself is the primary auth. Provenance is defense-in-depth for audit.
272 """
273 plaintext = decrypt_from_peer(envelope)
274 if plaintext is None:
275 return None
276 try:
277 payload = json.loads(plaintext)
278 except (json.JSONDecodeError, UnicodeDecodeError) as e:
279 logger.warning("E2E JSON decode failed: %s", e)
280 return None
282 # ── Verify provenance (audit, not blocking) ──
283 prov = payload.pop('_provenance', None)
284 if prov:
285 try:
286 from security.origin_attestation import compute_origin_fingerprint
287 import time as _time
288 peer_fp = prov.get('origin_fingerprint', '')
289 our_fp = compute_origin_fingerprint()
290 ts = prov.get('timestamp', 0)
291 drift = abs(_time.time() - ts)
293 if peer_fp != our_fp:
294 # Different fingerprint = different HART OS build or a fork
295 # Log but don't block — could be a legitimate older version
296 logger.info("Provenance: peer fingerprint %s differs from ours %s",
297 peer_fp[:12], our_fp[:12])
298 if drift > 300: # 5 min
299 logger.info("Provenance: timestamp drift %.0fs from peer %s",
300 drift, prov.get('node_public_key', '?')[:12])
301 except Exception:
302 pass # Best-effort
304 return payload
307# ── Utility: Check if payload is an encrypted envelope ─────────────
309def is_encrypted_envelope(data: dict) -> bool:
310 """Check if a dict looks like an E2E encrypted envelope."""
311 return (isinstance(data, dict)
312 and 'eph' in data
313 and 'nonce' in data
314 and 'ct' in data
315 and data.get('v') == _PROTOCOL_VERSION)