Coverage for integrations / social / e2e_key_service.py: 0.0%
104 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Phase 9 E2E DM key envelope service (schema-only stub).
4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9.
6This is the SCHEMA-ONLY layer of the optional E2E DM stack. The
7actual libsignal-style double-ratchet implementation lives in a
8follow-up. This module ships:
10 - publish_identity_key(user, conversation, identity_key_b64,
11 signed_prekey_b64, signed_prekey_sig_b64)
12 Member uploads their public identity bundle for a conversation.
14 - rotate_identity_key(user, conversation)
15 Soft-marks the existing key as `rotated_at = now()` and accepts
16 a fresh one via publish_identity_key.
18 - get_active_keys(conversation)
19 Returns each active member's current public bundle so a sender
20 can build envelopes.
22 - record_envelope(message_id, recipient_id, ciphertext_b64,
23 ratchet_header_b64)
24 Persists one envelope row per recipient.
26 - fetch_envelope(message_id, recipient_id)
27 Pulls the envelope a recipient should decrypt.
29What this module DOES NOT do (deferred to crypto follow-up):
30 - Generate / verify signatures
31 - Run the double-ratchet KDF chain
32 - Manage one-time prekeys (X3DH initial handshake)
33 - Key backup / escrow
35The schema is stable; the crypto can land later without further
36migration churn.
38Transport: same as the rest of HARTOS social — P2P-first via
39MessageBus.publish. Envelope distribution at message-send time
40flows through the existing Conversation send_message path; this
41module just stores the row.
42"""
44from __future__ import annotations
46import logging
47import uuid
48from typing import Any, Dict, List, Optional
50from sqlalchemy import text
51from sqlalchemy.exc import IntegrityError
53from .sync_service import _tenant_predicate
55logger = logging.getLogger('hevolve_social')
58class E2EKeyError(Exception):
59 """Raised for service-level E2E key failures. Caller maps to
60 4xx HTTP responses."""
63def _resolve_tenant(arg_tenant: Optional[str]) -> Optional[str]:
64 """Pass-5 F9 fix: prefer explicit `tenant_id` arg, fall back to
65 `g.tenant_id` from the active Flask request, then None. Closes
66 the foot-gun where a caller forgets to pass tenant_id and the
67 row gets stamped untenanted (invisible in strict mode = bricked).
68 """
69 if arg_tenant is not None:
70 return arg_tenant
71 try:
72 from flask import g, has_request_context
73 if has_request_context():
74 return getattr(g, 'tenant_id', None)
75 except Exception:
76 pass
77 return None
80class E2EKeyService:
82 @staticmethod
83 def publish_identity_key(db, conversation_id: str, user_id: str,
84 identity_key_b64: str,
85 *,
86 signed_prekey_b64: Optional[str] = None,
87 signed_prekey_signature_b64: Optional[str] = None,
88 key_algorithm: str = 'x25519-ed25519',
89 tenant_id: Optional[str] = None
90 ) -> Dict[str, Any]:
91 """Upload (or replace) this user's public identity bundle for
92 the given conversation. Idempotent on `(conversation_id,
93 user_id)` — re-publishing rotates the previous key.
95 Pass-5 F1 + F9: tenant_id is resolved from g if not passed,
96 and the rotate UPDATE is gated by tenant_id so we can't
97 accidentally rotate a row in another tenant. The INSERT is
98 wrapped in a SAVEPOINT (F2) so a concurrent publish race
99 recovers gracefully.
100 """
101 if not identity_key_b64:
102 raise E2EKeyError("identity_key_b64 required")
103 tid = _resolve_tenant(tenant_id)
104 tenant_sql, tenant_params = _tenant_predicate(tid)
106 # Soft-rotate existing active row(s). Tenant clause prevents
107 # cross-tenant mutation if the caller knows the (conv, user)
108 # but lives in a different tenant.
109 rotate_params = {'cid': conversation_id, 'uid': user_id}
110 rotate_params.update(tenant_params)
111 db.execute(text(
112 "UPDATE conversation_keys SET rotated_at = CURRENT_TIMESTAMP "
113 "WHERE conversation_id = :cid AND user_id = :uid "
114 "AND rotated_at IS NULL"
115 f"{tenant_sql}"),
116 rotate_params)
118 kid = str(uuid.uuid4())
119 try:
120 with db.begin_nested():
121 db.execute(text(
122 "INSERT INTO conversation_keys "
123 "(id, tenant_id, conversation_id, user_id, "
124 " identity_key_b64, signed_prekey_b64, "
125 " signed_prekey_signature_b64, key_algorithm) "
126 "VALUES (:id, :tid, :cid, :uid, :ik, :spk, :sig, :algo)"),
127 {'id': kid, 'tid': tid, 'cid': conversation_id,
128 'uid': user_id, 'ik': identity_key_b64,
129 'spk': signed_prekey_b64,
130 'sig': signed_prekey_signature_b64,
131 'algo': key_algorithm})
132 except IntegrityError as e:
133 # Pass-5 F2: concurrent publish raced us — retry the
134 # rotate (the other publisher's row is now active) and
135 # re-INSERT. One retry is enough; if it races again,
136 # surface as E2EKeyError so the caller can choose.
137 logger.info(
138 "publish_identity_key concurrent race for (conv=%s,"
139 " user=%s); retrying rotate+insert", conversation_id, user_id)
140 db.execute(text(
141 "UPDATE conversation_keys SET rotated_at = CURRENT_TIMESTAMP "
142 "WHERE conversation_id = :cid AND user_id = :uid "
143 "AND rotated_at IS NULL"
144 f"{tenant_sql}"),
145 rotate_params)
146 try:
147 db.execute(text(
148 "INSERT INTO conversation_keys "
149 "(id, tenant_id, conversation_id, user_id, "
150 " identity_key_b64, signed_prekey_b64, "
151 " signed_prekey_signature_b64, key_algorithm) "
152 "VALUES (:id, :tid, :cid, :uid, :ik, :spk, :sig, :algo)"),
153 {'id': kid, 'tid': tid, 'cid': conversation_id,
154 'uid': user_id, 'ik': identity_key_b64,
155 'spk': signed_prekey_b64,
156 'sig': signed_prekey_signature_b64,
157 'algo': key_algorithm})
158 except IntegrityError as e2:
159 raise E2EKeyError(
160 f"could not publish identity key: {e2}")
161 db.commit()
162 return E2EKeyService._key_dict(db, kid)
164 @staticmethod
165 def rotate_identity_key(db, conversation_id: str, user_id: str,
166 *,
167 tenant_id: Optional[str] = None) -> bool:
168 """Soft-mark the user's current key as rotated. Returns True
169 iff a row was actually rotated. Tenant-scoped (Pass-5 F1)."""
170 tid = _resolve_tenant(tenant_id)
171 tenant_sql, tenant_params = _tenant_predicate(tid)
172 params = {'cid': conversation_id, 'uid': user_id}
173 params.update(tenant_params)
174 result = db.execute(text(
175 "UPDATE conversation_keys SET rotated_at = CURRENT_TIMESTAMP "
176 "WHERE conversation_id = :cid AND user_id = :uid "
177 "AND rotated_at IS NULL"
178 f"{tenant_sql}"),
179 params)
180 db.commit()
181 return (result.rowcount or 0) > 0
183 @staticmethod
184 def get_active_keys(db, conversation_id: str,
185 *,
186 tenant_id: Optional[str] = None
187 ) -> List[Dict[str, Any]]:
188 """Return one dict per active member key. A sender uses
189 these to build per-recipient envelopes. Members without an
190 active key are SKIPPED — see list_members_without_keys (F8
191 follow-up) for the inverse list. Tenant-scoped (Pass-5 F1)."""
192 tid = _resolve_tenant(tenant_id)
193 tenant_sql, tenant_params = _tenant_predicate(tid)
194 params = {'cid': conversation_id}
195 params.update(tenant_params)
196 rows = db.execute(text(
197 "SELECT id, user_id, identity_key_b64, signed_prekey_b64, "
198 " signed_prekey_signature_b64, key_algorithm, "
199 " created_at "
200 "FROM conversation_keys "
201 "WHERE conversation_id = :cid AND rotated_at IS NULL"
202 f"{tenant_sql} "
203 "ORDER BY created_at ASC"),
204 params
205 ).fetchall()
206 return [
207 {'id': r[0], 'user_id': r[1],
208 'identity_key_b64': r[2],
209 'signed_prekey_b64': r[3],
210 'signed_prekey_signature_b64': r[4],
211 'key_algorithm': r[5],
212 'created_at': str(r[6]) if r[6] else None}
213 for r in rows
214 ]
216 @staticmethod
217 def list_members_without_keys(db, conversation_id: str,
218 *,
219 tenant_id: Optional[str] = None
220 ) -> List[str]:
221 """Pass-5 F8 helper: return user_ids of conversation members
222 who have NOT published an active identity key. A sender
223 building envelopes uses this to surface a UI warning ("3
224 members can't read this message yet") before sending.
226 Returns user_ids only — caller joins to user metadata as
227 needed. Tenant-scoped via memberships table.
228 """
229 tid = _resolve_tenant(tenant_id)
230 tenant_sql_m, tenant_params_m = _tenant_predicate(tid, alias='m')
231 tenant_sql_k, tenant_params_k = _tenant_predicate(tid, alias='k')
232 params = {'cid': conversation_id}
233 params.update(tenant_params_m)
234 # Note: tenant params for both aliases share `:tid` — that's
235 # the same param, fine to merge. But be explicit:
236 for k, v in tenant_params_k.items():
237 params[k] = v
238 rows = db.execute(text(
239 "SELECT m.member_id FROM memberships m "
240 "LEFT JOIN conversation_keys k "
241 " ON k.conversation_id = m.parent_id "
242 " AND k.user_id = m.member_id "
243 " AND k.rotated_at IS NULL"
244 f"{tenant_sql_k} "
245 "WHERE m.parent_kind = 'conversation' "
246 "AND m.parent_id = :cid "
247 "AND k.id IS NULL"
248 f"{tenant_sql_m}"),
249 params
250 ).fetchall()
251 return [r[0] for r in rows]
253 @staticmethod
254 def record_envelope(db, message_id: str, recipient_id: str,
255 ciphertext_b64: str,
256 *,
257 ratchet_header_b64: Optional[str] = None,
258 tenant_id: Optional[str] = None
259 ) -> Dict[str, Any]:
260 """Persist a per-recipient encrypted payload. Append-only;
261 UNIQUE(message_id, recipient_id) enforces one envelope per
262 (message, recipient).
264 Pass-5 F7: a duplicate (message, recipient) raises a clean
265 E2EKeyError instead of leaking SQLAlchemy's IntegrityError.
266 Pass-5 F1: tenant_id resolves from g if not passed.
267 """
268 if not ciphertext_b64:
269 raise E2EKeyError("ciphertext_b64 required")
270 tid = _resolve_tenant(tenant_id)
271 eid = str(uuid.uuid4())
272 try:
273 with db.begin_nested():
274 db.execute(text(
275 "INSERT INTO message_envelopes "
276 "(id, tenant_id, message_id, recipient_id, "
277 " ciphertext_b64, ratchet_header_b64) "
278 "VALUES (:id, :tid, :mid, :rid, :ct, :hdr)"),
279 {'id': eid, 'tid': tid, 'mid': message_id,
280 'rid': recipient_id, 'ct': ciphertext_b64,
281 'hdr': ratchet_header_b64})
282 except IntegrityError:
283 raise E2EKeyError(
284 "envelope already recorded for this (message, recipient)")
285 db.commit()
286 return E2EKeyService._envelope_dict(db, eid)
288 @staticmethod
289 def fetch_envelope(db, message_id: str, recipient_id: str,
290 *,
291 tenant_id: Optional[str] = None
292 ) -> Optional[Dict[str, Any]]:
293 """Return the envelope `(message_id, recipient_id)` should
294 decrypt, or None if no envelope exists for that pair (e.g.
295 the recipient joined the conversation after this message,
296 or the sender skipped them — see get_active_keys).
298 Pass-5 F1: tenant-scoped — a tenant-A user querying for a
299 tenant-B envelope by guessed id gets None, identical to
300 a missing pair (no enumeration leak)."""
301 tid = _resolve_tenant(tenant_id)
302 tenant_sql, tenant_params = _tenant_predicate(tid)
303 params = {'mid': message_id, 'rid': recipient_id}
304 params.update(tenant_params)
305 row = db.execute(text(
306 "SELECT id FROM message_envelopes "
307 "WHERE message_id = :mid AND recipient_id = :rid"
308 f"{tenant_sql}"),
309 params
310 ).fetchone()
311 if row is None:
312 return None
313 return E2EKeyService._envelope_dict(db, row[0])
315 # ── Internal helpers ────────────────────────────────────────────
317 @staticmethod
318 def _key_dict(db, key_id: str) -> Dict[str, Any]:
319 row = db.execute(text(
320 "SELECT id, conversation_id, user_id, identity_key_b64, "
321 " signed_prekey_b64, signed_prekey_signature_b64, "
322 " key_algorithm, created_at, rotated_at "
323 "FROM conversation_keys WHERE id = :id"),
324 {'id': key_id}
325 ).fetchone()
326 if not row:
327 return {}
328 return {
329 'id': row[0], 'conversation_id': row[1], 'user_id': row[2],
330 'identity_key_b64': row[3],
331 'signed_prekey_b64': row[4],
332 'signed_prekey_signature_b64': row[5],
333 'key_algorithm': row[6],
334 'created_at': str(row[7]) if row[7] else None,
335 'rotated_at': str(row[8]) if row[8] else None,
336 }
338 @staticmethod
339 def _envelope_dict(db, env_id: str) -> Dict[str, Any]:
340 row = db.execute(text(
341 "SELECT id, message_id, recipient_id, ciphertext_b64, "
342 " ratchet_header_b64, created_at "
343 "FROM message_envelopes WHERE id = :id"),
344 {'id': env_id}
345 ).fetchone()
346 if not row:
347 return {}
348 return {
349 'id': row[0], 'message_id': row[1], 'recipient_id': row[2],
350 'ciphertext_b64': row[3],
351 'ratchet_header_b64': row[4],
352 'created_at': str(row[5]) if row[5] else None,
353 }
356__all__ = ['E2EKeyService', 'E2EKeyError']