Coverage for integrations / social / livekit_transcript_subscriber.py: 54.3%
186 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2livekit_transcript_subscriber — UNIF-G7 / W1.7 Producer B.
4Subscribes to remote audio tracks in a LiveKit room, resamples each
5participant's PCM frames, and pushes them through the canonical
6streaming-STT WebSocket so Producer C's ``?call_id=&user_id=`` hook
7lands the final transcripts in
8``whisper_tool.enqueue_stt_segment(call_id, ...)``.
10Single canonical sink: every audio source in HARTOS — Discord voice
11recv (Producer A), LiveKit RTC (this module), RN mic stream (Producer
12C's direct caller) — funnels into the SAME per-call STT queue that
13``agent_voice_bridge._tick`` drains. No parallel paths.
15Lib gate:
16 The module is importable even when ``livekit-rtc`` is NOT
17 installed. ``HAS_LIVEKIT_RTC`` is False in that case and
18 ``LiveKitTranscriptSubscriber.start()`` becomes a synchronous
19 no-op so today's bridge-worker behavior is preserved.
21Threading model:
22 ``livekit-rtc`` is asyncio-based. We run an asyncio event loop
23 in a daemon thread (mirrors the existing ``start_stt_stream_
24 server`` pattern in ``whisper_tool``). Each subscribed
25 ``RemoteAudioTrack`` spawns a per-track frame-consumer task that
26 pushes resampled PCM through a per-participant sync ``websockets``
27 client back into the local STT WS. No new event loops in the
28 request hot path; ``stop()`` cleanly tears down both.
30Resampling:
31 LiveKit publishes 48kHz s16le mono per audio frame (default RTC
32 config). Some senders may publish at other rates — we use stdlib
33 ``audioop.ratecv`` to convert to 16kHz mono if needed. When the
34 frame is already 16kHz mono we forward it as-is (zero-copy).
35"""
36from __future__ import annotations
38import logging
39import threading
40from typing import Any, Callable, Dict, Optional
42logger = logging.getLogger(__name__)
45try:
46 from livekit import rtc as livekit_rtc # type: ignore
47 HAS_LIVEKIT_RTC = True
48except Exception:
49 livekit_rtc = None # type: ignore
50 HAS_LIVEKIT_RTC = False
53# Target STT-server format.
54_TARGET_RATE = 16000
55_TARGET_CHANNELS = 1
56_SAMPLE_WIDTH = 2 # s16le
59def _resample_to_16k_mono(pcm: bytes, src_rate: int,
60 src_channels: int) -> bytes:
61 """Convert arbitrary PCM16 to 16kHz mono. Stdlib audioop, no
62 new dep. Returns ``b''`` on any conversion error.
63 """
64 if not pcm:
65 return b''
66 if src_rate == _TARGET_RATE and src_channels == _TARGET_CHANNELS:
67 return pcm
68 try:
69 import audioop
70 if src_channels > 1:
71 pcm = audioop.tomono(pcm, _SAMPLE_WIDTH, 1.0, 1.0)
72 if src_rate != _TARGET_RATE:
73 pcm, _ = audioop.ratecv(
74 pcm, _SAMPLE_WIDTH, _TARGET_CHANNELS,
75 src_rate, _TARGET_RATE, None)
76 return pcm
77 except Exception as e:
78 logger.debug('livekit_transcript_subscriber: resample failed: %s', e)
79 return b''
82class LiveKitTranscriptSubscriber:
83 """One subscriber per (call_id, livekit_room). Connects, listens,
84 pipes PCM through the local STT WS, tears down on ``stop()``.
86 Constructor seams allow tests to inject:
87 - ``room_factory`` : callable returning an awaitable Room-like
88 object instead of importing livekit.
89 - ``ws_connect`` : sync WS connect callable; defaults to
90 ``websockets.sync.client.connect``.
91 - ``stt_port_provider``: returns the local STT WS port; defaults
92 to ``whisper_tool.get_stt_stream_port``.
93 """
95 def __init__(self, call_id: str, livekit_url: str, token: str,
96 room_factory: Optional[Callable[[], Any]] = None,
97 ws_connect: Optional[Callable[..., Any]] = None,
98 stt_port_provider: Optional[Callable[[], Optional[int]]] = None):
99 self.call_id = str(call_id) if call_id is not None else ''
100 self.livekit_url = livekit_url
101 self.token = token
102 self._room_factory = room_factory
103 self._ws_connect = ws_connect
104 self._stt_port_provider = stt_port_provider
105 # Per-participant WS clients keyed by participant identity.
106 self._ws_per_participant: Dict[str, Any] = {}
107 self._lock = threading.Lock()
108 self._thread: Optional[threading.Thread] = None
109 self._stop_evt = threading.Event()
110 self._loop: Any = None # asyncio.AbstractEventLoop, set in thread
111 self._room: Any = None
113 # ── Public API ──────────────────────────────────────────────
115 def start(self) -> bool:
116 """Spawn the daemon thread + asyncio loop. No-op (returns
117 False) when livekit-rtc isn't installed. Returns True iff a
118 thread was started."""
119 if not HAS_LIVEKIT_RTC and self._room_factory is None:
120 logger.info(
121 'livekit_transcript_subscriber: livekit-rtc not '
122 'installed — start() is a no-op (call_id=%s)',
123 self.call_id)
124 return False
125 if self._thread is not None and self._thread.is_alive():
126 return True
127 if not self.call_id or not self.livekit_url or not self.token:
128 logger.warning(
129 'livekit_transcript_subscriber: missing required '
130 'fields (call_id=%r url=%r token=%r)',
131 self.call_id, self.livekit_url, bool(self.token))
132 return False
133 self._stop_evt.clear()
134 self._thread = threading.Thread(
135 target=self._run, daemon=True,
136 name=f'livekit-transcript-{self.call_id[:12]}',
137 )
138 self._thread.start()
139 return True
141 def stop(self) -> None:
142 """Signal the thread to tear down. Idempotent."""
143 self._stop_evt.set()
144 # Schedule the room disconnect on the asyncio loop.
145 loop = self._loop
146 room = self._room
147 if loop is not None and room is not None:
148 try:
149 import asyncio as _asyncio
150 fut = _asyncio.run_coroutine_threadsafe(
151 self._async_disconnect(room), loop)
152 try:
153 fut.result(timeout=5)
154 except Exception:
155 pass
156 except Exception:
157 pass
158 # Close any per-participant WS clients.
159 with self._lock:
160 wss = list(self._ws_per_participant.values())
161 self._ws_per_participant.clear()
162 for ws in wss:
163 try:
164 ws.close()
165 except Exception:
166 pass
168 # ── Producer C bridge per participant ───────────────────────
170 def _resolve_stt_port(self) -> Optional[int]:
171 if self._stt_port_provider is not None:
172 try:
173 return self._stt_port_provider()
174 except Exception:
175 return None
176 try:
177 from integrations.service_tools.whisper_tool import (
178 get_stt_stream_port, start_stt_stream_server,
179 )
180 return get_stt_stream_port() or start_stt_stream_server()
181 except Exception:
182 return None
184 def _resolve_ws_connect(self) -> Optional[Callable[..., Any]]:
185 if self._ws_connect is not None:
186 return self._ws_connect
187 try:
188 from websockets.sync.client import connect # type: ignore
189 return connect
190 except Exception:
191 return None
193 def _get_ws_for(self, participant_identity: str) -> Any:
194 with self._lock:
195 existing = self._ws_per_participant.get(participant_identity)
196 if existing is not None:
197 return existing
198 if self._stop_evt.is_set():
199 return None
200 port = self._resolve_stt_port()
201 if not port:
202 return None
203 connect = self._resolve_ws_connect()
204 if connect is None:
205 return None
206 url = (f'ws://127.0.0.1:{port}/?call_id={self.call_id}'
207 f'&user_id={participant_identity}')
208 try:
209 ws = connect(url, max_size=2 * 1024 * 1024)
210 except Exception as e:
211 logger.warning(
212 'livekit_transcript_subscriber: WS connect failed '
213 '(%s): %s', url, e)
214 return None
215 with self._lock:
216 if self._stop_evt.is_set():
217 try:
218 ws.close()
219 except Exception:
220 pass
221 return None
222 self._ws_per_participant[participant_identity] = ws
223 # Drain inbound responses so the WS keeps reading; Producer C
224 # handles the enqueue server-side.
225 threading.Thread(
226 target=self._drain_loop, args=(ws,), daemon=True,
227 name=f'lk-drain-{participant_identity[:12]}',
228 ).start()
229 return ws
231 def _drain_loop(self, ws) -> None:
232 try:
233 for _msg in ws:
234 if self._stop_evt.is_set():
235 return
236 except Exception:
237 return
239 def push_frame(self, participant_identity: str, pcm: bytes,
240 src_rate: int = 48000, src_channels: int = 1) -> bool:
241 """Resample + forward one PCM frame for a participant.
243 Public seam — tests can drive this directly without a real
244 LiveKit Room. Returns True iff the frame was sent through
245 the WS (False on no-op / error).
246 """
247 if self._stop_evt.is_set():
248 return False
249 if not participant_identity or not pcm:
250 return False
251 out = _resample_to_16k_mono(pcm, src_rate, src_channels)
252 if not out:
253 return False
254 ws = self._get_ws_for(participant_identity)
255 if ws is None:
256 return False
257 try:
258 ws.send(out)
259 return True
260 except Exception as e:
261 logger.debug(
262 'livekit_transcript_subscriber: send failed '
263 '(participant=%s): %s', participant_identity, e)
264 with self._lock:
265 self._ws_per_participant.pop(participant_identity, None)
266 try:
267 ws.close()
268 except Exception:
269 pass
270 return False
272 # ── asyncio-side LiveKit subscription ──────────────────────
274 def _run(self) -> None:
275 try:
276 import asyncio
277 except Exception:
278 return
279 self._loop = asyncio.new_event_loop()
280 asyncio.set_event_loop(self._loop)
281 try:
282 self._loop.run_until_complete(self._async_main())
283 except Exception as e:
284 logger.warning(
285 'livekit_transcript_subscriber: loop crashed: %s', e)
286 finally:
287 try:
288 self._loop.close()
289 except Exception:
290 pass
292 async def _async_main(self) -> None:
293 room = self._make_room()
294 if room is None:
295 return
296 self._room = room
297 # Wire participant-track callbacks BEFORE connecting so we
298 # don't miss the join-time tracks.
299 try:
300 room.on('track_subscribed', self._on_track_subscribed)
301 except Exception as e:
302 logger.debug(
303 'livekit_transcript_subscriber: on() unavailable: %s',
304 e)
305 try:
306 await room.connect(self.livekit_url, self.token)
307 except Exception as e:
308 logger.warning(
309 'livekit_transcript_subscriber: room connect '
310 'failed (call=%s): %s', self.call_id, e)
311 return
312 # Wait until stop() is signaled.
313 import asyncio as _asyncio
314 while not self._stop_evt.is_set():
315 await _asyncio.sleep(0.25)
317 async def _async_disconnect(self, room) -> None:
318 try:
319 disconnect = getattr(room, 'disconnect', None)
320 if disconnect is None:
321 return
322 res = disconnect()
323 if hasattr(res, '__await__'):
324 await res
325 except Exception:
326 pass
328 def _make_room(self) -> Any:
329 if self._room_factory is not None:
330 try:
331 return self._room_factory()
332 except Exception as e:
333 logger.warning(
334 'livekit_transcript_subscriber: room_factory '
335 'failed: %s', e)
336 return None
337 if not HAS_LIVEKIT_RTC:
338 return None
339 try:
340 return livekit_rtc.Room() # type: ignore[union-attr]
341 except Exception as e:
342 logger.warning(
343 'livekit_transcript_subscriber: livekit_rtc.Room() '
344 'failed: %s', e)
345 return None
347 def _on_track_subscribed(self, track, publication, participant) -> None:
348 """LiveKit fires this synchronously on track-subscribe. We
349 spawn a per-track frame consumer on the loop."""
350 # Only audio tracks are interesting.
351 kind = getattr(track, 'kind', None)
352 try:
353 audio_kind = livekit_rtc.TrackKind.KIND_AUDIO # type: ignore
354 except Exception:
355 audio_kind = None
356 if audio_kind is not None and kind != audio_kind:
357 return
358 identity = (getattr(participant, 'identity', None)
359 or getattr(participant, 'sid', None) or 'unknown')
360 try:
361 import asyncio as _asyncio
362 _asyncio.create_task(self._consume_track(track, str(identity)))
363 except Exception as e:
364 logger.debug(
365 'livekit_transcript_subscriber: cannot spawn consumer '
366 '(%s): %s', identity, e)
368 async def _consume_track(self, track, participant_identity: str) -> None:
369 """Iterate over audio frames from a LiveKit RemoteAudioTrack.
371 livekit-rtc 0.x exposes ``AudioStream(track)`` whose async
372 iteration yields ``AudioFrameEvent`` with a ``.frame`` carrying
373 ``data`` (PCM bytes), ``sample_rate``, ``num_channels``. Older
374 signatures vary; we read defensively.
375 """
376 try:
377 stream = livekit_rtc.AudioStream(track) # type: ignore
378 except Exception as e:
379 logger.debug(
380 'livekit_transcript_subscriber: AudioStream init '
381 'failed: %s', e)
382 return
383 try:
384 async for evt in stream:
385 if self._stop_evt.is_set():
386 return
387 frame = getattr(evt, 'frame', None) or evt
388 pcm = getattr(frame, 'data', None) or getattr(
389 frame, 'pcm', None)
390 rate = (getattr(frame, 'sample_rate', None)
391 or getattr(frame, 'rate', None) or 48000)
392 channels = (getattr(frame, 'num_channels', None)
393 or getattr(frame, 'channels', None) or 1)
394 if pcm is None:
395 continue
396 # ``pcm`` may be bytes OR a memoryview / np buffer; force
397 # to bytes for the WS hand-off.
398 if not isinstance(pcm, (bytes, bytearray)):
399 try:
400 pcm = bytes(pcm)
401 except Exception:
402 continue
403 self.push_frame(
404 participant_identity, bytes(pcm),
405 src_rate=int(rate), src_channels=int(channels))
406 except Exception as e:
407 logger.debug(
408 'livekit_transcript_subscriber: track stream ended '
409 '(%s): %s', participant_identity, e)