Coverage for integrations / social / e2e_dm_pipeline.py: 0.0%

110 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Phase 9.B E2E DM encrypt/decrypt pipeline. 

3 

4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9. 

5 

6Glues the Double Ratchet primitives (e2e_ratchet.py), the per-pair 

7state repo (e2e_state_repo.py), and the envelope storage 

8(e2e_key_service.py) into a two-function pipeline: 

9 

10 - encrypt_for_conversation: sender encrypts plaintext into one 

11 envelope per active recipient member, advancing each pair's 

12 ratchet state. Stores envelopes via E2EKeyService.record_envelope. 

13 

14 - decrypt_for_recipient: receiver fetches their envelope for the 

15 message_id, decrypts it via their stored ratchet state, advances 

16 state, returns plaintext. 

17 

18When the `e2e_dms` feature flag is OFF or a conversation's 

19`settings.e2e_enabled` is not True, this module's hot path is never 

20called — ConversationService.send_message stores plaintext as 

21before. Zero regression on the flag-off path. 

22 

23Initial-secret bootstrap is the deterministic placeholder in 

24e2e_state_repo._derive_initial_shared_secret; X3DH (Phase 9.C) 

25will replace it without API change at this layer. 

26""" 

27 

28from __future__ import annotations 

29 

30import base64 

31import json 

32import logging 

33from typing import Any, Dict, List, Optional, Tuple 

34 

35from sqlalchemy import text 

36 

37from . import e2e_ratchet as r 

38from . import e2e_state_repo as state_repo 

39from .e2e_key_service import E2EKeyService 

40 

41logger = logging.getLogger('hevolve_social') 

42 

43 

44# ── Flag / settings resolution ────────────────────────────────────── 

45 

46 

47def _conv_settings(db, conv_id: str) -> Dict[str, Any]: 

48 row = db.execute(text( 

49 "SELECT settings FROM conversations WHERE id = :cid"), 

50 {'cid': conv_id} 

51 ).fetchone() 

52 if row is None or row[0] is None: 

53 return {} 

54 try: 

55 parsed = json.loads(row[0]) 

56 return parsed if isinstance(parsed, dict) else {} 

57 except Exception: 

58 return {} 

59 

60 

61def _flag_on() -> bool: 

62 """Server-side `e2e_dms` flag from g.feature_flags. Defaults 

63 False — so opt-in via the explicit flag, not just by setting 

64 the conversation's settings.e2e_enabled (which a malicious client 

65 could try to spoof in a JSON payload).""" 

66 try: 

67 from flask import g, has_request_context 

68 if has_request_context(): 

69 flags = getattr(g, 'feature_flags', {}) or {} 

70 return bool(flags.get('e2e_dms', False)) 

71 except Exception: 

72 pass 

73 return False 

74 

75 

76def should_encrypt(db, conv_id: str) -> bool: 

77 """Both flag AND settings must agree. Either off → plaintext. 

78 

79 Defense in depth: a malicious client editing the conversation 

80 settings can't force encryption (the server flag gates the path); 

81 a server with the flag on but a conversation that hasn't opted in 

82 still stores plaintext (so legacy clients keep working).""" 

83 if not _flag_on(): 

84 return False 

85 if not r._HAS_CRYPTO: 

86 # Bare deploy without `cryptography` — degrade to plaintext. 

87 # Logged once at warn so operators notice the gap. 

88 logger.warning( 

89 "e2e_dms flag is on but `cryptography` package is missing; " 

90 "falling back to plaintext storage") 

91 return False 

92 settings = _conv_settings(db, conv_id) 

93 return bool(settings.get('e2e_enabled', False)) 

94 

95 

96# ── Recipient resolution ──────────────────────────────────────────── 

97 

98 

99def _list_active_keys(db, conv_id: str, 

100 tenant_id: Optional[str] = None 

101 ) -> List[Dict[str, Any]]: 

102 """Active member identity bundles for the conversation. Only 

103 members who've published a key get an envelope — list_members 

104 _without_keys is the inverse for caller UI warnings. Phase 9.A 

105 schema already enforces UNIQUE active key per (conv, user).""" 

106 return E2EKeyService.get_active_keys(db, conv_id, tenant_id=tenant_id) 

107 

108 

109# ── Encrypt path ──────────────────────────────────────────────────── 

110 

111 

112def encrypt_for_conversation(db, conv_id: str, sender_id: str, 

113 message_id: str, plaintext: str, 

114 *, 

115 tenant_id: Optional[str] = None 

116 ) -> List[str]: 

117 """Encrypt `plaintext` once per active recipient. Stores one 

118 envelope per recipient via E2EKeyService.record_envelope, advances 

119 each pair's ratchet state. Returns the list of recipient_ids 

120 that actually got an envelope (excludes sender + members without 

121 a published key). 

122 

123 The caller (ConversationService.send_message) replaces the 

124 plaintext content with a placeholder ('[encrypted]') so legacy 

125 readers / non-member admin tooling don't crash on null. 

126 

127 Raises RatchetUnavailable if `cryptography` isn't installed — 

128 callers SHOULD pre-check via should_encrypt() so this never fires 

129 in practice. 

130 """ 

131 r._require_crypto() 

132 keys = _list_active_keys(db, conv_id, tenant_id=tenant_id) 

133 sender_keys = [k for k in keys if k['user_id'] == sender_id] 

134 if not sender_keys: 

135 raise r.RatchetError( 

136 f"sender {sender_id} has no published identity key — call " 

137 f"E2EKeyService.publish_identity_key before sending") 

138 sender_key_b64 = sender_keys[0]['identity_key_b64'] 

139 

140 plaintext_bytes = plaintext.encode('utf-8') 

141 delivered: List[str] = [] 

142 for recipient_key in keys: 

143 rid = recipient_key['user_id'] 

144 if rid == sender_id: 

145 continue 

146 # Bootstrap (or load) the SENDER's view of the (sender→recipient) pair. 

147 try: 

148 state = state_repo.load_or_bootstrap( 

149 db, conv_id, sender_id, rid, 

150 sender_key_b64, recipient_key['identity_key_b64'], 

151 tenant_id=tenant_id) 

152 except r.RatchetUnavailable: 

153 raise 

154 except Exception as e: 

155 logger.warning( 

156 "encrypt_for_conversation: bootstrap failed for " 

157 "(%s→%s): %s", sender_id, rid, e) 

158 continue 

159 # The responder side (sender_id > rid lex) bootstraps with 

160 # their_dh_pub=None (libsignal convention) — they can't send 

161 # before they receive. Phase 9.B integration gates this: 

162 # if a responder tries to send first, we derive a peer DH 

163 # pub from the bootstrap (deterministic for both sides) so 

164 # the first send succeeds. After the first inbound from the 

165 # peer, advance_dh_ratchet replaces it with the real ephemeral. 

166 if state.their_dh_pub is None: 

167 from . import e2e_state_repo as state_repo_mod 

168 shared = state_repo_mod._derive_initial_shared_secret( 

169 sender_id, rid, 

170 sender_key_b64, recipient_key['identity_key_b64']) 

171 _, peer_pub = state_repo_mod._derive_initial_dh_keypair( 

172 shared, rid) 

173 state = state._replace(their_dh_pub=peer_pub) 

174 new_state, envelope = r.encrypt_message(state, plaintext_bytes) 

175 state_repo.save(db, conv_id, sender_id, rid, new_state, 

176 tenant_id=tenant_id) 

177 # Record the envelope for the recipient. The ratchet header 

178 # carries the our_pub + idx + nonce so the receiver can 

179 # rebuild the envelope dict from the wire. We use the 

180 # serialize_envelope wire format for this. 

181 wire = r.serialize_envelope(envelope) 

182 ct_b64 = base64.b64encode(wire).decode('ascii') 

183 try: 

184 E2EKeyService.record_envelope( 

185 db, message_id, rid, ct_b64, 

186 tenant_id=tenant_id) 

187 delivered.append(rid) 

188 except Exception as e: 

189 logger.warning( 

190 "encrypt_for_conversation: record_envelope failed for " 

191 "(%s, %s): %s", message_id, rid, e) 

192 return delivered 

193 

194 

195# ── Decrypt path ──────────────────────────────────────────────────── 

196 

197 

198def decrypt_for_recipient(db, conv_id: str, message_id: str, 

199 recipient_id: str, 

200 *, 

201 tenant_id: Optional[str] = None 

202 ) -> Optional[str]: 

203 """Fetch + decrypt the envelope addressed to `recipient_id` for 

204 `message_id`. Returns the plaintext, or None if there's no 

205 envelope (recipient joined after the send, or sender skipped them 

206 because they had no published key at send-time). 

207 

208 Caller (ConversationService.list_messages) substitutes the 

209 decrypted plaintext into the message dict ONLY for the requesting 

210 user — other members' list_messages calls fetch their own 

211 envelopes; the stored `messages.content` placeholder is what 

212 every non-recipient observer sees. 

213 """ 

214 r._require_crypto() 

215 env = E2EKeyService.fetch_envelope( 

216 db, message_id, recipient_id, tenant_id=tenant_id) 

217 if env is None: 

218 return None 

219 try: 

220 wire = base64.b64decode(env['ciphertext_b64'].encode('ascii')) 

221 envelope_dict = r.deserialize_envelope(wire) 

222 except Exception as e: 

223 logger.warning( 

224 "decrypt_for_recipient: malformed envelope for " 

225 "(%s, %s): %s", message_id, recipient_id, e) 

226 return None 

227 # The sender's user_id is the OTHER end of the recipient_id 

228 # pair in this DM. We look it up via the conversation_keys 

229 # table — every published identity has a user_id. For DMs 

230 # there's exactly one peer. For groups (out of scope for 9.B 

231 # but we keep the shape forward-compatible) the sender_pub in 

232 # the envelope identifies which peer. 

233 sender_id = _resolve_sender(db, conv_id, recipient_id, 

234 envelope_dict['our_pub'], 

235 tenant_id=tenant_id) 

236 if sender_id is None: 

237 logger.warning( 

238 "decrypt_for_recipient: cannot resolve sender for " 

239 "envelope %s", message_id) 

240 return None 

241 state = state_repo.load(db, conv_id, recipient_id, sender_id) 

242 if state is None: 

243 # First inbound from this peer — bootstrap responder-side. 

244 sender_keys = [ 

245 k for k in _list_active_keys(db, conv_id, tenant_id=tenant_id) 

246 if k['user_id'] == sender_id] 

247 recipient_keys = [ 

248 k for k in _list_active_keys(db, conv_id, tenant_id=tenant_id) 

249 if k['user_id'] == recipient_id] 

250 if not sender_keys or not recipient_keys: 

251 return None 

252 state = state_repo.bootstrap_pair( 

253 db, conv_id, recipient_id, sender_id, 

254 recipient_keys[0]['identity_key_b64'], 

255 sender_keys[0]['identity_key_b64'], 

256 tenant_id=tenant_id, 

257 is_initiator=(str(recipient_id) < str(sender_id))) 

258 try: 

259 new_state, plaintext_bytes = r.decrypt_message(state, envelope_dict) 

260 except r.RatchetReplayError: 

261 # Double-delivery — caller already saw this message; safe to 

262 # ignore. Return None so the API surface treats it like a 

263 # missing envelope. 

264 return None 

265 except Exception as e: 

266 logger.warning( 

267 "decrypt_for_recipient: decrypt failed for (%s, %s): %s", 

268 message_id, recipient_id, e) 

269 return None 

270 state_repo.save(db, conv_id, recipient_id, sender_id, new_state, 

271 tenant_id=tenant_id) 

272 try: 

273 return plaintext_bytes.decode('utf-8') 

274 except Exception: 

275 return plaintext_bytes.decode('utf-8', errors='replace') 

276 

277 

278def _resolve_sender(db, conv_id: str, recipient_id: str, 

279 sender_pub: bytes, 

280 tenant_id: Optional[str] = None) -> Optional[str]: 

281 """Find the user_id whose identity_key (base64 of sender_pub) is 

282 active in this conversation, excluding the recipient. 

283 

284 For DMs there's one such user. For groups, the sender_pub 

285 distinguishes which peer sent the envelope. 

286 

287 Falls back to "any non-recipient member" for DMs when the 

288 sender_pub doesn't match an identity key (which can happen when 

289 the ratchet has stepped past the initial DH key — sender_pub is 

290 a rotating ephemeral key, not the identity key). This fallback 

291 is safe for DMs because there's exactly one peer; groups need 

292 a richer mapping (Phase 9.D when groups land). 

293 """ 

294 keys = _list_active_keys(db, conv_id, tenant_id=tenant_id) 

295 candidates = [k for k in keys if k['user_id'] != recipient_id] 

296 if not candidates: 

297 return None 

298 # Try identity-key match first (only correct on the very first 

299 # message before any DH-step has happened). 

300 try: 

301 sender_pub_b64 = base64.b64encode(sender_pub).decode('ascii') 

302 for k in candidates: 

303 if k['identity_key_b64'] == sender_pub_b64: 

304 return k['user_id'] 

305 except Exception: 

306 pass 

307 # DM fallback: exactly one non-recipient peer. 

308 if len(candidates) == 1: 

309 return candidates[0]['user_id'] 

310 # Group case: walk ratchet_states to find a peer whose persisted 

311 # state's `their_dh_pub` matches sender_pub. Falls through to 

312 # None if nothing matches. 

313 rows = db.execute(text( 

314 "SELECT peer_id, state_json FROM ratchet_states " 

315 "WHERE conversation_id = :cid AND user_id = :rid"), 

316 {'cid': conv_id, 'rid': recipient_id} 

317 ).fetchall() 

318 for peer_id, blob in rows: 

319 try: 

320 st = state_repo.deserialize_state(blob) 

321 if st.their_dh_pub == sender_pub: 

322 return peer_id 

323 except Exception: 

324 continue 

325 return None 

326 

327 

328__all__ = [ 

329 'should_encrypt', 

330 'encrypt_for_conversation', 

331 'decrypt_for_recipient', 

332]