Coverage for integrations / social / e2e_ratchet.py: 0.0%

154 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Phase 9.B Double Ratchet for E2E DMs. 

3 

4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9. 

5 

6Implements a libsignal-inspired Double Ratchet on top of the v51 

7key envelope schema (Phase 9.A). This is the cryptographic 

8follow-up to e2e_key_service which only handled identity bundles 

9and ciphertext storage. 

10 

11Cryptography: 

12 - X25519 ECDH for the asymmetric ratchet 

13 - HKDF-SHA256 for the symmetric ratchet (chain key + message key 

14 derivation) and the root-key advance 

15 - AES-256-GCM AEAD for message encryption with associated data 

16 binding sender + receiver public keys 

17 

18Property goals: 

19 - Forward secrecy: compromising a long-lived key MUST NOT 

20 decrypt past messages. 

21 - Post-compromise security: after a ratchet step, an attacker 

22 who held the previous keys cannot decrypt new messages. 

23 - Replay resistance: each message has a chain index; out-of-order 

24 arrival is allowed (receiver caches skipped message keys with 

25 a bounded budget) but identical (chain, index) pairs are rejected. 

26 

27Scope of THIS file: 

28 - Pure-compute primitives: encrypt_message, decrypt_message, 

29 advance_dh_ratchet (the X25519 step), derive_message_key, 

30 serialize/deserialize the wire envelope shape. 

31 - Stateless functions where possible. The ratchet STATE 

32 (current root key, sending chain key, receiving chain key, 

33 skipped-message-key cache) lives in a separate `RatchetState` 

34 NamedTuple that callers persist via the conversation_keys + 

35 message_envelopes tables. 

36 

37What's NOT here: 

38 - Persistence (caller's responsibility — see e2e_key_service). 

39 - Key registration / rotation orchestration. 

40 - The X3DH initial handshake (libsignal's first-message bootstrap; 

41 a Phase 9.C follow-up). For now we assume the two parties have 

42 already established an initial shared secret out-of-band. 

43 

44Best-effort import: if `cryptography` is not installed, the module 

45imports cleanly but every operation raises `RatchetUnavailable`. 

46This keeps `e2e_dms` an opt-in flag without forcing the dep on 

47flat / Nunba bundled deploys. 

48""" 

49 

50from __future__ import annotations 

51 

52import hashlib 

53import hmac 

54import json 

55import os 

56import struct 

57from typing import Any, Dict, List, NamedTuple, Optional, Tuple 

58 

59 

60class RatchetError(Exception): 

61 """Base class for ratchet-level errors.""" 

62 

63 

64class RatchetUnavailable(RatchetError): 

65 """Raised when the `cryptography` package isn't installed. The 

66 module imports anyway so dependent code can flag-gate this and 

67 fall back to plaintext storage.""" 

68 

69 

70class RatchetReplayError(RatchetError): 

71 """Raised when a (chain_index) pair is seen twice — replay attack 

72 or duplicate delivery.""" 

73 

74 

75class RatchetSkippedMessageBudgetExceeded(RatchetError): 

76 """Raised when an out-of-order receive would require caching 

77 more than MAX_SKIPPED_MESSAGES message keys.""" 

78 

79 

80# Try to import the cryptography primitives. If absent, every API 

81# raises RatchetUnavailable so the caller can flag-gate. 

82try: 

83 from cryptography.hazmat.primitives.asymmetric.x25519 import ( # type: ignore 

84 X25519PrivateKey, X25519PublicKey, 

85 ) 

86 from cryptography.hazmat.primitives.ciphers.aead import ( # type: ignore 

87 AESGCM, 

88 ) 

89 from cryptography.hazmat.primitives.kdf.hkdf import HKDF # type: ignore 

90 from cryptography.hazmat.primitives import hashes, serialization # type: ignore 

91 _HAS_CRYPTO = True 

92except Exception: # pragma: no cover — exercised only on bare deploys 

93 _HAS_CRYPTO = False 

94 X25519PrivateKey = X25519PublicKey = None 

95 AESGCM = HKDF = hashes = serialization = None 

96 

97 

98# Bound the skipped-message-key cache so an attacker can't OOM us 

99# by sending a single message at chain index 2**31. 

100MAX_SKIPPED_MESSAGES = 256 

101 

102# Wire format version. Bumped if the envelope shape ever changes. 

103WIRE_VERSION = 1 

104 

105# 32 bytes = 256 bits, the HKDF output we use for both root + chain 

106# keys; AES-256-GCM also takes a 32-byte key. 

107KEY_BYTES = 32 

108NONCE_BYTES = 12 # AES-GCM standard 

109 

110 

111def _require_crypto() -> None: 

112 if not _HAS_CRYPTO: 

113 raise RatchetUnavailable( 

114 "ratchet operations require the `cryptography` package; " 

115 "install via `pip install cryptography` or set e2e_dms=False") 

116 

117 

118# ── State shape ──────────────────────────────────────────────────── 

119 

120 

121class RatchetState(NamedTuple): 

122 """The full per-conversation, per-direction ratchet state. 

123 

124 Fields: 

125 root_key: 32-byte HKDF root key. 

126 sending_chain_key: 32 bytes; advanced once per message we send. 

127 sending_index: Counter of messages sent in the current chain. 

128 receiving_chain_key: 32 bytes for the inbound chain. 

129 receiving_index: Counter of messages received in current chain. 

130 our_dh_priv: Our X25519 private key (raw bytes). 

131 our_dh_pub: Our X25519 public key (raw bytes). 

132 their_dh_pub: Their X25519 public key (raw bytes), or None 

133 before the first DH-ratchet step. 

134 skipped_keys: {(their_dh_pub, idx): message_key} cache 

135 for out-of-order delivery. Bounded by 

136 MAX_SKIPPED_MESSAGES. 

137 """ 

138 root_key: bytes 

139 sending_chain_key: bytes 

140 sending_index: int 

141 receiving_chain_key: bytes 

142 receiving_index: int 

143 our_dh_priv: bytes 

144 our_dh_pub: bytes 

145 their_dh_pub: Optional[bytes] 

146 skipped_keys: Dict[Tuple[bytes, int], bytes] 

147 

148 

149# ── KDF helpers ──────────────────────────────────────────────────── 

150 

151 

152def _hkdf(input_key_material: bytes, *, info: bytes, 

153 salt: Optional[bytes] = None, 

154 length: int = KEY_BYTES) -> bytes: 

155 _require_crypto() 

156 return HKDF( 

157 algorithm=hashes.SHA256(), length=length, salt=salt, info=info 

158 ).derive(input_key_material) 

159 

160 

161def _kdf_chain(chain_key: bytes) -> Tuple[bytes, bytes]: 

162 """Symmetric ratchet step. 

163 

164 Given the current chain key, derive: 

165 - new chain key (HMAC-SHA256(chain_key, b'\\x02')) 

166 - message key (HMAC-SHA256(chain_key, b'\\x01')) 

167 

168 Constants `\\x01` / `\\x02` follow the libsignal spec. HMAC is 

169 used over HKDF here for parity with the reference Double Ratchet. 

170 """ 

171 _require_crypto() 

172 msg_key = hmac.new(chain_key, b'\x01', hashlib.sha256).digest() 

173 next_chain = hmac.new(chain_key, b'\x02', hashlib.sha256).digest() 

174 return next_chain, msg_key 

175 

176 

177def _kdf_root(root_key: bytes, dh_output: bytes) -> Tuple[bytes, bytes]: 

178 """Asymmetric ratchet step. 

179 

180 Given the current root key + a fresh DH output, derive: 

181 - new root key 

182 - new chain key (sending or receiving — caller assigns) 

183 """ 

184 _require_crypto() 

185 out = _hkdf(dh_output, info=b'hevolve-e2e-ratchet-root', 

186 salt=root_key, length=KEY_BYTES * 2) 

187 return out[:KEY_BYTES], out[KEY_BYTES:] 

188 

189 

190# ── X25519 helpers ───────────────────────────────────────────────── 

191 

192 

193def generate_dh_keypair() -> Tuple[bytes, bytes]: 

194 """Returns (private_bytes, public_bytes) — both raw 32-byte 

195 representations. Used at conversation init + every DH-ratchet 

196 step.""" 

197 _require_crypto() 

198 priv = X25519PrivateKey.generate() 

199 priv_bytes = priv.private_bytes( 

200 encoding=serialization.Encoding.Raw, 

201 format=serialization.PrivateFormat.Raw, 

202 encryption_algorithm=serialization.NoEncryption(), 

203 ) 

204 pub_bytes = priv.public_key().public_bytes( 

205 encoding=serialization.Encoding.Raw, 

206 format=serialization.PublicFormat.Raw, 

207 ) 

208 return priv_bytes, pub_bytes 

209 

210 

211def _dh(our_priv: bytes, their_pub: bytes) -> bytes: 

212 _require_crypto() 

213 priv = X25519PrivateKey.from_private_bytes(our_priv) 

214 pub = X25519PublicKey.from_public_bytes(their_pub) 

215 return priv.exchange(pub) 

216 

217 

218# ── Initialisation ───────────────────────────────────────────────── 

219 

220 

221def init_ratchet(*, shared_secret: bytes, 

222 our_dh_priv: bytes, our_dh_pub: bytes, 

223 their_dh_pub: Optional[bytes] = None) -> RatchetState: 

224 """Bootstrap a new ratchet state from an out-of-band shared 

225 secret (Phase 9.C X3DH output, or — for tests / Phase 9.B — 

226 a 32-byte secret negotiated externally). 

227 

228 `their_dh_pub` is None for the FIRST sender (the receiver hasn't 

229 completed their first DH step yet). The first send produces the 

230 receiver's first DH ratchet input. 

231 """ 

232 _require_crypto() 

233 if len(shared_secret) < KEY_BYTES: 

234 raise RatchetError( 

235 f"shared_secret must be at least {KEY_BYTES} bytes") 

236 if their_dh_pub is None: 

237 sending_chain = b'\x00' * KEY_BYTES 

238 receiving_chain = b'\x00' * KEY_BYTES 

239 root = shared_secret[:KEY_BYTES] 

240 else: 

241 # We're the responder: derive an initial sending chain from 

242 # the shared secret + first DH(our_priv, their_pub). 

243 dh_out = _dh(our_dh_priv, their_dh_pub) 

244 root, sending_chain = _kdf_root(shared_secret[:KEY_BYTES], dh_out) 

245 receiving_chain = b'\x00' * KEY_BYTES 

246 return RatchetState( 

247 root_key=root, 

248 sending_chain_key=sending_chain, 

249 sending_index=0, 

250 receiving_chain_key=receiving_chain, 

251 receiving_index=0, 

252 our_dh_priv=our_dh_priv, 

253 our_dh_pub=our_dh_pub, 

254 their_dh_pub=their_dh_pub, 

255 skipped_keys={}, 

256 ) 

257 

258 

259# ── DH ratchet step ──────────────────────────────────────────────── 

260 

261 

262def advance_dh_ratchet(state: RatchetState, 

263 their_new_dh_pub: bytes) -> RatchetState: 

264 """Receiver path: the peer rotated their DH key. We: 

265 1. Derive a NEW receiving chain from the old root + DH(our, theirs_new). 

266 2. Generate a fresh DH keypair for ourselves. 

267 3. Derive a NEW sending chain from the (already-stepped) root + 

268 DH(our_new, theirs_new). 

269 

270 Sending and receiving indices reset to 0. The next message we 

271 send will carry our new public key to the peer. 

272 """ 

273 _require_crypto() 

274 dh_recv = _dh(state.our_dh_priv, their_new_dh_pub) 

275 new_root, new_recv_chain = _kdf_root(state.root_key, dh_recv) 

276 new_priv, new_pub = generate_dh_keypair() 

277 dh_send = _dh(new_priv, their_new_dh_pub) 

278 final_root, new_send_chain = _kdf_root(new_root, dh_send) 

279 return state._replace( 

280 root_key=final_root, 

281 sending_chain_key=new_send_chain, 

282 sending_index=0, 

283 receiving_chain_key=new_recv_chain, 

284 receiving_index=0, 

285 our_dh_priv=new_priv, 

286 our_dh_pub=new_pub, 

287 their_dh_pub=their_new_dh_pub, 

288 ) 

289 

290 

291# ── Encrypt / decrypt ────────────────────────────────────────────── 

292 

293 

294def _aad_for(sender_pub: bytes, idx: int) -> bytes: 

295 """Bind sender pubkey + index into the AEAD AAD. 

296 

297 We deliberately do NOT bind the receiver's pubkey: when the 

298 receiver rotates their DH keypair on inbound (libsignal's lazy- 

299 rotate-on-receive pattern), the receiver's `our_dh_pub` changes 

300 between encrypt-time (sender's view) and decrypt-time (receiver's 

301 view). Binding the receiver pub would therefore break the AAD 

302 on every cross-rotation message. 

303 

304 Sender_pub + idx is sufficient because: 

305 - The chain key (which produces the AES-GCM message key) 

306 already encodes the (sender, receiver) pair via the DH 

307 ratchet — different pairs derive different chain keys. 

308 - idx prevents a relay from reordering messages within a chain. 

309 - sender_pub binds the envelope to the actual encryptor; an 

310 attacker who substitutes envelopes between conversations 

311 breaks GCM tag verification. 

312 """ 

313 return sender_pub + struct.pack('>Q', idx) 

314 

315 

316def encrypt_message(state: RatchetState, 

317 plaintext: bytes) -> Tuple[RatchetState, Dict[str, bytes]]: 

318 """Advance the sending chain by one step + AES-GCM encrypt. 

319 Returns the (new_state, envelope_dict). Envelope dict shape: 

320 

321 version: 1 (uint8) 

322 our_pub: 32 raw bytes 

323 idx: sending_index of this message (uint64) 

324 nonce: 12 raw bytes 

325 ciphertext: AES-GCM ciphertext + 16-byte tag 

326 """ 

327 _require_crypto() 

328 if state.their_dh_pub is None: 

329 raise RatchetError( 

330 "cannot encrypt before peer's DH public key is known") 

331 next_chain, msg_key = _kdf_chain(state.sending_chain_key) 

332 nonce = os.urandom(NONCE_BYTES) 

333 aad = _aad_for(state.our_dh_pub, state.sending_index) 

334 ciphertext = AESGCM(msg_key).encrypt(nonce, plaintext, aad) 

335 envelope = { 

336 'version': WIRE_VERSION, 

337 'our_pub': state.our_dh_pub, 

338 'idx': state.sending_index, 

339 'nonce': nonce, 

340 'ciphertext': ciphertext, 

341 } 

342 new_state = state._replace( 

343 sending_chain_key=next_chain, 

344 sending_index=state.sending_index + 1, 

345 ) 

346 return new_state, envelope 

347 

348 

349def _try_skipped(state: RatchetState, their_pub: bytes, 

350 idx: int) -> Optional[bytes]: 

351 """If a previously-skipped message key matches (their_pub, idx), 

352 return it and pop from the cache. Else None.""" 

353 return state.skipped_keys.get((their_pub, idx)) 

354 

355 

356def _skip_message_keys(state: RatchetState, their_pub: bytes, 

357 up_to_idx: int) -> RatchetState: 

358 """Cache message keys we never received (out-of-order delivery 

359 will fill them in later). Bounded by MAX_SKIPPED_MESSAGES.""" 

360 if up_to_idx - state.receiving_index > MAX_SKIPPED_MESSAGES: 

361 raise RatchetSkippedMessageBudgetExceeded( 

362 f"would skip {up_to_idx - state.receiving_index} keys; " 

363 f"max {MAX_SKIPPED_MESSAGES}") 

364 skipped = dict(state.skipped_keys) 

365 chain = state.receiving_chain_key 

366 idx = state.receiving_index 

367 while idx < up_to_idx: 

368 chain, msg_key = _kdf_chain(chain) 

369 skipped[(their_pub, idx)] = msg_key 

370 idx += 1 

371 return state._replace( 

372 receiving_chain_key=chain, 

373 receiving_index=idx, 

374 skipped_keys=skipped, 

375 ) 

376 

377 

378def decrypt_message(state: RatchetState, 

379 envelope: Dict[str, bytes]) -> Tuple[RatchetState, bytes]: 

380 """Decrypt one envelope. Handles three cases: 

381 1. Out-of-order: msg key was already cached → use it, drop 

382 from cache. 

383 2. Peer rotated DH key (their pubkey != current peer pubkey): 

384 step the DH ratchet first, then decrypt. 

385 3. Same chain, in-order: advance the receiving chain. 

386 """ 

387 _require_crypto() 

388 their_pub = envelope['our_pub'] # peer's pub from THEIR perspective 

389 idx = envelope['idx'] 

390 nonce = envelope['nonce'] 

391 ct = envelope['ciphertext'] 

392 

393 cached = _try_skipped(state, their_pub, idx) 

394 if cached is not None: 

395 aad = _aad_for(their_pub, idx) 

396 plaintext = AESGCM(cached).decrypt(nonce, ct, aad) 

397 new_skipped = dict(state.skipped_keys) 

398 new_skipped.pop((their_pub, idx), None) 

399 return state._replace(skipped_keys=new_skipped), plaintext 

400 

401 new_state = state 

402 if state.their_dh_pub != their_pub: 

403 # Peer rotated — first cache any messages we missed on the 

404 # OLD chain, then step DH. 

405 # (Plan E.10: skipped-key cache spans both chains; the libsignal 

406 # reference does the same.) 

407 new_state = advance_dh_ratchet(state, their_pub) 

408 

409 if idx < new_state.receiving_index: 

410 raise RatchetReplayError( 

411 f"already received idx {idx} on this chain") 

412 if idx > new_state.receiving_index: 

413 new_state = _skip_message_keys(new_state, their_pub, idx) 

414 

415 next_chain, msg_key = _kdf_chain(new_state.receiving_chain_key) 

416 aad = _aad_for(their_pub, idx) 

417 plaintext = AESGCM(msg_key).decrypt(nonce, ct, aad) 

418 return new_state._replace( 

419 receiving_chain_key=next_chain, 

420 receiving_index=idx + 1, 

421 ), plaintext 

422 

423 

424# ── Wire serialisation ───────────────────────────────────────────── 

425 

426 

427def serialize_envelope(env: Dict[str, Any]) -> bytes: 

428 """Compact wire format: 

429 [u8 version][u8 pub_len=32][32 bytes our_pub] 

430 [u64 idx][u8 nonce_len=12][12 bytes nonce] 

431 [u32 ciphertext_len][ciphertext bytes] 

432 """ 

433 out = bytearray() 

434 out.append(env['version']) 

435 out.append(len(env['our_pub'])) 

436 out += env['our_pub'] 

437 out += struct.pack('>Q', env['idx']) 

438 out.append(len(env['nonce'])) 

439 out += env['nonce'] 

440 out += struct.pack('>I', len(env['ciphertext'])) 

441 out += env['ciphertext'] 

442 return bytes(out) 

443 

444 

445def deserialize_envelope(buf: bytes) -> Dict[str, Any]: 

446 if len(buf) < 1 + 1 + 32 + 8 + 1 + 12 + 4: 

447 raise RatchetError("envelope too short") 

448 p = 0 

449 version = buf[p]; p += 1 

450 if version != WIRE_VERSION: 

451 raise RatchetError( 

452 f"unknown ratchet wire version: {version}") 

453 pub_len = buf[p]; p += 1 

454 our_pub = bytes(buf[p:p + pub_len]); p += pub_len 

455 idx = struct.unpack('>Q', buf[p:p + 8])[0]; p += 8 

456 nonce_len = buf[p]; p += 1 

457 nonce = bytes(buf[p:p + nonce_len]); p += nonce_len 

458 ct_len = struct.unpack('>I', buf[p:p + 4])[0]; p += 4 

459 ciphertext = bytes(buf[p:p + ct_len]) 

460 return { 

461 'version': version, 

462 'our_pub': our_pub, 

463 'idx': idx, 

464 'nonce': nonce, 

465 'ciphertext': ciphertext, 

466 } 

467 

468 

469__all__ = [ 

470 'RatchetState', 'RatchetError', 'RatchetUnavailable', 

471 'RatchetReplayError', 'RatchetSkippedMessageBudgetExceeded', 

472 'init_ratchet', 'advance_dh_ratchet', 

473 'encrypt_message', 'decrypt_message', 

474 'generate_dh_keypair', 

475 'serialize_envelope', 'deserialize_envelope', 

476 'MAX_SKIPPED_MESSAGES', 'WIRE_VERSION', 

477]