Coverage for integrations / social / chat_messages.py: 24.8%

133 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Chat-sync service — canonical writer + cursor-pull reader for 

2cross-device chat mirroring (U1-U9 workstream, task #389). 

3 

4Design principles (CLAUDE.md Gates): 

5 * Gate 2 (DRY): extends the existing ``ConversationEntry`` table — 

6 does NOT introduce a parallel ChatMessage model. Every chat turn 

7 (/chat hot path, channel adapters, LangChain prep_outputs) already 

8 lands here; this module gives it a stable API + cursor-pull + 

9 WAMP publish in one place. 

10 * Gate 3 (SRP): 

11 - ``persist(...)`` writes one row (per role) and returns the row. 

12 - ``pull_since(...)`` reads rows since a cursor, capped by row- + 

13 byte-budget from ``core.constants.CHAT_CURSOR_PULL_MAX_*``. 

14 - ``publish_new(...)`` fires the chat.new WAMP event (synchronous 

15 call — ``publish_async`` already routes its HTTP leg through 

16 its own executor, so the MessageBus hand-off is O(1)). 

17 - ``persist_and_publish_async(...)`` submits both to a module 

18 executor so callers on the chat hot path incur no latency. 

19 NO I/O mixing; the HTTP handler in Nunba's main.py stays pure glue. 

20 * Gate 4 (no parallel paths): ONE persistence helper — this one. 

21 ``world_model_bridge._persist_to_conversation_entry`` is preserved 

22 for now (other call sites), but new writes go through here. 

23 * Gate 8 (security): ``user_id`` is attacker-controllable via body; 

24 the HTTP gate (Nunba main.py) confirms JWT→user_id BEFORE calling 

25 ``persist`` / ``pull_since``. This module trusts its inputs. 

26 Dedup-on-msg_id verifies user_id matches before returning — a 

27 cross-user msg_id collision (astronomically unlikely) would NOT 

28 leak the other user's row. 

29 

30Public API 

31---------- 

32 persist(user_id, role, content, *, msg_id=None, agent_id=None, 

33 prompt_id=None, request_id=None, device_id=None, 

34 lang=None, attachments=None, channel_type='chat') -> dict|None 

35 Insert one ConversationEntry; return the ``to_dict()`` of the 

36 inserted row or None on failure. 

37 

38 pull_since(user_id, since_id=0, *, limit=None, channel_type=None, 

39 agent_id=None) -> list[dict] 

40 Cursor pull. 

41 

42 publish_new(row_dict) -> None 

43 Synchronous (but internally fast) WAMP publish. 

44 

45 persist_and_publish_async(...) -> None 

46 Fire-and-forget. Submits persist+publish to a module executor. 

47""" 

48 

49from __future__ import annotations 

50 

51import json 

52import logging 

53import os 

54import threading 

55import time 

56from concurrent.futures import ThreadPoolExecutor 

57from typing import Any 

58 

59logger = logging.getLogger(__name__) 

60 

61# Lazy module cache — keep this file import-cheap so tests that only 

62# exercise ``_generate_msg_id`` don't need SQLAlchemy on the path. 

63_MODELS_CACHE: dict[str, Any] = {} 

64_CACHE_LOCK = threading.Lock() 

65 

66# Module-level executor for persist-and-publish off the hot path. Bound 

67# small (2 workers) — we are not CPU-bound here; the bottleneck is a DB 

68# round-trip. max_workers>1 lets a slow write (network hiccup to a 

69# regional MySQL) not block the next turn's persist. Thread-name prefix 

70# so it shows up distinctly in ``core.diag`` thread-dump output. 

71_EXECUTOR = ThreadPoolExecutor(max_workers=2, thread_name_prefix='chat-sync') 

72 

73 

74def _models(): 

75 """Return ``(get_db, ConversationEntry)`` — cached on first call. 

76 

77 Raises ImportError if HARTOS social models can't be loaded, in which 

78 case the caller should degrade (skip the write, do NOT crash). 

79 """ 

80 if 'get_db' in _MODELS_CACHE: 

81 return _MODELS_CACHE['get_db'], _MODELS_CACHE['ConversationEntry'] 

82 with _CACHE_LOCK: 

83 if 'get_db' in _MODELS_CACHE: 

84 return _MODELS_CACHE['get_db'], _MODELS_CACHE['ConversationEntry'] 

85 from integrations.social.models import ConversationEntry, get_db 

86 _MODELS_CACHE['get_db'] = get_db 

87 _MODELS_CACHE['ConversationEntry'] = ConversationEntry 

88 return _MODELS_CACHE['get_db'], _MODELS_CACHE['ConversationEntry'] 

89 

90 

91def _generate_msg_id() -> str: 

92 """ULID-like 16-char hex id. 

93 

94 48 bits of milliseconds (12 hex chars) + 16 bits random (4 hex). 

95 Sortable by embedding time first — not cryptographically secure, 

96 collision-resistant enough for per-user dedup. For busy multi-user 

97 servers, clients should supply their own msg_id; this is a fallback. 

98 """ 

99 ms = int(time.time() * 1000) & 0xFFFFFFFFFFFF # 48-bit cap 

100 rnd = int.from_bytes(os.urandom(2), 'big') 

101 return f"{ms:012x}{rnd:04x}" 

102 

103 

104def _payload_bytes(obj: Any) -> int: 

105 """Cheap JSON-encoded size estimate for the byte cap in pull_since.""" 

106 try: 

107 return len(json.dumps(obj, separators=(',', ':'), default=str)) 

108 except Exception: # noqa: BLE001 

109 return 0 

110 

111 

112def persist( 

113 user_id: str, 

114 role: str, 

115 content: str, 

116 *, 

117 msg_id: str | None = None, 

118 agent_id: str | None = None, 

119 prompt_id: str | None = None, 

120 request_id: str | None = None, 

121 device_id: str | None = None, 

122 lang: str | None = None, 

123 attachments: list[dict] | None = None, 

124 channel_type: str = 'chat', 

125) -> dict | None: 

126 """Insert one ConversationEntry. Returns the row dict or None. 

127 

128 Deduplication: if ``msg_id`` already exists for THIS user, the 

129 existing row is returned without a second insert. Across users a 

130 msg_id collision (astronomically unlikely with 64 bits of entropy) 

131 is refused — we do not return another user's data. 

132 

133 Identity provenance (per 

134 ``memory/feedback_unification_reuse_contract.md`` F1): 

135 For agent-authored entries (``role='assistant'``), populate: 

136 * ``agent_id`` → the agent's prompt_id (top-level column, 

137 already supported) 

138 * ``attachments`` → include 

139 ``{'kind': 'agent_provenance', 

140 'agent_id': '<prompt_id>', 

141 'bound_to': '<owner_user_id>'}`` 

142 This lets receiving platforms render trust badges 

143 ("verified by Alice") and audit logs attribute agent actions 

144 to the human who owns the agent — without any new column, 

145 new transport, or new persist call. Single canonical writer 

146 contract: every agent-authored persist goes through here. 

147 """ 

148 if not user_id or not role or not content: 

149 return None 

150 if role not in ('user', 'assistant', 'system'): 

151 logger.debug("chat_messages.persist: invalid role %r", role) 

152 return None 

153 

154 msg_id = msg_id or _generate_msg_id() 

155 try: 

156 get_db, ConversationEntry = _models() 

157 except ImportError: 

158 logger.debug("chat_messages.persist: social models unavailable") 

159 return None 

160 

161 db = None 

162 try: 

163 db = get_db() 

164 existing = db.query(ConversationEntry).filter( 

165 ConversationEntry.msg_id == msg_id 

166 ).first() 

167 if existing is not None: 

168 if str(existing.user_id) != str(user_id): 

169 # Cross-user msg_id collision: refuse rather than leak 

170 # the other user's row. Caller retries with a new id. 

171 logger.warning( 

172 "chat_messages.persist: msg_id %r cross-user collision " 

173 "(owner=%r, caller=%r); refusing", 

174 msg_id, existing.user_id, user_id, 

175 ) 

176 return None 

177 return existing.to_dict() 

178 

179 entry = ConversationEntry( 

180 user_id=str(user_id), 

181 channel_type=channel_type, 

182 role=role, 

183 content=content[:10000], 

184 agent_id=agent_id, 

185 prompt_id=prompt_id, 

186 msg_id=msg_id, 

187 request_id=request_id, 

188 device_id=device_id, 

189 lang=lang, 

190 attachments=attachments, 

191 ) 

192 db.add(entry) 

193 db.commit() 

194 db.refresh(entry) 

195 return entry.to_dict() 

196 except Exception as e: # noqa: BLE001 

197 logger.debug("chat_messages.persist failed: %s", e) 

198 if db is not None: 

199 try: 

200 db.rollback() 

201 except Exception: # noqa: BLE001 

202 pass 

203 return None 

204 finally: 

205 if db is not None: 

206 try: 

207 db.close() 

208 except Exception: # noqa: BLE001 

209 pass 

210 

211 

212def pull_since( 

213 user_id: str, 

214 since_id: int = 0, 

215 *, 

216 limit: int | None = None, 

217 channel_type: str | None = None, 

218 agent_id: str | None = None, 

219) -> list[dict]: 

220 """Return ConversationEntry rows with ``id > since_id`` for this user. 

221 

222 Ordered by id ASC (monotonic cursor). Truncated at the row budget 

223 AND the byte budget — whichever fires first. Always returns at 

224 least one row if matches exist and the first row is smaller than 

225 the byte cap. Empty list on error or nothing-new. 

226 """ 

227 try: 

228 from core.constants import ( 

229 CHAT_CURSOR_PULL_MAX_BYTES, 

230 CHAT_CURSOR_PULL_MAX_ROWS, 

231 ) 

232 except ImportError: 

233 CHAT_CURSOR_PULL_MAX_ROWS = 500 

234 CHAT_CURSOR_PULL_MAX_BYTES = 2 * 1024 * 1024 

235 

236 try: 

237 lim = int(limit) if limit is not None else CHAT_CURSOR_PULL_MAX_ROWS 

238 except (TypeError, ValueError): 

239 lim = CHAT_CURSOR_PULL_MAX_ROWS 

240 row_cap = min(max(1, lim), CHAT_CURSOR_PULL_MAX_ROWS) 

241 

242 try: 

243 since = int(since_id or 0) 

244 except (TypeError, ValueError): 

245 since = 0 

246 if since < 0: 

247 since = 0 

248 

249 if not user_id: 

250 return [] 

251 

252 try: 

253 get_db, ConversationEntry = _models() 

254 except ImportError: 

255 return [] 

256 

257 db = None 

258 try: 

259 db = get_db() 

260 q = db.query(ConversationEntry).filter( 

261 ConversationEntry.user_id == str(user_id), 

262 ConversationEntry.id > since, 

263 ) 

264 if channel_type: 

265 q = q.filter(ConversationEntry.channel_type == channel_type) 

266 if agent_id: 

267 q = q.filter(ConversationEntry.agent_id == agent_id) 

268 q = q.order_by(ConversationEntry.id.asc()).limit(row_cap) 

269 

270 out: list[dict] = [] 

271 running = 0 

272 for row in q.all(): 

273 d = row.to_dict() 

274 # Back-fill msg_id for legacy rows written before v38 so 

275 # clients can dedup safely. Uses ``legacy-<seq>`` — never 

276 # collides with a real 16-char hex msg_id. Not written 

277 # back to DB — pulled-dict-only. 

278 if not d.get('msg_id'): 

279 d['msg_id'] = f"legacy-{d['id']}" 

280 size = _payload_bytes(d) 

281 if out and running + size > CHAT_CURSOR_PULL_MAX_BYTES: 

282 break 

283 out.append(d) 

284 running += size 

285 return out 

286 except Exception as e: # noqa: BLE001 

287 logger.debug("chat_messages.pull_since failed: %s", e) 

288 return [] 

289 finally: 

290 if db is not None: 

291 try: 

292 db.close() 

293 except Exception: # noqa: BLE001 

294 pass 

295 

296 

297def publish_new(row_dict: dict) -> None: 

298 """Publish chat.new for a freshly-persisted row. 

299 

300 Routes through ``core.peer_link.message_bus`` — that handles LOCAL 

301 in-memory delivery synchronously (microseconds) and fans out to 

302 PeerLink + Crossbar asynchronously via its own executor. We import 

303 the bus module here rather than pulling ``hart_intelligence_entry`` 

304 because the latter has a heavy langchain/torch import chain that 

305 is inappropriate for a synchronous hot-path helper and also breaks 

306 unit tests that don't have GPU-grade memory available. Safe to 

307 call on the hot path. 

308 """ 

309 if not row_dict or not row_dict.get('user_id'): 

310 return 

311 try: 

312 from core.constants import CHAT_TOPIC_NEW 

313 except ImportError: 

314 return 

315 topic = f"{CHAT_TOPIC_NEW}.{row_dict['user_id']}" 

316 try: 

317 from core.peer_link.message_bus import get_message_bus 

318 except ImportError: 

319 logger.debug("chat_messages.publish_new: message_bus unavailable") 

320 return 

321 try: 

322 bus = get_message_bus() 

323 bus.publish(topic, row_dict) 

324 except Exception as e: # noqa: BLE001 

325 logger.debug("chat_messages.publish_new failed for user %s: %s", 

326 row_dict.get('user_id'), e) 

327 

328 

329def persist_and_publish_async( 

330 user_id: str, 

331 role: str, 

332 content: str, 

333 **kwargs: Any, 

334) -> None: 

335 """Fire-and-forget persist + publish. Safe on the chat hot path. 

336 

337 Submits to a bounded module executor; the caller returns immediately 

338 with no DB or WAMP latency. On executor saturation the submit 

339 itself could block — but max_workers=2 is only saturated when two 

340 previous persists are stuck, which is already a symptom of DB 

341 trouble; we deliberately let backpressure surface there rather 

342 than queue unbounded. 

343 """ 

344 def _run() -> None: 

345 row = persist(user_id, role, content, **kwargs) 

346 if row: 

347 publish_new(row) 

348 

349 try: 

350 _EXECUTOR.submit(_run) 

351 except RuntimeError as e: 

352 # Executor shut down (interpreter exit). Fall back to inline. 

353 logger.debug("chat_messages.persist_and_publish_async: executor " 

354 "unavailable (%s); running inline", e) 

355 try: 

356 _run() 

357 except Exception: # noqa: BLE001 

358 pass 

359 

360 

361def persist_external_room_event( 

362 user_id: str, 

363 platform: str, 

364 room_id: str, 

365 sender_id: str, 

366 text: str, 

367 *, 

368 kind: str = 'external_room_msg', 

369 timestamp: float | None = None, 

370 role: str | None = None, 

371 msg_id: str | None = None, 

372 lang: str | None = None, 

373 extra: dict | None = None, 

374) -> None: 

375 """Canonical SINGLE-WRITER for cross-channel external-room events 

376 (UNIF-G3). Channel adapters and the agent_voice_bridge worker MUST 

377 call this when an inbound message arrives from an external room 

378 (Discord channel msg, Slack channel msg, voice transcript segment, 

379 Matrix room post, Telegram super-group msg, WhatsApp group msg, etc.). 

380 

381 Why a thin wrapper around ``persist_and_publish_async`` rather than 

382 each adapter calling that directly: this is the ONE place that 

383 encodes the canonical (platform, room_id, sender, ts, kind) shape 

384 into the existing ``ConversationEntry`` columns 

385 (``channel_type``, ``request_id``, ``device_id``, ``attachments``). 

386 A new adapter wires up cross-channel ingest by importing this one 

387 helper — no per-adapter copy of the field-mapping logic. 

388 

389 Field mapping into the canonical ``ConversationEntry``: 

390 - ``channel_type`` ← ``f"{platform}:{room_id}"`` (composite — both 

391 platform AND room are addressable in the recall surface) 

392 - ``request_id`` ← ``room_id`` (groups all events from the same 

393 room for thread-style recall) 

394 - ``device_id`` ← ``f"adapter:{platform}"`` (tells the chat-sync 

395 consumer this came from an external adapter, not a local device) 

396 - ``role`` ← caller-specified or auto-derived from sender 

397 (matches the agent's own user_id → role='assistant', 

398 else role='user') 

399 - ``content`` ← the raw text of the external event 

400 - ``attachments`` ← ``[{kind, platform, room_id, author, t}]`` 

401 for full provenance recall 

402 

403 Voice transcript segments use ``kind='transcript_segment'`` (with 

404 ``extra={'t0', 't1', 'speaker'}``) — the agent_voice_bridge worker 

405 pipes whisper_tool's streaming WebSocket output through here. 

406 

407 Per ``feedback_test_what_we_ship.md`` — this helper has unit tests 

408 in ``tests/unit/test_chat_messages_external.py``. 

409 

410 Best-effort: never raises out of the hot path. 

411 """ 

412 if not user_id or not platform or not room_id or not text: 

413 logger.debug( 

414 "chat_messages.persist_external_room_event: skipping " 

415 "(missing required field): platform=%r room=%r sender=%r", 

416 platform, room_id, sender_id) 

417 return 

418 

419 # Auto-derive role: anything from "the agent itself" is 'assistant'; 

420 # everything else is 'user'. Caller can override. 

421 if role is None: 

422 # Heuristic: sender id of the form "agent:..." or matching the 

423 # owner's agent identity → assistant. Default to 'user'. 

424 role = 'assistant' if str(sender_id or '').startswith('agent:') else 'user' 

425 

426 attachment = { 

427 'kind': kind, 

428 'platform': platform, 

429 'room_id': room_id, 

430 'author': sender_id, 

431 } 

432 if timestamp is not None: 

433 attachment['t'] = timestamp 

434 if extra: 

435 # Merge caller-supplied per-kind metadata (e.g. transcript 

436 # segment t0/t1/speaker) — keep adapter-specific keys out of 

437 # the canonical attachment shape. 

438 for k, v in extra.items(): 

439 if k not in attachment: 

440 attachment[k] = v 

441 

442 persist_and_publish_async( 

443 user_id, 

444 role, 

445 text, 

446 msg_id=msg_id, 

447 request_id=room_id, 

448 device_id=f'adapter:{platform}', 

449 lang=lang, 

450 attachments=[attachment], 

451 channel_type=f'{platform}:{room_id}', 

452 )