Coverage for integrations / social / e2e_state_repo.py: 0.0%
90 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Phase 9.B Double Ratchet state persistence.
4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9.
6Bridges the stateless ratchet primitives in e2e_ratchet.py with the
7v52 ratchet_states table. Handles:
9 - load(db, conv_id, user_id, peer_id) → RatchetState | None
10 - save(db, conv_id, user_id, peer_id, state) → upserts the row
11 - bootstrap(db, conv_id, user_id, peer_id, identity_keys) →
12 first-time init via a deterministic shared secret derived from
13 both members' published identity keys. This is a STAND-IN for
14 the proper X3DH handshake (Phase 9.C) — flagged inline so it's
15 not mistaken for production-grade key agreement.
17Wire format: state_json holds the JSON-encoded RatchetState with all
18`bytes` fields base64-encoded (same convention as conversation_keys
19+ message_envelopes — TEXT columns travel cleanly across SQLite,
20Postgres, MySQL). The skipped_keys cache (Dict[(bytes,int)→bytes])
21serializes to a list of {peer_b64, idx, key_b64} triples so JSON's
22string-keys-only constraint isn't an issue.
24Transport: this module is pure DB persistence + serialization. No
25WAMP, no PeerLink, no notification fan-out — those happen in
26ConversationService.send_message which CALLS this module. Keeps
27the transport selection in MessageBus.publish per Plan R.8.
28"""
30from __future__ import annotations
32import base64
33import hashlib
34import json
35import logging
36import uuid
37from typing import Any, Dict, List, Optional, Tuple
39from sqlalchemy import text
40from sqlalchemy.exc import IntegrityError
42from . import e2e_ratchet as r
44logger = logging.getLogger('hevolve_social')
47class E2EStateError(Exception):
48 """Raised for state-repo failures. Caller maps to 4xx."""
51# ── Serialization ──────────────────────────────────────────────────
54def _b64(b: Optional[bytes]) -> Optional[str]:
55 if b is None:
56 return None
57 return base64.b64encode(b).decode('ascii')
60def _unb64(s: Optional[str]) -> Optional[bytes]:
61 if s is None:
62 return None
63 return base64.b64decode(s.encode('ascii'))
66def serialize_state(state: r.RatchetState) -> str:
67 """RatchetState → JSON string, all bytes base64-encoded.
69 The skipped_keys dict has tuple keys (bytes, int) which JSON
70 can't represent natively — we flatten to a list-of-triples.
71 """
72 skipped = [
73 {'peer_b64': _b64(peer), 'idx': idx, 'key_b64': _b64(msg_key)}
74 for (peer, idx), msg_key in state.skipped_keys.items()
75 ]
76 return json.dumps({
77 'version': 1,
78 'root_key': _b64(state.root_key),
79 'sending_chain_key': _b64(state.sending_chain_key),
80 'sending_index': state.sending_index,
81 'receiving_chain_key': _b64(state.receiving_chain_key),
82 'receiving_index': state.receiving_index,
83 'our_dh_priv': _b64(state.our_dh_priv),
84 'our_dh_pub': _b64(state.our_dh_pub),
85 'their_dh_pub': _b64(state.their_dh_pub),
86 'skipped_keys': skipped,
87 })
90def deserialize_state(blob: str) -> r.RatchetState:
91 """JSON string → RatchetState, all bytes decoded."""
92 if not blob:
93 raise E2EStateError("empty state blob")
94 try:
95 d = json.loads(blob)
96 except Exception as e:
97 raise E2EStateError(f"corrupt state JSON: {e}")
98 if not isinstance(d, dict) or d.get('version') != 1:
99 raise E2EStateError(
100 f"unknown state version: {d.get('version') if isinstance(d, dict) else None}")
101 skipped_dict: Dict[Tuple[bytes, int], bytes] = {}
102 for entry in d.get('skipped_keys', []):
103 peer = _unb64(entry.get('peer_b64'))
104 idx = int(entry.get('idx', 0))
105 msg_key = _unb64(entry.get('key_b64'))
106 if peer is not None and msg_key is not None:
107 skipped_dict[(peer, idx)] = msg_key
108 return r.RatchetState(
109 root_key=_unb64(d['root_key']) or b'',
110 sending_chain_key=_unb64(d['sending_chain_key']) or b'',
111 sending_index=int(d.get('sending_index', 0)),
112 receiving_chain_key=_unb64(d['receiving_chain_key']) or b'',
113 receiving_index=int(d.get('receiving_index', 0)),
114 our_dh_priv=_unb64(d['our_dh_priv']) or b'',
115 our_dh_pub=_unb64(d['our_dh_pub']) or b'',
116 their_dh_pub=_unb64(d.get('their_dh_pub')),
117 skipped_keys=skipped_dict,
118 )
121# ── DB load / save ─────────────────────────────────────────────────
124def load(db, conversation_id: str, user_id: str,
125 peer_id: str) -> Optional[r.RatchetState]:
126 """Fetch the persisted state for (conv, user, peer). Returns
127 None if no row exists yet — caller decides whether to bootstrap
128 or refuse the encrypt."""
129 row = db.execute(text(
130 "SELECT state_json FROM ratchet_states "
131 "WHERE conversation_id = :cid AND user_id = :uid "
132 "AND peer_id = :pid"),
133 {'cid': conversation_id, 'uid': user_id, 'pid': peer_id}
134 ).fetchone()
135 if row is None:
136 return None
137 return deserialize_state(row[0])
140def save(db, conversation_id: str, user_id: str, peer_id: str,
141 state: r.RatchetState,
142 tenant_id: Optional[str] = None) -> None:
143 """Upsert the persisted state. Idempotent — row keyed on the
144 UNIQUE(conversation_id, user_id, peer_id) index from v52."""
145 blob = serialize_state(state)
146 # Try INSERT first; on conflict UPDATE. Portable across SQLite,
147 # Postgres, MySQL via the same dialect-branch pattern that
148 # _ensure_member uses in conversation_service.py.
149 dialect = db.bind.dialect.name if db.bind is not None else 'sqlite'
150 if dialect == 'sqlite':
151 stmt = (
152 "INSERT INTO ratchet_states "
153 "(id, tenant_id, conversation_id, user_id, peer_id, "
154 " state_json, created_at, updated_at) "
155 "VALUES (:id, :tid, :cid, :uid, :pid, :blob, "
156 " CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) "
157 "ON CONFLICT(conversation_id, user_id, peer_id) DO UPDATE "
158 "SET state_json = excluded.state_json, "
159 " updated_at = CURRENT_TIMESTAMP")
160 else:
161 # Postgres + MySQL both accept this ON CONFLICT shape when
162 # the unique index is defined — same approach v51 takes.
163 stmt = (
164 "INSERT INTO ratchet_states "
165 "(id, tenant_id, conversation_id, user_id, peer_id, "
166 " state_json, created_at, updated_at) "
167 "VALUES (:id, :tid, :cid, :uid, :pid, :blob, "
168 " CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) "
169 "ON CONFLICT (conversation_id, user_id, peer_id) DO UPDATE "
170 "SET state_json = EXCLUDED.state_json, "
171 " updated_at = CURRENT_TIMESTAMP")
172 try:
173 db.execute(text(stmt), {
174 'id': str(uuid.uuid4()), 'tid': tenant_id,
175 'cid': conversation_id, 'uid': user_id, 'pid': peer_id,
176 'blob': blob})
177 except IntegrityError as e:
178 raise E2EStateError(f"could not save ratchet state: {e}")
181def delete_for_conversation(db, conversation_id: str) -> int:
182 """Wipe ratchet state for every pair in the conversation. Used
183 when a conversation is fully closed/deleted, or when a member
184 rotates their identity key (forcing a fresh bootstrap). Returns
185 rowcount for caller logging."""
186 result = db.execute(text(
187 "DELETE FROM ratchet_states WHERE conversation_id = :cid"),
188 {'cid': conversation_id})
189 return result.rowcount or 0
192# ── Bootstrap (Phase 9.B placeholder for X3DH) ─────────────────────
195def _sorted_pair_id(a: str, b: str) -> bytes:
196 """Conversation-pair identifier, order-independent."""
197 parts = sorted([str(a).encode('utf-8'), str(b).encode('utf-8')])
198 return parts[0] + b'|' + parts[1]
201def _derive_initial_shared_secret(user_id: str, peer_id: str,
202 user_identity_b64: str,
203 peer_identity_b64: str) -> bytes:
204 """STAND-IN for X3DH (Phase 9.C).
206 Derives a deterministic 32-byte secret from the sorted concat of
207 BOTH party IDs and BOTH published identity keys. Both sides
208 compute the same value independently from data already in
209 conversation_keys — no out-of-band exchange needed.
211 SECURITY CAVEAT: This bootstrap is NOT forward-secret across
212 the FIRST exchange — every input is public. The Double
213 Ratchet's DH-step ratchet recovers post-compromise security
214 from message #2 onward (each send rotates DH pubkeys), but
215 message #1 is vulnerable if a future X3DH-replacement re-derives
216 the same shared secret. Phase 9.C replaces this with proper
217 X3DH using signed prekeys + one-time prekeys, which DOES give
218 first-message forward secrecy.
219 """
220 pair = _sorted_pair_id(user_id, peer_id)
221 keys = sorted([user_identity_b64.encode('ascii'),
222 peer_identity_b64.encode('ascii')])
223 raw = (b'hevolve-e2e-bootstrap-v9.B|'
224 + pair + b'|' + keys[0] + b'|' + keys[1])
225 return hashlib.sha256(raw).digest()
228def _derive_initial_dh_keypair(shared_secret: bytes,
229 party_id: str) -> Tuple[bytes, bytes]:
230 """STAND-IN for X3DH (Phase 9.C).
232 Both parties derive each other's bootstrap X25519 keypair from
233 `shared_secret + party_id` so DH symmetry holds in the FIRST
234 encrypt: alice computes DH(her_priv, bob_pub) and bob computes
235 DH(his_priv, alice_pub) — by ECDH symmetry these match,
236 establishing alice's sending_chain == bob's receiving_chain.
238 SECURITY CAVEAT: The derived `priv` is *known* to the peer (it's
239 derived from public inputs). This bootstrap protects against
240 OUTSIDE attackers who don't know the shared_secret seed but
241 NOT against the peer themselves — there's no peer impersonation
242 resistance until X3DH lands. This is acceptable because the
243 PEER is the legitimate other end of the conversation; a peer
244 who wants to read their own traffic doesn't need an exploit.
245 """
246 r._require_crypto()
247 raw = (b'hevolve-bootstrap-dh-v9.B|' + shared_secret
248 + b'|' + str(party_id).encode('utf-8'))
249 # X25519 accepts any 32 random-looking bytes as priv (it clamps
250 # internally). SHA-256 gives us 32.
251 priv_bytes = hashlib.sha256(raw).digest()
252 # Round-trip through the X25519 API to recover the canonical
253 # form + derive the matching pub.
254 from cryptography.hazmat.primitives.asymmetric.x25519 import (
255 X25519PrivateKey,
256 )
257 from cryptography.hazmat.primitives import serialization
258 priv = X25519PrivateKey.from_private_bytes(priv_bytes)
259 canonical_priv = priv.private_bytes(
260 encoding=serialization.Encoding.Raw,
261 format=serialization.PrivateFormat.Raw,
262 encryption_algorithm=serialization.NoEncryption(),
263 )
264 pub = priv.public_key().public_bytes(
265 encoding=serialization.Encoding.Raw,
266 format=serialization.PublicFormat.Raw,
267 )
268 return canonical_priv, pub
271def bootstrap_pair(db, conversation_id: str, user_id: str,
272 peer_id: str,
273 user_identity_b64: str, peer_identity_b64: str,
274 *,
275 tenant_id: Optional[str] = None,
276 is_initiator: bool) -> r.RatchetState:
277 """Initialize a fresh ratchet state for the (user, peer) pair.
279 `is_initiator` follows the libsignal convention: ONE side starts
280 with their_dh_pub set to the peer's bootstrap pub (so they can
281 send first); the other side starts with their_dh_pub=None and
282 advances on their first inbound. Both sides derive their
283 bootstrap DH keypair deterministically from shared_secret +
284 party_id so DH symmetry establishes a common chain key
285 (init_ratchet on the initiator side derives sending_chain via
286 DH(our_priv, their_pub); the responder's first
287 advance_dh_ratchet derives receiving_chain via DH(our_priv,
288 sender_pub) — these match by ECDH).
290 For DMs the caller picks initiator = `min(user_id, peer_id)` so
291 both sides agree without coordination.
292 """
293 r._require_crypto()
294 shared = _derive_initial_shared_secret(
295 user_id, peer_id, user_identity_b64, peer_identity_b64)
296 # Bootstrap DH keypairs for BOTH parties — both sides compute
297 # them so each knows the other's pub.
298 our_priv, our_pub = _derive_initial_dh_keypair(shared, user_id)
299 _, peer_pub = _derive_initial_dh_keypair(shared, peer_id)
300 their_dh_pub = peer_pub if is_initiator else None
301 state = r.init_ratchet(
302 shared_secret=shared,
303 our_dh_priv=our_priv, our_dh_pub=our_pub,
304 their_dh_pub=their_dh_pub)
305 save(db, conversation_id, user_id, peer_id, state,
306 tenant_id=tenant_id)
307 return state
310def load_or_bootstrap(db, conversation_id: str, user_id: str,
311 peer_id: str,
312 user_identity_b64: str, peer_identity_b64: str,
313 *,
314 tenant_id: Optional[str] = None) -> r.RatchetState:
315 """Convenience: load existing state, or bootstrap on first use.
316 Initiator selection is deterministic (lexicographic min)."""
317 existing = load(db, conversation_id, user_id, peer_id)
318 if existing is not None:
319 return existing
320 is_initiator = (str(user_id) < str(peer_id))
321 return bootstrap_pair(
322 db, conversation_id, user_id, peer_id,
323 user_identity_b64, peer_identity_b64,
324 tenant_id=tenant_id, is_initiator=is_initiator)
327__all__ = [
328 'E2EStateError',
329 'serialize_state', 'deserialize_state',
330 'load', 'save', 'delete_for_conversation',
331 'bootstrap_pair', 'load_or_bootstrap',
332]