Coverage for integrations / social / e2e_ratchet.py: 0.0%
154 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Phase 9.B Double Ratchet for E2E DMs.
4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9.
6Implements a libsignal-inspired Double Ratchet on top of the v51
7key envelope schema (Phase 9.A). This is the cryptographic
8follow-up to e2e_key_service which only handled identity bundles
9and ciphertext storage.
11Cryptography:
12 - X25519 ECDH for the asymmetric ratchet
13 - HKDF-SHA256 for the symmetric ratchet (chain key + message key
14 derivation) and the root-key advance
15 - AES-256-GCM AEAD for message encryption with associated data
16 binding sender + receiver public keys
18Property goals:
19 - Forward secrecy: compromising a long-lived key MUST NOT
20 decrypt past messages.
21 - Post-compromise security: after a ratchet step, an attacker
22 who held the previous keys cannot decrypt new messages.
23 - Replay resistance: each message has a chain index; out-of-order
24 arrival is allowed (receiver caches skipped message keys with
25 a bounded budget) but identical (chain, index) pairs are rejected.
27Scope of THIS file:
28 - Pure-compute primitives: encrypt_message, decrypt_message,
29 advance_dh_ratchet (the X25519 step), derive_message_key,
30 serialize/deserialize the wire envelope shape.
31 - Stateless functions where possible. The ratchet STATE
32 (current root key, sending chain key, receiving chain key,
33 skipped-message-key cache) lives in a separate `RatchetState`
34 NamedTuple that callers persist via the conversation_keys +
35 message_envelopes tables.
37What's NOT here:
38 - Persistence (caller's responsibility — see e2e_key_service).
39 - Key registration / rotation orchestration.
40 - The X3DH initial handshake (libsignal's first-message bootstrap;
41 a Phase 9.C follow-up). For now we assume the two parties have
42 already established an initial shared secret out-of-band.
44Best-effort import: if `cryptography` is not installed, the module
45imports cleanly but every operation raises `RatchetUnavailable`.
46This keeps `e2e_dms` an opt-in flag without forcing the dep on
47flat / Nunba bundled deploys.
48"""
50from __future__ import annotations
52import hashlib
53import hmac
54import json
55import os
56import struct
57from typing import Any, Dict, List, NamedTuple, Optional, Tuple
60class RatchetError(Exception):
61 """Base class for ratchet-level errors."""
64class RatchetUnavailable(RatchetError):
65 """Raised when the `cryptography` package isn't installed. The
66 module imports anyway so dependent code can flag-gate this and
67 fall back to plaintext storage."""
70class RatchetReplayError(RatchetError):
71 """Raised when a (chain_index) pair is seen twice — replay attack
72 or duplicate delivery."""
75class RatchetSkippedMessageBudgetExceeded(RatchetError):
76 """Raised when an out-of-order receive would require caching
77 more than MAX_SKIPPED_MESSAGES message keys."""
80# Try to import the cryptography primitives. If absent, every API
81# raises RatchetUnavailable so the caller can flag-gate.
82try:
83 from cryptography.hazmat.primitives.asymmetric.x25519 import ( # type: ignore
84 X25519PrivateKey, X25519PublicKey,
85 )
86 from cryptography.hazmat.primitives.ciphers.aead import ( # type: ignore
87 AESGCM,
88 )
89 from cryptography.hazmat.primitives.kdf.hkdf import HKDF # type: ignore
90 from cryptography.hazmat.primitives import hashes, serialization # type: ignore
91 _HAS_CRYPTO = True
92except Exception: # pragma: no cover — exercised only on bare deploys
93 _HAS_CRYPTO = False
94 X25519PrivateKey = X25519PublicKey = None
95 AESGCM = HKDF = hashes = serialization = None
98# Bound the skipped-message-key cache so an attacker can't OOM us
99# by sending a single message at chain index 2**31.
100MAX_SKIPPED_MESSAGES = 256
102# Wire format version. Bumped if the envelope shape ever changes.
103WIRE_VERSION = 1
105# 32 bytes = 256 bits, the HKDF output we use for both root + chain
106# keys; AES-256-GCM also takes a 32-byte key.
107KEY_BYTES = 32
108NONCE_BYTES = 12 # AES-GCM standard
111def _require_crypto() -> None:
112 if not _HAS_CRYPTO:
113 raise RatchetUnavailable(
114 "ratchet operations require the `cryptography` package; "
115 "install via `pip install cryptography` or set e2e_dms=False")
118# ── State shape ────────────────────────────────────────────────────
121class RatchetState(NamedTuple):
122 """The full per-conversation, per-direction ratchet state.
124 Fields:
125 root_key: 32-byte HKDF root key.
126 sending_chain_key: 32 bytes; advanced once per message we send.
127 sending_index: Counter of messages sent in the current chain.
128 receiving_chain_key: 32 bytes for the inbound chain.
129 receiving_index: Counter of messages received in current chain.
130 our_dh_priv: Our X25519 private key (raw bytes).
131 our_dh_pub: Our X25519 public key (raw bytes).
132 their_dh_pub: Their X25519 public key (raw bytes), or None
133 before the first DH-ratchet step.
134 skipped_keys: {(their_dh_pub, idx): message_key} cache
135 for out-of-order delivery. Bounded by
136 MAX_SKIPPED_MESSAGES.
137 """
138 root_key: bytes
139 sending_chain_key: bytes
140 sending_index: int
141 receiving_chain_key: bytes
142 receiving_index: int
143 our_dh_priv: bytes
144 our_dh_pub: bytes
145 their_dh_pub: Optional[bytes]
146 skipped_keys: Dict[Tuple[bytes, int], bytes]
149# ── KDF helpers ────────────────────────────────────────────────────
152def _hkdf(input_key_material: bytes, *, info: bytes,
153 salt: Optional[bytes] = None,
154 length: int = KEY_BYTES) -> bytes:
155 _require_crypto()
156 return HKDF(
157 algorithm=hashes.SHA256(), length=length, salt=salt, info=info
158 ).derive(input_key_material)
161def _kdf_chain(chain_key: bytes) -> Tuple[bytes, bytes]:
162 """Symmetric ratchet step.
164 Given the current chain key, derive:
165 - new chain key (HMAC-SHA256(chain_key, b'\\x02'))
166 - message key (HMAC-SHA256(chain_key, b'\\x01'))
168 Constants `\\x01` / `\\x02` follow the libsignal spec. HMAC is
169 used over HKDF here for parity with the reference Double Ratchet.
170 """
171 _require_crypto()
172 msg_key = hmac.new(chain_key, b'\x01', hashlib.sha256).digest()
173 next_chain = hmac.new(chain_key, b'\x02', hashlib.sha256).digest()
174 return next_chain, msg_key
177def _kdf_root(root_key: bytes, dh_output: bytes) -> Tuple[bytes, bytes]:
178 """Asymmetric ratchet step.
180 Given the current root key + a fresh DH output, derive:
181 - new root key
182 - new chain key (sending or receiving — caller assigns)
183 """
184 _require_crypto()
185 out = _hkdf(dh_output, info=b'hevolve-e2e-ratchet-root',
186 salt=root_key, length=KEY_BYTES * 2)
187 return out[:KEY_BYTES], out[KEY_BYTES:]
190# ── X25519 helpers ─────────────────────────────────────────────────
193def generate_dh_keypair() -> Tuple[bytes, bytes]:
194 """Returns (private_bytes, public_bytes) — both raw 32-byte
195 representations. Used at conversation init + every DH-ratchet
196 step."""
197 _require_crypto()
198 priv = X25519PrivateKey.generate()
199 priv_bytes = priv.private_bytes(
200 encoding=serialization.Encoding.Raw,
201 format=serialization.PrivateFormat.Raw,
202 encryption_algorithm=serialization.NoEncryption(),
203 )
204 pub_bytes = priv.public_key().public_bytes(
205 encoding=serialization.Encoding.Raw,
206 format=serialization.PublicFormat.Raw,
207 )
208 return priv_bytes, pub_bytes
211def _dh(our_priv: bytes, their_pub: bytes) -> bytes:
212 _require_crypto()
213 priv = X25519PrivateKey.from_private_bytes(our_priv)
214 pub = X25519PublicKey.from_public_bytes(their_pub)
215 return priv.exchange(pub)
218# ── Initialisation ─────────────────────────────────────────────────
221def init_ratchet(*, shared_secret: bytes,
222 our_dh_priv: bytes, our_dh_pub: bytes,
223 their_dh_pub: Optional[bytes] = None) -> RatchetState:
224 """Bootstrap a new ratchet state from an out-of-band shared
225 secret (Phase 9.C X3DH output, or — for tests / Phase 9.B —
226 a 32-byte secret negotiated externally).
228 `their_dh_pub` is None for the FIRST sender (the receiver hasn't
229 completed their first DH step yet). The first send produces the
230 receiver's first DH ratchet input.
231 """
232 _require_crypto()
233 if len(shared_secret) < KEY_BYTES:
234 raise RatchetError(
235 f"shared_secret must be at least {KEY_BYTES} bytes")
236 if their_dh_pub is None:
237 sending_chain = b'\x00' * KEY_BYTES
238 receiving_chain = b'\x00' * KEY_BYTES
239 root = shared_secret[:KEY_BYTES]
240 else:
241 # We're the responder: derive an initial sending chain from
242 # the shared secret + first DH(our_priv, their_pub).
243 dh_out = _dh(our_dh_priv, their_dh_pub)
244 root, sending_chain = _kdf_root(shared_secret[:KEY_BYTES], dh_out)
245 receiving_chain = b'\x00' * KEY_BYTES
246 return RatchetState(
247 root_key=root,
248 sending_chain_key=sending_chain,
249 sending_index=0,
250 receiving_chain_key=receiving_chain,
251 receiving_index=0,
252 our_dh_priv=our_dh_priv,
253 our_dh_pub=our_dh_pub,
254 their_dh_pub=their_dh_pub,
255 skipped_keys={},
256 )
259# ── DH ratchet step ────────────────────────────────────────────────
262def advance_dh_ratchet(state: RatchetState,
263 their_new_dh_pub: bytes) -> RatchetState:
264 """Receiver path: the peer rotated their DH key. We:
265 1. Derive a NEW receiving chain from the old root + DH(our, theirs_new).
266 2. Generate a fresh DH keypair for ourselves.
267 3. Derive a NEW sending chain from the (already-stepped) root +
268 DH(our_new, theirs_new).
270 Sending and receiving indices reset to 0. The next message we
271 send will carry our new public key to the peer.
272 """
273 _require_crypto()
274 dh_recv = _dh(state.our_dh_priv, their_new_dh_pub)
275 new_root, new_recv_chain = _kdf_root(state.root_key, dh_recv)
276 new_priv, new_pub = generate_dh_keypair()
277 dh_send = _dh(new_priv, their_new_dh_pub)
278 final_root, new_send_chain = _kdf_root(new_root, dh_send)
279 return state._replace(
280 root_key=final_root,
281 sending_chain_key=new_send_chain,
282 sending_index=0,
283 receiving_chain_key=new_recv_chain,
284 receiving_index=0,
285 our_dh_priv=new_priv,
286 our_dh_pub=new_pub,
287 their_dh_pub=their_new_dh_pub,
288 )
291# ── Encrypt / decrypt ──────────────────────────────────────────────
294def _aad_for(sender_pub: bytes, idx: int) -> bytes:
295 """Bind sender pubkey + index into the AEAD AAD.
297 We deliberately do NOT bind the receiver's pubkey: when the
298 receiver rotates their DH keypair on inbound (libsignal's lazy-
299 rotate-on-receive pattern), the receiver's `our_dh_pub` changes
300 between encrypt-time (sender's view) and decrypt-time (receiver's
301 view). Binding the receiver pub would therefore break the AAD
302 on every cross-rotation message.
304 Sender_pub + idx is sufficient because:
305 - The chain key (which produces the AES-GCM message key)
306 already encodes the (sender, receiver) pair via the DH
307 ratchet — different pairs derive different chain keys.
308 - idx prevents a relay from reordering messages within a chain.
309 - sender_pub binds the envelope to the actual encryptor; an
310 attacker who substitutes envelopes between conversations
311 breaks GCM tag verification.
312 """
313 return sender_pub + struct.pack('>Q', idx)
316def encrypt_message(state: RatchetState,
317 plaintext: bytes) -> Tuple[RatchetState, Dict[str, bytes]]:
318 """Advance the sending chain by one step + AES-GCM encrypt.
319 Returns the (new_state, envelope_dict). Envelope dict shape:
321 version: 1 (uint8)
322 our_pub: 32 raw bytes
323 idx: sending_index of this message (uint64)
324 nonce: 12 raw bytes
325 ciphertext: AES-GCM ciphertext + 16-byte tag
326 """
327 _require_crypto()
328 if state.their_dh_pub is None:
329 raise RatchetError(
330 "cannot encrypt before peer's DH public key is known")
331 next_chain, msg_key = _kdf_chain(state.sending_chain_key)
332 nonce = os.urandom(NONCE_BYTES)
333 aad = _aad_for(state.our_dh_pub, state.sending_index)
334 ciphertext = AESGCM(msg_key).encrypt(nonce, plaintext, aad)
335 envelope = {
336 'version': WIRE_VERSION,
337 'our_pub': state.our_dh_pub,
338 'idx': state.sending_index,
339 'nonce': nonce,
340 'ciphertext': ciphertext,
341 }
342 new_state = state._replace(
343 sending_chain_key=next_chain,
344 sending_index=state.sending_index + 1,
345 )
346 return new_state, envelope
349def _try_skipped(state: RatchetState, their_pub: bytes,
350 idx: int) -> Optional[bytes]:
351 """If a previously-skipped message key matches (their_pub, idx),
352 return it and pop from the cache. Else None."""
353 return state.skipped_keys.get((their_pub, idx))
356def _skip_message_keys(state: RatchetState, their_pub: bytes,
357 up_to_idx: int) -> RatchetState:
358 """Cache message keys we never received (out-of-order delivery
359 will fill them in later). Bounded by MAX_SKIPPED_MESSAGES."""
360 if up_to_idx - state.receiving_index > MAX_SKIPPED_MESSAGES:
361 raise RatchetSkippedMessageBudgetExceeded(
362 f"would skip {up_to_idx - state.receiving_index} keys; "
363 f"max {MAX_SKIPPED_MESSAGES}")
364 skipped = dict(state.skipped_keys)
365 chain = state.receiving_chain_key
366 idx = state.receiving_index
367 while idx < up_to_idx:
368 chain, msg_key = _kdf_chain(chain)
369 skipped[(their_pub, idx)] = msg_key
370 idx += 1
371 return state._replace(
372 receiving_chain_key=chain,
373 receiving_index=idx,
374 skipped_keys=skipped,
375 )
378def decrypt_message(state: RatchetState,
379 envelope: Dict[str, bytes]) -> Tuple[RatchetState, bytes]:
380 """Decrypt one envelope. Handles three cases:
381 1. Out-of-order: msg key was already cached → use it, drop
382 from cache.
383 2. Peer rotated DH key (their pubkey != current peer pubkey):
384 step the DH ratchet first, then decrypt.
385 3. Same chain, in-order: advance the receiving chain.
386 """
387 _require_crypto()
388 their_pub = envelope['our_pub'] # peer's pub from THEIR perspective
389 idx = envelope['idx']
390 nonce = envelope['nonce']
391 ct = envelope['ciphertext']
393 cached = _try_skipped(state, their_pub, idx)
394 if cached is not None:
395 aad = _aad_for(their_pub, idx)
396 plaintext = AESGCM(cached).decrypt(nonce, ct, aad)
397 new_skipped = dict(state.skipped_keys)
398 new_skipped.pop((their_pub, idx), None)
399 return state._replace(skipped_keys=new_skipped), plaintext
401 new_state = state
402 if state.their_dh_pub != their_pub:
403 # Peer rotated — first cache any messages we missed on the
404 # OLD chain, then step DH.
405 # (Plan E.10: skipped-key cache spans both chains; the libsignal
406 # reference does the same.)
407 new_state = advance_dh_ratchet(state, their_pub)
409 if idx < new_state.receiving_index:
410 raise RatchetReplayError(
411 f"already received idx {idx} on this chain")
412 if idx > new_state.receiving_index:
413 new_state = _skip_message_keys(new_state, their_pub, idx)
415 next_chain, msg_key = _kdf_chain(new_state.receiving_chain_key)
416 aad = _aad_for(their_pub, idx)
417 plaintext = AESGCM(msg_key).decrypt(nonce, ct, aad)
418 return new_state._replace(
419 receiving_chain_key=next_chain,
420 receiving_index=idx + 1,
421 ), plaintext
424# ── Wire serialisation ─────────────────────────────────────────────
427def serialize_envelope(env: Dict[str, Any]) -> bytes:
428 """Compact wire format:
429 [u8 version][u8 pub_len=32][32 bytes our_pub]
430 [u64 idx][u8 nonce_len=12][12 bytes nonce]
431 [u32 ciphertext_len][ciphertext bytes]
432 """
433 out = bytearray()
434 out.append(env['version'])
435 out.append(len(env['our_pub']))
436 out += env['our_pub']
437 out += struct.pack('>Q', env['idx'])
438 out.append(len(env['nonce']))
439 out += env['nonce']
440 out += struct.pack('>I', len(env['ciphertext']))
441 out += env['ciphertext']
442 return bytes(out)
445def deserialize_envelope(buf: bytes) -> Dict[str, Any]:
446 if len(buf) < 1 + 1 + 32 + 8 + 1 + 12 + 4:
447 raise RatchetError("envelope too short")
448 p = 0
449 version = buf[p]; p += 1
450 if version != WIRE_VERSION:
451 raise RatchetError(
452 f"unknown ratchet wire version: {version}")
453 pub_len = buf[p]; p += 1
454 our_pub = bytes(buf[p:p + pub_len]); p += pub_len
455 idx = struct.unpack('>Q', buf[p:p + 8])[0]; p += 8
456 nonce_len = buf[p]; p += 1
457 nonce = bytes(buf[p:p + nonce_len]); p += nonce_len
458 ct_len = struct.unpack('>I', buf[p:p + 4])[0]; p += 4
459 ciphertext = bytes(buf[p:p + ct_len])
460 return {
461 'version': version,
462 'our_pub': our_pub,
463 'idx': idx,
464 'nonce': nonce,
465 'ciphertext': ciphertext,
466 }
469__all__ = [
470 'RatchetState', 'RatchetError', 'RatchetUnavailable',
471 'RatchetReplayError', 'RatchetSkippedMessageBudgetExceeded',
472 'init_ratchet', 'advance_dh_ratchet',
473 'encrypt_message', 'decrypt_message',
474 'generate_dh_keypair',
475 'serialize_envelope', 'deserialize_envelope',
476 'MAX_SKIPPED_MESSAGES', 'WIRE_VERSION',
477]