Coverage for integrations / social / e2e_x3dh.py: 0.0%
137 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Phase 9.C X3DH initial key agreement.
4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9.
6Implements libsignal-style X3DH (Extended Triple Diffie-Hellman) so
7two parties can derive a Double Ratchet initial shared_secret
8without an out-of-band exchange. Replaces the deterministic
9placeholder in e2e_state_repo._derive_initial_shared_secret.
11Protocol summary
12================
14Each user publishes a prekey bundle:
15 - IK_pub: long-lived identity X25519 public key
16 - IK_sign_pub: long-lived identity Ed25519 public key (used to
17 sign SPK; same root identity but separate keypair
18 because X25519 + Ed25519 are different curves)
19 - SPK_pub: signed prekey X25519 public key (rotated ~weekly)
20 - SPK_sig: Ed25519 signature over SPK_pub bytes by IK_sign_priv
21 - OPK_pub: optional one-time prekey X25519 public key (used once
22 and discarded by the responder)
24Alice (initiator) handshake:
25 1. Fetch Bob's bundle. Verify SPK_sig with IK_sign_pub.
26 2. Generate ephemeral keypair (EK_priv_A, EK_pub_A).
27 3. Compute four DHs:
28 DH1 = DH(IK_priv_A, SPK_pub_B)
29 DH2 = DH(EK_priv_A, IK_pub_B)
30 DH3 = DH(EK_priv_A, SPK_pub_B)
31 DH4 = DH(EK_priv_A, OPK_pub_B) [omitted if no OPK]
32 4. SK = HKDF-SHA256(DH1 || DH2 || DH3 [|| DH4],
33 info='hevolve-x3dh-v1', salt=zeroed)
34 5. Send to Bob a `PreKeyMessage`:
35 - IK_pub_A, EK_pub_A, OPK_id_consumed (or None)
36 - the first Double-Ratchet envelope encrypted under SK
38Bob (responder) handshake:
39 1. On receiving a PreKeyMessage, look up OPK_priv_B by OPK_id and
40 DELETE it from the prekey store (one-time semantics).
41 2. Recompute DH1..DH4 with private keys reversed (ECDH symmetry):
42 DH1' = DH(SPK_priv_B, IK_pub_A)
43 DH2' = DH(IK_priv_B, EK_pub_A)
44 DH3' = DH(SPK_priv_B, EK_pub_A)
45 DH4' = DH(OPK_priv_B, EK_pub_A)
46 By X25519 symmetry, DH1==DH1', etc., so SK matches.
47 3. Initialize ratchet with SK; decrypt the first envelope.
49Forward secrecy
50===============
51EK_priv_A is ephemeral (lives one session) and OPK_priv_B is
52one-time (deleted on use). Compromise of either party's IK or SPK
53LATER does NOT decrypt past sessions, because EK + OPK private bits
54are gone. This is the property the Phase 9.B placeholder lacks.
56Scope of THIS file
57==================
58Pure-compute primitives. No schema, no endpoints, no wiring into
59ConversationService.send_message — those are integration follow-ups
60(Phase 9.D) that need:
61 - prekey_bundles table (IK + SPK + OPKs per user)
62 - one-time prekey delivery + deletion endpoints
63 - PreKeyMessage envelope routing alongside RatchetEnvelope
64"""
66from __future__ import annotations
68import os
69import struct
70from typing import Any, Dict, List, NamedTuple, Optional, Tuple
73class X3DHError(Exception):
74 """Base class for X3DH-level errors."""
77class X3DHUnavailable(X3DHError):
78 """Raised when `cryptography` isn't installed."""
81class X3DHSignatureError(X3DHError):
82 """Raised when SPK signature verification fails."""
85# Try to import the cryptography primitives. If absent, every API
86# raises X3DHUnavailable so the caller can flag-gate.
87try:
88 from cryptography.hazmat.primitives.asymmetric.x25519 import ( # type: ignore
89 X25519PrivateKey, X25519PublicKey,
90 )
91 from cryptography.hazmat.primitives.asymmetric.ed25519 import ( # type: ignore
92 Ed25519PrivateKey, Ed25519PublicKey,
93 )
94 from cryptography.hazmat.primitives.kdf.hkdf import HKDF # type: ignore
95 from cryptography.hazmat.primitives import hashes, serialization # type: ignore
96 from cryptography.exceptions import InvalidSignature # type: ignore
97 _HAS_CRYPTO = True
98except Exception: # pragma: no cover — exercised only on bare deploys
99 _HAS_CRYPTO = False
100 X25519PrivateKey = X25519PublicKey = None
101 Ed25519PrivateKey = Ed25519PublicKey = None
102 HKDF = hashes = serialization = None
103 InvalidSignature = Exception
106SHARED_SECRET_BYTES = 32
107HKDF_INFO = b'hevolve-x3dh-v1'
108# X3DH per-spec uses a 32-byte zero salt prefix on the HKDF input
109# to disambiguate from random key material derived elsewhere.
110HKDF_SALT_PREFIX = b'\xff' * 32
113def _require_crypto() -> None:
114 if not _HAS_CRYPTO:
115 raise X3DHUnavailable(
116 "X3DH requires the `cryptography` package; "
117 "install via `pip install cryptography` or skip 9.C bootstrap")
120# ── Keypair helpers ────────────────────────────────────────────────
123def _x25519_priv_from_bytes(b: bytes) -> X25519PrivateKey:
124 return X25519PrivateKey.from_private_bytes(b)
127def _x25519_pub_from_bytes(b: bytes) -> X25519PublicKey:
128 return X25519PublicKey.from_public_bytes(b)
131def _ed25519_priv_from_bytes(b: bytes) -> Ed25519PrivateKey:
132 return Ed25519PrivateKey.from_private_bytes(b)
135def _ed25519_pub_from_bytes(b: bytes) -> Ed25519PublicKey:
136 return Ed25519PublicKey.from_public_bytes(b)
139def _x25519_pub_bytes(priv: X25519PrivateKey) -> bytes:
140 return priv.public_key().public_bytes(
141 encoding=serialization.Encoding.Raw,
142 format=serialization.PublicFormat.Raw,
143 )
146def generate_x25519_keypair() -> Tuple[bytes, bytes]:
147 """Returns (private_bytes, public_bytes) — both raw 32-byte
148 X25519 representations."""
149 _require_crypto()
150 priv = X25519PrivateKey.generate()
151 priv_bytes = priv.private_bytes(
152 encoding=serialization.Encoding.Raw,
153 format=serialization.PrivateFormat.Raw,
154 encryption_algorithm=serialization.NoEncryption(),
155 )
156 pub_bytes = _x25519_pub_bytes(priv)
157 return priv_bytes, pub_bytes
160def generate_ed25519_keypair() -> Tuple[bytes, bytes]:
161 """Returns (private_bytes, public_bytes) — both raw 32-byte
162 Ed25519 representations."""
163 _require_crypto()
164 priv = Ed25519PrivateKey.generate()
165 priv_bytes = priv.private_bytes(
166 encoding=serialization.Encoding.Raw,
167 format=serialization.PrivateFormat.Raw,
168 encryption_algorithm=serialization.NoEncryption(),
169 )
170 pub_bytes = priv.public_key().public_bytes(
171 encoding=serialization.Encoding.Raw,
172 format=serialization.PublicFormat.Raw,
173 )
174 return priv_bytes, pub_bytes
177def _dh(our_priv: bytes, their_pub: bytes) -> bytes:
178 return _x25519_priv_from_bytes(our_priv).exchange(
179 _x25519_pub_from_bytes(their_pub))
182# ── Signatures ─────────────────────────────────────────────────────
185def sign_prekey(identity_sign_priv: bytes,
186 signed_prekey_pub: bytes) -> bytes:
187 """Sign the SPK pub bytes with the identity Ed25519 priv.
188 Returns the 64-byte Ed25519 signature."""
189 _require_crypto()
190 return _ed25519_priv_from_bytes(identity_sign_priv).sign(
191 signed_prekey_pub)
194def verify_prekey_signature(identity_sign_pub: bytes,
195 signed_prekey_pub: bytes,
196 signature: bytes) -> bool:
197 """Returns True iff `signature` is a valid Ed25519 sig over
198 `signed_prekey_pub` by `identity_sign_pub`. No exception on
199 failure — caller decides whether to raise X3DHSignatureError."""
200 _require_crypto()
201 try:
202 _ed25519_pub_from_bytes(identity_sign_pub).verify(
203 signature, signed_prekey_pub)
204 return True
205 except InvalidSignature:
206 return False
207 except Exception:
208 return False
211# ── Prekey bundle shape ────────────────────────────────────────────
214class PreKeyBundle(NamedTuple):
215 """The PUBLIC bundle a responder publishes for initiators to fetch.
217 Fields:
218 identity_pub: IK_pub (X25519, 32 bytes)
219 identity_sign_pub: IK_sign_pub (Ed25519, 32 bytes)
220 signed_prekey_pub: SPK_pub (X25519, 32 bytes)
221 signed_prekey_sig: 64-byte Ed25519 sig over SPK_pub
222 one_time_prekey_pub: OPK_pub (X25519, 32 bytes) — optional
223 one_time_prekey_id: Opaque id used to look up OPK_priv on
224 the responder's side. Serialized to the
225 PreKeyMessage so responder can find +
226 delete the right OPK.
227 """
228 identity_pub: bytes
229 identity_sign_pub: bytes
230 signed_prekey_pub: bytes
231 signed_prekey_sig: bytes
232 one_time_prekey_pub: Optional[bytes]
233 one_time_prekey_id: Optional[str]
236class PreKeyMessage(NamedTuple):
237 """The handshake envelope an initiator sends to a responder.
239 Carries the public values the responder needs to recompute the
240 same shared secret (DH symmetry). The first Double-Ratchet
241 envelope rides ALONGSIDE this in the wire format below.
243 Fields:
244 identity_pub_initiator: IK_pub_A
245 ephemeral_pub: EK_pub_A
246 one_time_prekey_id: OPK id consumed (or None)
247 ratchet_envelope_blob: serialized ratchet envelope (the first
248 encrypted message) — opaque to X3DH
249 """
250 identity_pub_initiator: bytes
251 ephemeral_pub: bytes
252 one_time_prekey_id: Optional[str]
253 ratchet_envelope_blob: bytes
256# ── Initiator path ─────────────────────────────────────────────────
259def initiator_derive_shared_secret(
260 identity_priv: bytes,
261 bundle: PreKeyBundle,
262 *,
263 ephemeral_priv: Optional[bytes] = None,
264 verify_sig: bool = True) -> Tuple[bytes, bytes, bytes]:
265 """Run the X3DH handshake from the initiator's side.
267 Returns (shared_secret, ephemeral_priv, ephemeral_pub).
268 Caller persists the ephemeral pub in the PreKeyMessage and
269 feeds shared_secret into init_ratchet().
271 `ephemeral_priv` is an optional override (testing seam) — by
272 default a fresh ephemeral keypair is generated per session.
273 `verify_sig=True` (default) refuses to handshake if the SPK
274 signature doesn't verify under the identity_sign_pub; the
275 caller can explicitly disable for legacy bundles, but production
276 deploys MUST keep it on.
277 """
278 _require_crypto()
279 if verify_sig:
280 if not verify_prekey_signature(
281 bundle.identity_sign_pub,
282 bundle.signed_prekey_pub,
283 bundle.signed_prekey_sig):
284 raise X3DHSignatureError(
285 "SPK signature verification failed — bundle may be "
286 "tampered or identity_sign_pub mismatched")
288 if ephemeral_priv is None:
289 ephemeral_priv, ephemeral_pub = generate_x25519_keypair()
290 else:
291 ephemeral_pub = _x25519_pub_bytes(
292 _x25519_priv_from_bytes(ephemeral_priv))
294 # Four DH computations. Order matches the libsignal X3DH spec
295 # so DH1..DH4 are concatenated in a fixed sequence both sides
296 # agree on.
297 dh1 = _dh(identity_priv, bundle.signed_prekey_pub)
298 dh2 = _dh(ephemeral_priv, bundle.identity_pub)
299 dh3 = _dh(ephemeral_priv, bundle.signed_prekey_pub)
300 if bundle.one_time_prekey_pub is not None:
301 dh4 = _dh(ephemeral_priv, bundle.one_time_prekey_pub)
302 ikm = dh1 + dh2 + dh3 + dh4
303 else:
304 ikm = dh1 + dh2 + dh3
305 sk = _x3dh_kdf(ikm)
306 return sk, ephemeral_priv, ephemeral_pub
309# ── Responder path ─────────────────────────────────────────────────
312def responder_derive_shared_secret(
313 identity_priv: bytes,
314 signed_prekey_priv: bytes,
315 prekey_message: PreKeyMessage,
316 *,
317 one_time_prekey_priv: Optional[bytes] = None) -> bytes:
318 """Run the X3DH handshake from the responder's side.
320 Caller is responsible for looking up `one_time_prekey_priv` from
321 `prekey_message.one_time_prekey_id` (and DELETING it from the
322 prekey store before this call returns). None means the initiator
323 didn't consume an OPK.
325 Returns the shared_secret matching the initiator's by ECDH
326 symmetry.
327 """
328 _require_crypto()
329 dh1 = _dh(signed_prekey_priv, prekey_message.identity_pub_initiator)
330 dh2 = _dh(identity_priv, prekey_message.ephemeral_pub)
331 dh3 = _dh(signed_prekey_priv, prekey_message.ephemeral_pub)
332 if prekey_message.one_time_prekey_id is not None:
333 if one_time_prekey_priv is None:
334 raise X3DHError(
335 "PreKeyMessage references one_time_prekey_id but "
336 "the responder didn't supply the matching priv — "
337 "OPK already consumed or unknown id")
338 dh4 = _dh(one_time_prekey_priv, prekey_message.ephemeral_pub)
339 ikm = dh1 + dh2 + dh3 + dh4
340 else:
341 ikm = dh1 + dh2 + dh3
342 return _x3dh_kdf(ikm)
345# ── KDF ────────────────────────────────────────────────────────────
348def _x3dh_kdf(ikm: bytes) -> bytes:
349 """X3DH-spec KDF: HKDF-SHA256 with a fixed 32-byte 0xff salt
350 prefix prepended to the input keying material, returning a
351 32-byte shared_secret.
353 The prefix is a domain-separation guard the spec recommends so
354 a zero-IKM attacker can't forge a known shared_secret."""
355 _require_crypto()
356 return HKDF(
357 algorithm=hashes.SHA256(),
358 length=SHARED_SECRET_BYTES,
359 salt=b'\x00' * 32,
360 info=HKDF_INFO,
361 ).derive(HKDF_SALT_PREFIX + ikm)
364# ── PreKeyMessage wire format ──────────────────────────────────────
367def serialize_prekey_message(msg: PreKeyMessage) -> bytes:
368 """Compact wire format:
369 [u8 version=1]
370 [u8 opk_present (0 or 1)]
371 [u8 opk_id_len][opk_id_bytes...] — only if opk_present
372 [u8 ik_len=32][ik_pub...]
373 [u8 ek_len=32][ek_pub...]
374 [u32 ratchet_len][ratchet_envelope_blob...]
375 """
376 _require_crypto()
377 out = bytearray()
378 out.append(1) # version
379 if msg.one_time_prekey_id is not None:
380 out.append(1)
381 opk_id = msg.one_time_prekey_id.encode('utf-8')
382 if len(opk_id) > 255:
383 raise X3DHError(
384 f"one_time_prekey_id too long ({len(opk_id)} > 255)")
385 out.append(len(opk_id))
386 out += opk_id
387 else:
388 out.append(0)
389 out.append(len(msg.identity_pub_initiator))
390 out += msg.identity_pub_initiator
391 out.append(len(msg.ephemeral_pub))
392 out += msg.ephemeral_pub
393 out += struct.pack('>I', len(msg.ratchet_envelope_blob))
394 out += msg.ratchet_envelope_blob
395 return bytes(out)
398def deserialize_prekey_message(buf: bytes) -> PreKeyMessage:
399 if len(buf) < 1 + 1 + 1 + 32 + 1 + 32 + 4:
400 raise X3DHError("PreKeyMessage too short")
401 p = 0
402 version = buf[p]; p += 1
403 if version != 1:
404 raise X3DHError(f"unknown PreKeyMessage version: {version}")
405 opk_present = buf[p]; p += 1
406 if opk_present:
407 opk_id_len = buf[p]; p += 1
408 opk_id = buf[p:p + opk_id_len].decode('utf-8'); p += opk_id_len
409 else:
410 opk_id = None
411 ik_len = buf[p]; p += 1
412 ik = bytes(buf[p:p + ik_len]); p += ik_len
413 ek_len = buf[p]; p += 1
414 ek = bytes(buf[p:p + ek_len]); p += ek_len
415 rk_len = struct.unpack('>I', buf[p:p + 4])[0]; p += 4
416 rk = bytes(buf[p:p + rk_len])
417 return PreKeyMessage(
418 identity_pub_initiator=ik,
419 ephemeral_pub=ek,
420 one_time_prekey_id=opk_id,
421 ratchet_envelope_blob=rk,
422 )
425__all__ = [
426 'X3DHError', 'X3DHUnavailable', 'X3DHSignatureError',
427 'PreKeyBundle', 'PreKeyMessage',
428 'generate_x25519_keypair', 'generate_ed25519_keypair',
429 'sign_prekey', 'verify_prekey_signature',
430 'initiator_derive_shared_secret',
431 'responder_derive_shared_secret',
432 'serialize_prekey_message', 'deserialize_prekey_message',
433 'SHARED_SECRET_BYTES', 'HKDF_INFO',
434]