Coverage for integrations / social / e2e_x3dh.py: 0.0%

137 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Phase 9.C X3DH initial key agreement. 

3 

4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9. 

5 

6Implements libsignal-style X3DH (Extended Triple Diffie-Hellman) so 

7two parties can derive a Double Ratchet initial shared_secret 

8without an out-of-band exchange. Replaces the deterministic 

9placeholder in e2e_state_repo._derive_initial_shared_secret. 

10 

11Protocol summary 

12================ 

13 

14Each user publishes a prekey bundle: 

15 - IK_pub: long-lived identity X25519 public key 

16 - IK_sign_pub: long-lived identity Ed25519 public key (used to 

17 sign SPK; same root identity but separate keypair 

18 because X25519 + Ed25519 are different curves) 

19 - SPK_pub: signed prekey X25519 public key (rotated ~weekly) 

20 - SPK_sig: Ed25519 signature over SPK_pub bytes by IK_sign_priv 

21 - OPK_pub: optional one-time prekey X25519 public key (used once 

22 and discarded by the responder) 

23 

24Alice (initiator) handshake: 

25 1. Fetch Bob's bundle. Verify SPK_sig with IK_sign_pub. 

26 2. Generate ephemeral keypair (EK_priv_A, EK_pub_A). 

27 3. Compute four DHs: 

28 DH1 = DH(IK_priv_A, SPK_pub_B) 

29 DH2 = DH(EK_priv_A, IK_pub_B) 

30 DH3 = DH(EK_priv_A, SPK_pub_B) 

31 DH4 = DH(EK_priv_A, OPK_pub_B) [omitted if no OPK] 

32 4. SK = HKDF-SHA256(DH1 || DH2 || DH3 [|| DH4], 

33 info='hevolve-x3dh-v1', salt=zeroed) 

34 5. Send to Bob a `PreKeyMessage`: 

35 - IK_pub_A, EK_pub_A, OPK_id_consumed (or None) 

36 - the first Double-Ratchet envelope encrypted under SK 

37 

38Bob (responder) handshake: 

39 1. On receiving a PreKeyMessage, look up OPK_priv_B by OPK_id and 

40 DELETE it from the prekey store (one-time semantics). 

41 2. Recompute DH1..DH4 with private keys reversed (ECDH symmetry): 

42 DH1' = DH(SPK_priv_B, IK_pub_A) 

43 DH2' = DH(IK_priv_B, EK_pub_A) 

44 DH3' = DH(SPK_priv_B, EK_pub_A) 

45 DH4' = DH(OPK_priv_B, EK_pub_A) 

46 By X25519 symmetry, DH1==DH1', etc., so SK matches. 

47 3. Initialize ratchet with SK; decrypt the first envelope. 

48 

49Forward secrecy 

50=============== 

51EK_priv_A is ephemeral (lives one session) and OPK_priv_B is 

52one-time (deleted on use). Compromise of either party's IK or SPK 

53LATER does NOT decrypt past sessions, because EK + OPK private bits 

54are gone. This is the property the Phase 9.B placeholder lacks. 

55 

56Scope of THIS file 

57================== 

58Pure-compute primitives. No schema, no endpoints, no wiring into 

59ConversationService.send_message — those are integration follow-ups 

60(Phase 9.D) that need: 

61 - prekey_bundles table (IK + SPK + OPKs per user) 

62 - one-time prekey delivery + deletion endpoints 

63 - PreKeyMessage envelope routing alongside RatchetEnvelope 

64""" 

65 

66from __future__ import annotations 

67 

68import os 

69import struct 

70from typing import Any, Dict, List, NamedTuple, Optional, Tuple 

71 

72 

73class X3DHError(Exception): 

74 """Base class for X3DH-level errors.""" 

75 

76 

77class X3DHUnavailable(X3DHError): 

78 """Raised when `cryptography` isn't installed.""" 

79 

80 

81class X3DHSignatureError(X3DHError): 

82 """Raised when SPK signature verification fails.""" 

83 

84 

85# Try to import the cryptography primitives. If absent, every API 

86# raises X3DHUnavailable so the caller can flag-gate. 

87try: 

88 from cryptography.hazmat.primitives.asymmetric.x25519 import ( # type: ignore 

89 X25519PrivateKey, X25519PublicKey, 

90 ) 

91 from cryptography.hazmat.primitives.asymmetric.ed25519 import ( # type: ignore 

92 Ed25519PrivateKey, Ed25519PublicKey, 

93 ) 

94 from cryptography.hazmat.primitives.kdf.hkdf import HKDF # type: ignore 

95 from cryptography.hazmat.primitives import hashes, serialization # type: ignore 

96 from cryptography.exceptions import InvalidSignature # type: ignore 

97 _HAS_CRYPTO = True 

98except Exception: # pragma: no cover — exercised only on bare deploys 

99 _HAS_CRYPTO = False 

100 X25519PrivateKey = X25519PublicKey = None 

101 Ed25519PrivateKey = Ed25519PublicKey = None 

102 HKDF = hashes = serialization = None 

103 InvalidSignature = Exception 

104 

105 

106SHARED_SECRET_BYTES = 32 

107HKDF_INFO = b'hevolve-x3dh-v1' 

108# X3DH per-spec uses a 32-byte zero salt prefix on the HKDF input 

109# to disambiguate from random key material derived elsewhere. 

110HKDF_SALT_PREFIX = b'\xff' * 32 

111 

112 

113def _require_crypto() -> None: 

114 if not _HAS_CRYPTO: 

115 raise X3DHUnavailable( 

116 "X3DH requires the `cryptography` package; " 

117 "install via `pip install cryptography` or skip 9.C bootstrap") 

118 

119 

120# ── Keypair helpers ──────────────────────────────────────────────── 

121 

122 

123def _x25519_priv_from_bytes(b: bytes) -> X25519PrivateKey: 

124 return X25519PrivateKey.from_private_bytes(b) 

125 

126 

127def _x25519_pub_from_bytes(b: bytes) -> X25519PublicKey: 

128 return X25519PublicKey.from_public_bytes(b) 

129 

130 

131def _ed25519_priv_from_bytes(b: bytes) -> Ed25519PrivateKey: 

132 return Ed25519PrivateKey.from_private_bytes(b) 

133 

134 

135def _ed25519_pub_from_bytes(b: bytes) -> Ed25519PublicKey: 

136 return Ed25519PublicKey.from_public_bytes(b) 

137 

138 

139def _x25519_pub_bytes(priv: X25519PrivateKey) -> bytes: 

140 return priv.public_key().public_bytes( 

141 encoding=serialization.Encoding.Raw, 

142 format=serialization.PublicFormat.Raw, 

143 ) 

144 

145 

146def generate_x25519_keypair() -> Tuple[bytes, bytes]: 

147 """Returns (private_bytes, public_bytes) — both raw 32-byte 

148 X25519 representations.""" 

149 _require_crypto() 

150 priv = X25519PrivateKey.generate() 

151 priv_bytes = priv.private_bytes( 

152 encoding=serialization.Encoding.Raw, 

153 format=serialization.PrivateFormat.Raw, 

154 encryption_algorithm=serialization.NoEncryption(), 

155 ) 

156 pub_bytes = _x25519_pub_bytes(priv) 

157 return priv_bytes, pub_bytes 

158 

159 

160def generate_ed25519_keypair() -> Tuple[bytes, bytes]: 

161 """Returns (private_bytes, public_bytes) — both raw 32-byte 

162 Ed25519 representations.""" 

163 _require_crypto() 

164 priv = Ed25519PrivateKey.generate() 

165 priv_bytes = priv.private_bytes( 

166 encoding=serialization.Encoding.Raw, 

167 format=serialization.PrivateFormat.Raw, 

168 encryption_algorithm=serialization.NoEncryption(), 

169 ) 

170 pub_bytes = priv.public_key().public_bytes( 

171 encoding=serialization.Encoding.Raw, 

172 format=serialization.PublicFormat.Raw, 

173 ) 

174 return priv_bytes, pub_bytes 

175 

176 

177def _dh(our_priv: bytes, their_pub: bytes) -> bytes: 

178 return _x25519_priv_from_bytes(our_priv).exchange( 

179 _x25519_pub_from_bytes(their_pub)) 

180 

181 

182# ── Signatures ───────────────────────────────────────────────────── 

183 

184 

185def sign_prekey(identity_sign_priv: bytes, 

186 signed_prekey_pub: bytes) -> bytes: 

187 """Sign the SPK pub bytes with the identity Ed25519 priv. 

188 Returns the 64-byte Ed25519 signature.""" 

189 _require_crypto() 

190 return _ed25519_priv_from_bytes(identity_sign_priv).sign( 

191 signed_prekey_pub) 

192 

193 

194def verify_prekey_signature(identity_sign_pub: bytes, 

195 signed_prekey_pub: bytes, 

196 signature: bytes) -> bool: 

197 """Returns True iff `signature` is a valid Ed25519 sig over 

198 `signed_prekey_pub` by `identity_sign_pub`. No exception on 

199 failure — caller decides whether to raise X3DHSignatureError.""" 

200 _require_crypto() 

201 try: 

202 _ed25519_pub_from_bytes(identity_sign_pub).verify( 

203 signature, signed_prekey_pub) 

204 return True 

205 except InvalidSignature: 

206 return False 

207 except Exception: 

208 return False 

209 

210 

211# ── Prekey bundle shape ──────────────────────────────────────────── 

212 

213 

214class PreKeyBundle(NamedTuple): 

215 """The PUBLIC bundle a responder publishes for initiators to fetch. 

216 

217 Fields: 

218 identity_pub: IK_pub (X25519, 32 bytes) 

219 identity_sign_pub: IK_sign_pub (Ed25519, 32 bytes) 

220 signed_prekey_pub: SPK_pub (X25519, 32 bytes) 

221 signed_prekey_sig: 64-byte Ed25519 sig over SPK_pub 

222 one_time_prekey_pub: OPK_pub (X25519, 32 bytes) — optional 

223 one_time_prekey_id: Opaque id used to look up OPK_priv on 

224 the responder's side. Serialized to the 

225 PreKeyMessage so responder can find + 

226 delete the right OPK. 

227 """ 

228 identity_pub: bytes 

229 identity_sign_pub: bytes 

230 signed_prekey_pub: bytes 

231 signed_prekey_sig: bytes 

232 one_time_prekey_pub: Optional[bytes] 

233 one_time_prekey_id: Optional[str] 

234 

235 

236class PreKeyMessage(NamedTuple): 

237 """The handshake envelope an initiator sends to a responder. 

238 

239 Carries the public values the responder needs to recompute the 

240 same shared secret (DH symmetry). The first Double-Ratchet 

241 envelope rides ALONGSIDE this in the wire format below. 

242 

243 Fields: 

244 identity_pub_initiator: IK_pub_A 

245 ephemeral_pub: EK_pub_A 

246 one_time_prekey_id: OPK id consumed (or None) 

247 ratchet_envelope_blob: serialized ratchet envelope (the first 

248 encrypted message) — opaque to X3DH 

249 """ 

250 identity_pub_initiator: bytes 

251 ephemeral_pub: bytes 

252 one_time_prekey_id: Optional[str] 

253 ratchet_envelope_blob: bytes 

254 

255 

256# ── Initiator path ───────────────────────────────────────────────── 

257 

258 

259def initiator_derive_shared_secret( 

260 identity_priv: bytes, 

261 bundle: PreKeyBundle, 

262 *, 

263 ephemeral_priv: Optional[bytes] = None, 

264 verify_sig: bool = True) -> Tuple[bytes, bytes, bytes]: 

265 """Run the X3DH handshake from the initiator's side. 

266 

267 Returns (shared_secret, ephemeral_priv, ephemeral_pub). 

268 Caller persists the ephemeral pub in the PreKeyMessage and 

269 feeds shared_secret into init_ratchet(). 

270 

271 `ephemeral_priv` is an optional override (testing seam) — by 

272 default a fresh ephemeral keypair is generated per session. 

273 `verify_sig=True` (default) refuses to handshake if the SPK 

274 signature doesn't verify under the identity_sign_pub; the 

275 caller can explicitly disable for legacy bundles, but production 

276 deploys MUST keep it on. 

277 """ 

278 _require_crypto() 

279 if verify_sig: 

280 if not verify_prekey_signature( 

281 bundle.identity_sign_pub, 

282 bundle.signed_prekey_pub, 

283 bundle.signed_prekey_sig): 

284 raise X3DHSignatureError( 

285 "SPK signature verification failed — bundle may be " 

286 "tampered or identity_sign_pub mismatched") 

287 

288 if ephemeral_priv is None: 

289 ephemeral_priv, ephemeral_pub = generate_x25519_keypair() 

290 else: 

291 ephemeral_pub = _x25519_pub_bytes( 

292 _x25519_priv_from_bytes(ephemeral_priv)) 

293 

294 # Four DH computations. Order matches the libsignal X3DH spec 

295 # so DH1..DH4 are concatenated in a fixed sequence both sides 

296 # agree on. 

297 dh1 = _dh(identity_priv, bundle.signed_prekey_pub) 

298 dh2 = _dh(ephemeral_priv, bundle.identity_pub) 

299 dh3 = _dh(ephemeral_priv, bundle.signed_prekey_pub) 

300 if bundle.one_time_prekey_pub is not None: 

301 dh4 = _dh(ephemeral_priv, bundle.one_time_prekey_pub) 

302 ikm = dh1 + dh2 + dh3 + dh4 

303 else: 

304 ikm = dh1 + dh2 + dh3 

305 sk = _x3dh_kdf(ikm) 

306 return sk, ephemeral_priv, ephemeral_pub 

307 

308 

309# ── Responder path ───────────────────────────────────────────────── 

310 

311 

312def responder_derive_shared_secret( 

313 identity_priv: bytes, 

314 signed_prekey_priv: bytes, 

315 prekey_message: PreKeyMessage, 

316 *, 

317 one_time_prekey_priv: Optional[bytes] = None) -> bytes: 

318 """Run the X3DH handshake from the responder's side. 

319 

320 Caller is responsible for looking up `one_time_prekey_priv` from 

321 `prekey_message.one_time_prekey_id` (and DELETING it from the 

322 prekey store before this call returns). None means the initiator 

323 didn't consume an OPK. 

324 

325 Returns the shared_secret matching the initiator's by ECDH 

326 symmetry. 

327 """ 

328 _require_crypto() 

329 dh1 = _dh(signed_prekey_priv, prekey_message.identity_pub_initiator) 

330 dh2 = _dh(identity_priv, prekey_message.ephemeral_pub) 

331 dh3 = _dh(signed_prekey_priv, prekey_message.ephemeral_pub) 

332 if prekey_message.one_time_prekey_id is not None: 

333 if one_time_prekey_priv is None: 

334 raise X3DHError( 

335 "PreKeyMessage references one_time_prekey_id but " 

336 "the responder didn't supply the matching priv — " 

337 "OPK already consumed or unknown id") 

338 dh4 = _dh(one_time_prekey_priv, prekey_message.ephemeral_pub) 

339 ikm = dh1 + dh2 + dh3 + dh4 

340 else: 

341 ikm = dh1 + dh2 + dh3 

342 return _x3dh_kdf(ikm) 

343 

344 

345# ── KDF ──────────────────────────────────────────────────────────── 

346 

347 

348def _x3dh_kdf(ikm: bytes) -> bytes: 

349 """X3DH-spec KDF: HKDF-SHA256 with a fixed 32-byte 0xff salt 

350 prefix prepended to the input keying material, returning a 

351 32-byte shared_secret. 

352 

353 The prefix is a domain-separation guard the spec recommends so 

354 a zero-IKM attacker can't forge a known shared_secret.""" 

355 _require_crypto() 

356 return HKDF( 

357 algorithm=hashes.SHA256(), 

358 length=SHARED_SECRET_BYTES, 

359 salt=b'\x00' * 32, 

360 info=HKDF_INFO, 

361 ).derive(HKDF_SALT_PREFIX + ikm) 

362 

363 

364# ── PreKeyMessage wire format ────────────────────────────────────── 

365 

366 

367def serialize_prekey_message(msg: PreKeyMessage) -> bytes: 

368 """Compact wire format: 

369 [u8 version=1] 

370 [u8 opk_present (0 or 1)] 

371 [u8 opk_id_len][opk_id_bytes...] — only if opk_present 

372 [u8 ik_len=32][ik_pub...] 

373 [u8 ek_len=32][ek_pub...] 

374 [u32 ratchet_len][ratchet_envelope_blob...] 

375 """ 

376 _require_crypto() 

377 out = bytearray() 

378 out.append(1) # version 

379 if msg.one_time_prekey_id is not None: 

380 out.append(1) 

381 opk_id = msg.one_time_prekey_id.encode('utf-8') 

382 if len(opk_id) > 255: 

383 raise X3DHError( 

384 f"one_time_prekey_id too long ({len(opk_id)} > 255)") 

385 out.append(len(opk_id)) 

386 out += opk_id 

387 else: 

388 out.append(0) 

389 out.append(len(msg.identity_pub_initiator)) 

390 out += msg.identity_pub_initiator 

391 out.append(len(msg.ephemeral_pub)) 

392 out += msg.ephemeral_pub 

393 out += struct.pack('>I', len(msg.ratchet_envelope_blob)) 

394 out += msg.ratchet_envelope_blob 

395 return bytes(out) 

396 

397 

398def deserialize_prekey_message(buf: bytes) -> PreKeyMessage: 

399 if len(buf) < 1 + 1 + 1 + 32 + 1 + 32 + 4: 

400 raise X3DHError("PreKeyMessage too short") 

401 p = 0 

402 version = buf[p]; p += 1 

403 if version != 1: 

404 raise X3DHError(f"unknown PreKeyMessage version: {version}") 

405 opk_present = buf[p]; p += 1 

406 if opk_present: 

407 opk_id_len = buf[p]; p += 1 

408 opk_id = buf[p:p + opk_id_len].decode('utf-8'); p += opk_id_len 

409 else: 

410 opk_id = None 

411 ik_len = buf[p]; p += 1 

412 ik = bytes(buf[p:p + ik_len]); p += ik_len 

413 ek_len = buf[p]; p += 1 

414 ek = bytes(buf[p:p + ek_len]); p += ek_len 

415 rk_len = struct.unpack('>I', buf[p:p + 4])[0]; p += 4 

416 rk = bytes(buf[p:p + rk_len]) 

417 return PreKeyMessage( 

418 identity_pub_initiator=ik, 

419 ephemeral_pub=ek, 

420 one_time_prekey_id=opk_id, 

421 ratchet_envelope_blob=rk, 

422 ) 

423 

424 

425__all__ = [ 

426 'X3DHError', 'X3DHUnavailable', 'X3DHSignatureError', 

427 'PreKeyBundle', 'PreKeyMessage', 

428 'generate_x25519_keypair', 'generate_ed25519_keypair', 

429 'sign_prekey', 'verify_prekey_signature', 

430 'initiator_derive_shared_secret', 

431 'responder_derive_shared_secret', 

432 'serialize_prekey_message', 'deserialize_prekey_message', 

433 'SHARED_SECRET_BYTES', 'HKDF_INFO', 

434]