Coverage for integrations / social / chat_messages.py: 24.8%
133 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Chat-sync service — canonical writer + cursor-pull reader for
2cross-device chat mirroring (U1-U9 workstream, task #389).
4Design principles (CLAUDE.md Gates):
5 * Gate 2 (DRY): extends the existing ``ConversationEntry`` table —
6 does NOT introduce a parallel ChatMessage model. Every chat turn
7 (/chat hot path, channel adapters, LangChain prep_outputs) already
8 lands here; this module gives it a stable API + cursor-pull +
9 WAMP publish in one place.
10 * Gate 3 (SRP):
11 - ``persist(...)`` writes one row (per role) and returns the row.
12 - ``pull_since(...)`` reads rows since a cursor, capped by row- +
13 byte-budget from ``core.constants.CHAT_CURSOR_PULL_MAX_*``.
14 - ``publish_new(...)`` fires the chat.new WAMP event (synchronous
15 call — ``publish_async`` already routes its HTTP leg through
16 its own executor, so the MessageBus hand-off is O(1)).
17 - ``persist_and_publish_async(...)`` submits both to a module
18 executor so callers on the chat hot path incur no latency.
19 NO I/O mixing; the HTTP handler in Nunba's main.py stays pure glue.
20 * Gate 4 (no parallel paths): ONE persistence helper — this one.
21 ``world_model_bridge._persist_to_conversation_entry`` is preserved
22 for now (other call sites), but new writes go through here.
23 * Gate 8 (security): ``user_id`` is attacker-controllable via body;
24 the HTTP gate (Nunba main.py) confirms JWT→user_id BEFORE calling
25 ``persist`` / ``pull_since``. This module trusts its inputs.
26 Dedup-on-msg_id verifies user_id matches before returning — a
27 cross-user msg_id collision (astronomically unlikely) would NOT
28 leak the other user's row.
30Public API
31----------
32 persist(user_id, role, content, *, msg_id=None, agent_id=None,
33 prompt_id=None, request_id=None, device_id=None,
34 lang=None, attachments=None, channel_type='chat') -> dict|None
35 Insert one ConversationEntry; return the ``to_dict()`` of the
36 inserted row or None on failure.
38 pull_since(user_id, since_id=0, *, limit=None, channel_type=None,
39 agent_id=None) -> list[dict]
40 Cursor pull.
42 publish_new(row_dict) -> None
43 Synchronous (but internally fast) WAMP publish.
45 persist_and_publish_async(...) -> None
46 Fire-and-forget. Submits persist+publish to a module executor.
47"""
49from __future__ import annotations
51import json
52import logging
53import os
54import threading
55import time
56from concurrent.futures import ThreadPoolExecutor
57from typing import Any
59logger = logging.getLogger(__name__)
61# Lazy module cache — keep this file import-cheap so tests that only
62# exercise ``_generate_msg_id`` don't need SQLAlchemy on the path.
63_MODELS_CACHE: dict[str, Any] = {}
64_CACHE_LOCK = threading.Lock()
66# Module-level executor for persist-and-publish off the hot path. Bound
67# small (2 workers) — we are not CPU-bound here; the bottleneck is a DB
68# round-trip. max_workers>1 lets a slow write (network hiccup to a
69# regional MySQL) not block the next turn's persist. Thread-name prefix
70# so it shows up distinctly in ``core.diag`` thread-dump output.
71_EXECUTOR = ThreadPoolExecutor(max_workers=2, thread_name_prefix='chat-sync')
74def _models():
75 """Return ``(get_db, ConversationEntry)`` — cached on first call.
77 Raises ImportError if HARTOS social models can't be loaded, in which
78 case the caller should degrade (skip the write, do NOT crash).
79 """
80 if 'get_db' in _MODELS_CACHE:
81 return _MODELS_CACHE['get_db'], _MODELS_CACHE['ConversationEntry']
82 with _CACHE_LOCK:
83 if 'get_db' in _MODELS_CACHE:
84 return _MODELS_CACHE['get_db'], _MODELS_CACHE['ConversationEntry']
85 from integrations.social.models import ConversationEntry, get_db
86 _MODELS_CACHE['get_db'] = get_db
87 _MODELS_CACHE['ConversationEntry'] = ConversationEntry
88 return _MODELS_CACHE['get_db'], _MODELS_CACHE['ConversationEntry']
91def _generate_msg_id() -> str:
92 """ULID-like 16-char hex id.
94 48 bits of milliseconds (12 hex chars) + 16 bits random (4 hex).
95 Sortable by embedding time first — not cryptographically secure,
96 collision-resistant enough for per-user dedup. For busy multi-user
97 servers, clients should supply their own msg_id; this is a fallback.
98 """
99 ms = int(time.time() * 1000) & 0xFFFFFFFFFFFF # 48-bit cap
100 rnd = int.from_bytes(os.urandom(2), 'big')
101 return f"{ms:012x}{rnd:04x}"
104def _payload_bytes(obj: Any) -> int:
105 """Cheap JSON-encoded size estimate for the byte cap in pull_since."""
106 try:
107 return len(json.dumps(obj, separators=(',', ':'), default=str))
108 except Exception: # noqa: BLE001
109 return 0
112def persist(
113 user_id: str,
114 role: str,
115 content: str,
116 *,
117 msg_id: str | None = None,
118 agent_id: str | None = None,
119 prompt_id: str | None = None,
120 request_id: str | None = None,
121 device_id: str | None = None,
122 lang: str | None = None,
123 attachments: list[dict] | None = None,
124 channel_type: str = 'chat',
125) -> dict | None:
126 """Insert one ConversationEntry. Returns the row dict or None.
128 Deduplication: if ``msg_id`` already exists for THIS user, the
129 existing row is returned without a second insert. Across users a
130 msg_id collision (astronomically unlikely with 64 bits of entropy)
131 is refused — we do not return another user's data.
133 Identity provenance (per
134 ``memory/feedback_unification_reuse_contract.md`` F1):
135 For agent-authored entries (``role='assistant'``), populate:
136 * ``agent_id`` → the agent's prompt_id (top-level column,
137 already supported)
138 * ``attachments`` → include
139 ``{'kind': 'agent_provenance',
140 'agent_id': '<prompt_id>',
141 'bound_to': '<owner_user_id>'}``
142 This lets receiving platforms render trust badges
143 ("verified by Alice") and audit logs attribute agent actions
144 to the human who owns the agent — without any new column,
145 new transport, or new persist call. Single canonical writer
146 contract: every agent-authored persist goes through here.
147 """
148 if not user_id or not role or not content:
149 return None
150 if role not in ('user', 'assistant', 'system'):
151 logger.debug("chat_messages.persist: invalid role %r", role)
152 return None
154 msg_id = msg_id or _generate_msg_id()
155 try:
156 get_db, ConversationEntry = _models()
157 except ImportError:
158 logger.debug("chat_messages.persist: social models unavailable")
159 return None
161 db = None
162 try:
163 db = get_db()
164 existing = db.query(ConversationEntry).filter(
165 ConversationEntry.msg_id == msg_id
166 ).first()
167 if existing is not None:
168 if str(existing.user_id) != str(user_id):
169 # Cross-user msg_id collision: refuse rather than leak
170 # the other user's row. Caller retries with a new id.
171 logger.warning(
172 "chat_messages.persist: msg_id %r cross-user collision "
173 "(owner=%r, caller=%r); refusing",
174 msg_id, existing.user_id, user_id,
175 )
176 return None
177 return existing.to_dict()
179 entry = ConversationEntry(
180 user_id=str(user_id),
181 channel_type=channel_type,
182 role=role,
183 content=content[:10000],
184 agent_id=agent_id,
185 prompt_id=prompt_id,
186 msg_id=msg_id,
187 request_id=request_id,
188 device_id=device_id,
189 lang=lang,
190 attachments=attachments,
191 )
192 db.add(entry)
193 db.commit()
194 db.refresh(entry)
195 return entry.to_dict()
196 except Exception as e: # noqa: BLE001
197 logger.debug("chat_messages.persist failed: %s", e)
198 if db is not None:
199 try:
200 db.rollback()
201 except Exception: # noqa: BLE001
202 pass
203 return None
204 finally:
205 if db is not None:
206 try:
207 db.close()
208 except Exception: # noqa: BLE001
209 pass
212def pull_since(
213 user_id: str,
214 since_id: int = 0,
215 *,
216 limit: int | None = None,
217 channel_type: str | None = None,
218 agent_id: str | None = None,
219) -> list[dict]:
220 """Return ConversationEntry rows with ``id > since_id`` for this user.
222 Ordered by id ASC (monotonic cursor). Truncated at the row budget
223 AND the byte budget — whichever fires first. Always returns at
224 least one row if matches exist and the first row is smaller than
225 the byte cap. Empty list on error or nothing-new.
226 """
227 try:
228 from core.constants import (
229 CHAT_CURSOR_PULL_MAX_BYTES,
230 CHAT_CURSOR_PULL_MAX_ROWS,
231 )
232 except ImportError:
233 CHAT_CURSOR_PULL_MAX_ROWS = 500
234 CHAT_CURSOR_PULL_MAX_BYTES = 2 * 1024 * 1024
236 try:
237 lim = int(limit) if limit is not None else CHAT_CURSOR_PULL_MAX_ROWS
238 except (TypeError, ValueError):
239 lim = CHAT_CURSOR_PULL_MAX_ROWS
240 row_cap = min(max(1, lim), CHAT_CURSOR_PULL_MAX_ROWS)
242 try:
243 since = int(since_id or 0)
244 except (TypeError, ValueError):
245 since = 0
246 if since < 0:
247 since = 0
249 if not user_id:
250 return []
252 try:
253 get_db, ConversationEntry = _models()
254 except ImportError:
255 return []
257 db = None
258 try:
259 db = get_db()
260 q = db.query(ConversationEntry).filter(
261 ConversationEntry.user_id == str(user_id),
262 ConversationEntry.id > since,
263 )
264 if channel_type:
265 q = q.filter(ConversationEntry.channel_type == channel_type)
266 if agent_id:
267 q = q.filter(ConversationEntry.agent_id == agent_id)
268 q = q.order_by(ConversationEntry.id.asc()).limit(row_cap)
270 out: list[dict] = []
271 running = 0
272 for row in q.all():
273 d = row.to_dict()
274 # Back-fill msg_id for legacy rows written before v38 so
275 # clients can dedup safely. Uses ``legacy-<seq>`` — never
276 # collides with a real 16-char hex msg_id. Not written
277 # back to DB — pulled-dict-only.
278 if not d.get('msg_id'):
279 d['msg_id'] = f"legacy-{d['id']}"
280 size = _payload_bytes(d)
281 if out and running + size > CHAT_CURSOR_PULL_MAX_BYTES:
282 break
283 out.append(d)
284 running += size
285 return out
286 except Exception as e: # noqa: BLE001
287 logger.debug("chat_messages.pull_since failed: %s", e)
288 return []
289 finally:
290 if db is not None:
291 try:
292 db.close()
293 except Exception: # noqa: BLE001
294 pass
297def publish_new(row_dict: dict) -> None:
298 """Publish chat.new for a freshly-persisted row.
300 Routes through ``core.peer_link.message_bus`` — that handles LOCAL
301 in-memory delivery synchronously (microseconds) and fans out to
302 PeerLink + Crossbar asynchronously via its own executor. We import
303 the bus module here rather than pulling ``hart_intelligence_entry``
304 because the latter has a heavy langchain/torch import chain that
305 is inappropriate for a synchronous hot-path helper and also breaks
306 unit tests that don't have GPU-grade memory available. Safe to
307 call on the hot path.
308 """
309 if not row_dict or not row_dict.get('user_id'):
310 return
311 try:
312 from core.constants import CHAT_TOPIC_NEW
313 except ImportError:
314 return
315 topic = f"{CHAT_TOPIC_NEW}.{row_dict['user_id']}"
316 try:
317 from core.peer_link.message_bus import get_message_bus
318 except ImportError:
319 logger.debug("chat_messages.publish_new: message_bus unavailable")
320 return
321 try:
322 bus = get_message_bus()
323 bus.publish(topic, row_dict)
324 except Exception as e: # noqa: BLE001
325 logger.debug("chat_messages.publish_new failed for user %s: %s",
326 row_dict.get('user_id'), e)
329def persist_and_publish_async(
330 user_id: str,
331 role: str,
332 content: str,
333 **kwargs: Any,
334) -> None:
335 """Fire-and-forget persist + publish. Safe on the chat hot path.
337 Submits to a bounded module executor; the caller returns immediately
338 with no DB or WAMP latency. On executor saturation the submit
339 itself could block — but max_workers=2 is only saturated when two
340 previous persists are stuck, which is already a symptom of DB
341 trouble; we deliberately let backpressure surface there rather
342 than queue unbounded.
343 """
344 def _run() -> None:
345 row = persist(user_id, role, content, **kwargs)
346 if row:
347 publish_new(row)
349 try:
350 _EXECUTOR.submit(_run)
351 except RuntimeError as e:
352 # Executor shut down (interpreter exit). Fall back to inline.
353 logger.debug("chat_messages.persist_and_publish_async: executor "
354 "unavailable (%s); running inline", e)
355 try:
356 _run()
357 except Exception: # noqa: BLE001
358 pass
361def persist_external_room_event(
362 user_id: str,
363 platform: str,
364 room_id: str,
365 sender_id: str,
366 text: str,
367 *,
368 kind: str = 'external_room_msg',
369 timestamp: float | None = None,
370 role: str | None = None,
371 msg_id: str | None = None,
372 lang: str | None = None,
373 extra: dict | None = None,
374) -> None:
375 """Canonical SINGLE-WRITER for cross-channel external-room events
376 (UNIF-G3). Channel adapters and the agent_voice_bridge worker MUST
377 call this when an inbound message arrives from an external room
378 (Discord channel msg, Slack channel msg, voice transcript segment,
379 Matrix room post, Telegram super-group msg, WhatsApp group msg, etc.).
381 Why a thin wrapper around ``persist_and_publish_async`` rather than
382 each adapter calling that directly: this is the ONE place that
383 encodes the canonical (platform, room_id, sender, ts, kind) shape
384 into the existing ``ConversationEntry`` columns
385 (``channel_type``, ``request_id``, ``device_id``, ``attachments``).
386 A new adapter wires up cross-channel ingest by importing this one
387 helper — no per-adapter copy of the field-mapping logic.
389 Field mapping into the canonical ``ConversationEntry``:
390 - ``channel_type`` ← ``f"{platform}:{room_id}"`` (composite — both
391 platform AND room are addressable in the recall surface)
392 - ``request_id`` ← ``room_id`` (groups all events from the same
393 room for thread-style recall)
394 - ``device_id`` ← ``f"adapter:{platform}"`` (tells the chat-sync
395 consumer this came from an external adapter, not a local device)
396 - ``role`` ← caller-specified or auto-derived from sender
397 (matches the agent's own user_id → role='assistant',
398 else role='user')
399 - ``content`` ← the raw text of the external event
400 - ``attachments`` ← ``[{kind, platform, room_id, author, t}]``
401 for full provenance recall
403 Voice transcript segments use ``kind='transcript_segment'`` (with
404 ``extra={'t0', 't1', 'speaker'}``) — the agent_voice_bridge worker
405 pipes whisper_tool's streaming WebSocket output through here.
407 Per ``feedback_test_what_we_ship.md`` — this helper has unit tests
408 in ``tests/unit/test_chat_messages_external.py``.
410 Best-effort: never raises out of the hot path.
411 """
412 if not user_id or not platform or not room_id or not text:
413 logger.debug(
414 "chat_messages.persist_external_room_event: skipping "
415 "(missing required field): platform=%r room=%r sender=%r",
416 platform, room_id, sender_id)
417 return
419 # Auto-derive role: anything from "the agent itself" is 'assistant';
420 # everything else is 'user'. Caller can override.
421 if role is None:
422 # Heuristic: sender id of the form "agent:..." or matching the
423 # owner's agent identity → assistant. Default to 'user'.
424 role = 'assistant' if str(sender_id or '').startswith('agent:') else 'user'
426 attachment = {
427 'kind': kind,
428 'platform': platform,
429 'room_id': room_id,
430 'author': sender_id,
431 }
432 if timestamp is not None:
433 attachment['t'] = timestamp
434 if extra:
435 # Merge caller-supplied per-kind metadata (e.g. transcript
436 # segment t0/t1/speaker) — keep adapter-specific keys out of
437 # the canonical attachment shape.
438 for k, v in extra.items():
439 if k not in attachment:
440 attachment[k] = v
442 persist_and_publish_async(
443 user_id,
444 role,
445 text,
446 msg_id=msg_id,
447 request_id=room_id,
448 device_id=f'adapter:{platform}',
449 lang=lang,
450 attachments=[attachment],
451 channel_type=f'{platform}:{room_id}',
452 )