Coverage for integrations / social / e2e_state_repo.py: 0.0%

90 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Phase 9.B Double Ratchet state persistence. 

3 

4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9. 

5 

6Bridges the stateless ratchet primitives in e2e_ratchet.py with the 

7v52 ratchet_states table. Handles: 

8 

9 - load(db, conv_id, user_id, peer_id) → RatchetState | None 

10 - save(db, conv_id, user_id, peer_id, state) → upserts the row 

11 - bootstrap(db, conv_id, user_id, peer_id, identity_keys) → 

12 first-time init via a deterministic shared secret derived from 

13 both members' published identity keys. This is a STAND-IN for 

14 the proper X3DH handshake (Phase 9.C) — flagged inline so it's 

15 not mistaken for production-grade key agreement. 

16 

17Wire format: state_json holds the JSON-encoded RatchetState with all 

18`bytes` fields base64-encoded (same convention as conversation_keys 

19+ message_envelopes — TEXT columns travel cleanly across SQLite, 

20Postgres, MySQL). The skipped_keys cache (Dict[(bytes,int)→bytes]) 

21serializes to a list of {peer_b64, idx, key_b64} triples so JSON's 

22string-keys-only constraint isn't an issue. 

23 

24Transport: this module is pure DB persistence + serialization. No 

25WAMP, no PeerLink, no notification fan-out — those happen in 

26ConversationService.send_message which CALLS this module. Keeps 

27the transport selection in MessageBus.publish per Plan R.8. 

28""" 

29 

30from __future__ import annotations 

31 

32import base64 

33import hashlib 

34import json 

35import logging 

36import uuid 

37from typing import Any, Dict, List, Optional, Tuple 

38 

39from sqlalchemy import text 

40from sqlalchemy.exc import IntegrityError 

41 

42from . import e2e_ratchet as r 

43 

44logger = logging.getLogger('hevolve_social') 

45 

46 

47class E2EStateError(Exception): 

48 """Raised for state-repo failures. Caller maps to 4xx.""" 

49 

50 

51# ── Serialization ────────────────────────────────────────────────── 

52 

53 

54def _b64(b: Optional[bytes]) -> Optional[str]: 

55 if b is None: 

56 return None 

57 return base64.b64encode(b).decode('ascii') 

58 

59 

60def _unb64(s: Optional[str]) -> Optional[bytes]: 

61 if s is None: 

62 return None 

63 return base64.b64decode(s.encode('ascii')) 

64 

65 

66def serialize_state(state: r.RatchetState) -> str: 

67 """RatchetState → JSON string, all bytes base64-encoded. 

68 

69 The skipped_keys dict has tuple keys (bytes, int) which JSON 

70 can't represent natively — we flatten to a list-of-triples. 

71 """ 

72 skipped = [ 

73 {'peer_b64': _b64(peer), 'idx': idx, 'key_b64': _b64(msg_key)} 

74 for (peer, idx), msg_key in state.skipped_keys.items() 

75 ] 

76 return json.dumps({ 

77 'version': 1, 

78 'root_key': _b64(state.root_key), 

79 'sending_chain_key': _b64(state.sending_chain_key), 

80 'sending_index': state.sending_index, 

81 'receiving_chain_key': _b64(state.receiving_chain_key), 

82 'receiving_index': state.receiving_index, 

83 'our_dh_priv': _b64(state.our_dh_priv), 

84 'our_dh_pub': _b64(state.our_dh_pub), 

85 'their_dh_pub': _b64(state.their_dh_pub), 

86 'skipped_keys': skipped, 

87 }) 

88 

89 

90def deserialize_state(blob: str) -> r.RatchetState: 

91 """JSON string → RatchetState, all bytes decoded.""" 

92 if not blob: 

93 raise E2EStateError("empty state blob") 

94 try: 

95 d = json.loads(blob) 

96 except Exception as e: 

97 raise E2EStateError(f"corrupt state JSON: {e}") 

98 if not isinstance(d, dict) or d.get('version') != 1: 

99 raise E2EStateError( 

100 f"unknown state version: {d.get('version') if isinstance(d, dict) else None}") 

101 skipped_dict: Dict[Tuple[bytes, int], bytes] = {} 

102 for entry in d.get('skipped_keys', []): 

103 peer = _unb64(entry.get('peer_b64')) 

104 idx = int(entry.get('idx', 0)) 

105 msg_key = _unb64(entry.get('key_b64')) 

106 if peer is not None and msg_key is not None: 

107 skipped_dict[(peer, idx)] = msg_key 

108 return r.RatchetState( 

109 root_key=_unb64(d['root_key']) or b'', 

110 sending_chain_key=_unb64(d['sending_chain_key']) or b'', 

111 sending_index=int(d.get('sending_index', 0)), 

112 receiving_chain_key=_unb64(d['receiving_chain_key']) or b'', 

113 receiving_index=int(d.get('receiving_index', 0)), 

114 our_dh_priv=_unb64(d['our_dh_priv']) or b'', 

115 our_dh_pub=_unb64(d['our_dh_pub']) or b'', 

116 their_dh_pub=_unb64(d.get('their_dh_pub')), 

117 skipped_keys=skipped_dict, 

118 ) 

119 

120 

121# ── DB load / save ───────────────────────────────────────────────── 

122 

123 

124def load(db, conversation_id: str, user_id: str, 

125 peer_id: str) -> Optional[r.RatchetState]: 

126 """Fetch the persisted state for (conv, user, peer). Returns 

127 None if no row exists yet — caller decides whether to bootstrap 

128 or refuse the encrypt.""" 

129 row = db.execute(text( 

130 "SELECT state_json FROM ratchet_states " 

131 "WHERE conversation_id = :cid AND user_id = :uid " 

132 "AND peer_id = :pid"), 

133 {'cid': conversation_id, 'uid': user_id, 'pid': peer_id} 

134 ).fetchone() 

135 if row is None: 

136 return None 

137 return deserialize_state(row[0]) 

138 

139 

140def save(db, conversation_id: str, user_id: str, peer_id: str, 

141 state: r.RatchetState, 

142 tenant_id: Optional[str] = None) -> None: 

143 """Upsert the persisted state. Idempotent — row keyed on the 

144 UNIQUE(conversation_id, user_id, peer_id) index from v52.""" 

145 blob = serialize_state(state) 

146 # Try INSERT first; on conflict UPDATE. Portable across SQLite, 

147 # Postgres, MySQL via the same dialect-branch pattern that 

148 # _ensure_member uses in conversation_service.py. 

149 dialect = db.bind.dialect.name if db.bind is not None else 'sqlite' 

150 if dialect == 'sqlite': 

151 stmt = ( 

152 "INSERT INTO ratchet_states " 

153 "(id, tenant_id, conversation_id, user_id, peer_id, " 

154 " state_json, created_at, updated_at) " 

155 "VALUES (:id, :tid, :cid, :uid, :pid, :blob, " 

156 " CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) " 

157 "ON CONFLICT(conversation_id, user_id, peer_id) DO UPDATE " 

158 "SET state_json = excluded.state_json, " 

159 " updated_at = CURRENT_TIMESTAMP") 

160 else: 

161 # Postgres + MySQL both accept this ON CONFLICT shape when 

162 # the unique index is defined — same approach v51 takes. 

163 stmt = ( 

164 "INSERT INTO ratchet_states " 

165 "(id, tenant_id, conversation_id, user_id, peer_id, " 

166 " state_json, created_at, updated_at) " 

167 "VALUES (:id, :tid, :cid, :uid, :pid, :blob, " 

168 " CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) " 

169 "ON CONFLICT (conversation_id, user_id, peer_id) DO UPDATE " 

170 "SET state_json = EXCLUDED.state_json, " 

171 " updated_at = CURRENT_TIMESTAMP") 

172 try: 

173 db.execute(text(stmt), { 

174 'id': str(uuid.uuid4()), 'tid': tenant_id, 

175 'cid': conversation_id, 'uid': user_id, 'pid': peer_id, 

176 'blob': blob}) 

177 except IntegrityError as e: 

178 raise E2EStateError(f"could not save ratchet state: {e}") 

179 

180 

181def delete_for_conversation(db, conversation_id: str) -> int: 

182 """Wipe ratchet state for every pair in the conversation. Used 

183 when a conversation is fully closed/deleted, or when a member 

184 rotates their identity key (forcing a fresh bootstrap). Returns 

185 rowcount for caller logging.""" 

186 result = db.execute(text( 

187 "DELETE FROM ratchet_states WHERE conversation_id = :cid"), 

188 {'cid': conversation_id}) 

189 return result.rowcount or 0 

190 

191 

192# ── Bootstrap (Phase 9.B placeholder for X3DH) ───────────────────── 

193 

194 

195def _sorted_pair_id(a: str, b: str) -> bytes: 

196 """Conversation-pair identifier, order-independent.""" 

197 parts = sorted([str(a).encode('utf-8'), str(b).encode('utf-8')]) 

198 return parts[0] + b'|' + parts[1] 

199 

200 

201def _derive_initial_shared_secret(user_id: str, peer_id: str, 

202 user_identity_b64: str, 

203 peer_identity_b64: str) -> bytes: 

204 """STAND-IN for X3DH (Phase 9.C). 

205 

206 Derives a deterministic 32-byte secret from the sorted concat of 

207 BOTH party IDs and BOTH published identity keys. Both sides 

208 compute the same value independently from data already in 

209 conversation_keys — no out-of-band exchange needed. 

210 

211 SECURITY CAVEAT: This bootstrap is NOT forward-secret across 

212 the FIRST exchange — every input is public. The Double 

213 Ratchet's DH-step ratchet recovers post-compromise security 

214 from message #2 onward (each send rotates DH pubkeys), but 

215 message #1 is vulnerable if a future X3DH-replacement re-derives 

216 the same shared secret. Phase 9.C replaces this with proper 

217 X3DH using signed prekeys + one-time prekeys, which DOES give 

218 first-message forward secrecy. 

219 """ 

220 pair = _sorted_pair_id(user_id, peer_id) 

221 keys = sorted([user_identity_b64.encode('ascii'), 

222 peer_identity_b64.encode('ascii')]) 

223 raw = (b'hevolve-e2e-bootstrap-v9.B|' 

224 + pair + b'|' + keys[0] + b'|' + keys[1]) 

225 return hashlib.sha256(raw).digest() 

226 

227 

228def _derive_initial_dh_keypair(shared_secret: bytes, 

229 party_id: str) -> Tuple[bytes, bytes]: 

230 """STAND-IN for X3DH (Phase 9.C). 

231 

232 Both parties derive each other's bootstrap X25519 keypair from 

233 `shared_secret + party_id` so DH symmetry holds in the FIRST 

234 encrypt: alice computes DH(her_priv, bob_pub) and bob computes 

235 DH(his_priv, alice_pub) — by ECDH symmetry these match, 

236 establishing alice's sending_chain == bob's receiving_chain. 

237 

238 SECURITY CAVEAT: The derived `priv` is *known* to the peer (it's 

239 derived from public inputs). This bootstrap protects against 

240 OUTSIDE attackers who don't know the shared_secret seed but 

241 NOT against the peer themselves — there's no peer impersonation 

242 resistance until X3DH lands. This is acceptable because the 

243 PEER is the legitimate other end of the conversation; a peer 

244 who wants to read their own traffic doesn't need an exploit. 

245 """ 

246 r._require_crypto() 

247 raw = (b'hevolve-bootstrap-dh-v9.B|' + shared_secret 

248 + b'|' + str(party_id).encode('utf-8')) 

249 # X25519 accepts any 32 random-looking bytes as priv (it clamps 

250 # internally). SHA-256 gives us 32. 

251 priv_bytes = hashlib.sha256(raw).digest() 

252 # Round-trip through the X25519 API to recover the canonical 

253 # form + derive the matching pub. 

254 from cryptography.hazmat.primitives.asymmetric.x25519 import ( 

255 X25519PrivateKey, 

256 ) 

257 from cryptography.hazmat.primitives import serialization 

258 priv = X25519PrivateKey.from_private_bytes(priv_bytes) 

259 canonical_priv = priv.private_bytes( 

260 encoding=serialization.Encoding.Raw, 

261 format=serialization.PrivateFormat.Raw, 

262 encryption_algorithm=serialization.NoEncryption(), 

263 ) 

264 pub = priv.public_key().public_bytes( 

265 encoding=serialization.Encoding.Raw, 

266 format=serialization.PublicFormat.Raw, 

267 ) 

268 return canonical_priv, pub 

269 

270 

271def bootstrap_pair(db, conversation_id: str, user_id: str, 

272 peer_id: str, 

273 user_identity_b64: str, peer_identity_b64: str, 

274 *, 

275 tenant_id: Optional[str] = None, 

276 is_initiator: bool) -> r.RatchetState: 

277 """Initialize a fresh ratchet state for the (user, peer) pair. 

278 

279 `is_initiator` follows the libsignal convention: ONE side starts 

280 with their_dh_pub set to the peer's bootstrap pub (so they can 

281 send first); the other side starts with their_dh_pub=None and 

282 advances on their first inbound. Both sides derive their 

283 bootstrap DH keypair deterministically from shared_secret + 

284 party_id so DH symmetry establishes a common chain key 

285 (init_ratchet on the initiator side derives sending_chain via 

286 DH(our_priv, their_pub); the responder's first 

287 advance_dh_ratchet derives receiving_chain via DH(our_priv, 

288 sender_pub) — these match by ECDH). 

289 

290 For DMs the caller picks initiator = `min(user_id, peer_id)` so 

291 both sides agree without coordination. 

292 """ 

293 r._require_crypto() 

294 shared = _derive_initial_shared_secret( 

295 user_id, peer_id, user_identity_b64, peer_identity_b64) 

296 # Bootstrap DH keypairs for BOTH parties — both sides compute 

297 # them so each knows the other's pub. 

298 our_priv, our_pub = _derive_initial_dh_keypair(shared, user_id) 

299 _, peer_pub = _derive_initial_dh_keypair(shared, peer_id) 

300 their_dh_pub = peer_pub if is_initiator else None 

301 state = r.init_ratchet( 

302 shared_secret=shared, 

303 our_dh_priv=our_priv, our_dh_pub=our_pub, 

304 their_dh_pub=their_dh_pub) 

305 save(db, conversation_id, user_id, peer_id, state, 

306 tenant_id=tenant_id) 

307 return state 

308 

309 

310def load_or_bootstrap(db, conversation_id: str, user_id: str, 

311 peer_id: str, 

312 user_identity_b64: str, peer_identity_b64: str, 

313 *, 

314 tenant_id: Optional[str] = None) -> r.RatchetState: 

315 """Convenience: load existing state, or bootstrap on first use. 

316 Initiator selection is deterministic (lexicographic min).""" 

317 existing = load(db, conversation_id, user_id, peer_id) 

318 if existing is not None: 

319 return existing 

320 is_initiator = (str(user_id) < str(peer_id)) 

321 return bootstrap_pair( 

322 db, conversation_id, user_id, peer_id, 

323 user_identity_b64, peer_identity_b64, 

324 tenant_id=tenant_id, is_initiator=is_initiator) 

325 

326 

327__all__ = [ 

328 'E2EStateError', 

329 'serialize_state', 'deserialize_state', 

330 'load', 'save', 'delete_for_conversation', 

331 'bootstrap_pair', 'load_or_bootstrap', 

332]