Coverage for integrations / social / conversation_service.py: 0.0%
243 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Conversations (DM/group) + Messages.
4Phase 7c.3. Plan reference: sunny-gliding-eich.md, Part C.2 + Part E.3.
6Naming distinctions (preserved exactly so we don't collide with existing
7HARTOS surfaces):
8 - `conversations` table (NEW, this module) — internal DM + group chat.
9 - `conversation_entries` table (legacy, untouched) — HARTOS 31-channel
10 external adapter (Telegram / WhatsApp / Discord / etc.).
11 - `conversation` table (legacy, untouched) — single-user Q/A history.
13Two kinds of conversation:
14 - 'dm': exactly 2 members, no title, idempotent dedup via member_hash.
15 - 'group': 2+ members, has a title, supports add/remove members.
17Conversation membership lives in the polymorphic `memberships` table
18(v41) with parent_kind='conversation'. One source of truth for community
19+ conversation membership rosters.
21Messages flow:
22 - send(): insert Message row, run MentionService.parse_and_record on
23 the content (which fires agent dispatch + dual-notify), update
24 conversations.last_message_at, fan out via NotificationService for
25 offline members.
26 - list(): paginated by `before` cursor (created_at-based).
27 - edit(): caller's own message, within 24h, updates content +
28 edited_at.
29 - soft_delete(): caller's own; sets is_deleted=True, content='[deleted]'.
31Block check: a sender's message to a conversation that contains a user
32who blocked them is allowed (group-message); the BLOCK semantics are
33about not seeing/being notified, not about being silently hidden from
34groups they're already in. (See plan K.2.) For DMs: blocking severs
35the DM creation path — `create_dm` refuses if either party blocks the
36other.
38Transport: NotificationService.create publishes via MessageBus
39(LOCAL → SSE → PEERLINK → CROSSBAR) — same fan-out the rest of the
40social stack uses. Typing + read-receipt are best-effort WAMP emits;
41the DB never persists them (per plan E.3).
43Agent reply:
44 When a message contains an @-mention to an agent, MentionService's
45 parse_and_record path fires `agentic_router.dispatch_to_agent` with
46 source_kind='message'. The dispatch worker's `_post_agent_reply`
47 handles the 'message' kind (added in this session) and posts the
48 agent's reply as a new Message row authored by the agent.
49"""
51from __future__ import annotations
53import hashlib
54import json
55import logging
56import uuid
57from datetime import datetime, timedelta
58from typing import List, Optional, Tuple
60logger = logging.getLogger('hevolve_social')
62_VALID_KINDS = ('dm', 'group')
63_EDIT_WINDOW = timedelta(hours=24)
64_MAX_MESSAGE_LEN = 8000
67class ConversationError(ValueError):
68 pass
71def _member_hash(member_ids: List[str]) -> str:
72 """SHA-256 of the sorted, comma-joined member IDs.
74 Used as a stable dedup key for DMs so re-creating a DM between A
75 and B returns the existing row. Sorted so order doesn't matter.
76 """
77 sorted_ids = sorted(set(str(x) for x in member_ids if x))
78 raw = ','.join(sorted_ids).encode('utf-8')
79 return hashlib.sha256(raw).hexdigest()
82def _is_blocked_either_way(db, x: str, y: str) -> bool:
83 """Match the same helper used by InviteService / FriendService."""
84 try:
85 from sqlalchemy import text
86 result = db.execute(text(
87 "SELECT 1 FROM blocks "
88 "WHERE (blocker_id = :x AND blocked_id = :y) "
89 "OR (blocker_id = :y AND blocked_id = :x) LIMIT 1"),
90 {'x': x, 'y': y}
91 ).fetchone()
92 return result is not None
93 except Exception:
94 return False
97def _ensure_member(db, conv_id: str, member_id: str,
98 tenant_id: Optional[str], role: str = 'member') -> None:
99 """Idempotent insert into the polymorphic memberships table."""
100 from sqlalchemy import text
101 dialect = db.bind.dialect.name if db.bind is not None else 'sqlite'
102 if dialect == 'sqlite':
103 stmt = ("INSERT OR IGNORE INTO memberships "
104 "(id, tenant_id, parent_kind, parent_id, member_id, "
105 " agent_kind, role, joined_at) "
106 "VALUES (:id, :tid, 'conversation', :pid, :mid, 'human', "
107 " :role, CURRENT_TIMESTAMP)")
108 else:
109 stmt = ("INSERT INTO memberships "
110 "(id, tenant_id, parent_kind, parent_id, member_id, "
111 " agent_kind, role, joined_at) "
112 "VALUES (:id, :tid, 'conversation', :pid, :mid, 'human', "
113 " :role, CURRENT_TIMESTAMP) "
114 "ON CONFLICT DO NOTHING")
115 db.execute(text(stmt), {
116 'id': str(uuid.uuid4()), 'tid': tenant_id,
117 'pid': conv_id, 'mid': member_id, 'role': role})
120def _list_member_ids(db, conv_id: str) -> List[str]:
121 from sqlalchemy import text
122 rows = db.execute(text(
123 "SELECT member_id FROM memberships "
124 "WHERE parent_kind = 'conversation' AND parent_id = :pid "
125 "ORDER BY joined_at"),
126 {'pid': conv_id}
127 ).fetchall()
128 return [r[0] for r in rows]
131def _is_member(db, conv_id: str, user_id: str) -> bool:
132 from sqlalchemy import text
133 result = db.execute(text(
134 "SELECT 1 FROM memberships "
135 "WHERE parent_kind='conversation' AND parent_id=:pid "
136 "AND member_id=:mid LIMIT 1"),
137 {'pid': conv_id, 'mid': user_id}
138 ).fetchone()
139 return result is not None
142class ConversationService:
143 """All public methods are static. Each takes an explicit `db` so
144 they can be called from any context without coupling to Flask `g`.
145 """
147 # ── Create ───────────────────────────────────────────────────
149 @staticmethod
150 def create(db, kind: str, member_ids: List[str], created_by: str,
151 title: Optional[str] = None,
152 tenant_id: Optional[str] = None) -> dict:
153 """Create a DM (idempotent dedup) or group conversation.
155 DM rules:
156 - exactly 2 members (caller's id must be in member_ids)
157 - same A↔B pair returns the existing row
158 - refuses if either party blocks the other
160 Group rules:
161 - 2+ members (caller is auto-added if missing)
162 - title can be empty; defaults to comma-joined display names
163 """
164 from sqlalchemy import text
165 if kind not in _VALID_KINDS:
166 raise ConversationError(f"invalid kind: {kind}")
168 # Normalize member set, ensure caller is in it.
169 member_set = set(str(m) for m in (member_ids or []) if m)
170 member_set.add(str(created_by))
171 members = sorted(member_set)
172 if len(members) < 2:
173 raise ConversationError("conversations need >= 2 members")
174 if kind == 'dm' and len(members) != 2:
175 raise ConversationError("dm requires exactly 2 members")
176 if kind == 'dm':
177 other = next(m for m in members if m != created_by)
178 if _is_blocked_either_way(db, created_by, other):
179 raise ConversationError("cannot start DM with blocked user")
181 member_hash = _member_hash(members) if kind == 'dm' else None
183 # DM dedup: SELECT existing row first.
184 if kind == 'dm':
185 existing = db.execute(text(
186 "SELECT id FROM conversations "
187 "WHERE kind = 'dm' AND member_hash = :h"),
188 {'h': member_hash}
189 ).fetchone()
190 if existing:
191 return ConversationService.get(db, existing[0])
193 cid = str(uuid.uuid4())
194 db.execute(text(
195 "INSERT INTO conversations "
196 "(id, tenant_id, kind, title, created_by, member_hash, "
197 " is_locked, is_archived, created_at) "
198 "VALUES (:id, :tid, :kind, :title, :cb, :mh, 0, 0, "
199 " CURRENT_TIMESTAMP)"),
200 {'id': cid, 'tid': tenant_id, 'kind': kind,
201 'title': title, 'cb': created_by, 'mh': member_hash})
203 # Insert memberships. Owner = created_by, role='admin' for groups.
204 for mid in members:
205 role = ('admin' if (kind == 'group' and mid == created_by)
206 else 'member')
207 _ensure_member(db, cid, mid, tenant_id, role=role)
209 db.commit()
210 return ConversationService.get(db, cid)
212 # ── Read ─────────────────────────────────────────────────────
214 @staticmethod
215 def get(db, conv_id: str) -> Optional[dict]:
216 from sqlalchemy import text
217 row = db.execute(text(
218 "SELECT id, kind, title, created_by, last_message_at, "
219 " is_locked, is_archived, created_at "
220 "FROM conversations WHERE id = :id"),
221 {'id': conv_id}
222 ).fetchone()
223 if row is None:
224 return None
225 return {
226 'id': row[0], 'kind': row[1], 'title': row[2],
227 'created_by': row[3],
228 'last_message_at': str(row[4]) if row[4] else None,
229 'is_locked': bool(row[5]), 'is_archived': bool(row[6]),
230 'created_at': str(row[7]) if row[7] else None,
231 'members': _list_member_ids(db, row[0]),
232 }
234 @staticmethod
235 def list_for_user(db, user_id: str,
236 include_archived: bool = False,
237 limit: int = 50) -> List[dict]:
238 """Conversations the user is a member of, newest first."""
239 from sqlalchemy import text
240 # Join via memberships polymorphic table.
241 where = ("WHERE m.parent_kind = 'conversation' AND m.member_id = :uid")
242 if not include_archived:
243 where += " AND c.is_archived = 0"
244 rows = db.execute(text(
245 f"SELECT c.id, c.kind, c.title, c.created_by, c.last_message_at, "
246 f" c.is_locked, c.is_archived, c.created_at "
247 f"FROM conversations c "
248 f"JOIN memberships m ON m.parent_id = c.id "
249 f"{where} "
250 f"ORDER BY COALESCE(c.last_message_at, c.created_at) DESC "
251 f"LIMIT :lim"),
252 {'uid': user_id, 'lim': limit}
253 ).fetchall()
254 out = []
255 for row in rows:
256 out.append({
257 'id': row[0], 'kind': row[1], 'title': row[2],
258 'created_by': row[3],
259 'last_message_at': str(row[4]) if row[4] else None,
260 'is_locked': bool(row[5]), 'is_archived': bool(row[6]),
261 'created_at': str(row[7]) if row[7] else None,
262 'members': _list_member_ids(db, row[0]),
263 })
264 return out
266 # ── Member management (group only) ───────────────────────────
268 @staticmethod
269 def add_member(db, conv_id: str, requester_id: str, new_member_id: str,
270 tenant_id: Optional[str] = None) -> dict:
271 """Add a member to a group conversation. Caller must be admin."""
272 from sqlalchemy import text
273 conv = ConversationService.get(db, conv_id)
274 if conv is None:
275 raise ConversationError("conversation not found")
276 if conv['kind'] != 'group':
277 raise ConversationError("cannot add members to a DM")
278 # Caller must be admin (or owner).
279 role_row = db.execute(text(
280 "SELECT role FROM memberships "
281 "WHERE parent_kind='conversation' AND parent_id=:pid "
282 "AND member_id=:mid"),
283 {'pid': conv_id, 'mid': requester_id}
284 ).fetchone()
285 if role_row is None or role_row[0] not in ('admin', 'owner'):
286 raise ConversationError("only admins can add members")
287 _ensure_member(db, conv_id, new_member_id, tenant_id, role='member')
288 db.commit()
289 return ConversationService.get(db, conv_id)
291 @staticmethod
292 def remove_member(db, conv_id: str, requester_id: str,
293 target_id: str) -> dict:
294 """Remove a member from a group. Admin can remove anyone; member
295 can remove themselves (leave)."""
296 from sqlalchemy import text
297 conv = ConversationService.get(db, conv_id)
298 if conv is None:
299 raise ConversationError("conversation not found")
300 if conv['kind'] != 'group':
301 raise ConversationError("cannot remove members from a DM")
302 if requester_id != target_id:
303 role_row = db.execute(text(
304 "SELECT role FROM memberships "
305 "WHERE parent_kind='conversation' AND parent_id=:pid "
306 "AND member_id=:mid"),
307 {'pid': conv_id, 'mid': requester_id}
308 ).fetchone()
309 if role_row is None or role_row[0] not in ('admin', 'owner'):
310 raise ConversationError("only admins can remove others")
311 db.execute(text(
312 "DELETE FROM memberships "
313 "WHERE parent_kind='conversation' AND parent_id=:pid "
314 "AND member_id=:mid"),
315 {'pid': conv_id, 'mid': target_id})
316 db.commit()
317 return ConversationService.get(db, conv_id)
319 # ── Send / list / edit / delete messages ─────────────────────
321 @staticmethod
322 def send_message(db, conv_id: str, author_id: str, content: str,
323 thread_root_id: Optional[str] = None,
324 metadata: Optional[dict] = None,
325 tenant_id: Optional[str] = None) -> dict:
326 """Insert a Message row + run MentionService + bump
327 last_message_at + fire NotificationService for non-author members.
329 Phase 9.B: when the `e2e_dms` server flag is on AND the
330 conversation has `settings.e2e_enabled=True`, the plaintext
331 is encrypted via Double Ratchet into one envelope per active
332 recipient (e2e_dm_pipeline.encrypt_for_conversation), and
333 `messages.content` is replaced with '[encrypted]' so legacy
334 readers + admin tooling don't trip on null. Mention parsing
335 + notifications run on the PLAINTEXT (server-side, sender-
336 side only) — the NotificationService snippet is bounded to
337 140 chars, so no risk of leaking long content into the
338 offline-recipient inbox.
340 Returns the message dict the caller embeds in the API response.
341 Encrypted DM responses include `is_encrypted=True` and
342 `recipients=[ids]` so the sender knows who got an envelope.
343 """
344 from sqlalchemy import text
345 if not content or not content.strip():
346 raise ConversationError("empty message")
347 if len(content) > _MAX_MESSAGE_LEN:
348 raise ConversationError(
349 f"message too long (>{_MAX_MESSAGE_LEN} chars)")
350 if not _is_member(db, conv_id, author_id):
351 raise ConversationError("not a conversation member")
353 # Phase 9.B encryption decision. Pre-computed so we know
354 # whether to store '[encrypted]' or the plaintext below.
355 is_encrypted = False
356 try:
357 from . import e2e_dm_pipeline
358 is_encrypted = e2e_dm_pipeline.should_encrypt(db, conv_id)
359 except Exception as e:
360 logger.warning(
361 "ConversationService.send_message: e2e flag check "
362 "failed (falling back to plaintext): %s", e)
364 stored_content = '[encrypted]' if is_encrypted else content
366 mid = str(uuid.uuid4())
367 meta_json = json.dumps(metadata) if metadata else None
368 db.execute(text(
369 "INSERT INTO messages "
370 "(id, tenant_id, parent_kind, parent_id, thread_root_id, "
371 " author_id, agent_kind, content, depth, is_deleted, "
372 " metadata_json, created_at) "
373 "VALUES (:id, :tid, 'conversation', :pid, :tr, :aid, "
374 " 'human', :c, 0, 0, :meta, CURRENT_TIMESTAMP)"),
375 {'id': mid, 'tid': tenant_id, 'pid': conv_id,
376 'tr': thread_root_id, 'aid': author_id, 'c': stored_content,
377 'meta': meta_json})
378 db.execute(text(
379 "UPDATE conversations SET last_message_at = CURRENT_TIMESTAMP "
380 "WHERE id = :pid"),
381 {'pid': conv_id})
382 db.commit()
384 # Phase 9.B: write per-recipient envelopes AFTER the messages
385 # row commits (envelopes FK to messages.id). Failures here
386 # don't roll back the message — the caller's send_message
387 # response surfaces `recipients` so the sender can retry the
388 # encrypt path if needed. This matches the existing pattern
389 # where mention parsing failures don't abort the send.
390 delivered = []
391 if is_encrypted:
392 try:
393 from . import e2e_dm_pipeline
394 delivered = e2e_dm_pipeline.encrypt_for_conversation(
395 db, conv_id, author_id, mid, content,
396 tenant_id=tenant_id)
397 db.commit()
398 except Exception as e:
399 logger.warning(
400 "ConversationService.send_message: e2e encrypt "
401 "failed; envelopes incomplete (recipients may not "
402 "be able to decrypt): %s", e)
404 # Mention parsing — same path post/comment use, source_kind='message'
405 # so MentionService records rows AND dispatches @-mentioned agents.
406 mentions = []
407 try:
408 from .mention_service import MentionService
409 mentions = MentionService.parse_and_record(
410 db, source_kind='message', source_id=mid,
411 content=content, author_id=author_id,
412 tenant_id=tenant_id)
413 except Exception as e:
414 logger.warning("ConversationService.send_message: mention "
415 "parse failed: %s", e)
417 # Notify other members (in-app notification per plan B.5 + R.2).
418 # The notification snippet is bounded to 140 chars by
419 # NotificationService.create — for encrypted DMs we send a
420 # neutral placeholder so plaintext doesn't leak via the
421 # offline-recipient notification fan-out.
422 notify_snippet = ('[encrypted message]' if is_encrypted
423 else content)
424 ConversationService._notify_members(
425 db, conv_id, author_id, mid, notify_snippet)
427 return {
428 'id': mid, 'parent_kind': 'conversation', 'parent_id': conv_id,
429 'thread_root_id': thread_root_id,
430 'author_id': author_id, 'agent_kind': 'human',
431 'content': content, 'mentions': mentions,
432 'metadata': metadata,
433 'is_encrypted': is_encrypted,
434 'recipients': delivered,
435 }
437 @staticmethod
438 def list_messages(db, conv_id: str, requester_id: str,
439 before: Optional[str] = None,
440 limit: int = 50) -> List[dict]:
441 """Paginated message history. `before` is a created_at cursor
442 (ISO datetime string) — return messages strictly older than
443 that. Default order: newest first.
444 """
445 from sqlalchemy import text
446 if not _is_member(db, conv_id, requester_id):
447 raise ConversationError("not a conversation member")
448 params = {'pid': conv_id, 'lim': max(1, min(int(limit), 200))}
449 where = ("WHERE parent_kind='conversation' AND parent_id=:pid "
450 "AND is_deleted = 0")
451 if before:
452 where += " AND created_at < :before"
453 params['before'] = before
454 rows = db.execute(text(
455 f"SELECT id, thread_root_id, author_id, agent_kind, content, "
456 f" edited_at, metadata_json, created_at "
457 f"FROM messages {where} "
458 f"ORDER BY created_at DESC LIMIT :lim"),
459 params
460 ).fetchall()
462 # Phase 9.B: when this conversation is e2e-encrypted, the
463 # stored content is '[encrypted]' for every message. We
464 # decrypt the requester's per-recipient envelope into the
465 # response so each user sees their own plaintext. Other
466 # callers (admin tooling, sync workers) that fetch via the
467 # raw messages table still see the placeholder, preserving
468 # the property that the server CAN'T read encrypted DMs
469 # outside the requester's own message-list call.
470 try:
471 from . import e2e_dm_pipeline
472 do_decrypt = e2e_dm_pipeline.should_encrypt(db, conv_id)
473 except Exception:
474 do_decrypt = False
476 out = []
477 for row in rows:
478 mid, troot, aid, akind, content, edited, meta, created = row
479 if do_decrypt and content == '[encrypted]':
480 try:
481 from . import e2e_dm_pipeline
482 plaintext = e2e_dm_pipeline.decrypt_for_recipient(
483 db, conv_id, mid, requester_id)
484 if plaintext is not None:
485 content = plaintext
486 except Exception as e:
487 logger.warning(
488 "ConversationService.list_messages: decrypt "
489 "failed for message %s: %s (returning placeholder)",
490 mid, e)
491 out.append({
492 'id': mid, 'parent_id': conv_id,
493 'thread_root_id': troot,
494 'author_id': aid, 'agent_kind': akind,
495 'content': content,
496 'edited_at': str(edited) if edited else None,
497 'metadata': json.loads(meta) if meta else None,
498 'created_at': str(created) if created else None,
499 'is_encrypted': do_decrypt,
500 })
501 return out
503 @staticmethod
504 def edit_message(db, message_id: str, requester_id: str,
505 new_content: str) -> dict:
506 from sqlalchemy import text
507 if not new_content or not new_content.strip():
508 raise ConversationError("empty message")
509 row = db.execute(text(
510 "SELECT author_id, created_at, parent_id "
511 "FROM messages WHERE id = :id"),
512 {'id': message_id}
513 ).fetchone()
514 if row is None:
515 raise ConversationError("message not found")
516 author_id, created_at, conv_id = row
517 if author_id != requester_id:
518 raise ConversationError("only author can edit")
519 # 24h edit window. If the stored created_at is unparseable
520 # (corrupted column or schema drift), refuse the edit rather
521 # than silently allow forever — N4, reviewer-flagged. The
522 # safe default is "no edit" not "edit anytime".
523 if isinstance(created_at, str):
524 try:
525 created_dt = datetime.strptime(
526 created_at.split('.')[0].replace('T', ' '),
527 '%Y-%m-%d %H:%M:%S')
528 except Exception:
529 raise ConversationError(
530 "cannot determine edit window from stored timestamp")
531 else:
532 created_dt = created_at
533 if datetime.utcnow() - created_dt > _EDIT_WINDOW:
534 raise ConversationError("edit window has passed")
535 db.execute(text(
536 "UPDATE messages SET content = :c, edited_at = CURRENT_TIMESTAMP "
537 "WHERE id = :id"),
538 {'c': new_content, 'id': message_id})
539 db.commit()
541 # Re-run mention diff so newly-added @-mentions fire and
542 # removed ones get cleaned up.
543 try:
544 from .mention_service import MentionService
545 MentionService.parse_and_record(
546 db, source_kind='message', source_id=message_id,
547 content=new_content, author_id=requester_id)
548 except Exception as e:
549 logger.warning("ConversationService.edit_message: mention "
550 "diff failed: %s", e)
552 return {'id': message_id, 'content': new_content,
553 'edited_at': datetime.utcnow().isoformat()}
555 @staticmethod
556 def soft_delete_message(db, message_id: str, requester_id: str) -> dict:
557 """Soft-delete: keep row for audit, blank content. Author only.
559 Bumps `edited_at = CURRENT_TIMESTAMP` so the change becomes
560 visible to /sync via the COALESCE(edited_at, created_at)
561 cursor expression — without this, offline clients would never
562 see that the message was deleted (reviewer-flagged C3).
563 """
564 from sqlalchemy import text
565 row = db.execute(text(
566 "SELECT author_id FROM messages WHERE id = :id"),
567 {'id': message_id}
568 ).fetchone()
569 if row is None:
570 raise ConversationError("message not found")
571 if row[0] != requester_id:
572 raise ConversationError("only author can delete")
573 db.execute(text(
574 "UPDATE messages SET is_deleted = 1, content = '[deleted]', "
575 " edited_at = CURRENT_TIMESTAMP "
576 "WHERE id = :id"),
577 {'id': message_id})
578 db.commit()
579 # Sweep out stale Mention rows so the mentions index doesn't
580 # carry pointers into deleted content (reviewer N-NEW-2). We
581 # invoke the same diff path edit_message uses, with empty
582 # content — _wipe is the optimization for that case.
583 try:
584 from .mention_service import MentionService
585 MentionService.parse_and_record(
586 db, source_kind='message', source_id=message_id,
587 content='', author_id=requester_id)
588 except Exception as e:
589 logger.warning(
590 "soft_delete_message: mention sweep failed: %s", e)
591 return {'id': message_id, 'is_deleted': True}
593 # ── Typing + read-receipt (Phase 7c.7) ───────────────────────
595 @staticmethod
596 def emit_typing(db, conv_id: str, user_id: str,
597 tenant_id: Optional[str] = None) -> dict:
598 """Fire a WAMP typing event for the conversation.
600 Pure broadcast — never persisted (typing is ephemeral state with
601 a 5s TTL on the receive side). Refuses if the user isn't a
602 conversation member. Returns {'sent': True} on success so the
603 endpoint can confirm; receivers see it through the WAMP
604 subscription.
605 """
606 if not _is_member(db, conv_id, user_id):
607 raise ConversationError("not a conversation member")
608 # Delegate to the existing realtime publisher — same module
609 # that NotificationService uses, same MessageBus fan-out.
610 try:
611 from .realtime import publish_event
612 topic = f'tenant.{tenant_id or "_"}.conv.{conv_id}.typing'
613 publish_event(topic, {
614 'conv_id': conv_id, 'user_id': user_id,
615 'kind': 'typing'},
616 user_id=user_id)
617 except Exception as e:
618 logger.warning("ConversationService.emit_typing: publish "
619 "failed: %s", e)
620 return {'sent': True, 'conv_id': conv_id, 'user_id': user_id}
622 @staticmethod
623 def mark_read(db, conv_id: str, user_id: str,
624 message_id: Optional[str] = None,
625 tenant_id: Optional[str] = None) -> dict:
626 """Persist + broadcast a read-receipt for the user in the conv.
628 Stores last_read_message_id + last_read_at on the user's
629 memberships row. If message_id is omitted we use the
630 conversation's most recent message (mark-all-read).
632 Other members of the conversation get the receipt via a WAMP
633 broadcast on tenant.{tid}.conv.{id}.read.
635 Pass-1 M3 contract:
636 Returns a dict with stable keys:
637 - sent (bool): True iff a read-receipt was persisted +
638 broadcast. False is a NO-OP success (no error), used
639 when there's nothing to mark (empty conversation).
640 - conv_id (str): always echoed back.
641 - last_read_message_id (str | None): the message id we
642 marked, or None on no-op.
643 - reason (str, optional): present iff sent=False, names
644 the no-op cause for caller logging.
645 Errors (not-a-member, message-not-in-conv) RAISE
646 ConversationError; no-ops return the dict. Two distinct
647 shapes for two distinct concerns.
648 """
649 from sqlalchemy import text
650 if not _is_member(db, conv_id, user_id):
651 raise ConversationError("not a conversation member")
653 # Default: pick the latest message if message_id not given.
654 if not message_id:
655 row = db.execute(text(
656 "SELECT id FROM messages "
657 "WHERE parent_kind='conversation' AND parent_id=:cid "
658 "AND is_deleted = 0 "
659 "ORDER BY created_at DESC LIMIT 1"),
660 {'cid': conv_id}
661 ).fetchone()
662 if row is None:
663 # Empty conversation — nothing to mark. Return the
664 # canonical no-op shape (Pass-1 M3 fix: include
665 # conv_id and last_read_message_id keys so callers
666 # don't have to special-case the response).
667 return {
668 'sent': False,
669 'conv_id': conv_id,
670 'last_read_message_id': None,
671 'reason': 'empty conversation',
672 }
673 message_id = row[0]
675 # Validate the message belongs to this conversation (avoid
676 # cross-conv reference attacks).
677 check = db.execute(text(
678 "SELECT 1 FROM messages "
679 "WHERE id = :mid AND parent_kind='conversation' "
680 "AND parent_id = :cid LIMIT 1"),
681 {'mid': message_id, 'cid': conv_id}
682 ).fetchone()
683 if check is None:
684 raise ConversationError("message not in conversation")
686 db.execute(text(
687 "UPDATE memberships SET last_read_message_id = :mid, "
688 " last_read_at = CURRENT_TIMESTAMP "
689 "WHERE parent_kind='conversation' AND parent_id=:cid "
690 "AND member_id=:uid"),
691 {'mid': message_id, 'cid': conv_id, 'uid': user_id})
692 db.commit()
694 try:
695 from .realtime import publish_event
696 topic = f'tenant.{tenant_id or "_"}.conv.{conv_id}.read'
697 publish_event(topic, {
698 'conv_id': conv_id, 'user_id': user_id,
699 'last_read_message_id': message_id,
700 'kind': 'read'},
701 user_id=user_id)
702 except Exception as e:
703 logger.warning("ConversationService.mark_read: publish "
704 "failed: %s", e)
706 return {'sent': True, 'conv_id': conv_id,
707 'last_read_message_id': message_id}
709 # ── Internal helpers ─────────────────────────────────────────
711 @staticmethod
712 def _notify_members(db, conv_id: str, author_id: str,
713 message_id: str, content: str) -> None:
714 """Fire one notification per non-author member. Each notif
715 publishes through MessageBus (LOCAL → SSE → PEERLINK → CROSSBAR).
716 """
717 try:
718 from .services import NotificationService
719 except Exception:
720 return
721 for member_id in _list_member_ids(db, conv_id):
722 if member_id == author_id:
723 continue
724 try:
725 NotificationService.create(
726 db, user_id=member_id, type='message',
727 source_user_id=author_id,
728 target_type='conversation', target_id=conv_id,
729 message=(content or '')[:140])
730 except Exception as e:
731 logger.warning("ConversationService: notify failed: %s", e)