Coverage for integrations / social / api_conversations.py: 26.3%
167 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Conversations API blueprint.
4Phase 7c.3. Plan reference: sunny-gliding-eich.md, Part E.3.
6Mounted at /api/social/conversations. Distinct from
7/api/social/channels (which is the legacy 31-channel external adapter
8— Telegram / WhatsApp / Discord etc., served by api_channels.py).
9The two blueprints coexist; their URL prefixes never overlap.
11Endpoints (all flag-gated by `conversations`; mutating endpoints return
12503 when the flag is off, read endpoints return []):
14 GET / list current user's conversations
15 POST / create (kind, member_ids, title?)
16 GET /<id> detail + (no messages — caller
17 fetches messages separately)
18 GET /<id>/messages paginated message history
19 POST /<id>/messages send a message
20 PATCH /<id>/messages/<msg_id> edit own (≤24 h)
21 DELETE /<id>/messages/<msg_id> soft-delete own
22 POST /<id>/members add (group only, admin only)
23 DELETE /<id>/members/<user_id> remove (admin removes others, self leaves)
25The blueprint is registered in social __init__.py / hartos_bootstrap
26alongside the existing api.social_bp.
27"""
29from __future__ import annotations
31import logging
32from flask import Blueprint, request, jsonify, g
34from .auth import require_auth
36logger = logging.getLogger('hevolve_social')
38conversations_bp = Blueprint(
39 'conversations', __name__, url_prefix='/api/social/conversations')
42def _ok(data=None, status=200):
43 r = {'success': True}
44 if data is not None:
45 r['data'] = data
46 return jsonify(r), status
49def _err(msg, status=400):
50 return jsonify({'success': False, 'error': msg}), status
53def _get_json():
54 return request.get_json(force=True, silent=True) or {}
57def _flag_on() -> bool:
58 flags = getattr(g, 'feature_flags', {}) or {}
59 return bool(flags.get('conversations', False))
62# ── List + create ────────────────────────────────────────────────
64@conversations_bp.route('', methods=['GET'])
65@require_auth
66def list_conversations():
67 if not _flag_on():
68 return _ok([])
69 include_archived = (request.args.get('include_archived',
70 'false').lower() == 'true')
71 limit = max(1, min(int(request.args.get('limit', 50)), 200))
72 from .conversation_service import ConversationService
73 return _ok(ConversationService.list_for_user(
74 g.db, g.user.id, include_archived=include_archived, limit=limit))
77@conversations_bp.route('', methods=['POST'])
78@require_auth
79def create_conversation():
80 if not _flag_on():
81 return _err("conversations feature flag is off", 503)
82 data = _get_json()
83 kind = data.get('kind')
84 member_ids = data.get('member_ids') or []
85 title = data.get('title')
86 if not kind:
87 return _err("kind required ('dm' | 'group')")
88 if not isinstance(member_ids, list):
89 return _err("member_ids must be a list")
90 try:
91 from .conversation_service import (
92 ConversationService, ConversationError)
93 result = ConversationService.create(
94 g.db, kind=kind, member_ids=member_ids,
95 created_by=g.user.id, title=title,
96 tenant_id=getattr(g, 'tenant_id', None))
97 return _ok(result, status=201)
98 except ConversationError as e:
99 return _err(str(e), 400)
102@conversations_bp.route('/<conv_id>', methods=['GET'])
103@require_auth
104def get_conversation(conv_id):
105 if not _flag_on():
106 return _err("conversations feature flag is off", 503)
107 from .conversation_service import ConversationService, _is_member
108 if not _is_member(g.db, conv_id, g.user.id):
109 return _err("not found", 404) # 404 (not 403) to avoid leak
110 result = ConversationService.get(g.db, conv_id)
111 if result is None:
112 return _err("not found", 404)
113 return _ok(result)
116# ── Messages ─────────────────────────────────────────────────────
118@conversations_bp.route('/<conv_id>/messages', methods=['GET'])
119@require_auth
120def list_messages(conv_id):
121 if not _flag_on():
122 return _ok([])
123 before = request.args.get('before')
124 limit = max(1, min(int(request.args.get('limit', 50)), 200))
125 try:
126 from .conversation_service import (
127 ConversationService, ConversationError)
128 return _ok(ConversationService.list_messages(
129 g.db, conv_id, requester_id=g.user.id,
130 before=before, limit=limit))
131 except ConversationError as e:
132 return _err(str(e), 403)
135def _classify_message_if_enabled(message_id: str, content: str) -> None:
136 """Phase 7e — run ContentClassifier on a freshly sent / edited
137 conversation message when the moderation_v2 flag is on.
139 Same shape as create_post / create_comment in api.py: best-effort,
140 decision row lands in the audit trail, mod queue can review. No
141 visibility flip today — the messages table doesn't have an
142 is_quarantined column, so soft-quarantine on a DM is purely an
143 auditable signal (the mod queue is the human review surface).
145 Encrypted DMs (e2e_enabled conversations) classify the plaintext
146 `content` from the request body — the same plaintext that
147 MentionService already parses for @-mentions on the same path.
148 The "encrypted at rest" property holds (server doesn't store
149 plaintext); this is consistent with the existing send-path that
150 sees plaintext during the request lifetime.
152 Guards:
153 - moderation_v2 flag off → no-op
154 - empty / missing message_id → no-op (defensive against weird
155 ConversationService.send_message return shapes)
156 - any classifier error → logged + swallowed (never breaks the
157 send / edit response shape)
158 """
159 if not g.feature_flags.get('moderation_v2', False):
160 return
161 if not message_id:
162 return
163 try:
164 from .content_classifier import ContentClassifier
165 ContentClassifier.classify_and_persist(
166 g.db, source_kind='message', source_id=message_id,
167 content=content,
168 tenant_id=getattr(g, 'tenant_id', None))
169 except Exception as e:
170 logger.warning(
171 "classify_message: classify_and_persist failed for "
172 "message=%s: %s", message_id, e)
175@conversations_bp.route('/<conv_id>/messages', methods=['POST'])
176@require_auth
177def send_message(conv_id):
178 if not _flag_on():
179 return _err("conversations feature flag is off", 503)
180 data = _get_json()
181 content = data.get('content')
182 thread_root_id = data.get('thread_root_id')
183 metadata = data.get('metadata')
184 if not content:
185 return _err("content required")
186 try:
187 from .conversation_service import (
188 ConversationService, ConversationError)
189 result = ConversationService.send_message(
190 g.db, conv_id=conv_id, author_id=g.user.id,
191 content=content, thread_root_id=thread_root_id,
192 metadata=metadata,
193 tenant_id=getattr(g, 'tenant_id', None))
194 # Phase 7e moderation — same gate + classifier wiring used by
195 # create_post / create_comment in api.py. Source_kind='message'
196 # so decisions land alongside post / comment rows in the mod
197 # queue (single canonical content_moderation_decisions table).
198 _classify_message_if_enabled(result.get('id'), content)
199 return _ok(result, status=201)
200 except ConversationError as e:
201 return _err(str(e), 400)
204@conversations_bp.route('/<conv_id>/messages/<message_id>',
205 methods=['PATCH'])
206@require_auth
207def edit_message(conv_id, message_id):
208 if not _flag_on():
209 return _err("conversations feature flag is off", 503)
210 data = _get_json()
211 content = data.get('content')
212 if not content:
213 return _err("content required")
214 try:
215 from .conversation_service import (
216 ConversationService, ConversationError)
217 result = ConversationService.edit_message(
218 g.db, message_id=message_id, requester_id=g.user.id,
219 new_content=content)
220 # Re-classify on edit — content can change between send and
221 # edit, so a previously-clean message might now be borderline.
222 _classify_message_if_enabled(message_id, content)
223 return _ok(result)
224 except ConversationError as e:
225 return _err(str(e), 400)
228@conversations_bp.route('/<conv_id>/messages/<message_id>',
229 methods=['DELETE'])
230@require_auth
231def delete_message(conv_id, message_id):
232 if not _flag_on():
233 return _err("conversations feature flag is off", 503)
234 try:
235 from .conversation_service import (
236 ConversationService, ConversationError)
237 return _ok(ConversationService.soft_delete_message(
238 g.db, message_id=message_id, requester_id=g.user.id))
239 except ConversationError as e:
240 return _err(str(e), 400)
243# ── Members (group only) ─────────────────────────────────────────
245@conversations_bp.route('/<conv_id>/members', methods=['POST'])
246@require_auth
247def add_member(conv_id):
248 if not _flag_on():
249 return _err("conversations feature flag is off", 503)
250 data = _get_json()
251 new_member_id = data.get('user_id') or data.get('member_id')
252 if not new_member_id:
253 return _err("user_id required")
254 try:
255 from .conversation_service import (
256 ConversationService, ConversationError)
257 return _ok(ConversationService.add_member(
258 g.db, conv_id=conv_id, requester_id=g.user.id,
259 new_member_id=new_member_id,
260 tenant_id=getattr(g, 'tenant_id', None)))
261 except ConversationError as e:
262 return _err(str(e), 400)
265@conversations_bp.route('/<conv_id>/members/<user_id>', methods=['DELETE'])
266@require_auth
267def remove_member(conv_id, user_id):
268 if not _flag_on():
269 return _err("conversations feature flag is off", 503)
270 try:
271 from .conversation_service import (
272 ConversationService, ConversationError)
273 return _ok(ConversationService.remove_member(
274 g.db, conv_id=conv_id, requester_id=g.user.id,
275 target_id=user_id))
276 except ConversationError as e:
277 return _err(str(e), 400)
280# ── Typing + read-receipt (Phase 7c.7) ───────────────────────────
282@conversations_bp.route('/<conv_id>/typing', methods=['POST'])
283@require_auth
284def emit_typing(conv_id):
285 """Broadcast that the caller is typing in the conversation.
287 Body is empty; the typing payload is just (user_id, conv_id).
288 Receivers see it through tenant.{tid}.conv.{id}.typing WAMP
289 subscription. Pure broadcast — never persisted.
290 """
291 if not _flag_on():
292 return _err("conversations feature flag is off", 503)
293 try:
294 from .conversation_service import (
295 ConversationService, ConversationError)
296 return _ok(ConversationService.emit_typing(
297 g.db, conv_id=conv_id, user_id=g.user.id,
298 tenant_id=getattr(g, 'tenant_id', None)))
299 except ConversationError as e:
300 return _err(str(e), 400)
303@conversations_bp.route('/<conv_id>/read-receipt', methods=['POST'])
304@require_auth
305def mark_read(conv_id):
306 """Mark conversation read up to a specific message (or latest).
308 Body: { "message_id": "<id>" } — optional; if omitted we mark the
309 most recent message read. Persists last_read_message_id +
310 last_read_at on the user's memberships row AND broadcasts a
311 receipt to other members via tenant.{tid}.conv.{id}.read.
312 """
313 if not _flag_on():
314 return _err("conversations feature flag is off", 503)
315 data = _get_json()
316 try:
317 from .conversation_service import (
318 ConversationService, ConversationError)
319 return _ok(ConversationService.mark_read(
320 g.db, conv_id=conv_id, user_id=g.user.id,
321 message_id=data.get('message_id'),
322 tenant_id=getattr(g, 'tenant_id', None)))
323 except ConversationError as e:
324 return _err(str(e), 400)