Coverage for integrations / social / livekit_transcript_subscriber.py: 54.3%

186 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2livekit_transcript_subscriber — UNIF-G7 / W1.7 Producer B. 

3 

4Subscribes to remote audio tracks in a LiveKit room, resamples each 

5participant's PCM frames, and pushes them through the canonical 

6streaming-STT WebSocket so Producer C's ``?call_id=&user_id=`` hook 

7lands the final transcripts in 

8``whisper_tool.enqueue_stt_segment(call_id, ...)``. 

9 

10Single canonical sink: every audio source in HARTOS — Discord voice 

11recv (Producer A), LiveKit RTC (this module), RN mic stream (Producer 

12C's direct caller) — funnels into the SAME per-call STT queue that 

13``agent_voice_bridge._tick`` drains. No parallel paths. 

14 

15Lib gate: 

16 The module is importable even when ``livekit-rtc`` is NOT 

17 installed. ``HAS_LIVEKIT_RTC`` is False in that case and 

18 ``LiveKitTranscriptSubscriber.start()`` becomes a synchronous 

19 no-op so today's bridge-worker behavior is preserved. 

20 

21Threading model: 

22 ``livekit-rtc`` is asyncio-based. We run an asyncio event loop 

23 in a daemon thread (mirrors the existing ``start_stt_stream_ 

24 server`` pattern in ``whisper_tool``). Each subscribed 

25 ``RemoteAudioTrack`` spawns a per-track frame-consumer task that 

26 pushes resampled PCM through a per-participant sync ``websockets`` 

27 client back into the local STT WS. No new event loops in the 

28 request hot path; ``stop()`` cleanly tears down both. 

29 

30Resampling: 

31 LiveKit publishes 48kHz s16le mono per audio frame (default RTC 

32 config). Some senders may publish at other rates — we use stdlib 

33 ``audioop.ratecv`` to convert to 16kHz mono if needed. When the 

34 frame is already 16kHz mono we forward it as-is (zero-copy). 

35""" 

36from __future__ import annotations 

37 

38import logging 

39import threading 

40from typing import Any, Callable, Dict, Optional 

41 

42logger = logging.getLogger(__name__) 

43 

44 

45try: 

46 from livekit import rtc as livekit_rtc # type: ignore 

47 HAS_LIVEKIT_RTC = True 

48except Exception: 

49 livekit_rtc = None # type: ignore 

50 HAS_LIVEKIT_RTC = False 

51 

52 

53# Target STT-server format. 

54_TARGET_RATE = 16000 

55_TARGET_CHANNELS = 1 

56_SAMPLE_WIDTH = 2 # s16le 

57 

58 

59def _resample_to_16k_mono(pcm: bytes, src_rate: int, 

60 src_channels: int) -> bytes: 

61 """Convert arbitrary PCM16 to 16kHz mono. Stdlib audioop, no 

62 new dep. Returns ``b''`` on any conversion error. 

63 """ 

64 if not pcm: 

65 return b'' 

66 if src_rate == _TARGET_RATE and src_channels == _TARGET_CHANNELS: 

67 return pcm 

68 try: 

69 import audioop 

70 if src_channels > 1: 

71 pcm = audioop.tomono(pcm, _SAMPLE_WIDTH, 1.0, 1.0) 

72 if src_rate != _TARGET_RATE: 

73 pcm, _ = audioop.ratecv( 

74 pcm, _SAMPLE_WIDTH, _TARGET_CHANNELS, 

75 src_rate, _TARGET_RATE, None) 

76 return pcm 

77 except Exception as e: 

78 logger.debug('livekit_transcript_subscriber: resample failed: %s', e) 

79 return b'' 

80 

81 

82class LiveKitTranscriptSubscriber: 

83 """One subscriber per (call_id, livekit_room). Connects, listens, 

84 pipes PCM through the local STT WS, tears down on ``stop()``. 

85 

86 Constructor seams allow tests to inject: 

87 - ``room_factory`` : callable returning an awaitable Room-like 

88 object instead of importing livekit. 

89 - ``ws_connect`` : sync WS connect callable; defaults to 

90 ``websockets.sync.client.connect``. 

91 - ``stt_port_provider``: returns the local STT WS port; defaults 

92 to ``whisper_tool.get_stt_stream_port``. 

93 """ 

94 

95 def __init__(self, call_id: str, livekit_url: str, token: str, 

96 room_factory: Optional[Callable[[], Any]] = None, 

97 ws_connect: Optional[Callable[..., Any]] = None, 

98 stt_port_provider: Optional[Callable[[], Optional[int]]] = None): 

99 self.call_id = str(call_id) if call_id is not None else '' 

100 self.livekit_url = livekit_url 

101 self.token = token 

102 self._room_factory = room_factory 

103 self._ws_connect = ws_connect 

104 self._stt_port_provider = stt_port_provider 

105 # Per-participant WS clients keyed by participant identity. 

106 self._ws_per_participant: Dict[str, Any] = {} 

107 self._lock = threading.Lock() 

108 self._thread: Optional[threading.Thread] = None 

109 self._stop_evt = threading.Event() 

110 self._loop: Any = None # asyncio.AbstractEventLoop, set in thread 

111 self._room: Any = None 

112 

113 # ── Public API ────────────────────────────────────────────── 

114 

115 def start(self) -> bool: 

116 """Spawn the daemon thread + asyncio loop. No-op (returns 

117 False) when livekit-rtc isn't installed. Returns True iff a 

118 thread was started.""" 

119 if not HAS_LIVEKIT_RTC and self._room_factory is None: 

120 logger.info( 

121 'livekit_transcript_subscriber: livekit-rtc not ' 

122 'installed — start() is a no-op (call_id=%s)', 

123 self.call_id) 

124 return False 

125 if self._thread is not None and self._thread.is_alive(): 

126 return True 

127 if not self.call_id or not self.livekit_url or not self.token: 

128 logger.warning( 

129 'livekit_transcript_subscriber: missing required ' 

130 'fields (call_id=%r url=%r token=%r)', 

131 self.call_id, self.livekit_url, bool(self.token)) 

132 return False 

133 self._stop_evt.clear() 

134 self._thread = threading.Thread( 

135 target=self._run, daemon=True, 

136 name=f'livekit-transcript-{self.call_id[:12]}', 

137 ) 

138 self._thread.start() 

139 return True 

140 

141 def stop(self) -> None: 

142 """Signal the thread to tear down. Idempotent.""" 

143 self._stop_evt.set() 

144 # Schedule the room disconnect on the asyncio loop. 

145 loop = self._loop 

146 room = self._room 

147 if loop is not None and room is not None: 

148 try: 

149 import asyncio as _asyncio 

150 fut = _asyncio.run_coroutine_threadsafe( 

151 self._async_disconnect(room), loop) 

152 try: 

153 fut.result(timeout=5) 

154 except Exception: 

155 pass 

156 except Exception: 

157 pass 

158 # Close any per-participant WS clients. 

159 with self._lock: 

160 wss = list(self._ws_per_participant.values()) 

161 self._ws_per_participant.clear() 

162 for ws in wss: 

163 try: 

164 ws.close() 

165 except Exception: 

166 pass 

167 

168 # ── Producer C bridge per participant ─────────────────────── 

169 

170 def _resolve_stt_port(self) -> Optional[int]: 

171 if self._stt_port_provider is not None: 

172 try: 

173 return self._stt_port_provider() 

174 except Exception: 

175 return None 

176 try: 

177 from integrations.service_tools.whisper_tool import ( 

178 get_stt_stream_port, start_stt_stream_server, 

179 ) 

180 return get_stt_stream_port() or start_stt_stream_server() 

181 except Exception: 

182 return None 

183 

184 def _resolve_ws_connect(self) -> Optional[Callable[..., Any]]: 

185 if self._ws_connect is not None: 

186 return self._ws_connect 

187 try: 

188 from websockets.sync.client import connect # type: ignore 

189 return connect 

190 except Exception: 

191 return None 

192 

193 def _get_ws_for(self, participant_identity: str) -> Any: 

194 with self._lock: 

195 existing = self._ws_per_participant.get(participant_identity) 

196 if existing is not None: 

197 return existing 

198 if self._stop_evt.is_set(): 

199 return None 

200 port = self._resolve_stt_port() 

201 if not port: 

202 return None 

203 connect = self._resolve_ws_connect() 

204 if connect is None: 

205 return None 

206 url = (f'ws://127.0.0.1:{port}/?call_id={self.call_id}' 

207 f'&user_id={participant_identity}') 

208 try: 

209 ws = connect(url, max_size=2 * 1024 * 1024) 

210 except Exception as e: 

211 logger.warning( 

212 'livekit_transcript_subscriber: WS connect failed ' 

213 '(%s): %s', url, e) 

214 return None 

215 with self._lock: 

216 if self._stop_evt.is_set(): 

217 try: 

218 ws.close() 

219 except Exception: 

220 pass 

221 return None 

222 self._ws_per_participant[participant_identity] = ws 

223 # Drain inbound responses so the WS keeps reading; Producer C 

224 # handles the enqueue server-side. 

225 threading.Thread( 

226 target=self._drain_loop, args=(ws,), daemon=True, 

227 name=f'lk-drain-{participant_identity[:12]}', 

228 ).start() 

229 return ws 

230 

231 def _drain_loop(self, ws) -> None: 

232 try: 

233 for _msg in ws: 

234 if self._stop_evt.is_set(): 

235 return 

236 except Exception: 

237 return 

238 

239 def push_frame(self, participant_identity: str, pcm: bytes, 

240 src_rate: int = 48000, src_channels: int = 1) -> bool: 

241 """Resample + forward one PCM frame for a participant. 

242 

243 Public seam — tests can drive this directly without a real 

244 LiveKit Room. Returns True iff the frame was sent through 

245 the WS (False on no-op / error). 

246 """ 

247 if self._stop_evt.is_set(): 

248 return False 

249 if not participant_identity or not pcm: 

250 return False 

251 out = _resample_to_16k_mono(pcm, src_rate, src_channels) 

252 if not out: 

253 return False 

254 ws = self._get_ws_for(participant_identity) 

255 if ws is None: 

256 return False 

257 try: 

258 ws.send(out) 

259 return True 

260 except Exception as e: 

261 logger.debug( 

262 'livekit_transcript_subscriber: send failed ' 

263 '(participant=%s): %s', participant_identity, e) 

264 with self._lock: 

265 self._ws_per_participant.pop(participant_identity, None) 

266 try: 

267 ws.close() 

268 except Exception: 

269 pass 

270 return False 

271 

272 # ── asyncio-side LiveKit subscription ────────────────────── 

273 

274 def _run(self) -> None: 

275 try: 

276 import asyncio 

277 except Exception: 

278 return 

279 self._loop = asyncio.new_event_loop() 

280 asyncio.set_event_loop(self._loop) 

281 try: 

282 self._loop.run_until_complete(self._async_main()) 

283 except Exception as e: 

284 logger.warning( 

285 'livekit_transcript_subscriber: loop crashed: %s', e) 

286 finally: 

287 try: 

288 self._loop.close() 

289 except Exception: 

290 pass 

291 

292 async def _async_main(self) -> None: 

293 room = self._make_room() 

294 if room is None: 

295 return 

296 self._room = room 

297 # Wire participant-track callbacks BEFORE connecting so we 

298 # don't miss the join-time tracks. 

299 try: 

300 room.on('track_subscribed', self._on_track_subscribed) 

301 except Exception as e: 

302 logger.debug( 

303 'livekit_transcript_subscriber: on() unavailable: %s', 

304 e) 

305 try: 

306 await room.connect(self.livekit_url, self.token) 

307 except Exception as e: 

308 logger.warning( 

309 'livekit_transcript_subscriber: room connect ' 

310 'failed (call=%s): %s', self.call_id, e) 

311 return 

312 # Wait until stop() is signaled. 

313 import asyncio as _asyncio 

314 while not self._stop_evt.is_set(): 

315 await _asyncio.sleep(0.25) 

316 

317 async def _async_disconnect(self, room) -> None: 

318 try: 

319 disconnect = getattr(room, 'disconnect', None) 

320 if disconnect is None: 

321 return 

322 res = disconnect() 

323 if hasattr(res, '__await__'): 

324 await res 

325 except Exception: 

326 pass 

327 

328 def _make_room(self) -> Any: 

329 if self._room_factory is not None: 

330 try: 

331 return self._room_factory() 

332 except Exception as e: 

333 logger.warning( 

334 'livekit_transcript_subscriber: room_factory ' 

335 'failed: %s', e) 

336 return None 

337 if not HAS_LIVEKIT_RTC: 

338 return None 

339 try: 

340 return livekit_rtc.Room() # type: ignore[union-attr] 

341 except Exception as e: 

342 logger.warning( 

343 'livekit_transcript_subscriber: livekit_rtc.Room() ' 

344 'failed: %s', e) 

345 return None 

346 

347 def _on_track_subscribed(self, track, publication, participant) -> None: 

348 """LiveKit fires this synchronously on track-subscribe. We 

349 spawn a per-track frame consumer on the loop.""" 

350 # Only audio tracks are interesting. 

351 kind = getattr(track, 'kind', None) 

352 try: 

353 audio_kind = livekit_rtc.TrackKind.KIND_AUDIO # type: ignore 

354 except Exception: 

355 audio_kind = None 

356 if audio_kind is not None and kind != audio_kind: 

357 return 

358 identity = (getattr(participant, 'identity', None) 

359 or getattr(participant, 'sid', None) or 'unknown') 

360 try: 

361 import asyncio as _asyncio 

362 _asyncio.create_task(self._consume_track(track, str(identity))) 

363 except Exception as e: 

364 logger.debug( 

365 'livekit_transcript_subscriber: cannot spawn consumer ' 

366 '(%s): %s', identity, e) 

367 

368 async def _consume_track(self, track, participant_identity: str) -> None: 

369 """Iterate over audio frames from a LiveKit RemoteAudioTrack. 

370 

371 livekit-rtc 0.x exposes ``AudioStream(track)`` whose async 

372 iteration yields ``AudioFrameEvent`` with a ``.frame`` carrying 

373 ``data`` (PCM bytes), ``sample_rate``, ``num_channels``. Older 

374 signatures vary; we read defensively. 

375 """ 

376 try: 

377 stream = livekit_rtc.AudioStream(track) # type: ignore 

378 except Exception as e: 

379 logger.debug( 

380 'livekit_transcript_subscriber: AudioStream init ' 

381 'failed: %s', e) 

382 return 

383 try: 

384 async for evt in stream: 

385 if self._stop_evt.is_set(): 

386 return 

387 frame = getattr(evt, 'frame', None) or evt 

388 pcm = getattr(frame, 'data', None) or getattr( 

389 frame, 'pcm', None) 

390 rate = (getattr(frame, 'sample_rate', None) 

391 or getattr(frame, 'rate', None) or 48000) 

392 channels = (getattr(frame, 'num_channels', None) 

393 or getattr(frame, 'channels', None) or 1) 

394 if pcm is None: 

395 continue 

396 # ``pcm`` may be bytes OR a memoryview / np buffer; force 

397 # to bytes for the WS hand-off. 

398 if not isinstance(pcm, (bytes, bytearray)): 

399 try: 

400 pcm = bytes(pcm) 

401 except Exception: 

402 continue 

403 self.push_frame( 

404 participant_identity, bytes(pcm), 

405 src_rate=int(rate), src_channels=int(channels)) 

406 except Exception as e: 

407 logger.debug( 

408 'livekit_transcript_subscriber: track stream ended ' 

409 '(%s): %s', participant_identity, e)