Coverage for integrations / social / e2e_dm_pipeline.py: 0.0%
110 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Phase 9.B E2E DM encrypt/decrypt pipeline.
4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9.
6Glues the Double Ratchet primitives (e2e_ratchet.py), the per-pair
7state repo (e2e_state_repo.py), and the envelope storage
8(e2e_key_service.py) into a two-function pipeline:
10 - encrypt_for_conversation: sender encrypts plaintext into one
11 envelope per active recipient member, advancing each pair's
12 ratchet state. Stores envelopes via E2EKeyService.record_envelope.
14 - decrypt_for_recipient: receiver fetches their envelope for the
15 message_id, decrypts it via their stored ratchet state, advances
16 state, returns plaintext.
18When the `e2e_dms` feature flag is OFF or a conversation's
19`settings.e2e_enabled` is not True, this module's hot path is never
20called — ConversationService.send_message stores plaintext as
21before. Zero regression on the flag-off path.
23Initial-secret bootstrap is the deterministic placeholder in
24e2e_state_repo._derive_initial_shared_secret; X3DH (Phase 9.C)
25will replace it without API change at this layer.
26"""
28from __future__ import annotations
30import base64
31import json
32import logging
33from typing import Any, Dict, List, Optional, Tuple
35from sqlalchemy import text
37from . import e2e_ratchet as r
38from . import e2e_state_repo as state_repo
39from .e2e_key_service import E2EKeyService
41logger = logging.getLogger('hevolve_social')
44# ── Flag / settings resolution ──────────────────────────────────────
47def _conv_settings(db, conv_id: str) -> Dict[str, Any]:
48 row = db.execute(text(
49 "SELECT settings FROM conversations WHERE id = :cid"),
50 {'cid': conv_id}
51 ).fetchone()
52 if row is None or row[0] is None:
53 return {}
54 try:
55 parsed = json.loads(row[0])
56 return parsed if isinstance(parsed, dict) else {}
57 except Exception:
58 return {}
61def _flag_on() -> bool:
62 """Server-side `e2e_dms` flag from g.feature_flags. Defaults
63 False — so opt-in via the explicit flag, not just by setting
64 the conversation's settings.e2e_enabled (which a malicious client
65 could try to spoof in a JSON payload)."""
66 try:
67 from flask import g, has_request_context
68 if has_request_context():
69 flags = getattr(g, 'feature_flags', {}) or {}
70 return bool(flags.get('e2e_dms', False))
71 except Exception:
72 pass
73 return False
76def should_encrypt(db, conv_id: str) -> bool:
77 """Both flag AND settings must agree. Either off → plaintext.
79 Defense in depth: a malicious client editing the conversation
80 settings can't force encryption (the server flag gates the path);
81 a server with the flag on but a conversation that hasn't opted in
82 still stores plaintext (so legacy clients keep working)."""
83 if not _flag_on():
84 return False
85 if not r._HAS_CRYPTO:
86 # Bare deploy without `cryptography` — degrade to plaintext.
87 # Logged once at warn so operators notice the gap.
88 logger.warning(
89 "e2e_dms flag is on but `cryptography` package is missing; "
90 "falling back to plaintext storage")
91 return False
92 settings = _conv_settings(db, conv_id)
93 return bool(settings.get('e2e_enabled', False))
96# ── Recipient resolution ────────────────────────────────────────────
99def _list_active_keys(db, conv_id: str,
100 tenant_id: Optional[str] = None
101 ) -> List[Dict[str, Any]]:
102 """Active member identity bundles for the conversation. Only
103 members who've published a key get an envelope — list_members
104 _without_keys is the inverse for caller UI warnings. Phase 9.A
105 schema already enforces UNIQUE active key per (conv, user)."""
106 return E2EKeyService.get_active_keys(db, conv_id, tenant_id=tenant_id)
109# ── Encrypt path ────────────────────────────────────────────────────
112def encrypt_for_conversation(db, conv_id: str, sender_id: str,
113 message_id: str, plaintext: str,
114 *,
115 tenant_id: Optional[str] = None
116 ) -> List[str]:
117 """Encrypt `plaintext` once per active recipient. Stores one
118 envelope per recipient via E2EKeyService.record_envelope, advances
119 each pair's ratchet state. Returns the list of recipient_ids
120 that actually got an envelope (excludes sender + members without
121 a published key).
123 The caller (ConversationService.send_message) replaces the
124 plaintext content with a placeholder ('[encrypted]') so legacy
125 readers / non-member admin tooling don't crash on null.
127 Raises RatchetUnavailable if `cryptography` isn't installed —
128 callers SHOULD pre-check via should_encrypt() so this never fires
129 in practice.
130 """
131 r._require_crypto()
132 keys = _list_active_keys(db, conv_id, tenant_id=tenant_id)
133 sender_keys = [k for k in keys if k['user_id'] == sender_id]
134 if not sender_keys:
135 raise r.RatchetError(
136 f"sender {sender_id} has no published identity key — call "
137 f"E2EKeyService.publish_identity_key before sending")
138 sender_key_b64 = sender_keys[0]['identity_key_b64']
140 plaintext_bytes = plaintext.encode('utf-8')
141 delivered: List[str] = []
142 for recipient_key in keys:
143 rid = recipient_key['user_id']
144 if rid == sender_id:
145 continue
146 # Bootstrap (or load) the SENDER's view of the (sender→recipient) pair.
147 try:
148 state = state_repo.load_or_bootstrap(
149 db, conv_id, sender_id, rid,
150 sender_key_b64, recipient_key['identity_key_b64'],
151 tenant_id=tenant_id)
152 except r.RatchetUnavailable:
153 raise
154 except Exception as e:
155 logger.warning(
156 "encrypt_for_conversation: bootstrap failed for "
157 "(%s→%s): %s", sender_id, rid, e)
158 continue
159 # The responder side (sender_id > rid lex) bootstraps with
160 # their_dh_pub=None (libsignal convention) — they can't send
161 # before they receive. Phase 9.B integration gates this:
162 # if a responder tries to send first, we derive a peer DH
163 # pub from the bootstrap (deterministic for both sides) so
164 # the first send succeeds. After the first inbound from the
165 # peer, advance_dh_ratchet replaces it with the real ephemeral.
166 if state.their_dh_pub is None:
167 from . import e2e_state_repo as state_repo_mod
168 shared = state_repo_mod._derive_initial_shared_secret(
169 sender_id, rid,
170 sender_key_b64, recipient_key['identity_key_b64'])
171 _, peer_pub = state_repo_mod._derive_initial_dh_keypair(
172 shared, rid)
173 state = state._replace(their_dh_pub=peer_pub)
174 new_state, envelope = r.encrypt_message(state, plaintext_bytes)
175 state_repo.save(db, conv_id, sender_id, rid, new_state,
176 tenant_id=tenant_id)
177 # Record the envelope for the recipient. The ratchet header
178 # carries the our_pub + idx + nonce so the receiver can
179 # rebuild the envelope dict from the wire. We use the
180 # serialize_envelope wire format for this.
181 wire = r.serialize_envelope(envelope)
182 ct_b64 = base64.b64encode(wire).decode('ascii')
183 try:
184 E2EKeyService.record_envelope(
185 db, message_id, rid, ct_b64,
186 tenant_id=tenant_id)
187 delivered.append(rid)
188 except Exception as e:
189 logger.warning(
190 "encrypt_for_conversation: record_envelope failed for "
191 "(%s, %s): %s", message_id, rid, e)
192 return delivered
195# ── Decrypt path ────────────────────────────────────────────────────
198def decrypt_for_recipient(db, conv_id: str, message_id: str,
199 recipient_id: str,
200 *,
201 tenant_id: Optional[str] = None
202 ) -> Optional[str]:
203 """Fetch + decrypt the envelope addressed to `recipient_id` for
204 `message_id`. Returns the plaintext, or None if there's no
205 envelope (recipient joined after the send, or sender skipped them
206 because they had no published key at send-time).
208 Caller (ConversationService.list_messages) substitutes the
209 decrypted plaintext into the message dict ONLY for the requesting
210 user — other members' list_messages calls fetch their own
211 envelopes; the stored `messages.content` placeholder is what
212 every non-recipient observer sees.
213 """
214 r._require_crypto()
215 env = E2EKeyService.fetch_envelope(
216 db, message_id, recipient_id, tenant_id=tenant_id)
217 if env is None:
218 return None
219 try:
220 wire = base64.b64decode(env['ciphertext_b64'].encode('ascii'))
221 envelope_dict = r.deserialize_envelope(wire)
222 except Exception as e:
223 logger.warning(
224 "decrypt_for_recipient: malformed envelope for "
225 "(%s, %s): %s", message_id, recipient_id, e)
226 return None
227 # The sender's user_id is the OTHER end of the recipient_id
228 # pair in this DM. We look it up via the conversation_keys
229 # table — every published identity has a user_id. For DMs
230 # there's exactly one peer. For groups (out of scope for 9.B
231 # but we keep the shape forward-compatible) the sender_pub in
232 # the envelope identifies which peer.
233 sender_id = _resolve_sender(db, conv_id, recipient_id,
234 envelope_dict['our_pub'],
235 tenant_id=tenant_id)
236 if sender_id is None:
237 logger.warning(
238 "decrypt_for_recipient: cannot resolve sender for "
239 "envelope %s", message_id)
240 return None
241 state = state_repo.load(db, conv_id, recipient_id, sender_id)
242 if state is None:
243 # First inbound from this peer — bootstrap responder-side.
244 sender_keys = [
245 k for k in _list_active_keys(db, conv_id, tenant_id=tenant_id)
246 if k['user_id'] == sender_id]
247 recipient_keys = [
248 k for k in _list_active_keys(db, conv_id, tenant_id=tenant_id)
249 if k['user_id'] == recipient_id]
250 if not sender_keys or not recipient_keys:
251 return None
252 state = state_repo.bootstrap_pair(
253 db, conv_id, recipient_id, sender_id,
254 recipient_keys[0]['identity_key_b64'],
255 sender_keys[0]['identity_key_b64'],
256 tenant_id=tenant_id,
257 is_initiator=(str(recipient_id) < str(sender_id)))
258 try:
259 new_state, plaintext_bytes = r.decrypt_message(state, envelope_dict)
260 except r.RatchetReplayError:
261 # Double-delivery — caller already saw this message; safe to
262 # ignore. Return None so the API surface treats it like a
263 # missing envelope.
264 return None
265 except Exception as e:
266 logger.warning(
267 "decrypt_for_recipient: decrypt failed for (%s, %s): %s",
268 message_id, recipient_id, e)
269 return None
270 state_repo.save(db, conv_id, recipient_id, sender_id, new_state,
271 tenant_id=tenant_id)
272 try:
273 return plaintext_bytes.decode('utf-8')
274 except Exception:
275 return plaintext_bytes.decode('utf-8', errors='replace')
278def _resolve_sender(db, conv_id: str, recipient_id: str,
279 sender_pub: bytes,
280 tenant_id: Optional[str] = None) -> Optional[str]:
281 """Find the user_id whose identity_key (base64 of sender_pub) is
282 active in this conversation, excluding the recipient.
284 For DMs there's one such user. For groups, the sender_pub
285 distinguishes which peer sent the envelope.
287 Falls back to "any non-recipient member" for DMs when the
288 sender_pub doesn't match an identity key (which can happen when
289 the ratchet has stepped past the initial DH key — sender_pub is
290 a rotating ephemeral key, not the identity key). This fallback
291 is safe for DMs because there's exactly one peer; groups need
292 a richer mapping (Phase 9.D when groups land).
293 """
294 keys = _list_active_keys(db, conv_id, tenant_id=tenant_id)
295 candidates = [k for k in keys if k['user_id'] != recipient_id]
296 if not candidates:
297 return None
298 # Try identity-key match first (only correct on the very first
299 # message before any DH-step has happened).
300 try:
301 sender_pub_b64 = base64.b64encode(sender_pub).decode('ascii')
302 for k in candidates:
303 if k['identity_key_b64'] == sender_pub_b64:
304 return k['user_id']
305 except Exception:
306 pass
307 # DM fallback: exactly one non-recipient peer.
308 if len(candidates) == 1:
309 return candidates[0]['user_id']
310 # Group case: walk ratchet_states to find a peer whose persisted
311 # state's `their_dh_pub` matches sender_pub. Falls through to
312 # None if nothing matches.
313 rows = db.execute(text(
314 "SELECT peer_id, state_json FROM ratchet_states "
315 "WHERE conversation_id = :cid AND user_id = :rid"),
316 {'cid': conv_id, 'rid': recipient_id}
317 ).fetchall()
318 for peer_id, blob in rows:
319 try:
320 st = state_repo.deserialize_state(blob)
321 if st.their_dh_pub == sender_pub:
322 return peer_id
323 except Exception:
324 continue
325 return None
328__all__ = [
329 'should_encrypt',
330 'encrypt_for_conversation',
331 'decrypt_for_recipient',
332]