Coverage for integrations / social / sync_service.py: 0.0%
277 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Sync deltas + cursor (multi-device backfill / restore).
4Phase 7c.6. Plan reference: sunny-gliding-eich.md, Part R.3.
6GET /api/social/sync?since=<cursor>&kinds=<csv> returns the user's
7deltas across every social kind they own or can see, since the
8cursor. Used for:
10 - New-device login (cold pull): since=0 → full backfill.
11 - Reconnect after offline period: since=<last_seen_cursor>.
12 - WAMP push tells device "something changed", device pulls the delta.
13 - Background catch-up timer when peer-to-peer is silent.
15Cursor design (single-server flat/regional/central):
16 Cursor = max(timestamp) seen so far. Server stamps every social
17 write with `created_at` / `edited_at` (existing columns), so
18 ordering within a single server is monotonic.
20 For multi-server (Phase 8 cloud + 9 hardening) the cursor will
21 upgrade to a hybrid logical clock (HLC) tuple
22 (physical_ms, logical_counter, node_id)
23 to handle clock skew across nodes. This module is the
24 layering point — callers always pass an opaque string cursor;
25 the service decides how to interpret it.
27Per-kind queries respect existing privacy gates:
28 - conversations: only ones the user is a member of (memberships table)
29 - messages: only in conversations the user is a member of
30 - posts / comments: only in communities the user can see (delegate
31 to existing visibility checks)
32 - friendships / blocks / invites: only ones involving the user
33 - mentions: only ones targeting the user
34 - memberships: only the user's own rows
36Every delta carries the kind + payload + a `cursor` field so the
37client can advance its high-water mark deterministically.
39Transport: this is request-response only. The client polls (or is
40nudged by a WAMP push and then polls). Fan-out from each write to
41subscribed clients still happens through the existing MessageBus
42(LOCAL → SSE → PEERLINK → CROSSBAR) — sync is the catch-up path,
43not the live path.
45Zero-loss invariant:
46 Every social write that affects user U is tagged with a server
47 timestamp >= cursor. The sync query returns every row strictly
48 greater than the cursor. Therefore the union of (live fan-out
49 while online) + (sync deltas while offline) covers every state
50 change exactly once at the storage layer. Client-side dedup by
51 primary key removes any double-delivery from overlap.
52"""
54from __future__ import annotations
56import logging
57from datetime import datetime
58from typing import Any, Dict, List, Optional, Tuple
60logger = logging.getLogger('hevolve_social')
63# Default cursor when the client is making its first sync call.
64# 1970 epoch ensures every row qualifies for the initial backfill.
65_EPOCH_CURSOR = '1970-01-01 00:00:00'
68# Whitelisted kinds the sync endpoint can return. Each kind maps to a
69# private fetcher below. Caller-passed `kinds` is intersected with
70# this set so unknown kinds are silently dropped.
71_VALID_KINDS = (
72 'conversations', 'messages',
73 'friendships', 'blocks',
74 'invites',
75 'mentions',
76 'memberships',
77 'notifications',
78)
81def _parse_cursor(cursor: Optional[str]) -> Tuple[str, str]:
82 """Decode `<timestamp>|<id>` into (timestamp, id).
84 Both halves protect against the SQLite second-resolution problem:
85 multiple rows can share a `created_at` timestamp, and a cursor
86 that only stored the timestamp would either lose those siblings
87 (with `>`) or re-emit them forever (with `>=`). Encoding the
88 last-seen primary key as a tiebreaker fixes both — see fetcher
89 queries below for the compound `WHERE` shape.
91 Bad input → epoch (`'1970-01-01 00:00:00', ''`) so the next
92 sync is a full backfill. Never raises.
93 """
94 if not cursor or cursor in ('0', '', 'null'):
95 return _EPOCH_CURSOR, ''
96 raw = str(cursor)
97 # Split on '|'; missing delimiter means a legacy timestamp-only
98 # cursor — treat the id half as empty (still correct, just slightly
99 # less precise on tie-row continuation).
100 if '|' in raw:
101 ts_part, id_part = raw.split('|', 1)
102 else:
103 ts_part, id_part = raw, ''
104 ts = ts_part.replace('T', ' ').split('.')[0]
105 if not ts:
106 return _EPOCH_CURSOR, ''
107 # Pass-1 N1 fix: removed the dead `len(ts) == 10` date-only
108 # branch. Encoded cursors always include a time component
109 # because they come from `str(datetime)` which produces
110 # `YYYY-MM-DD HH:MM:SS`. Any caller passing a bare date string
111 # is upgrading to epoch via the ValueError path below, which is
112 # the same fallback the date-only branch produced. Simpler.
113 try:
114 datetime.strptime(ts, '%Y-%m-%d %H:%M:%S')
115 return ts, id_part
116 except (ValueError, TypeError):
117 return _EPOCH_CURSOR, ''
120def _normalize_cursor(cursor: Optional[str]) -> str:
121 """Backward-compat shim returning just the timestamp portion.
123 Existing callers + tests that compare cursor as a timestamp string
124 keep working; new fetchers pull the (ts, id) pair via _parse_cursor.
125 """
126 ts, _ = _parse_cursor(cursor)
127 return ts
130def _encode_cursor(ts: str, row_id: str) -> str:
131 """Encode the compound cursor for client return + next-call use."""
132 return f'{ts}|{row_id}' if row_id else ts
135def _cursor_predicate(activity_expr: str, cursor_id: str,
136 ts_param: str = 'ts',
137 id_param: str = 'id',
138 table_id_col: str = 'id') -> str:
139 """Build the strictly-greater-than cursor WHERE fragment.
141 Two shapes:
142 - cursor_id == '' → `activity > :ts` (strict, no id branch)
143 - cursor_id != '' → `(activity > :ts OR (activity = :ts AND id > :id))`
145 Reviewer H-NEW-1: when the prior cursor was a bare timestamp
146 (encoded as `<ts>` because no id-bearing rows shipped on the
147 previous call), the second branch with `:id = ''` matched every
148 row at the boundary timestamp because every non-empty id is
149 string-greater than ''. That re-emitted boundary rows on every
150 sync forever. Splitting the branch closes the hole.
151 """
152 if not cursor_id:
153 return f"({activity_expr} > :{ts_param})"
154 return (
155 f"({activity_expr} > :{ts_param} "
156 f"OR ({activity_expr} = :{ts_param} "
157 f"AND {table_id_col} > :{id_param}))"
158 )
161def _tenant_predicate(tenant_id: Optional[str],
162 alias: str = '') -> Tuple[str, Dict[str, Any]]:
163 """Return (`AND ...` SQL fragment, params dict) for tenant filtering.
165 When `tenant_id` is None (flat/regional pass-through) this is the
166 empty string — the WHERE clause is unchanged. When set, the
167 fragment enforces (col == tid OR col IS NULL) so legacy untenanted
168 rows remain visible to the tenant for backward compatibility, same
169 semantics the ORM-level `tenant_filter.py` listener uses.
171 `alias` lets callers prefix the column for joined queries
172 (e.g. `'msg'` → `msg.tenant_id`). Empty string → bare `tenant_id`.
174 The `:tid` placeholder is reserved; callers must avoid colliding
175 with their own `tid` parameter. This is checked in tests.
177 Pass-1 M6 note — f-string SQL safety:
178 The f-string interpolation in the returned fragment is intentional
179 and safe. `alias` is a static string fragment chosen by the
180 caller from a closed set ('msg', 'c', 'm', etc.) — never user
181 input. All user-controlled values (`tenant_id`) flow through the
182 `:tid` bind parameter which SQLAlchemy escapes. Future callers
183 adding new aliases must keep this invariant: no user data in
184 the `alias` argument.
186 Pass-5 F10 note — raw-SQL DRY:
187 This helper is the canonical tenant-scope predicate for raw
188 SQL queries that bypass the ORM tenant_filter listener. Every
189 raw-`text()` query in sync_service / e2e_key_service / call_service
190 uses it. When adding a new raw-SQL service, import this helper
191 rather than rolling your own — keeps the loose-mode/strict-mode
192 contract in one place (Phase 8 strict mode is enforced by the
193 ORM listener, not by this helper, so raw-SQL paths run in
194 effectively-loose mode regardless of strict-mode setting; that
195 tradeoff is documented at tenant_filter.py module docstring).
196 """
197 if not tenant_id:
198 return '', {}
199 prefix = f'{alias}.' if alias else ''
200 return (
201 f' AND ({prefix}tenant_id = :tid OR {prefix}tenant_id IS NULL)',
202 {'tid': tenant_id})
205def _max_cursor(*candidates: Optional[str]) -> str:
206 """Pick the largest non-null cursor by tuple-comparing (ts, id).
208 String compare is INCORRECT here: '|' (0x7C) sorts after every
209 digit (0x30-0x39), so a cursor `T0|<uuid>` would compare GREATER
210 than a strictly later cursor `T1` (no id) — causing the returned
211 cursor to advance to the wrong value and the next sync to either
212 re-emit shipped rows or skip future ones.
214 Reviewer-flagged bug, regression test in test_phase7c6_sync.py:
215 test_max_cursor_tuple_compares_correctly.
216 """
217 parsed = [_parse_cursor(c) for c in candidates if c]
218 if not parsed:
219 return _EPOCH_CURSOR
220 best_ts, best_id = max(parsed) # tuple compare: ts first, id tiebreaker
221 return _encode_cursor(best_ts, best_id)
224class SyncService:
225 """All public methods are static. Each takes an explicit `db` so
226 they can be called from any context without coupling to Flask `g`.
227 """
229 @staticmethod
230 def deltas(db, user_id: str,
231 since: Optional[str] = None,
232 kinds: Optional[List[str]] = None,
233 limit_per_kind: int = 200,
234 tenant_id: Optional[str] = None) -> Dict[str, Any]:
235 """Return a delta dict + a new cursor.
237 Cursor format: '<timestamp>|<row_id>' (opaque to client).
238 Both halves are required for correctness on dialects that
239 store sub-second-precision timestamps as second-resolution
240 (SQLite default) — without an id tiebreaker, sibling rows
241 sharing a timestamp would either be re-delivered forever
242 (`>=`) or silently lost (`>`).
243 """
244 cursor_ts, cursor_id = _parse_cursor(since)
245 if kinds:
246 wanted = set(k for k in kinds if k in _VALID_KINDS)
247 else:
248 wanted = set(_VALID_KINDS)
250 deltas: Dict[str, List[dict]] = {}
251 max_seen = _encode_cursor(cursor_ts, cursor_id)
252 any_capped = False
254 fetchers = {
255 'conversations': SyncService._conversations_since,
256 'messages': SyncService._messages_since,
257 'friendships': SyncService._friendships_since,
258 'blocks': SyncService._blocks_since,
259 'invites': SyncService._invites_since,
260 'mentions': SyncService._mentions_since,
261 'memberships': SyncService._memberships_since,
262 'notifications': SyncService._notifications_since,
263 }
264 for kind in wanted:
265 try:
266 # tenant_id is passed to every fetcher so the raw SQL
267 # WHERE clause can enforce isolation. The ORM-level
268 # tenant_filter listener does NOT fire on raw text()
269 # queries, so we have to do it explicitly here —
270 # critical privacy gate, reviewer-flagged.
271 rows, kind_max, capped = fetchers[kind](
272 db, user_id, cursor_ts, cursor_id, limit_per_kind,
273 tenant_id)
274 except Exception as e:
275 logger.warning(
276 "SyncService: %s fetch failed: %s", kind, e)
277 rows, kind_max, capped = [], max_seen, False
278 deltas[kind] = rows
279 max_seen = _max_cursor(max_seen, kind_max)
280 if capped:
281 any_capped = True
283 return {
284 'cursor': max_seen,
285 'has_more': any_capped,
286 'deltas': deltas,
287 }
289 # ── Per-kind fetchers ────────────────────────────────────────
290 #
291 # Contract: each fetcher receives (db, user_id, cursor, limit) and
292 # returns (rows, max_cursor_seen_in_this_kind, hit_limit_bool).
293 # rows is a list of plain dicts (JSON-serializable). The `cursor`
294 # passed in is the lower bound (strictly greater); rows are ordered
295 # by their relevant timestamp ascending so that paginating with the
296 # returned max_cursor yields a strict progression on the next call.
298 # Compound-cursor SQL pattern used by every fetcher below:
299 #
300 # WHERE activity_ts > :ts
301 # OR (activity_ts = :ts AND row_id > :id)
302 #
303 # The activity_ts is a per-table COALESCE expression that captures
304 # the row's "last meaningful change" (create + edit + accept +
305 # block + delete folded into one column). The id tiebreaker
306 # handles SQLite's second-resolution timestamps without losing
307 # rows. Each fetcher returns the encoded compound cursor of the
308 # last row it shipped so the next call resumes correctly.
310 @staticmethod
311 def _conversations_since(db, user_id, cursor_ts, cursor_id, limit,
312 tenant_id):
313 from sqlalchemy import text
314 tenant_sql, tenant_params = _tenant_predicate(tenant_id, alias='c')
315 cursor_sql = _cursor_predicate(
316 'COALESCE(c.last_message_at, c.created_at)',
317 cursor_id, table_id_col='c.id')
318 params = {'uid': user_id, 'ts': cursor_ts,
319 'lim': limit + 1}
320 if cursor_id:
321 params['id'] = cursor_id
322 params.update(tenant_params)
323 rows = db.execute(text(
324 "SELECT c.id, c.kind, c.title, c.created_by, "
325 " c.last_message_at, c.is_locked, c.is_archived, "
326 " c.created_at, "
327 " COALESCE(c.last_message_at, c.created_at) AS act_ts "
328 "FROM conversations c "
329 "JOIN memberships m ON m.parent_id = c.id "
330 "WHERE m.parent_kind = 'conversation' AND m.member_id = :uid "
331 f"AND {cursor_sql} "
332 f"{tenant_sql} "
333 "ORDER BY COALESCE(c.last_message_at, c.created_at) ASC, "
334 " c.id ASC "
335 "LIMIT :lim"),
336 params
337 ).fetchall()
338 capped = len(rows) > limit
339 rows = rows[:limit]
340 out = []
341 max_seen = _encode_cursor(cursor_ts, cursor_id)
342 for row in rows:
343 ts = str(row[8] or '')
344 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0]))
345 out.append({
346 'id': row[0], 'kind': row[1], 'title': row[2],
347 'created_by': row[3],
348 'last_message_at': str(row[4]) if row[4] else None,
349 'is_locked': bool(row[5]), 'is_archived': bool(row[6]),
350 'created_at': str(row[7]) if row[7] else None,
351 })
352 return out, max_seen, capped
354 @staticmethod
355 def _messages_since(db, user_id, cursor_ts, cursor_id, limit,
356 tenant_id):
357 from sqlalchemy import text
358 tenant_sql, tenant_params = _tenant_predicate(tenant_id, alias='msg')
359 cursor_sql = _cursor_predicate(
360 'COALESCE(msg.edited_at, msg.created_at)',
361 cursor_id, table_id_col='msg.id')
362 params = {'uid': user_id, 'ts': cursor_ts,
363 'lim': limit + 1}
364 if cursor_id:
365 params['id'] = cursor_id
366 params.update(tenant_params)
367 rows = db.execute(text(
368 "SELECT msg.id, msg.parent_kind, msg.parent_id, "
369 " msg.thread_root_id, msg.author_id, msg.agent_kind, "
370 " msg.content, msg.depth, msg.edited_at, "
371 " msg.is_deleted, msg.metadata_json, msg.created_at, "
372 " COALESCE(msg.edited_at, msg.created_at) AS act_ts "
373 "FROM messages msg "
374 "JOIN memberships m ON m.parent_id = msg.parent_id "
375 "WHERE m.parent_kind = 'conversation' AND m.member_id = :uid "
376 "AND msg.parent_kind = 'conversation' "
377 f"AND {cursor_sql} "
378 f"{tenant_sql} "
379 "ORDER BY COALESCE(msg.edited_at, msg.created_at) ASC, "
380 " msg.id ASC "
381 "LIMIT :lim"),
382 params
383 ).fetchall()
384 capped = len(rows) > limit
385 rows = rows[:limit]
386 out = []
387 max_seen = _encode_cursor(cursor_ts, cursor_id)
388 for row in rows:
389 ts = str(row[12] or '')
390 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0]))
391 out.append({
392 'id': row[0], 'parent_kind': row[1], 'parent_id': row[2],
393 'thread_root_id': row[3], 'author_id': row[4],
394 'agent_kind': row[5], 'content': row[6],
395 'depth': row[7],
396 'edited_at': str(row[8]) if row[8] else None,
397 'is_deleted': bool(row[9]),
398 'metadata_json': row[10],
399 'created_at': str(row[11]) if row[11] else None,
400 })
401 return out, max_seen, capped
403 @staticmethod
404 def _friendships_since(db, user_id, cursor_ts, cursor_id, limit,
405 tenant_id):
406 from sqlalchemy import text
407 tenant_sql, tenant_params = _tenant_predicate(tenant_id)
408 cursor_sql = _cursor_predicate(
409 'COALESCE(blocked_at, accepted_at, created_at)', cursor_id)
410 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1}
411 if cursor_id:
412 params['id'] = cursor_id
413 params.update(tenant_params)
414 rows = db.execute(text(
415 "SELECT id, user_a_id, user_b_id, status, initiator_id, "
416 " created_at, accepted_at, blocked_at, "
417 " COALESCE(blocked_at, accepted_at, created_at) AS act_ts "
418 "FROM friendships "
419 "WHERE (user_a_id = :uid OR user_b_id = :uid) "
420 f"AND {cursor_sql} "
421 f"{tenant_sql} "
422 "ORDER BY COALESCE(blocked_at, accepted_at, created_at) ASC, "
423 " id ASC LIMIT :lim"),
424 params
425 ).fetchall()
426 capped = len(rows) > limit
427 rows = rows[:limit]
428 out = []
429 max_seen = _encode_cursor(cursor_ts, cursor_id)
430 for row in rows:
431 ts = str(row[8] or '')
432 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0]))
433 out.append({
434 'id': row[0], 'user_a_id': row[1], 'user_b_id': row[2],
435 'status': row[3], 'initiator_id': row[4],
436 'created_at': str(row[5]) if row[5] else None,
437 'accepted_at': str(row[6]) if row[6] else None,
438 'blocked_at': str(row[7]) if row[7] else None,
439 })
440 return out, max_seen, capped
442 @staticmethod
443 def _blocks_since(db, user_id, cursor_ts, cursor_id, limit, tenant_id):
444 from sqlalchemy import text
445 tenant_sql, tenant_params = _tenant_predicate(tenant_id)
446 cursor_sql = _cursor_predicate('created_at', cursor_id)
447 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1}
448 if cursor_id:
449 params['id'] = cursor_id
450 params.update(tenant_params)
451 rows = db.execute(text(
452 "SELECT id, blocker_id, blocked_id, reason, created_at "
453 "FROM blocks "
454 "WHERE blocker_id = :uid "
455 f"AND {cursor_sql} "
456 f"{tenant_sql} "
457 "ORDER BY created_at ASC, id ASC LIMIT :lim"),
458 params
459 ).fetchall()
460 capped = len(rows) > limit
461 rows = rows[:limit]
462 out = []
463 max_seen = _encode_cursor(cursor_ts, cursor_id)
464 for row in rows:
465 ts = str(row[4] or '')
466 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0]))
467 out.append({
468 'id': row[0], 'blocker_id': row[1], 'blocked_id': row[2],
469 'reason': row[3],
470 'created_at': str(row[4]) if row[4] else None,
471 })
472 return out, max_seen, capped
474 @staticmethod
475 def _invites_since(db, user_id, cursor_ts, cursor_id, limit, tenant_id):
476 from sqlalchemy import text
477 from .models import User
478 u = db.query(User).filter(User.id == user_id).first()
479 email_clause = ""
480 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1}
481 if cursor_id:
482 params['id'] = cursor_id
483 if u and u.email:
484 email_clause = " OR LOWER(invitee_email) = :em"
485 params['em'] = (u.email or '').lower()
486 tenant_sql, tenant_params = _tenant_predicate(tenant_id)
487 cursor_sql = _cursor_predicate(
488 'COALESCE(responded_at, created_at)', cursor_id)
489 params.update(tenant_params)
490 rows = db.execute(text(
491 f"SELECT id, parent_kind, parent_id, invitee_id, "
492 f" invitee_email, invite_code, invited_by, "
493 f" role_offered, status, created_at, "
494 f" expires_at, responded_at, "
495 f" COALESCE(responded_at, created_at) AS act_ts "
496 f"FROM invites "
497 f"WHERE ( invitee_id = :uid OR invited_by = :uid {email_clause} ) "
498 f"AND {cursor_sql} "
499 f"{tenant_sql} "
500 f"ORDER BY COALESCE(responded_at, created_at) ASC, "
501 f" id ASC LIMIT :lim"),
502 params
503 ).fetchall()
504 capped = len(rows) > limit
505 rows = rows[:limit]
506 out = []
507 max_seen = _encode_cursor(cursor_ts, cursor_id)
508 for row in rows:
509 ts = str(row[12] or '')
510 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0]))
511 out.append({
512 'id': row[0], 'parent_kind': row[1], 'parent_id': row[2],
513 'invitee_id': row[3], 'invitee_email': row[4],
514 'invite_code': row[5], 'invited_by': row[6],
515 'role_offered': row[7], 'status': row[8],
516 'created_at': str(row[9]) if row[9] else None,
517 'expires_at': str(row[10]) if row[10] else None,
518 'responded_at': str(row[11]) if row[11] else None,
519 })
520 return out, max_seen, capped
522 @staticmethod
523 def _mentions_since(db, user_id, cursor_ts, cursor_id, limit, tenant_id):
524 from sqlalchemy import text
525 tenant_sql, tenant_params = _tenant_predicate(tenant_id)
526 cursor_sql = _cursor_predicate('created_at', cursor_id)
527 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1}
528 if cursor_id:
529 params['id'] = cursor_id
530 params.update(tenant_params)
531 rows = db.execute(text(
532 "SELECT id, source_kind, source_id, mentioned_user_id, "
533 " mentioned_kind, agent_owner_id, created_at "
534 "FROM mentions "
535 "WHERE (mentioned_user_id = :uid OR agent_owner_id = :uid) "
536 f"AND {cursor_sql} "
537 f"{tenant_sql} "
538 "ORDER BY created_at ASC, id ASC LIMIT :lim"),
539 params
540 ).fetchall()
541 capped = len(rows) > limit
542 rows = rows[:limit]
543 out = []
544 max_seen = _encode_cursor(cursor_ts, cursor_id)
545 for row in rows:
546 ts = str(row[6] or '')
547 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0]))
548 out.append({
549 'id': row[0], 'source_kind': row[1], 'source_id': row[2],
550 'mentioned_user_id': row[3], 'mentioned_kind': row[4],
551 'agent_owner_id': row[5],
552 'created_at': str(row[6]) if row[6] else None,
553 })
554 return out, max_seen, capped
556 @staticmethod
557 def _memberships_since(db, user_id, cursor_ts, cursor_id, limit,
558 tenant_id):
559 from sqlalchemy import text
560 tenant_sql, tenant_params = _tenant_predicate(tenant_id)
561 cursor_sql = _cursor_predicate('joined_at', cursor_id)
562 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1}
563 if cursor_id:
564 params['id'] = cursor_id
565 params.update(tenant_params)
566 rows = db.execute(text(
567 "SELECT id, parent_kind, parent_id, member_id, agent_kind, "
568 " role, joined_at, muted_until, notification_pref "
569 "FROM memberships "
570 "WHERE member_id = :uid "
571 f"AND {cursor_sql} "
572 f"{tenant_sql} "
573 "ORDER BY joined_at ASC, id ASC LIMIT :lim"),
574 params
575 ).fetchall()
576 capped = len(rows) > limit
577 rows = rows[:limit]
578 out = []
579 max_seen = _encode_cursor(cursor_ts, cursor_id)
580 for row in rows:
581 ts = str(row[6] or '')
582 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0]))
583 out.append({
584 'id': row[0], 'parent_kind': row[1], 'parent_id': row[2],
585 'member_id': row[3], 'agent_kind': row[4],
586 'role': row[5],
587 'joined_at': str(row[6]) if row[6] else None,
588 'muted_until': str(row[7]) if row[7] else None,
589 'notification_pref': row[8],
590 })
591 return out, max_seen, capped
593 @staticmethod
594 def _notifications_since(db, user_id, cursor_ts, cursor_id, limit,
595 tenant_id):
596 from sqlalchemy import text
597 tenant_sql, tenant_params = _tenant_predicate(tenant_id)
598 cursor_sql = _cursor_predicate('created_at', cursor_id)
599 params = {'uid': user_id, 'ts': cursor_ts, 'lim': limit + 1}
600 if cursor_id:
601 params['id'] = cursor_id
602 params.update(tenant_params)
603 rows = db.execute(text(
604 "SELECT id, type, source_user_id, target_type, target_id, "
605 " message, is_read, created_at "
606 "FROM notifications "
607 "WHERE user_id = :uid "
608 f"AND {cursor_sql} "
609 f"{tenant_sql} "
610 "ORDER BY created_at ASC, id ASC LIMIT :lim"),
611 params
612 ).fetchall()
613 capped = len(rows) > limit
614 rows = rows[:limit]
615 out = []
616 max_seen = _encode_cursor(cursor_ts, cursor_id)
617 for row in rows:
618 ts = str(row[7] or '')
619 max_seen = _max_cursor(max_seen, _encode_cursor(ts, row[0]))
620 out.append({
621 'id': row[0], 'type': row[1], 'source_user_id': row[2],
622 'target_type': row[3], 'target_id': row[4],
623 'message': row[5], 'is_read': bool(row[6]),
624 'created_at': str(row[7]) if row[7] else None,
625 })
626 return out, max_seen, capped
629# ── Unified inbox view (additive, layered on top of deltas) ──────────
630#
631# `SyncService.inbox_rows(...)` reshapes the per-kind buckets returned
632# by `deltas(...)` into a single chronological list of "InboxRow" dicts
633# the unified-inbox UI consumes directly. No new SQL — calls deltas()
634# internally and maps each per-kind row through a thin shaper.
635#
636# This keeps the per-kind fetchers + the existing /sync route + every
637# existing client untouched. /sync continues to return per-kind buckets
638# for callers that want them (multi-device replay, restore, etc.); the
639# new /sync/inbox route exposes the flattened view for the inbox UI
640# only.
641#
642# InboxRow shape (the contract every client renders directly):
643# {
644# 'id': str, # source-table id, kind-prefixed
645# # for client-side dedup ('msg_', 'mnt_', ...)
646# 'kind': str, # 'message'|'mention'|'invite'
647# # |'friend_request'|'notification'
648# 'parent_kind': Optional[str], # 'conversation'|'post'
649# # |'comment'|'community'|'message'
650# 'parent_id': Optional[str],
651# 'sender_id': Optional[str],
652# 'sender_kind': Optional[str], # 'human'|'agent'
653# 'content_preview': str, # 140 char max
654# 'is_unread': bool,
655# 'last_activity_at': Optional[str], # ISO ts, sortable
656# 'deep_link': str, # hevolveai://... for tap navigation
657# }
658#
659# Sort: ASC by last_activity_at then id (matches /sync cursor semantics
660# so the same `since` cursor advances cleanly).
663def _row_message(r: dict) -> dict:
664 parent_kind = r.get('parent_kind') or 'conversation'
665 parent_id = r.get('parent_id') or ''
666 return {
667 'id': f"msg_{r.get('id')}",
668 'kind': 'message',
669 'parent_kind': parent_kind,
670 'parent_id': parent_id,
671 'sender_id': r.get('author_id'),
672 'sender_kind': r.get('agent_kind') or 'human',
673 'content_preview': (r.get('content') or '')[:140],
674 'is_unread': not bool(r.get('is_deleted')),
675 'last_activity_at': r.get('edited_at') or r.get('created_at'),
676 'deep_link': f"hevolveai://{parent_kind}/{parent_id}",
677 }
680def _row_mention(r: dict) -> dict:
681 src_kind = r.get('source_kind') or ''
682 src_id = r.get('source_id') or ''
683 # @-mention: tap deep-links to wherever the user was mentioned
684 # (post / comment / message thread). Source author isn't carried
685 # on the Mention row — the renderer fetches it lazily if it wants
686 # to show the name; the row itself is enough for inbox ordering
687 # + tap-to-navigate.
688 return {
689 'id': f"mnt_{r.get('id')}",
690 'kind': 'mention',
691 'parent_kind': src_kind or None,
692 'parent_id': src_id or None,
693 'sender_id': None,
694 'sender_kind': None,
695 'content_preview': f"You were mentioned in {src_kind or 'a thread'}",
696 'is_unread': not bool(r.get('notified_at')),
697 'last_activity_at': r.get('created_at'),
698 'deep_link': f"hevolveai://{src_kind}/{src_id}" if src_kind and src_id
699 else "hevolveai://",
700 }
703def _row_invite(r: dict) -> dict:
704 pk = r.get('parent_kind') or ''
705 pid = r.get('parent_id') or ''
706 role = r.get('role_offered') or 'member'
707 code = r.get('invite_code') or ''
708 status = r.get('status') or 'pending'
709 return {
710 'id': f"inv_{r.get('id')}",
711 'kind': 'invite',
712 'parent_kind': pk or None,
713 'parent_id': pid or None,
714 'sender_id': r.get('invited_by'),
715 'sender_kind': 'human',
716 'content_preview': f"Invited as {role} ({status})",
717 'is_unread': status == 'pending',
718 'last_activity_at': r.get('responded_at') or r.get('created_at'),
719 'deep_link': (f"hevolveai://i/{code}" if code
720 else f"hevolveai://{pk}/{pid}"),
721 }
724def _row_friendship(r: dict, viewer_id: str) -> dict:
725 status = r.get('status') or ''
726 initiator = r.get('initiator_id')
727 other = (r.get('user_b_id') if r.get('user_a_id') == viewer_id
728 else r.get('user_a_id'))
729 if status == 'pending':
730 kind = 'friend_request'
731 preview = ('Sent a friend request' if initiator == viewer_id
732 else 'Wants to be friends')
733 is_unread = initiator != viewer_id
734 elif status == 'active':
735 kind = 'friend_request' # surfaces in inbox as 'friend_active'
736 preview = 'Friendship accepted'
737 is_unread = False
738 elif status == 'blocked':
739 kind = 'friend_request'
740 preview = 'Blocked'
741 is_unread = False
742 else:
743 kind = 'friend_request'
744 preview = f"Friendship {status}"
745 is_unread = False
746 return {
747 'id': f"frn_{r.get('id')}",
748 'kind': kind,
749 'parent_kind': 'user',
750 'parent_id': other,
751 'sender_id': initiator,
752 'sender_kind': 'human',
753 'content_preview': preview,
754 'is_unread': is_unread,
755 'last_activity_at': (r.get('blocked_at') or r.get('accepted_at')
756 or r.get('created_at')),
757 'deep_link': f"hevolveai://user/{other}" if other
758 else "hevolveai://friends",
759 }
762def _row_notification(r: dict) -> dict:
763 target_type = r.get('target_type') or ''
764 target_id = r.get('target_id') or ''
765 return {
766 'id': f"ntf_{r.get('id')}",
767 'kind': 'notification',
768 'parent_kind': target_type or None,
769 'parent_id': target_id or None,
770 'sender_id': r.get('source_user_id'),
771 'sender_kind': None,
772 'content_preview': (r.get('message') or '')[:140],
773 'is_unread': not bool(r.get('is_read')),
774 'last_activity_at': r.get('created_at'),
775 'deep_link': (f"hevolveai://{target_type}/{target_id}"
776 if target_type and target_id else "hevolveai://"),
777 }
780def _shape_to_inbox(deltas: Dict[str, list], viewer_id: str) -> list:
781 """Map per-kind buckets → flat InboxRow list, sorted ASC by
782 last_activity_at then id (matches /sync cursor sort)."""
783 rows: list = []
784 for r in deltas.get('messages') or []:
785 rows.append(_row_message(r))
786 for r in deltas.get('mentions') or []:
787 rows.append(_row_mention(r))
788 for r in deltas.get('invites') or []:
789 rows.append(_row_invite(r))
790 for r in deltas.get('friendships') or []:
791 rows.append(_row_friendship(r, viewer_id))
792 for r in deltas.get('notifications') or []:
793 rows.append(_row_notification(r))
794 # 'conversations', 'blocks', 'memberships' are state-tracking — not
795 # conversational content; they advance the /sync cursor for
796 # multi-device replay but are intentionally skipped from the inbox
797 # surface (the messages bucket already represents the user-visible
798 # conversation activity).
799 rows.sort(key=lambda r: ((r.get('last_activity_at') or ''),
800 (r.get('id') or '')))
801 return rows
804# Patch SyncService with the additional inbox_rows() method. Done as a
805# post-class assignment so the existing class body stays untouched —
806# any external caller doing `from .sync_service import SyncService`
807# picks up the new method automatically.
808def _inbox_rows(db, user_id: str,
809 since: Optional[str] = None,
810 limit_per_kind: int = 50,
811 tenant_id: Optional[str] = None) -> Dict[str, Any]:
812 """Flattened-inbox view of /sync deltas. Calls SyncService.deltas
813 internally — no parallel SQL path. Returns the same {cursor,
814 has_more, rows} shape every client renders directly.
816 Pagination semantics are inherited from /sync: pass back the
817 returned ``cursor`` as ``since`` to fetch the next page; the
818 underlying delta cursor is opaque to the client.
819 """
820 bundle = SyncService.deltas(
821 db, user_id=user_id, since=since,
822 # Subset of kinds that produce inbox rows. conversations /
823 # blocks / memberships keep advancing /sync but don't
824 # surface here.
825 kinds=['messages', 'mentions', 'invites', 'friendships',
826 'notifications'],
827 limit_per_kind=limit_per_kind,
828 tenant_id=tenant_id,
829 )
830 return {
831 'cursor': bundle.get('cursor'),
832 'has_more': bool(bundle.get('has_more')),
833 'rows': _shape_to_inbox(bundle.get('deltas') or {}, user_id),
834 }
837SyncService.inbox_rows = staticmethod(_inbox_rows)
840__all__ = ['SyncService']