Coverage for integrations / social / mention_service.py: 0.0%

104 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — Mention service. 

3 

4Phase 7b. Plan reference: sunny-gliding-eich.md, Part E.5 + Part L. 

5 

6Transport: P2P-first via core/peer_link/message_bus.MessageBus.publish(...). 

7Falls back to WAMP push for offline recipients and HTTP API for sync/restore. 

8Central is the audit log + discovery index, not the primary router. 

9 

10Existing fan-out order (do not bypass): 

11 LOCAL → SSE → PEERLINK → CROSSBAR 

12 

13This module ONLY persists Mention rows + Notification rows + dispatches 

14agents through the existing agentic_router. It does NOT directly emit 

15WAMP or open PeerLink frames — transport selection is owned by 

16NotificationService (which already publishes via MessageBus). 

17 

18Agent topology (preserved exactly): 

19 When the mentioned user is an agent (User.user_type == 'agent'): 

20 1. Insert Mention row with mentioned_kind='agent' + agent_owner_id. 

21 2. Fire TWO notifications: one for the agent, one for the owning 

22 human. Both go through NotificationService.create which already 

23 publishes on the social.{user_id} WAMP topic AND fans out via 

24 MessageBus (LOCAL → SSE → PEERLINK → CROSSBAR). 

25 3. Dispatch through agentic_router.find_matching_agent so the 

26 reply is gated by the existing GuardrailEnforcer.before_dispatch 

27 and after_response (Constitutional Filter + Constructive Filter 

28 — security/hive_guardrails.py). No new privileged path. 

29 4. The agent's reply is posted as a regular Comment via the 

30 existing CommentService.create — the same path a human reply 

31 takes. This guarantees the reply gets DLP-redacted, classified 

32 (when moderation_v2 flag is on), and fanned out exactly like 

33 any other comment. 

34 

35Privacy: 

36 Friend graph affects mention delivery — a user blocked by the 

37 mentioned target is silently dropped (Mention row is still recorded 

38 for audit, but no Notification fires). Phase 7c's Block table 

39 is checked when present; pre-7c the check is a no-op. 

40""" 

41 

42from __future__ import annotations 

43 

44import logging 

45import re 

46from typing import List, Optional 

47 

48logger = logging.getLogger('hevolve_social') 

49 

50 

51# Mirrors the client-side regex in components/shared/MentionInput.js. 

52# Allows alphanumerics, underscore, dot, hyphen — same charset as 

53# username validation in UserService.register. 

54USERNAME_PATTERN = re.compile(r'(?<!\w)@([a-zA-Z0-9_.-]{2,40})') 

55 

56 

57def _existing_mentions(db, source_kind: str, source_id: str) -> dict: 

58 """Return {username_lower: Mention row} for an existing source. 

59 

60 Reads via raw SQL because the Mention model isn't an ORM class 

61 today (table created by migration v42 which lands with this 

62 phase). When v42 isn't yet applied we silently return empty. 

63 """ 

64 try: 

65 from sqlalchemy import text 

66 rows = db.execute(text( 

67 "SELECT m.id, m.mentioned_user_id, u.username " 

68 "FROM mentions m " 

69 "LEFT JOIN users u ON u.id = m.mentioned_user_id " 

70 "WHERE m.source_kind = :sk AND m.source_id = :sid"), 

71 {'sk': source_kind, 'sid': source_id} 

72 ).fetchall() 

73 except Exception: 

74 return {} 

75 out = {} 

76 for row in rows: 

77 if row[2]: 

78 out[row[2].lower()] = (row[0], row[1]) 

79 return out 

80 

81 

82def _is_blocked_either_way(db, x: str, y: str) -> bool: 

83 """Phase 7c block check — bidirectional. Best-effort: returns 

84 False if the Block table doesn't exist yet (pre-migration v43). 

85 

86 Suppresses notification if EITHER party blocks the other: 

87 - target blocked author → target doesn't want messages from author 

88 - author blocked target → author shouldn't be able to @-summon target 

89 """ 

90 try: 

91 from sqlalchemy import text 

92 result = db.execute(text( 

93 "SELECT 1 FROM blocks " 

94 "WHERE (blocker_id = :x AND blocked_id = :y) " 

95 "OR (blocker_id = :y AND blocked_id = :x) LIMIT 1"), 

96 {'x': x, 'y': y} 

97 ).fetchone() 

98 return result is not None 

99 except Exception: 

100 return False 

101 

102 

103class MentionService: 

104 """Parse @-mentions in user content and fire notifications + agent dispatch. 

105 

106 All public methods are static — they take an explicit `db` session 

107 and `tenant_id` so they can be called from any service without 

108 coupling to Flask `g`. 

109 """ 

110 

111 @staticmethod 

112 def parse(content: str) -> List[str]: 

113 """Extract every @-mentioned username from content. 

114 

115 Returns list of lowercased, deduped usernames in order of 

116 first occurrence. Used by the autocomplete UI to highlight 

117 accepted refs and by parse_and_record to look up users. 

118 """ 

119 if not content: 

120 return [] 

121 seen = set() 

122 out = [] 

123 for m in USERNAME_PATTERN.finditer(content): 

124 uname = m.group(1).lower() 

125 if uname not in seen: 

126 seen.add(uname) 

127 out.append(uname) 

128 return out 

129 

130 @staticmethod 

131 def parse_and_record(db, source_kind: str, source_id: str, 

132 content: str, author_id: str, 

133 tenant_id: Optional[str] = None, 

134 dispatch_agents: bool = True) -> List[dict]: 

135 """Parse content, insert Mention rows, fire Notifications, 

136 dispatch any mentioned agents through the existing 

137 agentic_router. 

138 

139 Returns a list of mention dicts the caller can include in the 

140 post / comment / message response payload: 

141 [{user_id, username, kind: 'human'|'agent'}] 

142 

143 Idempotent: re-running on the same source replaces the 

144 existing mention set (insert new, delete removed) — used by 

145 update_post / update_comment paths. 

146 """ 

147 from .models import User 

148 from .services import NotificationService 

149 

150 usernames = MentionService.parse(content) 

151 if not usernames: 

152 # Source was edited to remove all mentions — wipe any 

153 # existing Mention rows so they don't linger in the index. 

154 MentionService._wipe(db, source_kind, source_id) 

155 return [] 

156 

157 # Fetch users matching the parsed usernames (tenant-scoped if 

158 # the column is populated — flat/regional pass-through with 

159 # NULL tenant_id matches NULL rows). 

160 qry = db.query(User).filter(User.username.in_(usernames), 

161 User.is_banned == False) # noqa: E712 

162 if hasattr(User, 'tenant_id') and tenant_id: 

163 qry = qry.filter(User.tenant_id == tenant_id) 

164 matched = {u.username.lower(): u for u in qry.all()} 

165 

166 # Diff against existing mentions (idempotent edit support). 

167 existing = _existing_mentions(db, source_kind, source_id) 

168 

169 to_remove = set(existing.keys()) - set(matched.keys()) 

170 to_add = set(matched.keys()) - set(existing.keys()) 

171 

172 # Remove stale rows (silently — no notification on un-mention). 

173 # SQLAlchemy expanding bindparam works on every dialect. 

174 if to_remove: 

175 try: 

176 from sqlalchemy import text, bindparam 

177 ids = [existing[u][0] for u in to_remove] 

178 db.execute( 

179 text("DELETE FROM mentions WHERE id IN :ids").bindparams( 

180 bindparam('ids', expanding=True)), 

181 {'ids': ids}) 

182 db.commit() 

183 except Exception as e: 

184 logger.warning("MentionService: stale removal failed: %s", e) 

185 

186 out = [] 

187 # Insert new rows + notify. 

188 for uname in usernames: 

189 u = matched.get(uname) 

190 if not u: 

191 continue 

192 # Skip blocked targets — record-once-no-notify. 

193 # Bidirectional: either party blocking the other suppresses delivery. 

194 blocked = _is_blocked_either_way(db, u.id, author_id) 

195 

196 if uname in to_add: 

197 MentionService._insert_row( 

198 db, source_kind, source_id, u, author_id, 

199 tenant_id, suppress_notify=blocked) 

200 

201 kind = 'agent' if (getattr(u, 'user_type', '') == 'agent') else 'human' 

202 out.append({ 

203 'user_id': u.id, 

204 'username': u.username, 

205 'kind': kind, 

206 'agent_owner_id': getattr(u, 'owner_id', None), 

207 }) 

208 

209 # Agent dispatch (existing HARTOS topology — see plan B.4). 

210 if dispatch_agents and kind == 'agent' and uname in to_add and not blocked: 

211 MentionService._dispatch_agent( 

212 db, agent=u, source_kind=source_kind, 

213 source_id=source_id, content=content, 

214 author_id=author_id, tenant_id=tenant_id) 

215 

216 return out 

217 

218 @staticmethod 

219 def diff_and_update(db, source_kind: str, source_id: str, 

220 old_content: str, new_content: str, 

221 author_id: str, tenant_id: Optional[str] = None): 

222 """Convenience wrapper for edit paths. 

223 

224 We re-run parse_and_record on the new content; it handles 

225 insert/delete diffing internally. Old content is currently 

226 unused (kept in signature for future once-per-edit dispatch 

227 guarantees — Phase 7c may use it for re-notification policy). 

228 """ 

229 return MentionService.parse_and_record( 

230 db, source_kind, source_id, new_content, author_id, 

231 tenant_id=tenant_id) 

232 

233 # ─── Private helpers ──────────────────────────────────────── 

234 

235 @staticmethod 

236 def _insert_row(db, source_kind, source_id, mentioned_user, 

237 author_id, tenant_id, suppress_notify=False): 

238 """Insert one Mention row + one or two Notification rows 

239 (two if mentioned is an agent — agent + owner).""" 

240 import uuid 

241 from sqlalchemy import text 

242 from .services import NotificationService 

243 

244 mid = str(uuid.uuid4()) 

245 kind = 'agent' if (getattr(mentioned_user, 'user_type', '') == 'agent') else 'human' 

246 owner_id = getattr(mentioned_user, 'owner_id', None) if kind == 'agent' else None 

247 

248 try: 

249 db.execute(text( 

250 "INSERT INTO mentions " 

251 "(id, tenant_id, source_kind, source_id, " 

252 " mentioned_user_id, mentioned_kind, agent_owner_id, " 

253 " created_at) " 

254 "VALUES " 

255 "(:id, :tid, :sk, :sid, :muid, :mk, :aoi, " 

256 " CURRENT_TIMESTAMP)"), 

257 {'id': mid, 'tid': tenant_id, 'sk': source_kind, 

258 'sid': source_id, 'muid': mentioned_user.id, 

259 'mk': kind, 'aoi': owner_id} 

260 ) 

261 db.commit() 

262 except Exception as e: 

263 logger.warning("MentionService: insert failed: %s", e) 

264 return 

265 

266 if suppress_notify: 

267 return 

268 

269 # Notify the mentioned user via existing NotificationService — 

270 # which already publishes on social.{user_id} WAMP topic + the 

271 # MessageBus fan-out (LOCAL → SSE → PEERLINK → CROSSBAR). 

272 try: 

273 NotificationService.create( 

274 db, user_id=mentioned_user.id, type='mention', 

275 source_user_id=author_id, 

276 target_type=source_kind, target_id=source_id, 

277 message=f"You were mentioned in a {source_kind}") 

278 except Exception as e: 

279 logger.warning("MentionService: notify mentioned failed: %s", e) 

280 

281 # Dual-notify the owner when the mention is on an agent. 

282 if kind == 'agent' and owner_id: 

283 try: 

284 NotificationService.create( 

285 db, user_id=owner_id, type='agent_mention', 

286 source_user_id=author_id, 

287 target_type=source_kind, target_id=source_id, 

288 message=f"Your agent {mentioned_user.username} was mentioned") 

289 except Exception as e: 

290 logger.warning("MentionService: notify owner failed: %s", e) 

291 

292 @staticmethod 

293 def _wipe(db, source_kind, source_id): 

294 """Delete every Mention row for a source — used when an edit 

295 removes all @-mentions.""" 

296 try: 

297 from sqlalchemy import text 

298 db.execute(text( 

299 "DELETE FROM mentions " 

300 "WHERE source_kind = :sk AND source_id = :sid"), 

301 {'sk': source_kind, 'sid': source_id}) 

302 db.commit() 

303 except Exception: 

304 pass 

305 

306 @staticmethod 

307 def _dispatch_agent(db, agent, source_kind, source_id, content, 

308 author_id, tenant_id): 

309 """Dispatch the mentioned agent through the existing 

310 agentic_router. The router calls into autogen / LangChain 

311 with GuardrailEnforcer wrapping every step (security/ 

312 hive_guardrails.py — see plan Part B.4). 

313 

314 We do NOT post the agent's reply ourselves — agentic_router 

315 returns a plan / response which the existing agent runtime 

316 publishes back via the same channels a human reply would 

317 use. This keeps the agent topology unchanged; we just 

318 deliver the prompt. 

319 

320 If agentic_router is unavailable (HARTOS still booting, 

321 offline, or the import fails), we silently degrade: the 

322 Mention + Notification rows are still recorded so the agent 

323 can pick up the work asynchronously the next time it 

324 reconciles its inbox. 

325 """ 

326 try: 

327 from integrations import agentic_router 

328 except Exception: 

329 logger.info("MentionService: agentic_router unavailable; " 

330 "skipping agent dispatch for %s", agent.username) 

331 return 

332 

333 # Build the agent prompt context. Inline the surrounding text 

334 # so the agent has enough to reason about. 

335 prompt = ( 

336 f"You were mentioned in a {source_kind} (id={source_id}). " 

337 f"The author wrote:\n\n{content}\n\n" 

338 "Reply if appropriate; otherwise stay silent." 

339 ) 

340 

341 try: 

342 # agentic_router.dispatch_to_agent is the canonical hook 

343 # (Phase 7b — added in this session). It runs the prompt 

344 # through GuardrailEnforcer.before_dispatch + after_response 

345 # and posts the reply via CommentService.create — same path 

346 # any human reply takes (plan B.4: no privileged path). 

347 # The dispatch is async (daemon thread) so this call 

348 # returns immediately; the calling Flask request is not 

349 # blocked on the LLM round-trip. 

350 if hasattr(agentic_router, 'dispatch_to_agent'): 

351 agentic_router.dispatch_to_agent( 

352 agent_id=agent.id, prompt=prompt, 

353 context={'source_kind': source_kind, 

354 'source_id': source_id, 

355 'author_id': author_id, 

356 'tenant_id': tenant_id}) 

357 return 

358 # Older agentic_router build without the hook — Mention + 

359 # Notification rows are persisted upstream so the agent 

360 # runtime can pick up asynchronously next tick. 

361 logger.info("MentionService: queued mention for agent %s " 

362 "(no direct dispatcher) — runtime will pick up", 

363 agent.username) 

364 except Exception as e: 

365 # Never let agent dispatch failure break the post create. 

366 logger.warning("MentionService: agent dispatch failed for %s: %s", 

367 agent.username, e)