Coverage for integrations / social / e2e_key_service.py: 0.0%

104 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Phase 9 E2E DM key envelope service (schema-only stub). 

3 

4Plan reference: sunny-gliding-eich.md, Part K.4 + Phase 9. 

5 

6This is the SCHEMA-ONLY layer of the optional E2E DM stack. The 

7actual libsignal-style double-ratchet implementation lives in a 

8follow-up. This module ships: 

9 

10 - publish_identity_key(user, conversation, identity_key_b64, 

11 signed_prekey_b64, signed_prekey_sig_b64) 

12 Member uploads their public identity bundle for a conversation. 

13 

14 - rotate_identity_key(user, conversation) 

15 Soft-marks the existing key as `rotated_at = now()` and accepts 

16 a fresh one via publish_identity_key. 

17 

18 - get_active_keys(conversation) 

19 Returns each active member's current public bundle so a sender 

20 can build envelopes. 

21 

22 - record_envelope(message_id, recipient_id, ciphertext_b64, 

23 ratchet_header_b64) 

24 Persists one envelope row per recipient. 

25 

26 - fetch_envelope(message_id, recipient_id) 

27 Pulls the envelope a recipient should decrypt. 

28 

29What this module DOES NOT do (deferred to crypto follow-up): 

30 - Generate / verify signatures 

31 - Run the double-ratchet KDF chain 

32 - Manage one-time prekeys (X3DH initial handshake) 

33 - Key backup / escrow 

34 

35The schema is stable; the crypto can land later without further 

36migration churn. 

37 

38Transport: same as the rest of HARTOS social — P2P-first via 

39MessageBus.publish. Envelope distribution at message-send time 

40flows through the existing Conversation send_message path; this 

41module just stores the row. 

42""" 

43 

44from __future__ import annotations 

45 

46import logging 

47import uuid 

48from typing import Any, Dict, List, Optional 

49 

50from sqlalchemy import text 

51from sqlalchemy.exc import IntegrityError 

52 

53from .sync_service import _tenant_predicate 

54 

55logger = logging.getLogger('hevolve_social') 

56 

57 

58class E2EKeyError(Exception): 

59 """Raised for service-level E2E key failures. Caller maps to 

60 4xx HTTP responses.""" 

61 

62 

63def _resolve_tenant(arg_tenant: Optional[str]) -> Optional[str]: 

64 """Pass-5 F9 fix: prefer explicit `tenant_id` arg, fall back to 

65 `g.tenant_id` from the active Flask request, then None. Closes 

66 the foot-gun where a caller forgets to pass tenant_id and the 

67 row gets stamped untenanted (invisible in strict mode = bricked). 

68 """ 

69 if arg_tenant is not None: 

70 return arg_tenant 

71 try: 

72 from flask import g, has_request_context 

73 if has_request_context(): 

74 return getattr(g, 'tenant_id', None) 

75 except Exception: 

76 pass 

77 return None 

78 

79 

80class E2EKeyService: 

81 

82 @staticmethod 

83 def publish_identity_key(db, conversation_id: str, user_id: str, 

84 identity_key_b64: str, 

85 *, 

86 signed_prekey_b64: Optional[str] = None, 

87 signed_prekey_signature_b64: Optional[str] = None, 

88 key_algorithm: str = 'x25519-ed25519', 

89 tenant_id: Optional[str] = None 

90 ) -> Dict[str, Any]: 

91 """Upload (or replace) this user's public identity bundle for 

92 the given conversation. Idempotent on `(conversation_id, 

93 user_id)` — re-publishing rotates the previous key. 

94 

95 Pass-5 F1 + F9: tenant_id is resolved from g if not passed, 

96 and the rotate UPDATE is gated by tenant_id so we can't 

97 accidentally rotate a row in another tenant. The INSERT is 

98 wrapped in a SAVEPOINT (F2) so a concurrent publish race 

99 recovers gracefully. 

100 """ 

101 if not identity_key_b64: 

102 raise E2EKeyError("identity_key_b64 required") 

103 tid = _resolve_tenant(tenant_id) 

104 tenant_sql, tenant_params = _tenant_predicate(tid) 

105 

106 # Soft-rotate existing active row(s). Tenant clause prevents 

107 # cross-tenant mutation if the caller knows the (conv, user) 

108 # but lives in a different tenant. 

109 rotate_params = {'cid': conversation_id, 'uid': user_id} 

110 rotate_params.update(tenant_params) 

111 db.execute(text( 

112 "UPDATE conversation_keys SET rotated_at = CURRENT_TIMESTAMP " 

113 "WHERE conversation_id = :cid AND user_id = :uid " 

114 "AND rotated_at IS NULL" 

115 f"{tenant_sql}"), 

116 rotate_params) 

117 

118 kid = str(uuid.uuid4()) 

119 try: 

120 with db.begin_nested(): 

121 db.execute(text( 

122 "INSERT INTO conversation_keys " 

123 "(id, tenant_id, conversation_id, user_id, " 

124 " identity_key_b64, signed_prekey_b64, " 

125 " signed_prekey_signature_b64, key_algorithm) " 

126 "VALUES (:id, :tid, :cid, :uid, :ik, :spk, :sig, :algo)"), 

127 {'id': kid, 'tid': tid, 'cid': conversation_id, 

128 'uid': user_id, 'ik': identity_key_b64, 

129 'spk': signed_prekey_b64, 

130 'sig': signed_prekey_signature_b64, 

131 'algo': key_algorithm}) 

132 except IntegrityError as e: 

133 # Pass-5 F2: concurrent publish raced us — retry the 

134 # rotate (the other publisher's row is now active) and 

135 # re-INSERT. One retry is enough; if it races again, 

136 # surface as E2EKeyError so the caller can choose. 

137 logger.info( 

138 "publish_identity_key concurrent race for (conv=%s," 

139 " user=%s); retrying rotate+insert", conversation_id, user_id) 

140 db.execute(text( 

141 "UPDATE conversation_keys SET rotated_at = CURRENT_TIMESTAMP " 

142 "WHERE conversation_id = :cid AND user_id = :uid " 

143 "AND rotated_at IS NULL" 

144 f"{tenant_sql}"), 

145 rotate_params) 

146 try: 

147 db.execute(text( 

148 "INSERT INTO conversation_keys " 

149 "(id, tenant_id, conversation_id, user_id, " 

150 " identity_key_b64, signed_prekey_b64, " 

151 " signed_prekey_signature_b64, key_algorithm) " 

152 "VALUES (:id, :tid, :cid, :uid, :ik, :spk, :sig, :algo)"), 

153 {'id': kid, 'tid': tid, 'cid': conversation_id, 

154 'uid': user_id, 'ik': identity_key_b64, 

155 'spk': signed_prekey_b64, 

156 'sig': signed_prekey_signature_b64, 

157 'algo': key_algorithm}) 

158 except IntegrityError as e2: 

159 raise E2EKeyError( 

160 f"could not publish identity key: {e2}") 

161 db.commit() 

162 return E2EKeyService._key_dict(db, kid) 

163 

164 @staticmethod 

165 def rotate_identity_key(db, conversation_id: str, user_id: str, 

166 *, 

167 tenant_id: Optional[str] = None) -> bool: 

168 """Soft-mark the user's current key as rotated. Returns True 

169 iff a row was actually rotated. Tenant-scoped (Pass-5 F1).""" 

170 tid = _resolve_tenant(tenant_id) 

171 tenant_sql, tenant_params = _tenant_predicate(tid) 

172 params = {'cid': conversation_id, 'uid': user_id} 

173 params.update(tenant_params) 

174 result = db.execute(text( 

175 "UPDATE conversation_keys SET rotated_at = CURRENT_TIMESTAMP " 

176 "WHERE conversation_id = :cid AND user_id = :uid " 

177 "AND rotated_at IS NULL" 

178 f"{tenant_sql}"), 

179 params) 

180 db.commit() 

181 return (result.rowcount or 0) > 0 

182 

183 @staticmethod 

184 def get_active_keys(db, conversation_id: str, 

185 *, 

186 tenant_id: Optional[str] = None 

187 ) -> List[Dict[str, Any]]: 

188 """Return one dict per active member key. A sender uses 

189 these to build per-recipient envelopes. Members without an 

190 active key are SKIPPED — see list_members_without_keys (F8 

191 follow-up) for the inverse list. Tenant-scoped (Pass-5 F1).""" 

192 tid = _resolve_tenant(tenant_id) 

193 tenant_sql, tenant_params = _tenant_predicate(tid) 

194 params = {'cid': conversation_id} 

195 params.update(tenant_params) 

196 rows = db.execute(text( 

197 "SELECT id, user_id, identity_key_b64, signed_prekey_b64, " 

198 " signed_prekey_signature_b64, key_algorithm, " 

199 " created_at " 

200 "FROM conversation_keys " 

201 "WHERE conversation_id = :cid AND rotated_at IS NULL" 

202 f"{tenant_sql} " 

203 "ORDER BY created_at ASC"), 

204 params 

205 ).fetchall() 

206 return [ 

207 {'id': r[0], 'user_id': r[1], 

208 'identity_key_b64': r[2], 

209 'signed_prekey_b64': r[3], 

210 'signed_prekey_signature_b64': r[4], 

211 'key_algorithm': r[5], 

212 'created_at': str(r[6]) if r[6] else None} 

213 for r in rows 

214 ] 

215 

216 @staticmethod 

217 def list_members_without_keys(db, conversation_id: str, 

218 *, 

219 tenant_id: Optional[str] = None 

220 ) -> List[str]: 

221 """Pass-5 F8 helper: return user_ids of conversation members 

222 who have NOT published an active identity key. A sender 

223 building envelopes uses this to surface a UI warning ("3 

224 members can't read this message yet") before sending. 

225 

226 Returns user_ids only — caller joins to user metadata as 

227 needed. Tenant-scoped via memberships table. 

228 """ 

229 tid = _resolve_tenant(tenant_id) 

230 tenant_sql_m, tenant_params_m = _tenant_predicate(tid, alias='m') 

231 tenant_sql_k, tenant_params_k = _tenant_predicate(tid, alias='k') 

232 params = {'cid': conversation_id} 

233 params.update(tenant_params_m) 

234 # Note: tenant params for both aliases share `:tid` — that's 

235 # the same param, fine to merge. But be explicit: 

236 for k, v in tenant_params_k.items(): 

237 params[k] = v 

238 rows = db.execute(text( 

239 "SELECT m.member_id FROM memberships m " 

240 "LEFT JOIN conversation_keys k " 

241 " ON k.conversation_id = m.parent_id " 

242 " AND k.user_id = m.member_id " 

243 " AND k.rotated_at IS NULL" 

244 f"{tenant_sql_k} " 

245 "WHERE m.parent_kind = 'conversation' " 

246 "AND m.parent_id = :cid " 

247 "AND k.id IS NULL" 

248 f"{tenant_sql_m}"), 

249 params 

250 ).fetchall() 

251 return [r[0] for r in rows] 

252 

253 @staticmethod 

254 def record_envelope(db, message_id: str, recipient_id: str, 

255 ciphertext_b64: str, 

256 *, 

257 ratchet_header_b64: Optional[str] = None, 

258 tenant_id: Optional[str] = None 

259 ) -> Dict[str, Any]: 

260 """Persist a per-recipient encrypted payload. Append-only; 

261 UNIQUE(message_id, recipient_id) enforces one envelope per 

262 (message, recipient). 

263 

264 Pass-5 F7: a duplicate (message, recipient) raises a clean 

265 E2EKeyError instead of leaking SQLAlchemy's IntegrityError. 

266 Pass-5 F1: tenant_id resolves from g if not passed. 

267 """ 

268 if not ciphertext_b64: 

269 raise E2EKeyError("ciphertext_b64 required") 

270 tid = _resolve_tenant(tenant_id) 

271 eid = str(uuid.uuid4()) 

272 try: 

273 with db.begin_nested(): 

274 db.execute(text( 

275 "INSERT INTO message_envelopes " 

276 "(id, tenant_id, message_id, recipient_id, " 

277 " ciphertext_b64, ratchet_header_b64) " 

278 "VALUES (:id, :tid, :mid, :rid, :ct, :hdr)"), 

279 {'id': eid, 'tid': tid, 'mid': message_id, 

280 'rid': recipient_id, 'ct': ciphertext_b64, 

281 'hdr': ratchet_header_b64}) 

282 except IntegrityError: 

283 raise E2EKeyError( 

284 "envelope already recorded for this (message, recipient)") 

285 db.commit() 

286 return E2EKeyService._envelope_dict(db, eid) 

287 

288 @staticmethod 

289 def fetch_envelope(db, message_id: str, recipient_id: str, 

290 *, 

291 tenant_id: Optional[str] = None 

292 ) -> Optional[Dict[str, Any]]: 

293 """Return the envelope `(message_id, recipient_id)` should 

294 decrypt, or None if no envelope exists for that pair (e.g. 

295 the recipient joined the conversation after this message, 

296 or the sender skipped them — see get_active_keys). 

297 

298 Pass-5 F1: tenant-scoped — a tenant-A user querying for a 

299 tenant-B envelope by guessed id gets None, identical to 

300 a missing pair (no enumeration leak).""" 

301 tid = _resolve_tenant(tenant_id) 

302 tenant_sql, tenant_params = _tenant_predicate(tid) 

303 params = {'mid': message_id, 'rid': recipient_id} 

304 params.update(tenant_params) 

305 row = db.execute(text( 

306 "SELECT id FROM message_envelopes " 

307 "WHERE message_id = :mid AND recipient_id = :rid" 

308 f"{tenant_sql}"), 

309 params 

310 ).fetchone() 

311 if row is None: 

312 return None 

313 return E2EKeyService._envelope_dict(db, row[0]) 

314 

315 # ── Internal helpers ──────────────────────────────────────────── 

316 

317 @staticmethod 

318 def _key_dict(db, key_id: str) -> Dict[str, Any]: 

319 row = db.execute(text( 

320 "SELECT id, conversation_id, user_id, identity_key_b64, " 

321 " signed_prekey_b64, signed_prekey_signature_b64, " 

322 " key_algorithm, created_at, rotated_at " 

323 "FROM conversation_keys WHERE id = :id"), 

324 {'id': key_id} 

325 ).fetchone() 

326 if not row: 

327 return {} 

328 return { 

329 'id': row[0], 'conversation_id': row[1], 'user_id': row[2], 

330 'identity_key_b64': row[3], 

331 'signed_prekey_b64': row[4], 

332 'signed_prekey_signature_b64': row[5], 

333 'key_algorithm': row[6], 

334 'created_at': str(row[7]) if row[7] else None, 

335 'rotated_at': str(row[8]) if row[8] else None, 

336 } 

337 

338 @staticmethod 

339 def _envelope_dict(db, env_id: str) -> Dict[str, Any]: 

340 row = db.execute(text( 

341 "SELECT id, message_id, recipient_id, ciphertext_b64, " 

342 " ratchet_header_b64, created_at " 

343 "FROM message_envelopes WHERE id = :id"), 

344 {'id': env_id} 

345 ).fetchone() 

346 if not row: 

347 return {} 

348 return { 

349 'id': row[0], 'message_id': row[1], 'recipient_id': row[2], 

350 'ciphertext_b64': row[3], 

351 'ratchet_header_b64': row[4], 

352 'created_at': str(row[5]) if row[5] else None, 

353 } 

354 

355 

356__all__ = ['E2EKeyService', 'E2EKeyError']