Coverage for integrations / social / mention_service.py: 0.0%
104 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — Mention service.
4Phase 7b. Plan reference: sunny-gliding-eich.md, Part E.5 + Part L.
6Transport: P2P-first via core/peer_link/message_bus.MessageBus.publish(...).
7Falls back to WAMP push for offline recipients and HTTP API for sync/restore.
8Central is the audit log + discovery index, not the primary router.
10Existing fan-out order (do not bypass):
11 LOCAL → SSE → PEERLINK → CROSSBAR
13This module ONLY persists Mention rows + Notification rows + dispatches
14agents through the existing agentic_router. It does NOT directly emit
15WAMP or open PeerLink frames — transport selection is owned by
16NotificationService (which already publishes via MessageBus).
18Agent topology (preserved exactly):
19 When the mentioned user is an agent (User.user_type == 'agent'):
20 1. Insert Mention row with mentioned_kind='agent' + agent_owner_id.
21 2. Fire TWO notifications: one for the agent, one for the owning
22 human. Both go through NotificationService.create which already
23 publishes on the social.{user_id} WAMP topic AND fans out via
24 MessageBus (LOCAL → SSE → PEERLINK → CROSSBAR).
25 3. Dispatch through agentic_router.find_matching_agent so the
26 reply is gated by the existing GuardrailEnforcer.before_dispatch
27 and after_response (Constitutional Filter + Constructive Filter
28 — security/hive_guardrails.py). No new privileged path.
29 4. The agent's reply is posted as a regular Comment via the
30 existing CommentService.create — the same path a human reply
31 takes. This guarantees the reply gets DLP-redacted, classified
32 (when moderation_v2 flag is on), and fanned out exactly like
33 any other comment.
35Privacy:
36 Friend graph affects mention delivery — a user blocked by the
37 mentioned target is silently dropped (Mention row is still recorded
38 for audit, but no Notification fires). Phase 7c's Block table
39 is checked when present; pre-7c the check is a no-op.
40"""
42from __future__ import annotations
44import logging
45import re
46from typing import List, Optional
48logger = logging.getLogger('hevolve_social')
51# Mirrors the client-side regex in components/shared/MentionInput.js.
52# Allows alphanumerics, underscore, dot, hyphen — same charset as
53# username validation in UserService.register.
54USERNAME_PATTERN = re.compile(r'(?<!\w)@([a-zA-Z0-9_.-]{2,40})')
57def _existing_mentions(db, source_kind: str, source_id: str) -> dict:
58 """Return {username_lower: Mention row} for an existing source.
60 Reads via raw SQL because the Mention model isn't an ORM class
61 today (table created by migration v42 which lands with this
62 phase). When v42 isn't yet applied we silently return empty.
63 """
64 try:
65 from sqlalchemy import text
66 rows = db.execute(text(
67 "SELECT m.id, m.mentioned_user_id, u.username "
68 "FROM mentions m "
69 "LEFT JOIN users u ON u.id = m.mentioned_user_id "
70 "WHERE m.source_kind = :sk AND m.source_id = :sid"),
71 {'sk': source_kind, 'sid': source_id}
72 ).fetchall()
73 except Exception:
74 return {}
75 out = {}
76 for row in rows:
77 if row[2]:
78 out[row[2].lower()] = (row[0], row[1])
79 return out
82def _is_blocked_either_way(db, x: str, y: str) -> bool:
83 """Phase 7c block check — bidirectional. Best-effort: returns
84 False if the Block table doesn't exist yet (pre-migration v43).
86 Suppresses notification if EITHER party blocks the other:
87 - target blocked author → target doesn't want messages from author
88 - author blocked target → author shouldn't be able to @-summon target
89 """
90 try:
91 from sqlalchemy import text
92 result = db.execute(text(
93 "SELECT 1 FROM blocks "
94 "WHERE (blocker_id = :x AND blocked_id = :y) "
95 "OR (blocker_id = :y AND blocked_id = :x) LIMIT 1"),
96 {'x': x, 'y': y}
97 ).fetchone()
98 return result is not None
99 except Exception:
100 return False
103class MentionService:
104 """Parse @-mentions in user content and fire notifications + agent dispatch.
106 All public methods are static — they take an explicit `db` session
107 and `tenant_id` so they can be called from any service without
108 coupling to Flask `g`.
109 """
111 @staticmethod
112 def parse(content: str) -> List[str]:
113 """Extract every @-mentioned username from content.
115 Returns list of lowercased, deduped usernames in order of
116 first occurrence. Used by the autocomplete UI to highlight
117 accepted refs and by parse_and_record to look up users.
118 """
119 if not content:
120 return []
121 seen = set()
122 out = []
123 for m in USERNAME_PATTERN.finditer(content):
124 uname = m.group(1).lower()
125 if uname not in seen:
126 seen.add(uname)
127 out.append(uname)
128 return out
130 @staticmethod
131 def parse_and_record(db, source_kind: str, source_id: str,
132 content: str, author_id: str,
133 tenant_id: Optional[str] = None,
134 dispatch_agents: bool = True) -> List[dict]:
135 """Parse content, insert Mention rows, fire Notifications,
136 dispatch any mentioned agents through the existing
137 agentic_router.
139 Returns a list of mention dicts the caller can include in the
140 post / comment / message response payload:
141 [{user_id, username, kind: 'human'|'agent'}]
143 Idempotent: re-running on the same source replaces the
144 existing mention set (insert new, delete removed) — used by
145 update_post / update_comment paths.
146 """
147 from .models import User
148 from .services import NotificationService
150 usernames = MentionService.parse(content)
151 if not usernames:
152 # Source was edited to remove all mentions — wipe any
153 # existing Mention rows so they don't linger in the index.
154 MentionService._wipe(db, source_kind, source_id)
155 return []
157 # Fetch users matching the parsed usernames (tenant-scoped if
158 # the column is populated — flat/regional pass-through with
159 # NULL tenant_id matches NULL rows).
160 qry = db.query(User).filter(User.username.in_(usernames),
161 User.is_banned == False) # noqa: E712
162 if hasattr(User, 'tenant_id') and tenant_id:
163 qry = qry.filter(User.tenant_id == tenant_id)
164 matched = {u.username.lower(): u for u in qry.all()}
166 # Diff against existing mentions (idempotent edit support).
167 existing = _existing_mentions(db, source_kind, source_id)
169 to_remove = set(existing.keys()) - set(matched.keys())
170 to_add = set(matched.keys()) - set(existing.keys())
172 # Remove stale rows (silently — no notification on un-mention).
173 # SQLAlchemy expanding bindparam works on every dialect.
174 if to_remove:
175 try:
176 from sqlalchemy import text, bindparam
177 ids = [existing[u][0] for u in to_remove]
178 db.execute(
179 text("DELETE FROM mentions WHERE id IN :ids").bindparams(
180 bindparam('ids', expanding=True)),
181 {'ids': ids})
182 db.commit()
183 except Exception as e:
184 logger.warning("MentionService: stale removal failed: %s", e)
186 out = []
187 # Insert new rows + notify.
188 for uname in usernames:
189 u = matched.get(uname)
190 if not u:
191 continue
192 # Skip blocked targets — record-once-no-notify.
193 # Bidirectional: either party blocking the other suppresses delivery.
194 blocked = _is_blocked_either_way(db, u.id, author_id)
196 if uname in to_add:
197 MentionService._insert_row(
198 db, source_kind, source_id, u, author_id,
199 tenant_id, suppress_notify=blocked)
201 kind = 'agent' if (getattr(u, 'user_type', '') == 'agent') else 'human'
202 out.append({
203 'user_id': u.id,
204 'username': u.username,
205 'kind': kind,
206 'agent_owner_id': getattr(u, 'owner_id', None),
207 })
209 # Agent dispatch (existing HARTOS topology — see plan B.4).
210 if dispatch_agents and kind == 'agent' and uname in to_add and not blocked:
211 MentionService._dispatch_agent(
212 db, agent=u, source_kind=source_kind,
213 source_id=source_id, content=content,
214 author_id=author_id, tenant_id=tenant_id)
216 return out
218 @staticmethod
219 def diff_and_update(db, source_kind: str, source_id: str,
220 old_content: str, new_content: str,
221 author_id: str, tenant_id: Optional[str] = None):
222 """Convenience wrapper for edit paths.
224 We re-run parse_and_record on the new content; it handles
225 insert/delete diffing internally. Old content is currently
226 unused (kept in signature for future once-per-edit dispatch
227 guarantees — Phase 7c may use it for re-notification policy).
228 """
229 return MentionService.parse_and_record(
230 db, source_kind, source_id, new_content, author_id,
231 tenant_id=tenant_id)
233 # ─── Private helpers ────────────────────────────────────────
235 @staticmethod
236 def _insert_row(db, source_kind, source_id, mentioned_user,
237 author_id, tenant_id, suppress_notify=False):
238 """Insert one Mention row + one or two Notification rows
239 (two if mentioned is an agent — agent + owner)."""
240 import uuid
241 from sqlalchemy import text
242 from .services import NotificationService
244 mid = str(uuid.uuid4())
245 kind = 'agent' if (getattr(mentioned_user, 'user_type', '') == 'agent') else 'human'
246 owner_id = getattr(mentioned_user, 'owner_id', None) if kind == 'agent' else None
248 try:
249 db.execute(text(
250 "INSERT INTO mentions "
251 "(id, tenant_id, source_kind, source_id, "
252 " mentioned_user_id, mentioned_kind, agent_owner_id, "
253 " created_at) "
254 "VALUES "
255 "(:id, :tid, :sk, :sid, :muid, :mk, :aoi, "
256 " CURRENT_TIMESTAMP)"),
257 {'id': mid, 'tid': tenant_id, 'sk': source_kind,
258 'sid': source_id, 'muid': mentioned_user.id,
259 'mk': kind, 'aoi': owner_id}
260 )
261 db.commit()
262 except Exception as e:
263 logger.warning("MentionService: insert failed: %s", e)
264 return
266 if suppress_notify:
267 return
269 # Notify the mentioned user via existing NotificationService —
270 # which already publishes on social.{user_id} WAMP topic + the
271 # MessageBus fan-out (LOCAL → SSE → PEERLINK → CROSSBAR).
272 try:
273 NotificationService.create(
274 db, user_id=mentioned_user.id, type='mention',
275 source_user_id=author_id,
276 target_type=source_kind, target_id=source_id,
277 message=f"You were mentioned in a {source_kind}")
278 except Exception as e:
279 logger.warning("MentionService: notify mentioned failed: %s", e)
281 # Dual-notify the owner when the mention is on an agent.
282 if kind == 'agent' and owner_id:
283 try:
284 NotificationService.create(
285 db, user_id=owner_id, type='agent_mention',
286 source_user_id=author_id,
287 target_type=source_kind, target_id=source_id,
288 message=f"Your agent {mentioned_user.username} was mentioned")
289 except Exception as e:
290 logger.warning("MentionService: notify owner failed: %s", e)
292 @staticmethod
293 def _wipe(db, source_kind, source_id):
294 """Delete every Mention row for a source — used when an edit
295 removes all @-mentions."""
296 try:
297 from sqlalchemy import text
298 db.execute(text(
299 "DELETE FROM mentions "
300 "WHERE source_kind = :sk AND source_id = :sid"),
301 {'sk': source_kind, 'sid': source_id})
302 db.commit()
303 except Exception:
304 pass
306 @staticmethod
307 def _dispatch_agent(db, agent, source_kind, source_id, content,
308 author_id, tenant_id):
309 """Dispatch the mentioned agent through the existing
310 agentic_router. The router calls into autogen / LangChain
311 with GuardrailEnforcer wrapping every step (security/
312 hive_guardrails.py — see plan Part B.4).
314 We do NOT post the agent's reply ourselves — agentic_router
315 returns a plan / response which the existing agent runtime
316 publishes back via the same channels a human reply would
317 use. This keeps the agent topology unchanged; we just
318 deliver the prompt.
320 If agentic_router is unavailable (HARTOS still booting,
321 offline, or the import fails), we silently degrade: the
322 Mention + Notification rows are still recorded so the agent
323 can pick up the work asynchronously the next time it
324 reconciles its inbox.
325 """
326 try:
327 from integrations import agentic_router
328 except Exception:
329 logger.info("MentionService: agentic_router unavailable; "
330 "skipping agent dispatch for %s", agent.username)
331 return
333 # Build the agent prompt context. Inline the surrounding text
334 # so the agent has enough to reason about.
335 prompt = (
336 f"You were mentioned in a {source_kind} (id={source_id}). "
337 f"The author wrote:\n\n{content}\n\n"
338 "Reply if appropriate; otherwise stay silent."
339 )
341 try:
342 # agentic_router.dispatch_to_agent is the canonical hook
343 # (Phase 7b — added in this session). It runs the prompt
344 # through GuardrailEnforcer.before_dispatch + after_response
345 # and posts the reply via CommentService.create — same path
346 # any human reply takes (plan B.4: no privileged path).
347 # The dispatch is async (daemon thread) so this call
348 # returns immediately; the calling Flask request is not
349 # blocked on the LLM round-trip.
350 if hasattr(agentic_router, 'dispatch_to_agent'):
351 agentic_router.dispatch_to_agent(
352 agent_id=agent.id, prompt=prompt,
353 context={'source_kind': source_kind,
354 'source_id': source_id,
355 'author_id': author_id,
356 'tenant_id': tenant_id})
357 return
358 # Older agentic_router build without the hook — Mention +
359 # Notification rows are persisted upstream so the agent
360 # runtime can pick up asynchronously next tick.
361 logger.info("MentionService: queued mention for agent %s "
362 "(no direct dispatcher) — runtime will pick up",
363 agent.username)
364 except Exception as e:
365 # Never let agent dispatch failure break the post create.
366 logger.warning("MentionService: agent dispatch failed for %s: %s",
367 agent.username, e)