Coverage for integrations / social / conversation_service.py: 0.0%

243 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Conversations (DM/group) + Messages. 

3 

4Phase 7c.3. Plan reference: sunny-gliding-eich.md, Part C.2 + Part E.3. 

5 

6Naming distinctions (preserved exactly so we don't collide with existing 

7HARTOS surfaces): 

8 - `conversations` table (NEW, this module) — internal DM + group chat. 

9 - `conversation_entries` table (legacy, untouched) — HARTOS 31-channel 

10 external adapter (Telegram / WhatsApp / Discord / etc.). 

11 - `conversation` table (legacy, untouched) — single-user Q/A history. 

12 

13Two kinds of conversation: 

14 - 'dm': exactly 2 members, no title, idempotent dedup via member_hash. 

15 - 'group': 2+ members, has a title, supports add/remove members. 

16 

17Conversation membership lives in the polymorphic `memberships` table 

18(v41) with parent_kind='conversation'. One source of truth for community 

19+ conversation membership rosters. 

20 

21Messages flow: 

22 - send(): insert Message row, run MentionService.parse_and_record on 

23 the content (which fires agent dispatch + dual-notify), update 

24 conversations.last_message_at, fan out via NotificationService for 

25 offline members. 

26 - list(): paginated by `before` cursor (created_at-based). 

27 - edit(): caller's own message, within 24h, updates content + 

28 edited_at. 

29 - soft_delete(): caller's own; sets is_deleted=True, content='[deleted]'. 

30 

31Block check: a sender's message to a conversation that contains a user 

32who blocked them is allowed (group-message); the BLOCK semantics are 

33about not seeing/being notified, not about being silently hidden from 

34groups they're already in. (See plan K.2.) For DMs: blocking severs 

35the DM creation path — `create_dm` refuses if either party blocks the 

36other. 

37 

38Transport: NotificationService.create publishes via MessageBus 

39(LOCAL → SSE → PEERLINK → CROSSBAR) — same fan-out the rest of the 

40social stack uses. Typing + read-receipt are best-effort WAMP emits; 

41the DB never persists them (per plan E.3). 

42 

43Agent reply: 

44 When a message contains an @-mention to an agent, MentionService's 

45 parse_and_record path fires `agentic_router.dispatch_to_agent` with 

46 source_kind='message'. The dispatch worker's `_post_agent_reply` 

47 handles the 'message' kind (added in this session) and posts the 

48 agent's reply as a new Message row authored by the agent. 

49""" 

50 

51from __future__ import annotations 

52 

53import hashlib 

54import json 

55import logging 

56import uuid 

57from datetime import datetime, timedelta 

58from typing import List, Optional, Tuple 

59 

60logger = logging.getLogger('hevolve_social') 

61 

62_VALID_KINDS = ('dm', 'group') 

63_EDIT_WINDOW = timedelta(hours=24) 

64_MAX_MESSAGE_LEN = 8000 

65 

66 

67class ConversationError(ValueError): 

68 pass 

69 

70 

71def _member_hash(member_ids: List[str]) -> str: 

72 """SHA-256 of the sorted, comma-joined member IDs. 

73 

74 Used as a stable dedup key for DMs so re-creating a DM between A 

75 and B returns the existing row. Sorted so order doesn't matter. 

76 """ 

77 sorted_ids = sorted(set(str(x) for x in member_ids if x)) 

78 raw = ','.join(sorted_ids).encode('utf-8') 

79 return hashlib.sha256(raw).hexdigest() 

80 

81 

82def _is_blocked_either_way(db, x: str, y: str) -> bool: 

83 """Match the same helper used by InviteService / FriendService.""" 

84 try: 

85 from sqlalchemy import text 

86 result = db.execute(text( 

87 "SELECT 1 FROM blocks " 

88 "WHERE (blocker_id = :x AND blocked_id = :y) " 

89 "OR (blocker_id = :y AND blocked_id = :x) LIMIT 1"), 

90 {'x': x, 'y': y} 

91 ).fetchone() 

92 return result is not None 

93 except Exception: 

94 return False 

95 

96 

97def _ensure_member(db, conv_id: str, member_id: str, 

98 tenant_id: Optional[str], role: str = 'member') -> None: 

99 """Idempotent insert into the polymorphic memberships table.""" 

100 from sqlalchemy import text 

101 dialect = db.bind.dialect.name if db.bind is not None else 'sqlite' 

102 if dialect == 'sqlite': 

103 stmt = ("INSERT OR IGNORE INTO memberships " 

104 "(id, tenant_id, parent_kind, parent_id, member_id, " 

105 " agent_kind, role, joined_at) " 

106 "VALUES (:id, :tid, 'conversation', :pid, :mid, 'human', " 

107 " :role, CURRENT_TIMESTAMP)") 

108 else: 

109 stmt = ("INSERT INTO memberships " 

110 "(id, tenant_id, parent_kind, parent_id, member_id, " 

111 " agent_kind, role, joined_at) " 

112 "VALUES (:id, :tid, 'conversation', :pid, :mid, 'human', " 

113 " :role, CURRENT_TIMESTAMP) " 

114 "ON CONFLICT DO NOTHING") 

115 db.execute(text(stmt), { 

116 'id': str(uuid.uuid4()), 'tid': tenant_id, 

117 'pid': conv_id, 'mid': member_id, 'role': role}) 

118 

119 

120def _list_member_ids(db, conv_id: str) -> List[str]: 

121 from sqlalchemy import text 

122 rows = db.execute(text( 

123 "SELECT member_id FROM memberships " 

124 "WHERE parent_kind = 'conversation' AND parent_id = :pid " 

125 "ORDER BY joined_at"), 

126 {'pid': conv_id} 

127 ).fetchall() 

128 return [r[0] for r in rows] 

129 

130 

131def _is_member(db, conv_id: str, user_id: str) -> bool: 

132 from sqlalchemy import text 

133 result = db.execute(text( 

134 "SELECT 1 FROM memberships " 

135 "WHERE parent_kind='conversation' AND parent_id=:pid " 

136 "AND member_id=:mid LIMIT 1"), 

137 {'pid': conv_id, 'mid': user_id} 

138 ).fetchone() 

139 return result is not None 

140 

141 

142class ConversationService: 

143 """All public methods are static. Each takes an explicit `db` so 

144 they can be called from any context without coupling to Flask `g`. 

145 """ 

146 

147 # ── Create ─────────────────────────────────────────────────── 

148 

149 @staticmethod 

150 def create(db, kind: str, member_ids: List[str], created_by: str, 

151 title: Optional[str] = None, 

152 tenant_id: Optional[str] = None) -> dict: 

153 """Create a DM (idempotent dedup) or group conversation. 

154 

155 DM rules: 

156 - exactly 2 members (caller's id must be in member_ids) 

157 - same A↔B pair returns the existing row 

158 - refuses if either party blocks the other 

159 

160 Group rules: 

161 - 2+ members (caller is auto-added if missing) 

162 - title can be empty; defaults to comma-joined display names 

163 """ 

164 from sqlalchemy import text 

165 if kind not in _VALID_KINDS: 

166 raise ConversationError(f"invalid kind: {kind}") 

167 

168 # Normalize member set, ensure caller is in it. 

169 member_set = set(str(m) for m in (member_ids or []) if m) 

170 member_set.add(str(created_by)) 

171 members = sorted(member_set) 

172 if len(members) < 2: 

173 raise ConversationError("conversations need >= 2 members") 

174 if kind == 'dm' and len(members) != 2: 

175 raise ConversationError("dm requires exactly 2 members") 

176 if kind == 'dm': 

177 other = next(m for m in members if m != created_by) 

178 if _is_blocked_either_way(db, created_by, other): 

179 raise ConversationError("cannot start DM with blocked user") 

180 

181 member_hash = _member_hash(members) if kind == 'dm' else None 

182 

183 # DM dedup: SELECT existing row first. 

184 if kind == 'dm': 

185 existing = db.execute(text( 

186 "SELECT id FROM conversations " 

187 "WHERE kind = 'dm' AND member_hash = :h"), 

188 {'h': member_hash} 

189 ).fetchone() 

190 if existing: 

191 return ConversationService.get(db, existing[0]) 

192 

193 cid = str(uuid.uuid4()) 

194 db.execute(text( 

195 "INSERT INTO conversations " 

196 "(id, tenant_id, kind, title, created_by, member_hash, " 

197 " is_locked, is_archived, created_at) " 

198 "VALUES (:id, :tid, :kind, :title, :cb, :mh, 0, 0, " 

199 " CURRENT_TIMESTAMP)"), 

200 {'id': cid, 'tid': tenant_id, 'kind': kind, 

201 'title': title, 'cb': created_by, 'mh': member_hash}) 

202 

203 # Insert memberships. Owner = created_by, role='admin' for groups. 

204 for mid in members: 

205 role = ('admin' if (kind == 'group' and mid == created_by) 

206 else 'member') 

207 _ensure_member(db, cid, mid, tenant_id, role=role) 

208 

209 db.commit() 

210 return ConversationService.get(db, cid) 

211 

212 # ── Read ───────────────────────────────────────────────────── 

213 

214 @staticmethod 

215 def get(db, conv_id: str) -> Optional[dict]: 

216 from sqlalchemy import text 

217 row = db.execute(text( 

218 "SELECT id, kind, title, created_by, last_message_at, " 

219 " is_locked, is_archived, created_at " 

220 "FROM conversations WHERE id = :id"), 

221 {'id': conv_id} 

222 ).fetchone() 

223 if row is None: 

224 return None 

225 return { 

226 'id': row[0], 'kind': row[1], 'title': row[2], 

227 'created_by': row[3], 

228 'last_message_at': str(row[4]) if row[4] else None, 

229 'is_locked': bool(row[5]), 'is_archived': bool(row[6]), 

230 'created_at': str(row[7]) if row[7] else None, 

231 'members': _list_member_ids(db, row[0]), 

232 } 

233 

234 @staticmethod 

235 def list_for_user(db, user_id: str, 

236 include_archived: bool = False, 

237 limit: int = 50) -> List[dict]: 

238 """Conversations the user is a member of, newest first.""" 

239 from sqlalchemy import text 

240 # Join via memberships polymorphic table. 

241 where = ("WHERE m.parent_kind = 'conversation' AND m.member_id = :uid") 

242 if not include_archived: 

243 where += " AND c.is_archived = 0" 

244 rows = db.execute(text( 

245 f"SELECT c.id, c.kind, c.title, c.created_by, c.last_message_at, " 

246 f" c.is_locked, c.is_archived, c.created_at " 

247 f"FROM conversations c " 

248 f"JOIN memberships m ON m.parent_id = c.id " 

249 f"{where} " 

250 f"ORDER BY COALESCE(c.last_message_at, c.created_at) DESC " 

251 f"LIMIT :lim"), 

252 {'uid': user_id, 'lim': limit} 

253 ).fetchall() 

254 out = [] 

255 for row in rows: 

256 out.append({ 

257 'id': row[0], 'kind': row[1], 'title': row[2], 

258 'created_by': row[3], 

259 'last_message_at': str(row[4]) if row[4] else None, 

260 'is_locked': bool(row[5]), 'is_archived': bool(row[6]), 

261 'created_at': str(row[7]) if row[7] else None, 

262 'members': _list_member_ids(db, row[0]), 

263 }) 

264 return out 

265 

266 # ── Member management (group only) ─────────────────────────── 

267 

268 @staticmethod 

269 def add_member(db, conv_id: str, requester_id: str, new_member_id: str, 

270 tenant_id: Optional[str] = None) -> dict: 

271 """Add a member to a group conversation. Caller must be admin.""" 

272 from sqlalchemy import text 

273 conv = ConversationService.get(db, conv_id) 

274 if conv is None: 

275 raise ConversationError("conversation not found") 

276 if conv['kind'] != 'group': 

277 raise ConversationError("cannot add members to a DM") 

278 # Caller must be admin (or owner). 

279 role_row = db.execute(text( 

280 "SELECT role FROM memberships " 

281 "WHERE parent_kind='conversation' AND parent_id=:pid " 

282 "AND member_id=:mid"), 

283 {'pid': conv_id, 'mid': requester_id} 

284 ).fetchone() 

285 if role_row is None or role_row[0] not in ('admin', 'owner'): 

286 raise ConversationError("only admins can add members") 

287 _ensure_member(db, conv_id, new_member_id, tenant_id, role='member') 

288 db.commit() 

289 return ConversationService.get(db, conv_id) 

290 

291 @staticmethod 

292 def remove_member(db, conv_id: str, requester_id: str, 

293 target_id: str) -> dict: 

294 """Remove a member from a group. Admin can remove anyone; member 

295 can remove themselves (leave).""" 

296 from sqlalchemy import text 

297 conv = ConversationService.get(db, conv_id) 

298 if conv is None: 

299 raise ConversationError("conversation not found") 

300 if conv['kind'] != 'group': 

301 raise ConversationError("cannot remove members from a DM") 

302 if requester_id != target_id: 

303 role_row = db.execute(text( 

304 "SELECT role FROM memberships " 

305 "WHERE parent_kind='conversation' AND parent_id=:pid " 

306 "AND member_id=:mid"), 

307 {'pid': conv_id, 'mid': requester_id} 

308 ).fetchone() 

309 if role_row is None or role_row[0] not in ('admin', 'owner'): 

310 raise ConversationError("only admins can remove others") 

311 db.execute(text( 

312 "DELETE FROM memberships " 

313 "WHERE parent_kind='conversation' AND parent_id=:pid " 

314 "AND member_id=:mid"), 

315 {'pid': conv_id, 'mid': target_id}) 

316 db.commit() 

317 return ConversationService.get(db, conv_id) 

318 

319 # ── Send / list / edit / delete messages ───────────────────── 

320 

321 @staticmethod 

322 def send_message(db, conv_id: str, author_id: str, content: str, 

323 thread_root_id: Optional[str] = None, 

324 metadata: Optional[dict] = None, 

325 tenant_id: Optional[str] = None) -> dict: 

326 """Insert a Message row + run MentionService + bump 

327 last_message_at + fire NotificationService for non-author members. 

328 

329 Phase 9.B: when the `e2e_dms` server flag is on AND the 

330 conversation has `settings.e2e_enabled=True`, the plaintext 

331 is encrypted via Double Ratchet into one envelope per active 

332 recipient (e2e_dm_pipeline.encrypt_for_conversation), and 

333 `messages.content` is replaced with '[encrypted]' so legacy 

334 readers + admin tooling don't trip on null. Mention parsing 

335 + notifications run on the PLAINTEXT (server-side, sender- 

336 side only) — the NotificationService snippet is bounded to 

337 140 chars, so no risk of leaking long content into the 

338 offline-recipient inbox. 

339 

340 Returns the message dict the caller embeds in the API response. 

341 Encrypted DM responses include `is_encrypted=True` and 

342 `recipients=[ids]` so the sender knows who got an envelope. 

343 """ 

344 from sqlalchemy import text 

345 if not content or not content.strip(): 

346 raise ConversationError("empty message") 

347 if len(content) > _MAX_MESSAGE_LEN: 

348 raise ConversationError( 

349 f"message too long (>{_MAX_MESSAGE_LEN} chars)") 

350 if not _is_member(db, conv_id, author_id): 

351 raise ConversationError("not a conversation member") 

352 

353 # Phase 9.B encryption decision. Pre-computed so we know 

354 # whether to store '[encrypted]' or the plaintext below. 

355 is_encrypted = False 

356 try: 

357 from . import e2e_dm_pipeline 

358 is_encrypted = e2e_dm_pipeline.should_encrypt(db, conv_id) 

359 except Exception as e: 

360 logger.warning( 

361 "ConversationService.send_message: e2e flag check " 

362 "failed (falling back to plaintext): %s", e) 

363 

364 stored_content = '[encrypted]' if is_encrypted else content 

365 

366 mid = str(uuid.uuid4()) 

367 meta_json = json.dumps(metadata) if metadata else None 

368 db.execute(text( 

369 "INSERT INTO messages " 

370 "(id, tenant_id, parent_kind, parent_id, thread_root_id, " 

371 " author_id, agent_kind, content, depth, is_deleted, " 

372 " metadata_json, created_at) " 

373 "VALUES (:id, :tid, 'conversation', :pid, :tr, :aid, " 

374 " 'human', :c, 0, 0, :meta, CURRENT_TIMESTAMP)"), 

375 {'id': mid, 'tid': tenant_id, 'pid': conv_id, 

376 'tr': thread_root_id, 'aid': author_id, 'c': stored_content, 

377 'meta': meta_json}) 

378 db.execute(text( 

379 "UPDATE conversations SET last_message_at = CURRENT_TIMESTAMP " 

380 "WHERE id = :pid"), 

381 {'pid': conv_id}) 

382 db.commit() 

383 

384 # Phase 9.B: write per-recipient envelopes AFTER the messages 

385 # row commits (envelopes FK to messages.id). Failures here 

386 # don't roll back the message — the caller's send_message 

387 # response surfaces `recipients` so the sender can retry the 

388 # encrypt path if needed. This matches the existing pattern 

389 # where mention parsing failures don't abort the send. 

390 delivered = [] 

391 if is_encrypted: 

392 try: 

393 from . import e2e_dm_pipeline 

394 delivered = e2e_dm_pipeline.encrypt_for_conversation( 

395 db, conv_id, author_id, mid, content, 

396 tenant_id=tenant_id) 

397 db.commit() 

398 except Exception as e: 

399 logger.warning( 

400 "ConversationService.send_message: e2e encrypt " 

401 "failed; envelopes incomplete (recipients may not " 

402 "be able to decrypt): %s", e) 

403 

404 # Mention parsing — same path post/comment use, source_kind='message' 

405 # so MentionService records rows AND dispatches @-mentioned agents. 

406 mentions = [] 

407 try: 

408 from .mention_service import MentionService 

409 mentions = MentionService.parse_and_record( 

410 db, source_kind='message', source_id=mid, 

411 content=content, author_id=author_id, 

412 tenant_id=tenant_id) 

413 except Exception as e: 

414 logger.warning("ConversationService.send_message: mention " 

415 "parse failed: %s", e) 

416 

417 # Notify other members (in-app notification per plan B.5 + R.2). 

418 # The notification snippet is bounded to 140 chars by 

419 # NotificationService.create — for encrypted DMs we send a 

420 # neutral placeholder so plaintext doesn't leak via the 

421 # offline-recipient notification fan-out. 

422 notify_snippet = ('[encrypted message]' if is_encrypted 

423 else content) 

424 ConversationService._notify_members( 

425 db, conv_id, author_id, mid, notify_snippet) 

426 

427 return { 

428 'id': mid, 'parent_kind': 'conversation', 'parent_id': conv_id, 

429 'thread_root_id': thread_root_id, 

430 'author_id': author_id, 'agent_kind': 'human', 

431 'content': content, 'mentions': mentions, 

432 'metadata': metadata, 

433 'is_encrypted': is_encrypted, 

434 'recipients': delivered, 

435 } 

436 

437 @staticmethod 

438 def list_messages(db, conv_id: str, requester_id: str, 

439 before: Optional[str] = None, 

440 limit: int = 50) -> List[dict]: 

441 """Paginated message history. `before` is a created_at cursor 

442 (ISO datetime string) — return messages strictly older than 

443 that. Default order: newest first. 

444 """ 

445 from sqlalchemy import text 

446 if not _is_member(db, conv_id, requester_id): 

447 raise ConversationError("not a conversation member") 

448 params = {'pid': conv_id, 'lim': max(1, min(int(limit), 200))} 

449 where = ("WHERE parent_kind='conversation' AND parent_id=:pid " 

450 "AND is_deleted = 0") 

451 if before: 

452 where += " AND created_at < :before" 

453 params['before'] = before 

454 rows = db.execute(text( 

455 f"SELECT id, thread_root_id, author_id, agent_kind, content, " 

456 f" edited_at, metadata_json, created_at " 

457 f"FROM messages {where} " 

458 f"ORDER BY created_at DESC LIMIT :lim"), 

459 params 

460 ).fetchall() 

461 

462 # Phase 9.B: when this conversation is e2e-encrypted, the 

463 # stored content is '[encrypted]' for every message. We 

464 # decrypt the requester's per-recipient envelope into the 

465 # response so each user sees their own plaintext. Other 

466 # callers (admin tooling, sync workers) that fetch via the 

467 # raw messages table still see the placeholder, preserving 

468 # the property that the server CAN'T read encrypted DMs 

469 # outside the requester's own message-list call. 

470 try: 

471 from . import e2e_dm_pipeline 

472 do_decrypt = e2e_dm_pipeline.should_encrypt(db, conv_id) 

473 except Exception: 

474 do_decrypt = False 

475 

476 out = [] 

477 for row in rows: 

478 mid, troot, aid, akind, content, edited, meta, created = row 

479 if do_decrypt and content == '[encrypted]': 

480 try: 

481 from . import e2e_dm_pipeline 

482 plaintext = e2e_dm_pipeline.decrypt_for_recipient( 

483 db, conv_id, mid, requester_id) 

484 if plaintext is not None: 

485 content = plaintext 

486 except Exception as e: 

487 logger.warning( 

488 "ConversationService.list_messages: decrypt " 

489 "failed for message %s: %s (returning placeholder)", 

490 mid, e) 

491 out.append({ 

492 'id': mid, 'parent_id': conv_id, 

493 'thread_root_id': troot, 

494 'author_id': aid, 'agent_kind': akind, 

495 'content': content, 

496 'edited_at': str(edited) if edited else None, 

497 'metadata': json.loads(meta) if meta else None, 

498 'created_at': str(created) if created else None, 

499 'is_encrypted': do_decrypt, 

500 }) 

501 return out 

502 

503 @staticmethod 

504 def edit_message(db, message_id: str, requester_id: str, 

505 new_content: str) -> dict: 

506 from sqlalchemy import text 

507 if not new_content or not new_content.strip(): 

508 raise ConversationError("empty message") 

509 row = db.execute(text( 

510 "SELECT author_id, created_at, parent_id " 

511 "FROM messages WHERE id = :id"), 

512 {'id': message_id} 

513 ).fetchone() 

514 if row is None: 

515 raise ConversationError("message not found") 

516 author_id, created_at, conv_id = row 

517 if author_id != requester_id: 

518 raise ConversationError("only author can edit") 

519 # 24h edit window. If the stored created_at is unparseable 

520 # (corrupted column or schema drift), refuse the edit rather 

521 # than silently allow forever — N4, reviewer-flagged. The 

522 # safe default is "no edit" not "edit anytime". 

523 if isinstance(created_at, str): 

524 try: 

525 created_dt = datetime.strptime( 

526 created_at.split('.')[0].replace('T', ' '), 

527 '%Y-%m-%d %H:%M:%S') 

528 except Exception: 

529 raise ConversationError( 

530 "cannot determine edit window from stored timestamp") 

531 else: 

532 created_dt = created_at 

533 if datetime.utcnow() - created_dt > _EDIT_WINDOW: 

534 raise ConversationError("edit window has passed") 

535 db.execute(text( 

536 "UPDATE messages SET content = :c, edited_at = CURRENT_TIMESTAMP " 

537 "WHERE id = :id"), 

538 {'c': new_content, 'id': message_id}) 

539 db.commit() 

540 

541 # Re-run mention diff so newly-added @-mentions fire and 

542 # removed ones get cleaned up. 

543 try: 

544 from .mention_service import MentionService 

545 MentionService.parse_and_record( 

546 db, source_kind='message', source_id=message_id, 

547 content=new_content, author_id=requester_id) 

548 except Exception as e: 

549 logger.warning("ConversationService.edit_message: mention " 

550 "diff failed: %s", e) 

551 

552 return {'id': message_id, 'content': new_content, 

553 'edited_at': datetime.utcnow().isoformat()} 

554 

555 @staticmethod 

556 def soft_delete_message(db, message_id: str, requester_id: str) -> dict: 

557 """Soft-delete: keep row for audit, blank content. Author only. 

558 

559 Bumps `edited_at = CURRENT_TIMESTAMP` so the change becomes 

560 visible to /sync via the COALESCE(edited_at, created_at) 

561 cursor expression — without this, offline clients would never 

562 see that the message was deleted (reviewer-flagged C3). 

563 """ 

564 from sqlalchemy import text 

565 row = db.execute(text( 

566 "SELECT author_id FROM messages WHERE id = :id"), 

567 {'id': message_id} 

568 ).fetchone() 

569 if row is None: 

570 raise ConversationError("message not found") 

571 if row[0] != requester_id: 

572 raise ConversationError("only author can delete") 

573 db.execute(text( 

574 "UPDATE messages SET is_deleted = 1, content = '[deleted]', " 

575 " edited_at = CURRENT_TIMESTAMP " 

576 "WHERE id = :id"), 

577 {'id': message_id}) 

578 db.commit() 

579 # Sweep out stale Mention rows so the mentions index doesn't 

580 # carry pointers into deleted content (reviewer N-NEW-2). We 

581 # invoke the same diff path edit_message uses, with empty 

582 # content — _wipe is the optimization for that case. 

583 try: 

584 from .mention_service import MentionService 

585 MentionService.parse_and_record( 

586 db, source_kind='message', source_id=message_id, 

587 content='', author_id=requester_id) 

588 except Exception as e: 

589 logger.warning( 

590 "soft_delete_message: mention sweep failed: %s", e) 

591 return {'id': message_id, 'is_deleted': True} 

592 

593 # ── Typing + read-receipt (Phase 7c.7) ─────────────────────── 

594 

595 @staticmethod 

596 def emit_typing(db, conv_id: str, user_id: str, 

597 tenant_id: Optional[str] = None) -> dict: 

598 """Fire a WAMP typing event for the conversation. 

599 

600 Pure broadcast — never persisted (typing is ephemeral state with 

601 a 5s TTL on the receive side). Refuses if the user isn't a 

602 conversation member. Returns {'sent': True} on success so the 

603 endpoint can confirm; receivers see it through the WAMP 

604 subscription. 

605 """ 

606 if not _is_member(db, conv_id, user_id): 

607 raise ConversationError("not a conversation member") 

608 # Delegate to the existing realtime publisher — same module 

609 # that NotificationService uses, same MessageBus fan-out. 

610 try: 

611 from .realtime import publish_event 

612 topic = f'tenant.{tenant_id or "_"}.conv.{conv_id}.typing' 

613 publish_event(topic, { 

614 'conv_id': conv_id, 'user_id': user_id, 

615 'kind': 'typing'}, 

616 user_id=user_id) 

617 except Exception as e: 

618 logger.warning("ConversationService.emit_typing: publish " 

619 "failed: %s", e) 

620 return {'sent': True, 'conv_id': conv_id, 'user_id': user_id} 

621 

622 @staticmethod 

623 def mark_read(db, conv_id: str, user_id: str, 

624 message_id: Optional[str] = None, 

625 tenant_id: Optional[str] = None) -> dict: 

626 """Persist + broadcast a read-receipt for the user in the conv. 

627 

628 Stores last_read_message_id + last_read_at on the user's 

629 memberships row. If message_id is omitted we use the 

630 conversation's most recent message (mark-all-read). 

631 

632 Other members of the conversation get the receipt via a WAMP 

633 broadcast on tenant.{tid}.conv.{id}.read. 

634 

635 Pass-1 M3 contract: 

636 Returns a dict with stable keys: 

637 - sent (bool): True iff a read-receipt was persisted + 

638 broadcast. False is a NO-OP success (no error), used 

639 when there's nothing to mark (empty conversation). 

640 - conv_id (str): always echoed back. 

641 - last_read_message_id (str | None): the message id we 

642 marked, or None on no-op. 

643 - reason (str, optional): present iff sent=False, names 

644 the no-op cause for caller logging. 

645 Errors (not-a-member, message-not-in-conv) RAISE 

646 ConversationError; no-ops return the dict. Two distinct 

647 shapes for two distinct concerns. 

648 """ 

649 from sqlalchemy import text 

650 if not _is_member(db, conv_id, user_id): 

651 raise ConversationError("not a conversation member") 

652 

653 # Default: pick the latest message if message_id not given. 

654 if not message_id: 

655 row = db.execute(text( 

656 "SELECT id FROM messages " 

657 "WHERE parent_kind='conversation' AND parent_id=:cid " 

658 "AND is_deleted = 0 " 

659 "ORDER BY created_at DESC LIMIT 1"), 

660 {'cid': conv_id} 

661 ).fetchone() 

662 if row is None: 

663 # Empty conversation — nothing to mark. Return the 

664 # canonical no-op shape (Pass-1 M3 fix: include 

665 # conv_id and last_read_message_id keys so callers 

666 # don't have to special-case the response). 

667 return { 

668 'sent': False, 

669 'conv_id': conv_id, 

670 'last_read_message_id': None, 

671 'reason': 'empty conversation', 

672 } 

673 message_id = row[0] 

674 

675 # Validate the message belongs to this conversation (avoid 

676 # cross-conv reference attacks). 

677 check = db.execute(text( 

678 "SELECT 1 FROM messages " 

679 "WHERE id = :mid AND parent_kind='conversation' " 

680 "AND parent_id = :cid LIMIT 1"), 

681 {'mid': message_id, 'cid': conv_id} 

682 ).fetchone() 

683 if check is None: 

684 raise ConversationError("message not in conversation") 

685 

686 db.execute(text( 

687 "UPDATE memberships SET last_read_message_id = :mid, " 

688 " last_read_at = CURRENT_TIMESTAMP " 

689 "WHERE parent_kind='conversation' AND parent_id=:cid " 

690 "AND member_id=:uid"), 

691 {'mid': message_id, 'cid': conv_id, 'uid': user_id}) 

692 db.commit() 

693 

694 try: 

695 from .realtime import publish_event 

696 topic = f'tenant.{tenant_id or "_"}.conv.{conv_id}.read' 

697 publish_event(topic, { 

698 'conv_id': conv_id, 'user_id': user_id, 

699 'last_read_message_id': message_id, 

700 'kind': 'read'}, 

701 user_id=user_id) 

702 except Exception as e: 

703 logger.warning("ConversationService.mark_read: publish " 

704 "failed: %s", e) 

705 

706 return {'sent': True, 'conv_id': conv_id, 

707 'last_read_message_id': message_id} 

708 

709 # ── Internal helpers ───────────────────────────────────────── 

710 

711 @staticmethod 

712 def _notify_members(db, conv_id: str, author_id: str, 

713 message_id: str, content: str) -> None: 

714 """Fire one notification per non-author member. Each notif 

715 publishes through MessageBus (LOCAL → SSE → PEERLINK → CROSSBAR). 

716 """ 

717 try: 

718 from .services import NotificationService 

719 except Exception: 

720 return 

721 for member_id in _list_member_ids(db, conv_id): 

722 if member_id == author_id: 

723 continue 

724 try: 

725 NotificationService.create( 

726 db, user_id=member_id, type='message', 

727 source_user_id=author_id, 

728 target_type='conversation', target_id=conv_id, 

729 message=(content or '')[:140]) 

730 except Exception as e: 

731 logger.warning("ConversationService: notify failed: %s", e)