Coverage for integrations / social / agent_voice_bridge.py: 54.4%
182 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial — AgentVoiceBridge. Phase 7d.B.
4Plan reference: sunny-gliding-eich.md, Part E.12.
6WHY this exists:
7 Agents are first-class call participants per Plan A.3 #4 / B.4.
8 But agents are LLMs + TTS pipelines, not WebRTC clients — they
9 can't hold a media socket directly. AgentVoiceBridge runs
10 server-side (on the user's local node for flat / regional, on a
11 tenant-isolated worker pod for central) and:
13 - Holds the LiveKit publisher track on behalf of the agent.
14 - Subscribes to other participants' audio + runs Whisper STT.
15 - Feeds transcripts through the SAME `agentic_router.dispatch_to_agent`
16 path used for post / comment / message mentions. GuardrailEnforcer
17 (before_dispatch + after_response) gates every agent action — the
18 agent never has a privileged path.
19 - TTS-ifies the agent's reply via PocketTTS and publishes the audio
20 frames into the LiveKit room.
22This module ships the SCAFFOLDING + the worker contract. The actual
23audio frame plumbing depends on the LiveKit Python SDK
24(`livekit-api`, `livekit-rtc`) and the existing PocketTTS + Whisper
25modules. Each integration point is marked with a comment + a stub
26return so the module is importable + testable without those packages
27installed.
29Lifecycle:
30 AgentVoiceBridge.attach_agent(call_id, agent_id, owner_id, scope)
31 → spawns one bridge worker per (call, agent) pair
32 → registers a CallParticipant row with device_kind='agent_bridge'
33 → returns the participant dict
35 AgentVoiceBridge.detach_agent(call_id, agent_id)
36 → flips the participant.left_at = now
37 → signals the worker to stop on its next tick
39 AgentVoiceBridge.list_active(call_id) → list of bridge workers
41The worker loop runs in a daemon thread (not asyncio) to match
42HARTOS's existing `agentic_router.dispatch_to_agent` daemon-thread
43pattern. Idempotent attach: re-attaching an already-active agent
44returns the existing worker.
46Transport: pure server-side. The LiveKit room is the media plane
47(SFU); PeerLink DISPATCH carries call signaling (mute, hangup);
48this bridge bridges the gap. No new transport invented.
49"""
51from __future__ import annotations
53import logging
54import threading
55import time
56from collections import deque
57from typing import Any, Deque, Dict, List, Optional
59logger = logging.getLogger('hevolve_social')
62# Best-effort imports — all of these may be absent on flat /
63# Nunba bundled deploys. The bridge degrades to a no-op stub
64# in that case so importing this module is always safe.
65try:
66 from livekit import rtc as livekit_rtc # type: ignore
67 _HAS_LIVEKIT_RTC = True
68except Exception:
69 livekit_rtc = None
70 _HAS_LIVEKIT_RTC = False
73# Active workers keyed by (call_id, agent_id). Module-level dict +
74# lock — process-local, restart clears. Workers are daemon threads
75# so process exit doesn't hang.
76_ACTIVE_WORKERS: Dict[tuple, 'AgentBridgeWorker'] = {}
77_WORKERS_LOCK = threading.Lock()
80# ── TTS outbox: agent reply text → audio publisher ──────────────────
81#
82# This is the symmetric counterpart of whisper_tool's STT-segment
83# queue. agentic_router._post_agent_reply for source_kind='call'
84# enqueues the agent's reply text here; the audio publisher (the
85# bridge worker's TTS half OR an external adapter — LiveKit-rtc
86# producer / Discord voice client) drains it, synthesizes via
87# PocketTTS, and publishes audio frames into the room.
88#
89# Single canonical home for outbound agent-text-on-calls. No parallel
90# queues elsewhere — every producer pushes here, every consumer reads
91# here, just like the STT side in whisper_tool.
92#
93# Bounded per (call, agent) so a stalled audio adapter can't grow
94# unbounded. Drops oldest with WARN if cap exceeded.
95_TTS_OUTBOX: Dict[tuple, deque] = {}
96_TTS_LOCK = threading.Lock()
97_TTS_OUTBOX_CAP = 64 # chunks per (call, agent)
100# Polling cadence inside the worker loop. 250ms balances STT
101# latency against CPU cost; lowered to 50ms for tests via the
102# `_WORKER_TICK_S` module override.
103_WORKER_TICK_S = 0.25
106def enqueue_tts_text(call_id: str, agent_id: str, text: str) -> bool:
107 """Append agent reply text to the (call, agent) TTS outbox.
109 Producer-side: agentic_router._post_agent_reply for source_kind
110 ='call' calls this with the LLM reply text after GuardrailEnforcer
111 .after_response has already approved it.
113 Returns True iff enqueued (False on empty text or invalid keys).
114 Best-effort: never raises.
115 """
116 if not call_id or not agent_id:
117 return False
118 if not text or not text.strip():
119 return False
120 key = (call_id, agent_id)
121 with _TTS_LOCK:
122 q = _TTS_OUTBOX.setdefault(key, deque())
123 q.append({'text': text.strip(), 'enqueued_at': time.time()})
124 while len(q) > _TTS_OUTBOX_CAP:
125 evicted = q.popleft()
126 logger.warning(
127 "AgentVoiceBridge.enqueue_tts_text: cap %d exceeded "
128 "for call=%s agent=%s; evicted text=%r",
129 _TTS_OUTBOX_CAP, call_id, agent_id,
130 evicted['text'][:80])
131 return True
134def dequeue_tts_text(call_id: str, agent_id: str,
135 limit: int = 4) -> List[Dict[str, Any]]:
136 """Drain up to `limit` reply chunks for (call, agent). Destructive
137 — once popped, gone. Bridge worker calls per tick."""
138 if not call_id or not agent_id:
139 return []
140 key = (call_id, agent_id)
141 out: List[Dict[str, Any]] = []
142 with _TTS_LOCK:
143 q = _TTS_OUTBOX.get(key)
144 if not q:
145 return []
146 while q and len(out) < limit:
147 out.append(q.popleft())
148 if not q:
149 _TTS_OUTBOX.pop(key, None)
150 return out
153def tts_outbox_depth(call_id: str, agent_id: str) -> int:
154 """For /health + tests."""
155 with _TTS_LOCK:
156 q = _TTS_OUTBOX.get((call_id, agent_id))
157 return len(q) if q else 0
160class AgentBridgeError(Exception):
161 """Bridge-attach failures — caller maps to 4xx."""
164class AgentBridgeWorker:
165 """One bridge per (call, agent) pair. Holds the LiveKit
166 publisher token, runs the STT/dispatch/TTS loop until detach.
168 Scaffolded — actual frame I/O is wired alongside livekit-rtc +
169 PocketTTS + whisper integrations. The contract here lets the
170 rest of the system call attach_agent / detach_agent today."""
172 def __init__(self, call_id: str, agent_id: str, owner_id: str,
173 scope: Dict[str, Any]):
174 self.call_id = call_id
175 self.agent_id = agent_id
176 self.owner_id = owner_id
177 self.scope = dict(scope or {})
178 self._stop = threading.Event()
179 self._thread: Optional[threading.Thread] = None
180 self.started_at = time.time()
181 # Last STT segment id we drained from
182 # ``whisper_tool.dequeue_segments`` — monotonic int per call.
183 # Passed back as the ``since`` watermark next tick to avoid
184 # re-processing the same segment twice if a producer re-emits.
185 self._last_stt_segment_id: Optional[int] = None
186 # Rolling state for the Liquid UI meet_copilot card (UNIF-G5).
187 # Capped to keep the card lightweight on every emit; the card
188 # only shows the most-recent N lines anyway. Decisions and
189 # action items are appended by future LLM-driven extraction
190 # (out of W1.3 scope); kept here so the card is forward-
191 # compatible. Participants list is populated by adapter
192 # ``list_room_members`` calls in a follow-up.
193 self._transcript_lines: Deque[Dict[str, Any]] = deque(maxlen=10)
194 self._decisions: List[str] = []
195 self._action_items: List[str] = []
197 def start(self) -> None:
198 if self._thread and self._thread.is_alive():
199 return
200 self._thread = threading.Thread(
201 target=self._loop,
202 name=f'agent_voice_bridge_{self.agent_id[:8]}',
203 daemon=True,
204 )
205 self._thread.start()
207 def stop(self) -> None:
208 self._stop.set()
210 def is_alive(self) -> bool:
211 return bool(self._thread and self._thread.is_alive())
213 def to_dict(self) -> Dict[str, Any]:
214 return {
215 'call_id': self.call_id,
216 'agent_id': self.agent_id,
217 'owner_id': self.owner_id,
218 'scope': self.scope,
219 'started_at': self.started_at,
220 'alive': self.is_alive(),
221 }
223 # ── Worker loop ─────────────────────────────────────────────
225 def _loop(self) -> None:
226 """Main bridge loop. Each tick:
227 1. Check stop signal.
228 2. Pull fresh STT segments from LiveKit subscriber stream.
229 3. For each segment, dispatch through agentic_router (which
230 gates via GuardrailEnforcer.before_dispatch + .after_response).
231 4. TTS-ify the reply (if any) + publish to LiveKit.
233 All steps degrade to no-ops when their underlying integrations
234 aren't installed (flat / regional / Nunba bundled).
235 """
236 logger.info("AgentBridgeWorker starting: call=%s agent=%s",
237 self.call_id, self.agent_id)
238 # Emit the initial meet_copilot card so the user sees the
239 # presence indicator immediately on attach (UNIF-G5) — empty
240 # transcript_lines is fine; subsequent ticks fill it.
241 self._emit_meet_copilot(state='live')
242 try:
243 while not self._stop.is_set():
244 try:
245 self._tick()
246 except Exception as e:
247 logger.warning(
248 "AgentBridgeWorker tick error (call=%s, "
249 "agent=%s): %s — continuing",
250 self.call_id, self.agent_id, e)
251 self._stop.wait(_WORKER_TICK_S)
252 finally:
253 logger.info(
254 "AgentBridgeWorker stopped: call=%s agent=%s",
255 self.call_id, self.agent_id)
256 # Flip state to 'ended' so the renderer can dismiss /
257 # collapse the card. Idempotent overwrite — same canonical
258 # emit path.
259 try:
260 self._emit_meet_copilot(state='ended')
261 except Exception:
262 pass
264 def _tick(self) -> None:
265 """One worker iteration — lazy-importing the integrations so
266 unit tests don't drag in livekit/whisper/PocketTTS modules
267 that may not be installed.
269 Drains finalized STT segments from
270 ``whisper_tool.dequeue_segments`` (the canonical per-call queue
271 that audio-adapter producers fill — LiveKit subscriber, Discord
272 voice receiver, RN mic stream, etc.). For each segment whose
273 author is NOT this agent:
274 (a) Persists it as a ``ConversationEntry`` row via
275 ``chat_messages.persist_external_room_event`` with
276 ``kind='transcript_segment'`` so cross-channel recall
277 sees the call transcript chronologically alongside
278 external-room messages (UNIF-G3).
279 (b) Dispatches the transcript text through
280 ``agentic_router.dispatch_to_agent`` so the agent can
281 respond (existing canonical agent dispatch path).
283 Self-authored segments are skipped — the agent should not
284 talk to itself. Interim (non-final) segments never enter the
285 queue (producers should only enqueue ``is_final=True``); a
286 defensive ``is_final`` check stays here as belt-and-braces.
288 TTS publish-back to the room is the producer-adapter's
289 concern (LiveKit publish track / Discord voice client send),
290 not the bridge's. This loop is one-way: room → agent.
292 Whichever audio source isn't installed (LiveKit RTC, discord.py
293 voice receive lib, etc.) just means the producer never enqueues
294 for this call_id — the worker tick is then a no-op drain.
295 Always safe to run; never raises out of the loop.
296 """
297 # Lazy imports — avoids hard dependency on whisper / agentic_router
298 # at module-import time (unit tests of attach/detach don't need
299 # to drag in the LLM dispatch graph or audio toolchain).
300 try:
301 from integrations.service_tools.whisper_tool import (
302 dequeue_segments,
303 )
304 from integrations.social.chat_messages import (
305 persist_external_room_event,
306 )
307 except Exception as e: # pragma: no cover — import-only failure
308 logger.debug(
309 "AgentBridgeWorker._tick: lazy imports unavailable "
310 "(%s) — skipping drain", e)
311 return
313 # Drain the queue since our last watermark. whisper_tool returns
314 # only segments with id > since AND prunes them from the queue;
315 # no double-processing.
316 try:
317 segments = dequeue_segments(
318 call_id=self.call_id,
319 since=self._last_stt_segment_id,
320 )
321 except Exception as e:
322 logger.warning(
323 "AgentBridgeWorker._tick: dequeue_segments failed "
324 "(call=%s): %s", self.call_id, e)
325 return
327 if not segments:
328 return
330 for seg in segments:
331 seg_id = seg.get('segment_id')
332 if not seg.get('is_final', True):
333 # Defensive: producers should not enqueue interim, but
334 # if they do, we just skip + keep advancing the watermark.
335 if seg_id is not None:
336 self._last_stt_segment_id = seg_id
337 continue
339 author_id = seg.get('author_id')
340 text = (seg.get('text') or '').strip()
341 if not text:
342 if seg_id is not None:
343 self._last_stt_segment_id = seg_id
344 continue
346 # Persist for cross-channel recall (UNIF-G3) — same canonical
347 # writer used by adapter message handlers. Voice transcripts
348 # carry kind='transcript_segment' + extra={t0, t1, speaker}
349 # per the persist_external_room_event contract.
350 try:
351 persist_external_room_event(
352 user_id=str(self.owner_id),
353 platform=str(self.scope.get('platform') or 'livekit'),
354 room_id=str(self.call_id),
355 sender_id=str(author_id or 'unknown'),
356 text=text,
357 kind='transcript_segment',
358 timestamp=seg.get('t1') or seg.get('t0'),
359 lang=seg.get('lang'),
360 extra={
361 't0': seg.get('t0'),
362 't1': seg.get('t1'),
363 'speaker': seg.get('speaker'),
364 },
365 )
366 except Exception as e:
367 logger.warning(
368 "AgentBridgeWorker._tick: persist failed "
369 "(call=%s, seg=%s): %s",
370 self.call_id, seg_id, e)
372 # Append to rolling transcript and emit the meet_copilot
373 # Liquid UI card update (UNIF-G5). Idempotent overwrite —
374 # the renderer (web AgentOverlay.jsx + RN LiquidOverlay.js +
375 # iOS shared JS) treats each emit as the canonical full
376 # state, so stragglers / re-orders self-correct on the
377 # next tick. Best-effort: emit failures don't block.
378 self._transcript_lines.append({
379 'speaker': seg.get('speaker') or author_id,
380 'text': text,
381 't0': seg.get('t0'),
382 't1': seg.get('t1'),
383 })
384 self._emit_meet_copilot(state='live')
386 # Skip self-authored — the agent shouldn't dispatch on its
387 # own previous TTS-back-into-the-room.
388 if author_id and str(author_id) == str(self.agent_id):
389 if seg_id is not None:
390 self._last_stt_segment_id = seg_id
391 continue
393 # Dispatch through the canonical agent router so existing
394 # GuardrailEnforcer.before_dispatch / .after_response gates
395 # apply identically to call-driven prompts as to chat-driven.
396 try:
397 from integrations.agentic_router import dispatch_to_agent
398 dispatch_to_agent(
399 agent_id=self.agent_id,
400 prompt=text,
401 context={
402 'source_kind': 'call',
403 'source_id': self.call_id,
404 'author_id': author_id,
405 'owner_id': self.owner_id,
406 'tenant_id': self.scope.get('tenant_id'),
407 'platform': self.scope.get('platform') or 'livekit',
408 'lang': seg.get('lang'),
409 },
410 synchronous=False,
411 )
412 except Exception as e:
413 logger.warning(
414 "AgentBridgeWorker._tick: dispatch_to_agent failed "
415 "(call=%s, agent=%s, seg=%s): %s",
416 self.call_id, self.agent_id, seg_id, e)
418 if seg_id is not None:
419 self._last_stt_segment_id = seg_id
421 # ── Half B: TTS outbox → audio publisher ────────────────────
422 # Drain any reply text that agentic_router._post_agent_reply
423 # enqueued for this (call, agent) and hand it to the audio
424 # publisher. When livekit-rtc is absent (flat / Nunba bundled)
425 # we log the reply text so the call audit trail still reflects
426 # the agent's contribution; the audio side stays no-op.
427 try:
428 replies = dequeue_tts_text(self.call_id, self.agent_id,
429 limit=2)
430 except Exception as e:
431 replies = []
432 logger.warning(
433 "AgentBridgeWorker._tick: dequeue_tts_text failed "
434 "(call=%s, agent=%s): %s",
435 self.call_id, self.agent_id, e)
436 for r in replies:
437 text = r.get('text', '')
438 if not text:
439 continue
440 try:
441 self._publish_audio_for(text)
442 except Exception as e:
443 logger.warning(
444 "AgentBridgeWorker._tick: _publish_audio_for "
445 "failed (call=%s, agent=%s): %s",
446 self.call_id, self.agent_id, e)
448 def _publish_audio_for(self, text: str) -> None:
449 """Synthesize `text` via PocketTTS + publish frames into the
450 LiveKit room. Producer-adapter integration lands separately
451 (livekit-rtc room handle wiring is async-heavy, kept out of
452 this control-plane module so it can be reviewed in isolation).
454 Today: logs the reply at INFO so call audit trails record the
455 agent's spoken contribution even before audio is published.
456 """
457 if not text:
458 return
459 if not _HAS_LIVEKIT_RTC:
460 logger.info(
461 "AgentBridgeWorker._publish_audio_for: livekit-rtc "
462 "absent; reply queued but not synthesized — "
463 "call=%s agent=%s text=%r",
464 self.call_id, self.agent_id, text[:120])
465 return
466 # SDK present — synthesize via PocketTTS + publish frames.
467 # Filled in alongside the livekit-rtc room handle integration.
468 logger.info(
469 "AgentBridgeWorker._publish_audio_for: stub synthesize+"
470 "publish — call=%s agent=%s text=%r",
471 self.call_id, self.agent_id, text[:120])
473 def _emit_meet_copilot(self, state: str = 'live') -> None:
474 """Push the rolling meet_copilot card state to the user's Liquid UI
475 surface (UNIF-G5).
477 Single canonical emit site for the meet_copilot component-type
478 defined in ``liquid_ui_service.py:COMPONENT_TYPES`` and rendered
479 by web ``AgentOverlay.jsx`` + RN ``LiquidOverlay.js`` + iOS
480 shared JS. Idempotent overwrite — every emit is the full state
481 the renderer should display, so dropped / re-ordered events
482 self-correct on the next tick.
484 Best-effort: never raises out of the bridge loop. If the
485 LiquidUIService isn't registered (Nunba bundled mode without
486 the agent engine), the emit is a logged no-op.
487 """
488 try:
489 from core.platform.service_registry import ServiceRegistry
490 svc = ServiceRegistry.get('LiquidUIService')
491 except Exception as e:
492 logger.debug(
493 "AgentBridgeWorker._emit_meet_copilot: ServiceRegistry "
494 "unavailable (%s) — skipping emit", e)
495 return
496 if svc is None:
497 return
498 try:
499 svc.agent_ui_update(
500 str(self.agent_id),
501 {
502 'type': 'meet_copilot',
503 'call_id': str(self.call_id),
504 'platform': str(
505 self.scope.get('platform') or 'livekit'),
506 'room_id': str(
507 self.scope.get('room_id') or self.call_id),
508 'state': state,
509 'transcript_lines': list(self._transcript_lines),
510 'decisions': list(self._decisions),
511 'action_items': list(self._action_items),
512 'participants': list(
513 self.scope.get('participants') or []),
514 'agent_role': str(
515 self.scope.get('role') or 'co_pilot'),
516 },
517 )
518 except Exception as e:
519 logger.warning(
520 "AgentBridgeWorker._emit_meet_copilot: agent_ui_update "
521 "failed (call=%s, agent=%s): %s",
522 self.call_id, self.agent_id, e)
525class AgentVoiceBridge:
526 """Public surface — what api_calls.add_agent_to_call invokes."""
528 @staticmethod
529 def attach_agent(db, call_id: str, agent_id: str, owner_id: str,
530 scope: Dict[str, Any]) -> Dict[str, Any]:
531 """Spin up a bridge worker for (call, agent). Idempotent on
532 the (call, agent) pair — re-attaching an already-active
533 agent returns the existing worker.
535 Caller (CallService.attach_agent) is responsible for
536 verifying the AgentJoinGrant + scope.can_voice BEFORE this
537 gets called. This method trusts the scope dict.
538 """
539 if not call_id or not agent_id or not owner_id:
540 raise AgentBridgeError(
541 "call_id, agent_id, owner_id required")
542 key = (call_id, agent_id)
543 with _WORKERS_LOCK:
544 existing = _ACTIVE_WORKERS.get(key)
545 if existing and existing.is_alive():
546 return existing.to_dict()
547 worker = AgentBridgeWorker(
548 call_id=call_id, agent_id=agent_id,
549 owner_id=owner_id, scope=scope)
550 worker.start()
551 _ACTIVE_WORKERS[key] = worker
552 return worker.to_dict()
554 @staticmethod
555 def detach_agent(call_id: str, agent_id: str) -> bool:
556 """Stop the bridge worker. Returns True iff a worker was
557 actually stopped. Idempotent — calling with no active
558 worker is a benign no-op."""
559 key = (call_id, agent_id)
560 with _WORKERS_LOCK:
561 worker = _ACTIVE_WORKERS.pop(key, None)
562 # Drop any pending TTS chunks for this (call, agent) so a
563 # later same-key attach doesn't replay stale audio.
564 with _TTS_LOCK:
565 _TTS_OUTBOX.pop(key, None)
566 if worker is None:
567 return False
568 worker.stop()
569 return True
571 @staticmethod
572 def list_active(call_id: Optional[str] = None) -> List[Dict[str, Any]]:
573 """List all active bridge workers, optionally filtered by
574 call_id. Used by ops dashboards + integration tests."""
575 with _WORKERS_LOCK:
576 workers = list(_ACTIVE_WORKERS.values())
577 if call_id is not None:
578 workers = [w for w in workers if w.call_id == call_id]
579 return [w.to_dict() for w in workers]
581 @staticmethod
582 def shutdown_all() -> int:
583 """Stop every active bridge. Process-shutdown hook +
584 test cleanup. Returns count of workers stopped."""
585 with _WORKERS_LOCK:
586 workers = list(_ACTIVE_WORKERS.values())
587 _ACTIVE_WORKERS.clear()
588 with _TTS_LOCK:
589 _TTS_OUTBOX.clear()
590 for w in workers:
591 w.stop()
592 return len(workers)
595__all__ = [
596 'AgentVoiceBridge',
597 'AgentBridgeError',
598 'AgentBridgeWorker',
599 'enqueue_tts_text',
600 'dequeue_tts_text',
601 'tts_outbox_depth',
602]