Coverage for integrations / social / api_conversations.py: 26.3%

167 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Conversations API blueprint. 

3 

4Phase 7c.3. Plan reference: sunny-gliding-eich.md, Part E.3. 

5 

6Mounted at /api/social/conversations. Distinct from 

7/api/social/channels (which is the legacy 31-channel external adapter 

8— Telegram / WhatsApp / Discord etc., served by api_channels.py). 

9The two blueprints coexist; their URL prefixes never overlap. 

10 

11Endpoints (all flag-gated by `conversations`; mutating endpoints return 

12503 when the flag is off, read endpoints return []): 

13 

14 GET / list current user's conversations 

15 POST / create (kind, member_ids, title?) 

16 GET /<id> detail + (no messages — caller 

17 fetches messages separately) 

18 GET /<id>/messages paginated message history 

19 POST /<id>/messages send a message 

20 PATCH /<id>/messages/<msg_id> edit own (≤24 h) 

21 DELETE /<id>/messages/<msg_id> soft-delete own 

22 POST /<id>/members add (group only, admin only) 

23 DELETE /<id>/members/<user_id> remove (admin removes others, self leaves) 

24 

25The blueprint is registered in social __init__.py / hartos_bootstrap 

26alongside the existing api.social_bp. 

27""" 

28 

29from __future__ import annotations 

30 

31import logging 

32from flask import Blueprint, request, jsonify, g 

33 

34from .auth import require_auth 

35 

36logger = logging.getLogger('hevolve_social') 

37 

38conversations_bp = Blueprint( 

39 'conversations', __name__, url_prefix='/api/social/conversations') 

40 

41 

42def _ok(data=None, status=200): 

43 r = {'success': True} 

44 if data is not None: 

45 r['data'] = data 

46 return jsonify(r), status 

47 

48 

49def _err(msg, status=400): 

50 return jsonify({'success': False, 'error': msg}), status 

51 

52 

53def _get_json(): 

54 return request.get_json(force=True, silent=True) or {} 

55 

56 

57def _flag_on() -> bool: 

58 flags = getattr(g, 'feature_flags', {}) or {} 

59 return bool(flags.get('conversations', False)) 

60 

61 

62# ── List + create ──────────────────────────────────────────────── 

63 

64@conversations_bp.route('', methods=['GET']) 

65@require_auth 

66def list_conversations(): 

67 if not _flag_on(): 

68 return _ok([]) 

69 include_archived = (request.args.get('include_archived', 

70 'false').lower() == 'true') 

71 limit = max(1, min(int(request.args.get('limit', 50)), 200)) 

72 from .conversation_service import ConversationService 

73 return _ok(ConversationService.list_for_user( 

74 g.db, g.user.id, include_archived=include_archived, limit=limit)) 

75 

76 

77@conversations_bp.route('', methods=['POST']) 

78@require_auth 

79def create_conversation(): 

80 if not _flag_on(): 

81 return _err("conversations feature flag is off", 503) 

82 data = _get_json() 

83 kind = data.get('kind') 

84 member_ids = data.get('member_ids') or [] 

85 title = data.get('title') 

86 if not kind: 

87 return _err("kind required ('dm' | 'group')") 

88 if not isinstance(member_ids, list): 

89 return _err("member_ids must be a list") 

90 try: 

91 from .conversation_service import ( 

92 ConversationService, ConversationError) 

93 result = ConversationService.create( 

94 g.db, kind=kind, member_ids=member_ids, 

95 created_by=g.user.id, title=title, 

96 tenant_id=getattr(g, 'tenant_id', None)) 

97 return _ok(result, status=201) 

98 except ConversationError as e: 

99 return _err(str(e), 400) 

100 

101 

102@conversations_bp.route('/<conv_id>', methods=['GET']) 

103@require_auth 

104def get_conversation(conv_id): 

105 if not _flag_on(): 

106 return _err("conversations feature flag is off", 503) 

107 from .conversation_service import ConversationService, _is_member 

108 if not _is_member(g.db, conv_id, g.user.id): 

109 return _err("not found", 404) # 404 (not 403) to avoid leak 

110 result = ConversationService.get(g.db, conv_id) 

111 if result is None: 

112 return _err("not found", 404) 

113 return _ok(result) 

114 

115 

116# ── Messages ───────────────────────────────────────────────────── 

117 

118@conversations_bp.route('/<conv_id>/messages', methods=['GET']) 

119@require_auth 

120def list_messages(conv_id): 

121 if not _flag_on(): 

122 return _ok([]) 

123 before = request.args.get('before') 

124 limit = max(1, min(int(request.args.get('limit', 50)), 200)) 

125 try: 

126 from .conversation_service import ( 

127 ConversationService, ConversationError) 

128 return _ok(ConversationService.list_messages( 

129 g.db, conv_id, requester_id=g.user.id, 

130 before=before, limit=limit)) 

131 except ConversationError as e: 

132 return _err(str(e), 403) 

133 

134 

135def _classify_message_if_enabled(message_id: str, content: str) -> None: 

136 """Phase 7e — run ContentClassifier on a freshly sent / edited 

137 conversation message when the moderation_v2 flag is on. 

138 

139 Same shape as create_post / create_comment in api.py: best-effort, 

140 decision row lands in the audit trail, mod queue can review. No 

141 visibility flip today — the messages table doesn't have an 

142 is_quarantined column, so soft-quarantine on a DM is purely an 

143 auditable signal (the mod queue is the human review surface). 

144 

145 Encrypted DMs (e2e_enabled conversations) classify the plaintext 

146 `content` from the request body — the same plaintext that 

147 MentionService already parses for @-mentions on the same path. 

148 The "encrypted at rest" property holds (server doesn't store 

149 plaintext); this is consistent with the existing send-path that 

150 sees plaintext during the request lifetime. 

151 

152 Guards: 

153 - moderation_v2 flag off → no-op 

154 - empty / missing message_id → no-op (defensive against weird 

155 ConversationService.send_message return shapes) 

156 - any classifier error → logged + swallowed (never breaks the 

157 send / edit response shape) 

158 """ 

159 if not g.feature_flags.get('moderation_v2', False): 

160 return 

161 if not message_id: 

162 return 

163 try: 

164 from .content_classifier import ContentClassifier 

165 ContentClassifier.classify_and_persist( 

166 g.db, source_kind='message', source_id=message_id, 

167 content=content, 

168 tenant_id=getattr(g, 'tenant_id', None)) 

169 except Exception as e: 

170 logger.warning( 

171 "classify_message: classify_and_persist failed for " 

172 "message=%s: %s", message_id, e) 

173 

174 

175@conversations_bp.route('/<conv_id>/messages', methods=['POST']) 

176@require_auth 

177def send_message(conv_id): 

178 if not _flag_on(): 

179 return _err("conversations feature flag is off", 503) 

180 data = _get_json() 

181 content = data.get('content') 

182 thread_root_id = data.get('thread_root_id') 

183 metadata = data.get('metadata') 

184 if not content: 

185 return _err("content required") 

186 try: 

187 from .conversation_service import ( 

188 ConversationService, ConversationError) 

189 result = ConversationService.send_message( 

190 g.db, conv_id=conv_id, author_id=g.user.id, 

191 content=content, thread_root_id=thread_root_id, 

192 metadata=metadata, 

193 tenant_id=getattr(g, 'tenant_id', None)) 

194 # Phase 7e moderation — same gate + classifier wiring used by 

195 # create_post / create_comment in api.py. Source_kind='message' 

196 # so decisions land alongside post / comment rows in the mod 

197 # queue (single canonical content_moderation_decisions table). 

198 _classify_message_if_enabled(result.get('id'), content) 

199 return _ok(result, status=201) 

200 except ConversationError as e: 

201 return _err(str(e), 400) 

202 

203 

204@conversations_bp.route('/<conv_id>/messages/<message_id>', 

205 methods=['PATCH']) 

206@require_auth 

207def edit_message(conv_id, message_id): 

208 if not _flag_on(): 

209 return _err("conversations feature flag is off", 503) 

210 data = _get_json() 

211 content = data.get('content') 

212 if not content: 

213 return _err("content required") 

214 try: 

215 from .conversation_service import ( 

216 ConversationService, ConversationError) 

217 result = ConversationService.edit_message( 

218 g.db, message_id=message_id, requester_id=g.user.id, 

219 new_content=content) 

220 # Re-classify on edit — content can change between send and 

221 # edit, so a previously-clean message might now be borderline. 

222 _classify_message_if_enabled(message_id, content) 

223 return _ok(result) 

224 except ConversationError as e: 

225 return _err(str(e), 400) 

226 

227 

228@conversations_bp.route('/<conv_id>/messages/<message_id>', 

229 methods=['DELETE']) 

230@require_auth 

231def delete_message(conv_id, message_id): 

232 if not _flag_on(): 

233 return _err("conversations feature flag is off", 503) 

234 try: 

235 from .conversation_service import ( 

236 ConversationService, ConversationError) 

237 return _ok(ConversationService.soft_delete_message( 

238 g.db, message_id=message_id, requester_id=g.user.id)) 

239 except ConversationError as e: 

240 return _err(str(e), 400) 

241 

242 

243# ── Members (group only) ───────────────────────────────────────── 

244 

245@conversations_bp.route('/<conv_id>/members', methods=['POST']) 

246@require_auth 

247def add_member(conv_id): 

248 if not _flag_on(): 

249 return _err("conversations feature flag is off", 503) 

250 data = _get_json() 

251 new_member_id = data.get('user_id') or data.get('member_id') 

252 if not new_member_id: 

253 return _err("user_id required") 

254 try: 

255 from .conversation_service import ( 

256 ConversationService, ConversationError) 

257 return _ok(ConversationService.add_member( 

258 g.db, conv_id=conv_id, requester_id=g.user.id, 

259 new_member_id=new_member_id, 

260 tenant_id=getattr(g, 'tenant_id', None))) 

261 except ConversationError as e: 

262 return _err(str(e), 400) 

263 

264 

265@conversations_bp.route('/<conv_id>/members/<user_id>', methods=['DELETE']) 

266@require_auth 

267def remove_member(conv_id, user_id): 

268 if not _flag_on(): 

269 return _err("conversations feature flag is off", 503) 

270 try: 

271 from .conversation_service import ( 

272 ConversationService, ConversationError) 

273 return _ok(ConversationService.remove_member( 

274 g.db, conv_id=conv_id, requester_id=g.user.id, 

275 target_id=user_id)) 

276 except ConversationError as e: 

277 return _err(str(e), 400) 

278 

279 

280# ── Typing + read-receipt (Phase 7c.7) ─────────────────────────── 

281 

282@conversations_bp.route('/<conv_id>/typing', methods=['POST']) 

283@require_auth 

284def emit_typing(conv_id): 

285 """Broadcast that the caller is typing in the conversation. 

286 

287 Body is empty; the typing payload is just (user_id, conv_id). 

288 Receivers see it through tenant.{tid}.conv.{id}.typing WAMP 

289 subscription. Pure broadcast — never persisted. 

290 """ 

291 if not _flag_on(): 

292 return _err("conversations feature flag is off", 503) 

293 try: 

294 from .conversation_service import ( 

295 ConversationService, ConversationError) 

296 return _ok(ConversationService.emit_typing( 

297 g.db, conv_id=conv_id, user_id=g.user.id, 

298 tenant_id=getattr(g, 'tenant_id', None))) 

299 except ConversationError as e: 

300 return _err(str(e), 400) 

301 

302 

303@conversations_bp.route('/<conv_id>/read-receipt', methods=['POST']) 

304@require_auth 

305def mark_read(conv_id): 

306 """Mark conversation read up to a specific message (or latest). 

307 

308 Body: { "message_id": "<id>" } — optional; if omitted we mark the 

309 most recent message read. Persists last_read_message_id + 

310 last_read_at on the user's memberships row AND broadcasts a 

311 receipt to other members via tenant.{tid}.conv.{id}.read. 

312 """ 

313 if not _flag_on(): 

314 return _err("conversations feature flag is off", 503) 

315 data = _get_json() 

316 try: 

317 from .conversation_service import ( 

318 ConversationService, ConversationError) 

319 return _ok(ConversationService.mark_read( 

320 g.db, conv_id=conv_id, user_id=g.user.id, 

321 message_id=data.get('message_id'), 

322 tenant_id=getattr(g, 'tenant_id', None))) 

323 except ConversationError as e: 

324 return _err(str(e), 400)