Coverage for integrations / social / sync_service.py: 0.0%

277 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Sync deltas + cursor (multi-device backfill / restore). 

3 

4Phase 7c.6. Plan reference: sunny-gliding-eich.md, Part R.3. 

5 

6GET /api/social/sync?since=<cursor>&kinds=<csv> returns the user's 

7deltas across every social kind they own or can see, since the 

8cursor. Used for: 

9 

10 - New-device login (cold pull): since=0 → full backfill. 

11 - Reconnect after offline period: since=<last_seen_cursor>. 

12 - WAMP push tells device "something changed", device pulls the delta. 

13 - Background catch-up timer when peer-to-peer is silent. 

14 

15Cursor design (single-server flat/regional/central): 

16 Cursor = max(timestamp) seen so far. Server stamps every social 

17 write with `created_at` / `edited_at` (existing columns), so 

18 ordering within a single server is monotonic. 

19 

20 For multi-server (Phase 8 cloud + 9 hardening) the cursor will 

21 upgrade to a hybrid logical clock (HLC) tuple 

22 (physical_ms, logical_counter, node_id) 

23 to handle clock skew across nodes. This module is the 

24 layering point — callers always pass an opaque string cursor; 

25 the service decides how to interpret it. 

26 

27Per-kind queries respect existing privacy gates: 

28 - conversations: only ones the user is a member of (memberships table) 

29 - messages: only in conversations the user is a member of 

30 - posts / comments: only in communities the user can see (delegate 

31 to existing visibility checks) 

32 - friendships / blocks / invites: only ones involving the user 

33 - mentions: only ones targeting the user 

34 - memberships: only the user's own rows 

35 

36Every delta carries the kind + payload + a `cursor` field so the 

37client can advance its high-water mark deterministically. 

38 

39Transport: this is request-response only. The client polls (or is 

40nudged by a WAMP push and then polls). Fan-out from each write to 

41subscribed clients still happens through the existing MessageBus 

42(LOCAL → SSE → PEERLINK → CROSSBAR) — sync is the catch-up path, 

43not the live path. 

44 

45Zero-loss invariant: 

46 Every social write that affects user U is tagged with a server 

47 timestamp >= cursor. The sync query returns every row strictly 

48 greater than the cursor. Therefore the union of (live fan-out 

49 while online) + (sync deltas while offline) covers every state 

50 change exactly once at the storage layer. Client-side dedup by 

51 primary key removes any double-delivery from overlap. 

52""" 

53 

54from __future__ import annotations 

55 

56import logging 

57from datetime import datetime 

58from typing import Any, Dict, List, Optional, Tuple 

59 

60logger = logging.getLogger('hevolve_social') 

61 

62 

63# Default cursor when the client is making its first sync call. 

64# 1970 epoch ensures every row qualifies for the initial backfill. 

65_EPOCH_CURSOR = '1970-01-01 00:00:00' 

66 

67 

68# Whitelisted kinds the sync endpoint can return. Each kind maps to a 

69# private fetcher below. Caller-passed `kinds` is intersected with 

70# this set so unknown kinds are silently dropped. 

71_VALID_KINDS = ( 

72 'conversations', 'messages', 

73 'friendships', 'blocks', 

74 'invites', 

75 'mentions', 

76 'memberships', 

77 'notifications', 

78) 

79 

80 

81def _parse_cursor(cursor: Optional[str]) -> Tuple[str, str]: 

82 """Decode `<timestamp>|<id>` into (timestamp, id). 

83 

84 Both halves protect against the SQLite second-resolution problem: 

85 multiple rows can share a `created_at` timestamp, and a cursor 

86 that only stored the timestamp would either lose those siblings 

87 (with `>`) or re-emit them forever (with `>=`). Encoding the 

88 last-seen primary key as a tiebreaker fixes both — see fetcher 

89 queries below for the compound `WHERE` shape. 

90 

91 Bad input → epoch (`'1970-01-01 00:00:00', ''`) so the next 

92 sync is a full backfill. Never raises. 

93 """ 

94 if not cursor or cursor in ('0', '', 'null'): 

95 return _EPOCH_CURSOR, '' 

96 raw = str(cursor) 

97 # Split on '|'; missing delimiter means a legacy timestamp-only 

98 # cursor — treat the id half as empty (still correct, just slightly 

99 # less precise on tie-row continuation). 

100 if '|' in raw: 

101 ts_part, id_part = raw.split('|', 1) 

102 else: 

103 ts_part, id_part = raw, '' 

104 ts = ts_part.replace('T', ' ').split('.')[0] 

105 if not ts: 

106 return _EPOCH_CURSOR, '' 

107 # Pass-1 N1 fix: removed the dead `len(ts) == 10` date-only 

108 # branch. Encoded cursors always include a time component 

109 # because they come from `str(datetime)` which produces 

110 # `YYYY-MM-DD HH:MM:SS`. Any caller passing a bare date string 

111 # is upgrading to epoch via the ValueError path below, which is 

112 # the same fallback the date-only branch produced. Simpler. 

113 try: 

114 datetime.strptime(ts, '%Y-%m-%d %H:%M:%S') 

115 return ts, id_part 

116 except (ValueError, TypeError): 

117 return _EPOCH_CURSOR, '' 

118 

119 

120def _normalize_cursor(cursor: Optional[str]) -> str: 

121 """Backward-compat shim returning just the timestamp portion. 

122 

123 Existing callers + tests that compare cursor as a timestamp string 

124 keep working; new fetchers pull the (ts, id) pair via _parse_cursor. 

125 """ 

126 ts, _ = _parse_cursor(cursor) 

127 return ts 

128 

129 

130def _encode_cursor(ts: str, row_id: str) -> str: 

131 """Encode the compound cursor for client return + next-call use.""" 

132 return f'{ts}|{row_id}' if row_id else ts 

133 

134 

135def _cursor_predicate(activity_expr: str, cursor_id: str, 

136 ts_param: str = 'ts', 

137 id_param: str = 'id', 

138 table_id_col: str = 'id') -> str: 

139 """Build the strictly-greater-than cursor WHERE fragment. 

140 

141 Two shapes: 

142 - cursor_id == '' → `activity > :ts` (strict, no id branch) 

143 - cursor_id != '' → `(activity > :ts OR (activity = :ts AND id > :id))` 

144 

145 Reviewer H-NEW-1: when the prior cursor was a bare timestamp 

146 (encoded as `<ts>` because no id-bearing rows shipped on the 

147 previous call), the second branch with `:id = ''` matched every 

148 row at the boundary timestamp because every non-empty id is 

149 string-greater than ''. That re-emitted boundary rows on every 

150 sync forever. Splitting the branch closes the hole. 

151 """ 

152 if not cursor_id: 

153 return f"({activity_expr} > :{ts_param})" 

154 return ( 

155 f"({activity_expr} > :{ts_param} " 

156 f"OR ({activity_expr} = :{ts_param} " 

157 f"AND {table_id_col} > :{id_param}))" 

158 ) 

159 

160 

161def _tenant_predicate(tenant_id: Optional[str], 

162 alias: str = '') -> Tuple[str, Dict[str, Any]]: 

163 """Return (`AND ...` SQL fragment, params dict) for tenant filtering. 

164 

165 When `tenant_id` is None (flat/regional pass-through) this is the 

166 empty string — the WHERE clause is unchanged. When set, the 

167 fragment enforces (col == tid OR col IS NULL) so legacy untenanted 

168 rows remain visible to the tenant for backward compatibility, same 

169 semantics the ORM-level `tenant_filter.py` listener uses. 

170 

171 `alias` lets callers prefix the column for joined queries 

172 (e.g. `'msg'` → `msg.tenant_id`). Empty string → bare `tenant_id`. 

173 

174 The `:tid` placeholder is reserved; callers must avoid colliding 

175 with their own `tid` parameter. This is checked in tests. 

176 

177 Pass-1 M6 note — f-string SQL safety: 

178 The f-string interpolation in the returned fragment is intentional 

179 and safe. `alias` is a static string fragment chosen by the 

180 caller from a closed set ('msg', 'c', 'm', etc.) — never user 

181 input. All user-controlled values (`tenant_id`) flow through the 

182 `:tid` bind parameter which SQLAlchemy escapes. Future callers 

183 adding new aliases must keep this invariant: no user data in 

184 the `alias` argument. 

185 

186 Pass-5 F10 note — raw-SQL DRY: 

187 This helper is the canonical tenant-scope predicate for raw 

188 SQL queries that bypass the ORM tenant_filter listener. Every 

189 raw-`text()` query in sync_service / e2e_key_service / call_service 

190 uses it. When adding a new raw-SQL service, import this helper 

191 rather than rolling your own — keeps the loose-mode/strict-mode 

192 contract in one place (Phase 8 strict mode is enforced by the 

193 ORM listener, not by this helper, so raw-SQL paths run in 

194 effectively-loose mode regardless of strict-mode setting; that 

195 tradeoff is documented at tenant_filter.py module docstring). 

196 """ 

197 if not tenant_id: 

198 return '', {} 

199 prefix = f'{alias}.' if alias else '' 

200 return ( 

201 f' AND ({prefix}tenant_id = :tid OR {prefix}tenant_id IS NULL)', 

202 {'tid': tenant_id}) 

203 

204 

205def _max_cursor(*candidates: Optional[str]) -> str: 

206 """Pick the largest non-null cursor by tuple-comparing (ts, id). 

207 

208 String compare is INCORRECT here: '|' (0x7C) sorts after every 

209 digit (0x30-0x39), so a cursor `T0|<uuid>` would compare GREATER 

210 than a strictly later cursor `T1` (no id) — causing the returned 

211 cursor to advance to the wrong value and the next sync to either 

212 re-emit shipped rows or skip future ones. 

213 

214 Reviewer-flagged bug, regression test in test_phase7c6_sync.py: 

215 test_max_cursor_tuple_compares_correctly. 

216 """ 

217 parsed = [_parse_cursor(c) for c in candidates if c] 

218 if not parsed: 

219 return _EPOCH_CURSOR 

220 best_ts, best_id = max(parsed) # tuple compare: ts first, id tiebreaker 

221 return _encode_cursor(best_ts, best_id) 

222 

223 

224class SyncService: 

225 """All public methods are static. Each takes an explicit `db` so 

226 they can be called from any context without coupling to Flask `g`. 

227 """ 

228 

229 @staticmethod 

230 def deltas(db, user_id: str, 

231 since: Optional[str] = None, 

232 kinds: Optional[List[str]] = None, 

233 limit_per_kind: int = 200, 

234 tenant_id: Optional[str] = None) -> Dict[str, Any]: 

235 """Return a delta dict + a new cursor. 

236 

237 Cursor format: '<timestamp>|<row_id>' (opaque to client). 

238 Both halves are required for correctness on dialects that 

239 store sub-second-precision timestamps as second-resolution 

240 (SQLite default) — without an id tiebreaker, sibling rows 

241 sharing a timestamp would either be re-delivered forever 

242 (`>=`) or silently lost (`>`). 

243 """ 

244 cursor_ts, cursor_id = _parse_cursor(since) 

245 if kinds: 

246 wanted = set(k for k in kinds if k in _VALID_KINDS) 

247 else: 

248 wanted = set(_VALID_KINDS) 

249 

250 deltas: Dict[str, List[dict]] = {} 

251 max_seen = _encode_cursor(cursor_ts, cursor_id) 

252 any_capped = False 

253 

254 fetchers = { 

255 'conversations': SyncService._conversations_since, 

256 'messages': SyncService._messages_since, 

257 'friendships': SyncService._friendships_since, 

258 'blocks': SyncService._blocks_since, 

259 'invites': SyncService._invites_since, 

260 'mentions': SyncService._mentions_since, 

261 'memberships': SyncService._memberships_since, 

262 'notifications': SyncService._notifications_since, 

263 } 

264 for kind in wanted: 

265 try: 

266 # tenant_id is passed to every fetcher so the raw SQL 

267 # WHERE clause can enforce isolation. The ORM-level 

268 # tenant_filter listener does NOT fire on raw text() 

269 # queries, so we have to do it explicitly here — 

270 # critical privacy gate, reviewer-flagged. 

271 rows, kind_max, capped = fetchers[kind]( 

272 db, user_id, cursor_ts, cursor_id, limit_per_kind, 

273 tenant_id) 

274 except Exception as e: 

275 logger.warning( 

276 "SyncService: %s fetch failed: %s", kind, e) 

277 rows, kind_max, capped = [], max_seen, False 

278 deltas[kind] = rows 

279 max_seen = _max_cursor(max_seen, kind_max) 

280 if capped: 

281 any_capped = True 

282 

283 return { 

284 'cursor': max_seen, 

285 'has_more': any_capped, 

286 'deltas': deltas, 

287 } 

288 

289 # ── Per-kind fetchers ──────────────────────────────────────── 

290 # 

291 # Contract: each fetcher receives (db, user_id, cursor, limit) and 

292 # returns (rows, max_cursor_seen_in_this_kind, hit_limit_bool). 

293 # rows is a list of plain dicts (JSON-serializable). The `cursor` 

294 # passed in is the lower bound (strictly greater); rows are ordered 

295 # by their relevant timestamp ascending so that paginating with the 

296 # returned max_cursor yields a strict progression on the next call. 

297 

298 # Compound-cursor SQL pattern used by every fetcher below: 

299 # 

300 # WHERE activity_ts > :ts 

301 # OR (activity_ts = :ts AND row_id > :id) 

302 # 

303 # The activity_ts is a per-table COALESCE expression that captures 

304 # the row's "last meaningful change" (create + edit + accept + 

305 # block + delete folded into one column). The id tiebreaker 

306 # handles SQLite's second-resolution timestamps without losing 

307 # rows. Each fetcher returns the encoded compound cursor of the 

308 # last row it shipped so the next call resumes correctly. 

309 

310 @staticmethod 

311 def _conversations_since(db, user_id, cursor_ts, cursor_id, limit, 

312 tenant_id): 

313 from sqlalchemy import text 

314 tenant_sql, tenant_params = _tenant_predicate(tenant_id, alias='c') 

315 cursor_sql = _cursor_predicate( 

316 'COALESCE(c.last_message_at, c.created_at)', 

317 cursor_id, table_id_col='c.id') 

318 params = {'uid': user_id, 'ts': cursor_ts, 

319 'lim': limit + 1} 

320 if cursor_id: 

321 params['id'] = cursor_id 

322 params.update(tenant_params) 

323 rows = db.execute(text( 

324 "SELECT c.id, c.kind, c.title, c.created_by, " 

325 " c.last_message_at, c.is_locked, c.is_archived, " 

326 " c.created_at, " 

327 " COALESCE(c.last_message_at, c.created_at) AS act_ts " 

328 "FROM conversations c " 

329 "JOIN memberships m ON m.parent_id = c.id " 

330 "WHERE m.parent_kind = 'conversation' AND m.member_id = :uid " 

331 f"AND {cursor_sql} " 

332 f"{tenant_sql} " 

333 "ORDER BY COALESCE(c.last_message_at, c.created_at) ASC, " 

334 " c.id ASC " 

335 "LIMIT :lim"), 

336 params 

337 ).fetchall() 

338 capped = len(rows) > limit 

339 rows = rows[:limit] 

340 out = [] 

341 max_seen = _encode_cursor(cursor_ts, cursor_id) 

342 for row in rows: 

343 ts = str(row[8] or '') 

344 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0])) 

345 out.append({ 

346 'id': row[0], 'kind': row[1], 'title': row[2], 

347 'created_by': row[3], 

348 'last_message_at': str(row[4]) if row[4] else None, 

349 'is_locked': bool(row[5]), 'is_archived': bool(row[6]), 

350 'created_at': str(row[7]) if row[7] else None, 

351 }) 

352 return out, max_seen, capped 

353 

354 @staticmethod 

355 def _messages_since(db, user_id, cursor_ts, cursor_id, limit, 

356 tenant_id): 

357 from sqlalchemy import text 

358 tenant_sql, tenant_params = _tenant_predicate(tenant_id, alias='msg') 

359 cursor_sql = _cursor_predicate( 

360 'COALESCE(msg.edited_at, msg.created_at)', 

361 cursor_id, table_id_col='msg.id') 

362 params = {'uid': user_id, 'ts': cursor_ts, 

363 'lim': limit + 1} 

364 if cursor_id: 

365 params['id'] = cursor_id 

366 params.update(tenant_params) 

367 rows = db.execute(text( 

368 "SELECT msg.id, msg.parent_kind, msg.parent_id, " 

369 " msg.thread_root_id, msg.author_id, msg.agent_kind, " 

370 " msg.content, msg.depth, msg.edited_at, " 

371 " msg.is_deleted, msg.metadata_json, msg.created_at, " 

372 " COALESCE(msg.edited_at, msg.created_at) AS act_ts " 

373 "FROM messages msg " 

374 "JOIN memberships m ON m.parent_id = msg.parent_id " 

375 "WHERE m.parent_kind = 'conversation' AND m.member_id = :uid " 

376 "AND msg.parent_kind = 'conversation' " 

377 f"AND {cursor_sql} " 

378 f"{tenant_sql} " 

379 "ORDER BY COALESCE(msg.edited_at, msg.created_at) ASC, " 

380 " msg.id ASC " 

381 "LIMIT :lim"), 

382 params 

383 ).fetchall() 

384 capped = len(rows) > limit 

385 rows = rows[:limit] 

386 out = [] 

387 max_seen = _encode_cursor(cursor_ts, cursor_id) 

388 for row in rows: 

389 ts = str(row[12] or '') 

390 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0])) 

391 out.append({ 

392 'id': row[0], 'parent_kind': row[1], 'parent_id': row[2], 

393 'thread_root_id': row[3], 'author_id': row[4], 

394 'agent_kind': row[5], 'content': row[6], 

395 'depth': row[7], 

396 'edited_at': str(row[8]) if row[8] else None, 

397 'is_deleted': bool(row[9]), 

398 'metadata_json': row[10], 

399 'created_at': str(row[11]) if row[11] else None, 

400 }) 

401 return out, max_seen, capped 

402 

403 @staticmethod 

404 def _friendships_since(db, user_id, cursor_ts, cursor_id, limit, 

405 tenant_id): 

406 from sqlalchemy import text 

407 tenant_sql, tenant_params = _tenant_predicate(tenant_id) 

408 cursor_sql = _cursor_predicate( 

409 'COALESCE(blocked_at, accepted_at, created_at)', cursor_id) 

410 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1} 

411 if cursor_id: 

412 params['id'] = cursor_id 

413 params.update(tenant_params) 

414 rows = db.execute(text( 

415 "SELECT id, user_a_id, user_b_id, status, initiator_id, " 

416 " created_at, accepted_at, blocked_at, " 

417 " COALESCE(blocked_at, accepted_at, created_at) AS act_ts " 

418 "FROM friendships " 

419 "WHERE (user_a_id = :uid OR user_b_id = :uid) " 

420 f"AND {cursor_sql} " 

421 f"{tenant_sql} " 

422 "ORDER BY COALESCE(blocked_at, accepted_at, created_at) ASC, " 

423 " id ASC LIMIT :lim"), 

424 params 

425 ).fetchall() 

426 capped = len(rows) > limit 

427 rows = rows[:limit] 

428 out = [] 

429 max_seen = _encode_cursor(cursor_ts, cursor_id) 

430 for row in rows: 

431 ts = str(row[8] or '') 

432 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0])) 

433 out.append({ 

434 'id': row[0], 'user_a_id': row[1], 'user_b_id': row[2], 

435 'status': row[3], 'initiator_id': row[4], 

436 'created_at': str(row[5]) if row[5] else None, 

437 'accepted_at': str(row[6]) if row[6] else None, 

438 'blocked_at': str(row[7]) if row[7] else None, 

439 }) 

440 return out, max_seen, capped 

441 

442 @staticmethod 

443 def _blocks_since(db, user_id, cursor_ts, cursor_id, limit, tenant_id): 

444 from sqlalchemy import text 

445 tenant_sql, tenant_params = _tenant_predicate(tenant_id) 

446 cursor_sql = _cursor_predicate('created_at', cursor_id) 

447 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1} 

448 if cursor_id: 

449 params['id'] = cursor_id 

450 params.update(tenant_params) 

451 rows = db.execute(text( 

452 "SELECT id, blocker_id, blocked_id, reason, created_at " 

453 "FROM blocks " 

454 "WHERE blocker_id = :uid " 

455 f"AND {cursor_sql} " 

456 f"{tenant_sql} " 

457 "ORDER BY created_at ASC, id ASC LIMIT :lim"), 

458 params 

459 ).fetchall() 

460 capped = len(rows) > limit 

461 rows = rows[:limit] 

462 out = [] 

463 max_seen = _encode_cursor(cursor_ts, cursor_id) 

464 for row in rows: 

465 ts = str(row[4] or '') 

466 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0])) 

467 out.append({ 

468 'id': row[0], 'blocker_id': row[1], 'blocked_id': row[2], 

469 'reason': row[3], 

470 'created_at': str(row[4]) if row[4] else None, 

471 }) 

472 return out, max_seen, capped 

473 

474 @staticmethod 

475 def _invites_since(db, user_id, cursor_ts, cursor_id, limit, tenant_id): 

476 from sqlalchemy import text 

477 from .models import User 

478 u = db.query(User).filter(User.id == user_id).first() 

479 email_clause = "" 

480 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1} 

481 if cursor_id: 

482 params['id'] = cursor_id 

483 if u and u.email: 

484 email_clause = " OR LOWER(invitee_email) = :em" 

485 params['em'] = (u.email or '').lower() 

486 tenant_sql, tenant_params = _tenant_predicate(tenant_id) 

487 cursor_sql = _cursor_predicate( 

488 'COALESCE(responded_at, created_at)', cursor_id) 

489 params.update(tenant_params) 

490 rows = db.execute(text( 

491 f"SELECT id, parent_kind, parent_id, invitee_id, " 

492 f" invitee_email, invite_code, invited_by, " 

493 f" role_offered, status, created_at, " 

494 f" expires_at, responded_at, " 

495 f" COALESCE(responded_at, created_at) AS act_ts " 

496 f"FROM invites " 

497 f"WHERE ( invitee_id = :uid OR invited_by = :uid {email_clause} ) " 

498 f"AND {cursor_sql} " 

499 f"{tenant_sql} " 

500 f"ORDER BY COALESCE(responded_at, created_at) ASC, " 

501 f" id ASC LIMIT :lim"), 

502 params 

503 ).fetchall() 

504 capped = len(rows) > limit 

505 rows = rows[:limit] 

506 out = [] 

507 max_seen = _encode_cursor(cursor_ts, cursor_id) 

508 for row in rows: 

509 ts = str(row[12] or '') 

510 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0])) 

511 out.append({ 

512 'id': row[0], 'parent_kind': row[1], 'parent_id': row[2], 

513 'invitee_id': row[3], 'invitee_email': row[4], 

514 'invite_code': row[5], 'invited_by': row[6], 

515 'role_offered': row[7], 'status': row[8], 

516 'created_at': str(row[9]) if row[9] else None, 

517 'expires_at': str(row[10]) if row[10] else None, 

518 'responded_at': str(row[11]) if row[11] else None, 

519 }) 

520 return out, max_seen, capped 

521 

522 @staticmethod 

523 def _mentions_since(db, user_id, cursor_ts, cursor_id, limit, tenant_id): 

524 from sqlalchemy import text 

525 tenant_sql, tenant_params = _tenant_predicate(tenant_id) 

526 cursor_sql = _cursor_predicate('created_at', cursor_id) 

527 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1} 

528 if cursor_id: 

529 params['id'] = cursor_id 

530 params.update(tenant_params) 

531 rows = db.execute(text( 

532 "SELECT id, source_kind, source_id, mentioned_user_id, " 

533 " mentioned_kind, agent_owner_id, created_at " 

534 "FROM mentions " 

535 "WHERE (mentioned_user_id = :uid OR agent_owner_id = :uid) " 

536 f"AND {cursor_sql} " 

537 f"{tenant_sql} " 

538 "ORDER BY created_at ASC, id ASC LIMIT :lim"), 

539 params 

540 ).fetchall() 

541 capped = len(rows) > limit 

542 rows = rows[:limit] 

543 out = [] 

544 max_seen = _encode_cursor(cursor_ts, cursor_id) 

545 for row in rows: 

546 ts = str(row[6] or '') 

547 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0])) 

548 out.append({ 

549 'id': row[0], 'source_kind': row[1], 'source_id': row[2], 

550 'mentioned_user_id': row[3], 'mentioned_kind': row[4], 

551 'agent_owner_id': row[5], 

552 'created_at': str(row[6]) if row[6] else None, 

553 }) 

554 return out, max_seen, capped 

555 

556 @staticmethod 

557 def _memberships_since(db, user_id, cursor_ts, cursor_id, limit, 

558 tenant_id): 

559 from sqlalchemy import text 

560 tenant_sql, tenant_params = _tenant_predicate(tenant_id) 

561 cursor_sql = _cursor_predicate('joined_at', cursor_id) 

562 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1} 

563 if cursor_id: 

564 params['id'] = cursor_id 

565 params.update(tenant_params) 

566 rows = db.execute(text( 

567 "SELECT id, parent_kind, parent_id, member_id, agent_kind, " 

568 " role, joined_at, muted_until, notification_pref " 

569 "FROM memberships " 

570 "WHERE member_id = :uid " 

571 f"AND {cursor_sql} " 

572 f"{tenant_sql} " 

573 "ORDER BY joined_at ASC, id ASC LIMIT :lim"), 

574 params 

575 ).fetchall() 

576 capped = len(rows) > limit 

577 rows = rows[:limit] 

578 out = [] 

579 max_seen = _encode_cursor(cursor_ts, cursor_id) 

580 for row in rows: 

581 ts = str(row[6] or '') 

582 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0])) 

583 out.append({ 

584 'id': row[0], 'parent_kind': row[1], 'parent_id': row[2], 

585 'member_id': row[3], 'agent_kind': row[4], 

586 'role': row[5], 

587 'joined_at': str(row[6]) if row[6] else None, 

588 'muted_until': str(row[7]) if row[7] else None, 

589 'notification_pref': row[8], 

590 }) 

591 return out, max_seen, capped 

592 

593 @staticmethod 

594 def _notifications_since(db, user_id, cursor_ts, cursor_id, limit, 

595 tenant_id): 

596 from sqlalchemy import text 

597 tenant_sql, tenant_params = _tenant_predicate(tenant_id) 

598 cursor_sql = _cursor_predicate('created_at', cursor_id) 

599 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1} 

600 if cursor_id: 

601 params['id'] = cursor_id 

602 params.update(tenant_params) 

603 rows = db.execute(text( 

604 "SELECT id, type, source_user_id, target_type, target_id, " 

605 " message, is_read, created_at " 

606 "FROM notifications " 

607 "WHERE user_id = :uid " 

608 f"AND {cursor_sql} " 

609 f"{tenant_sql} " 

610 "ORDER BY created_at ASC, id ASC LIMIT :lim"), 

611 params 

612 ).fetchall() 

613 capped = len(rows) > limit 

614 rows = rows[:limit] 

615 out = [] 

616 max_seen = _encode_cursor(cursor_ts, cursor_id) 

617 for row in rows: 

618 ts = str(row[7] or '') 

619 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0])) 

620 out.append({ 

621 'id': row[0], 'type': row[1], 'source_user_id': row[2], 

622 'target_type': row[3], 'target_id': row[4], 

623 'message': row[5], 'is_read': bool(row[6]), 

624 'created_at': str(row[7]) if row[7] else None, 

625 }) 

626 return out, max_seen, capped 

627 

628 

629# ── Unified inbox view (additive, layered on top of deltas) ────────── 

630# 

631# `SyncService.inbox_rows(...)` reshapes the per-kind buckets returned 

632# by `deltas(...)` into a single chronological list of "InboxRow" dicts 

633# the unified-inbox UI consumes directly. No new SQL — calls deltas() 

634# internally and maps each per-kind row through a thin shaper. 

635# 

636# This keeps the per-kind fetchers + the existing /sync route + every 

637# existing client untouched. /sync continues to return per-kind buckets 

638# for callers that want them (multi-device replay, restore, etc.); the 

639# new /sync/inbox route exposes the flattened view for the inbox UI 

640# only. 

641# 

642# InboxRow shape (the contract every client renders directly): 

643# { 

644# 'id': str, # source-table id, kind-prefixed 

645# # for client-side dedup ('msg_', 'mnt_', ...) 

646# 'kind': str, # 'message'|'mention'|'invite' 

647# # |'friend_request'|'notification' 

648# 'parent_kind': Optional[str], # 'conversation'|'post' 

649# # |'comment'|'community'|'message' 

650# 'parent_id': Optional[str], 

651# 'sender_id': Optional[str], 

652# 'sender_kind': Optional[str], # 'human'|'agent' 

653# 'content_preview': str, # 140 char max 

654# 'is_unread': bool, 

655# 'last_activity_at': Optional[str], # ISO ts, sortable 

656# 'deep_link': str, # hevolveai://... for tap navigation 

657# } 

658# 

659# Sort: ASC by last_activity_at then id (matches /sync cursor semantics 

660# so the same `since` cursor advances cleanly). 

661 

662 

663def _row_message(r: dict) -> dict: 

664 parent_kind = r.get('parent_kind') or 'conversation' 

665 parent_id = r.get('parent_id') or '' 

666 return { 

667 'id': f"msg_{r.get('id')}", 

668 'kind': 'message', 

669 'parent_kind': parent_kind, 

670 'parent_id': parent_id, 

671 'sender_id': r.get('author_id'), 

672 'sender_kind': r.get('agent_kind') or 'human', 

673 'content_preview': (r.get('content') or '')[:140], 

674 'is_unread': not bool(r.get('is_deleted')), 

675 'last_activity_at': r.get('edited_at') or r.get('created_at'), 

676 'deep_link': f"hevolveai://{parent_kind}/{parent_id}", 

677 } 

678 

679 

680def _row_mention(r: dict) -> dict: 

681 src_kind = r.get('source_kind') or '' 

682 src_id = r.get('source_id') or '' 

683 # @-mention: tap deep-links to wherever the user was mentioned 

684 # (post / comment / message thread). Source author isn't carried 

685 # on the Mention row — the renderer fetches it lazily if it wants 

686 # to show the name; the row itself is enough for inbox ordering 

687 # + tap-to-navigate. 

688 return { 

689 'id': f"mnt_{r.get('id')}", 

690 'kind': 'mention', 

691 'parent_kind': src_kind or None, 

692 'parent_id': src_id or None, 

693 'sender_id': None, 

694 'sender_kind': None, 

695 'content_preview': f"You were mentioned in {src_kind or 'a thread'}", 

696 'is_unread': not bool(r.get('notified_at')), 

697 'last_activity_at': r.get('created_at'), 

698 'deep_link': f"hevolveai://{src_kind}/{src_id}" if src_kind and src_id 

699 else "hevolveai://", 

700 } 

701 

702 

703def _row_invite(r: dict) -> dict: 

704 pk = r.get('parent_kind') or '' 

705 pid = r.get('parent_id') or '' 

706 role = r.get('role_offered') or 'member' 

707 code = r.get('invite_code') or '' 

708 status = r.get('status') or 'pending' 

709 return { 

710 'id': f"inv_{r.get('id')}", 

711 'kind': 'invite', 

712 'parent_kind': pk or None, 

713 'parent_id': pid or None, 

714 'sender_id': r.get('invited_by'), 

715 'sender_kind': 'human', 

716 'content_preview': f"Invited as {role} ({status})", 

717 'is_unread': status == 'pending', 

718 'last_activity_at': r.get('responded_at') or r.get('created_at'), 

719 'deep_link': (f"hevolveai://i/{code}" if code 

720 else f"hevolveai://{pk}/{pid}"), 

721 } 

722 

723 

724def _row_friendship(r: dict, viewer_id: str) -> dict: 

725 status = r.get('status') or '' 

726 initiator = r.get('initiator_id') 

727 other = (r.get('user_b_id') if r.get('user_a_id') == viewer_id 

728 else r.get('user_a_id')) 

729 if status == 'pending': 

730 kind = 'friend_request' 

731 preview = ('Sent a friend request' if initiator == viewer_id 

732 else 'Wants to be friends') 

733 is_unread = initiator != viewer_id 

734 elif status == 'active': 

735 kind = 'friend_request' # surfaces in inbox as 'friend_active' 

736 preview = 'Friendship accepted' 

737 is_unread = False 

738 elif status == 'blocked': 

739 kind = 'friend_request' 

740 preview = 'Blocked' 

741 is_unread = False 

742 else: 

743 kind = 'friend_request' 

744 preview = f"Friendship {status}" 

745 is_unread = False 

746 return { 

747 'id': f"frn_{r.get('id')}", 

748 'kind': kind, 

749 'parent_kind': 'user', 

750 'parent_id': other, 

751 'sender_id': initiator, 

752 'sender_kind': 'human', 

753 'content_preview': preview, 

754 'is_unread': is_unread, 

755 'last_activity_at': (r.get('blocked_at') or r.get('accepted_at') 

756 or r.get('created_at')), 

757 'deep_link': f"hevolveai://user/{other}" if other 

758 else "hevolveai://friends", 

759 } 

760 

761 

762def _row_notification(r: dict) -> dict: 

763 target_type = r.get('target_type') or '' 

764 target_id = r.get('target_id') or '' 

765 return { 

766 'id': f"ntf_{r.get('id')}", 

767 'kind': 'notification', 

768 'parent_kind': target_type or None, 

769 'parent_id': target_id or None, 

770 'sender_id': r.get('source_user_id'), 

771 'sender_kind': None, 

772 'content_preview': (r.get('message') or '')[:140], 

773 'is_unread': not bool(r.get('is_read')), 

774 'last_activity_at': r.get('created_at'), 

775 'deep_link': (f"hevolveai://{target_type}/{target_id}" 

776 if target_type and target_id else "hevolveai://"), 

777 } 

778 

779 

780def _shape_to_inbox(deltas: Dict[str, list], viewer_id: str) -> list: 

781 """Map per-kind buckets → flat InboxRow list, sorted ASC by 

782 last_activity_at then id (matches /sync cursor sort).""" 

783 rows: list = [] 

784 for r in deltas.get('messages') or []: 

785 rows.append(_row_message(r)) 

786 for r in deltas.get('mentions') or []: 

787 rows.append(_row_mention(r)) 

788 for r in deltas.get('invites') or []: 

789 rows.append(_row_invite(r)) 

790 for r in deltas.get('friendships') or []: 

791 rows.append(_row_friendship(r, viewer_id)) 

792 for r in deltas.get('notifications') or []: 

793 rows.append(_row_notification(r)) 

794 # 'conversations', 'blocks', 'memberships' are state-tracking — not 

795 # conversational content; they advance the /sync cursor for 

796 # multi-device replay but are intentionally skipped from the inbox 

797 # surface (the messages bucket already represents the user-visible 

798 # conversation activity). 

799 rows.sort(key=lambda r: ((r.get('last_activity_at') or ''), 

800 (r.get('id') or ''))) 

801 return rows 

802 

803 

804# Patch SyncService with the additional inbox_rows() method. Done as a 

805# post-class assignment so the existing class body stays untouched — 

806# any external caller doing `from .sync_service import SyncService` 

807# picks up the new method automatically. 

808def _inbox_rows(db, user_id: str, 

809 since: Optional[str] = None, 

810 limit_per_kind: int = 50, 

811 tenant_id: Optional[str] = None) -> Dict[str, Any]: 

812 """Flattened-inbox view of /sync deltas. Calls SyncService.deltas 

813 internally — no parallel SQL path. Returns the same {cursor, 

814 has_more, rows} shape every client renders directly. 

815 

816 Pagination semantics are inherited from /sync: pass back the 

817 returned ``cursor`` as ``since`` to fetch the next page; the 

818 underlying delta cursor is opaque to the client. 

819 """ 

820 bundle = SyncService.deltas( 

821 db, user_id=user_id, since=since, 

822 # Subset of kinds that produce inbox rows. conversations / 

823 # blocks / memberships keep advancing /sync but don't 

824 # surface here. 

825 kinds=['messages', 'mentions', 'invites', 'friendships', 

826 'notifications'], 

827 limit_per_kind=limit_per_kind, 

828 tenant_id=tenant_id, 

829 ) 

830 return { 

831 'cursor': bundle.get('cursor'), 

832 'has_more': bool(bundle.get('has_more')), 

833 'rows': _shape_to_inbox(bundle.get('deltas') or {}, user_id), 

834 } 

835 

836 

837SyncService.inbox_rows = staticmethod(_inbox_rows) 

838 

839 

840__all__ = ['SyncService']