Coverage for integrations / social / agent_voice_bridge.py: 54.4%

182 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial — AgentVoiceBridge. Phase 7d.B. 

3 

4Plan reference: sunny-gliding-eich.md, Part E.12. 

5 

6WHY this exists: 

7 Agents are first-class call participants per Plan A.3 #4 / B.4. 

8 But agents are LLMs + TTS pipelines, not WebRTC clients — they 

9 can't hold a media socket directly. AgentVoiceBridge runs 

10 server-side (on the user's local node for flat / regional, on a 

11 tenant-isolated worker pod for central) and: 

12 

13 - Holds the LiveKit publisher track on behalf of the agent. 

14 - Subscribes to other participants' audio + runs Whisper STT. 

15 - Feeds transcripts through the SAME `agentic_router.dispatch_to_agent` 

16 path used for post / comment / message mentions. GuardrailEnforcer 

17 (before_dispatch + after_response) gates every agent action — the 

18 agent never has a privileged path. 

19 - TTS-ifies the agent's reply via PocketTTS and publishes the audio 

20 frames into the LiveKit room. 

21 

22This module ships the SCAFFOLDING + the worker contract. The actual 

23audio frame plumbing depends on the LiveKit Python SDK 

24(`livekit-api`, `livekit-rtc`) and the existing PocketTTS + Whisper 

25modules. Each integration point is marked with a comment + a stub 

26return so the module is importable + testable without those packages 

27installed. 

28 

29Lifecycle: 

30 AgentVoiceBridge.attach_agent(call_id, agent_id, owner_id, scope) 

31 → spawns one bridge worker per (call, agent) pair 

32 → registers a CallParticipant row with device_kind='agent_bridge' 

33 → returns the participant dict 

34 

35 AgentVoiceBridge.detach_agent(call_id, agent_id) 

36 → flips the participant.left_at = now 

37 → signals the worker to stop on its next tick 

38 

39 AgentVoiceBridge.list_active(call_id) → list of bridge workers 

40 

41The worker loop runs in a daemon thread (not asyncio) to match 

42HARTOS's existing `agentic_router.dispatch_to_agent` daemon-thread 

43pattern. Idempotent attach: re-attaching an already-active agent 

44returns the existing worker. 

45 

46Transport: pure server-side. The LiveKit room is the media plane 

47(SFU); PeerLink DISPATCH carries call signaling (mute, hangup); 

48this bridge bridges the gap. No new transport invented. 

49""" 

50 

51from __future__ import annotations 

52 

53import logging 

54import threading 

55import time 

56from collections import deque 

57from typing import Any, Deque, Dict, List, Optional 

58 

59logger = logging.getLogger('hevolve_social') 

60 

61 

62# Best-effort imports — all of these may be absent on flat / 

63# Nunba bundled deploys. The bridge degrades to a no-op stub 

64# in that case so importing this module is always safe. 

65try: 

66 from livekit import rtc as livekit_rtc # type: ignore 

67 _HAS_LIVEKIT_RTC = True 

68except Exception: 

69 livekit_rtc = None 

70 _HAS_LIVEKIT_RTC = False 

71 

72 

73# Active workers keyed by (call_id, agent_id). Module-level dict + 

74# lock — process-local, restart clears. Workers are daemon threads 

75# so process exit doesn't hang. 

76_ACTIVE_WORKERS: Dict[tuple, 'AgentBridgeWorker'] = {} 

77_WORKERS_LOCK = threading.Lock() 

78 

79 

80# ── TTS outbox: agent reply text → audio publisher ────────────────── 

81# 

82# This is the symmetric counterpart of whisper_tool's STT-segment 

83# queue. agentic_router._post_agent_reply for source_kind='call' 

84# enqueues the agent's reply text here; the audio publisher (the 

85# bridge worker's TTS half OR an external adapter — LiveKit-rtc 

86# producer / Discord voice client) drains it, synthesizes via 

87# PocketTTS, and publishes audio frames into the room. 

88# 

89# Single canonical home for outbound agent-text-on-calls. No parallel 

90# queues elsewhere — every producer pushes here, every consumer reads 

91# here, just like the STT side in whisper_tool. 

92# 

93# Bounded per (call, agent) so a stalled audio adapter can't grow 

94# unbounded. Drops oldest with WARN if cap exceeded. 

95_TTS_OUTBOX: Dict[tuple, deque] = {} 

96_TTS_LOCK = threading.Lock() 

97_TTS_OUTBOX_CAP = 64 # chunks per (call, agent) 

98 

99 

100# Polling cadence inside the worker loop. 250ms balances STT 

101# latency against CPU cost; lowered to 50ms for tests via the 

102# `_WORKER_TICK_S` module override. 

103_WORKER_TICK_S = 0.25 

104 

105 

106def enqueue_tts_text(call_id: str, agent_id: str, text: str) -> bool: 

107 """Append agent reply text to the (call, agent) TTS outbox. 

108 

109 Producer-side: agentic_router._post_agent_reply for source_kind 

110 ='call' calls this with the LLM reply text after GuardrailEnforcer 

111 .after_response has already approved it. 

112 

113 Returns True iff enqueued (False on empty text or invalid keys). 

114 Best-effort: never raises. 

115 """ 

116 if not call_id or not agent_id: 

117 return False 

118 if not text or not text.strip(): 

119 return False 

120 key = (call_id, agent_id) 

121 with _TTS_LOCK: 

122 q = _TTS_OUTBOX.setdefault(key, deque()) 

123 q.append({'text': text.strip(), 'enqueued_at': time.time()}) 

124 while len(q) > _TTS_OUTBOX_CAP: 

125 evicted = q.popleft() 

126 logger.warning( 

127 "AgentVoiceBridge.enqueue_tts_text: cap %d exceeded " 

128 "for call=%s agent=%s; evicted text=%r", 

129 _TTS_OUTBOX_CAP, call_id, agent_id, 

130 evicted['text'][:80]) 

131 return True 

132 

133 

134def dequeue_tts_text(call_id: str, agent_id: str, 

135 limit: int = 4) -> List[Dict[str, Any]]: 

136 """Drain up to `limit` reply chunks for (call, agent). Destructive 

137 — once popped, gone. Bridge worker calls per tick.""" 

138 if not call_id or not agent_id: 

139 return [] 

140 key = (call_id, agent_id) 

141 out: List[Dict[str, Any]] = [] 

142 with _TTS_LOCK: 

143 q = _TTS_OUTBOX.get(key) 

144 if not q: 

145 return [] 

146 while q and len(out) < limit: 

147 out.append(q.popleft()) 

148 if not q: 

149 _TTS_OUTBOX.pop(key, None) 

150 return out 

151 

152 

153def tts_outbox_depth(call_id: str, agent_id: str) -> int: 

154 """For /health + tests.""" 

155 with _TTS_LOCK: 

156 q = _TTS_OUTBOX.get((call_id, agent_id)) 

157 return len(q) if q else 0 

158 

159 

160class AgentBridgeError(Exception): 

161 """Bridge-attach failures — caller maps to 4xx.""" 

162 

163 

164class AgentBridgeWorker: 

165 """One bridge per (call, agent) pair. Holds the LiveKit 

166 publisher token, runs the STT/dispatch/TTS loop until detach. 

167 

168 Scaffolded — actual frame I/O is wired alongside livekit-rtc + 

169 PocketTTS + whisper integrations. The contract here lets the 

170 rest of the system call attach_agent / detach_agent today.""" 

171 

172 def __init__(self, call_id: str, agent_id: str, owner_id: str, 

173 scope: Dict[str, Any]): 

174 self.call_id = call_id 

175 self.agent_id = agent_id 

176 self.owner_id = owner_id 

177 self.scope = dict(scope or {}) 

178 self._stop = threading.Event() 

179 self._thread: Optional[threading.Thread] = None 

180 self.started_at = time.time() 

181 # Last STT segment id we drained from 

182 # ``whisper_tool.dequeue_segments`` — monotonic int per call. 

183 # Passed back as the ``since`` watermark next tick to avoid 

184 # re-processing the same segment twice if a producer re-emits. 

185 self._last_stt_segment_id: Optional[int] = None 

186 # Rolling state for the Liquid UI meet_copilot card (UNIF-G5). 

187 # Capped to keep the card lightweight on every emit; the card 

188 # only shows the most-recent N lines anyway. Decisions and 

189 # action items are appended by future LLM-driven extraction 

190 # (out of W1.3 scope); kept here so the card is forward- 

191 # compatible. Participants list is populated by adapter 

192 # ``list_room_members`` calls in a follow-up. 

193 self._transcript_lines: Deque[Dict[str, Any]] = deque(maxlen=10) 

194 self._decisions: List[str] = [] 

195 self._action_items: List[str] = [] 

196 

197 def start(self) -> None: 

198 if self._thread and self._thread.is_alive(): 

199 return 

200 self._thread = threading.Thread( 

201 target=self._loop, 

202 name=f'agent_voice_bridge_{self.agent_id[:8]}', 

203 daemon=True, 

204 ) 

205 self._thread.start() 

206 

207 def stop(self) -> None: 

208 self._stop.set() 

209 

210 def is_alive(self) -> bool: 

211 return bool(self._thread and self._thread.is_alive()) 

212 

213 def to_dict(self) -> Dict[str, Any]: 

214 return { 

215 'call_id': self.call_id, 

216 'agent_id': self.agent_id, 

217 'owner_id': self.owner_id, 

218 'scope': self.scope, 

219 'started_at': self.started_at, 

220 'alive': self.is_alive(), 

221 } 

222 

223 # ── Worker loop ───────────────────────────────────────────── 

224 

225 def _loop(self) -> None: 

226 """Main bridge loop. Each tick: 

227 1. Check stop signal. 

228 2. Pull fresh STT segments from LiveKit subscriber stream. 

229 3. For each segment, dispatch through agentic_router (which 

230 gates via GuardrailEnforcer.before_dispatch + .after_response). 

231 4. TTS-ify the reply (if any) + publish to LiveKit. 

232 

233 All steps degrade to no-ops when their underlying integrations 

234 aren't installed (flat / regional / Nunba bundled). 

235 """ 

236 logger.info("AgentBridgeWorker starting: call=%s agent=%s", 

237 self.call_id, self.agent_id) 

238 # Emit the initial meet_copilot card so the user sees the 

239 # presence indicator immediately on attach (UNIF-G5) — empty 

240 # transcript_lines is fine; subsequent ticks fill it. 

241 self._emit_meet_copilot(state='live') 

242 try: 

243 while not self._stop.is_set(): 

244 try: 

245 self._tick() 

246 except Exception as e: 

247 logger.warning( 

248 "AgentBridgeWorker tick error (call=%s, " 

249 "agent=%s): %s — continuing", 

250 self.call_id, self.agent_id, e) 

251 self._stop.wait(_WORKER_TICK_S) 

252 finally: 

253 logger.info( 

254 "AgentBridgeWorker stopped: call=%s agent=%s", 

255 self.call_id, self.agent_id) 

256 # Flip state to 'ended' so the renderer can dismiss / 

257 # collapse the card. Idempotent overwrite — same canonical 

258 # emit path. 

259 try: 

260 self._emit_meet_copilot(state='ended') 

261 except Exception: 

262 pass 

263 

264 def _tick(self) -> None: 

265 """One worker iteration — lazy-importing the integrations so 

266 unit tests don't drag in livekit/whisper/PocketTTS modules 

267 that may not be installed. 

268 

269 Drains finalized STT segments from 

270 ``whisper_tool.dequeue_segments`` (the canonical per-call queue 

271 that audio-adapter producers fill — LiveKit subscriber, Discord 

272 voice receiver, RN mic stream, etc.). For each segment whose 

273 author is NOT this agent: 

274 (a) Persists it as a ``ConversationEntry`` row via 

275 ``chat_messages.persist_external_room_event`` with 

276 ``kind='transcript_segment'`` so cross-channel recall 

277 sees the call transcript chronologically alongside 

278 external-room messages (UNIF-G3). 

279 (b) Dispatches the transcript text through 

280 ``agentic_router.dispatch_to_agent`` so the agent can 

281 respond (existing canonical agent dispatch path). 

282 

283 Self-authored segments are skipped — the agent should not 

284 talk to itself. Interim (non-final) segments never enter the 

285 queue (producers should only enqueue ``is_final=True``); a 

286 defensive ``is_final`` check stays here as belt-and-braces. 

287 

288 TTS publish-back to the room is the producer-adapter's 

289 concern (LiveKit publish track / Discord voice client send), 

290 not the bridge's. This loop is one-way: room → agent. 

291 

292 Whichever audio source isn't installed (LiveKit RTC, discord.py 

293 voice receive lib, etc.) just means the producer never enqueues 

294 for this call_id — the worker tick is then a no-op drain. 

295 Always safe to run; never raises out of the loop. 

296 """ 

297 # Lazy imports — avoids hard dependency on whisper / agentic_router 

298 # at module-import time (unit tests of attach/detach don't need 

299 # to drag in the LLM dispatch graph or audio toolchain). 

300 try: 

301 from integrations.service_tools.whisper_tool import ( 

302 dequeue_segments, 

303 ) 

304 from integrations.social.chat_messages import ( 

305 persist_external_room_event, 

306 ) 

307 except Exception as e: # pragma: no cover — import-only failure 

308 logger.debug( 

309 "AgentBridgeWorker._tick: lazy imports unavailable " 

310 "(%s) — skipping drain", e) 

311 return 

312 

313 # Drain the queue since our last watermark. whisper_tool returns 

314 # only segments with id > since AND prunes them from the queue; 

315 # no double-processing. 

316 try: 

317 segments = dequeue_segments( 

318 call_id=self.call_id, 

319 since=self._last_stt_segment_id, 

320 ) 

321 except Exception as e: 

322 logger.warning( 

323 "AgentBridgeWorker._tick: dequeue_segments failed " 

324 "(call=%s): %s", self.call_id, e) 

325 return 

326 

327 if not segments: 

328 return 

329 

330 for seg in segments: 

331 seg_id = seg.get('segment_id') 

332 if not seg.get('is_final', True): 

333 # Defensive: producers should not enqueue interim, but 

334 # if they do, we just skip + keep advancing the watermark. 

335 if seg_id is not None: 

336 self._last_stt_segment_id = seg_id 

337 continue 

338 

339 author_id = seg.get('author_id') 

340 text = (seg.get('text') or '').strip() 

341 if not text: 

342 if seg_id is not None: 

343 self._last_stt_segment_id = seg_id 

344 continue 

345 

346 # Persist for cross-channel recall (UNIF-G3) — same canonical 

347 # writer used by adapter message handlers. Voice transcripts 

348 # carry kind='transcript_segment' + extra={t0, t1, speaker} 

349 # per the persist_external_room_event contract. 

350 try: 

351 persist_external_room_event( 

352 user_id=str(self.owner_id), 

353 platform=str(self.scope.get('platform') or 'livekit'), 

354 room_id=str(self.call_id), 

355 sender_id=str(author_id or 'unknown'), 

356 text=text, 

357 kind='transcript_segment', 

358 timestamp=seg.get('t1') or seg.get('t0'), 

359 lang=seg.get('lang'), 

360 extra={ 

361 't0': seg.get('t0'), 

362 't1': seg.get('t1'), 

363 'speaker': seg.get('speaker'), 

364 }, 

365 ) 

366 except Exception as e: 

367 logger.warning( 

368 "AgentBridgeWorker._tick: persist failed " 

369 "(call=%s, seg=%s): %s", 

370 self.call_id, seg_id, e) 

371 

372 # Append to rolling transcript and emit the meet_copilot 

373 # Liquid UI card update (UNIF-G5). Idempotent overwrite — 

374 # the renderer (web AgentOverlay.jsx + RN LiquidOverlay.js + 

375 # iOS shared JS) treats each emit as the canonical full 

376 # state, so stragglers / re-orders self-correct on the 

377 # next tick. Best-effort: emit failures don't block. 

378 self._transcript_lines.append({ 

379 'speaker': seg.get('speaker') or author_id, 

380 'text': text, 

381 't0': seg.get('t0'), 

382 't1': seg.get('t1'), 

383 }) 

384 self._emit_meet_copilot(state='live') 

385 

386 # Skip self-authored — the agent shouldn't dispatch on its 

387 # own previous TTS-back-into-the-room. 

388 if author_id and str(author_id) == str(self.agent_id): 

389 if seg_id is not None: 

390 self._last_stt_segment_id = seg_id 

391 continue 

392 

393 # Dispatch through the canonical agent router so existing 

394 # GuardrailEnforcer.before_dispatch / .after_response gates 

395 # apply identically to call-driven prompts as to chat-driven. 

396 try: 

397 from integrations.agentic_router import dispatch_to_agent 

398 dispatch_to_agent( 

399 agent_id=self.agent_id, 

400 prompt=text, 

401 context={ 

402 'source_kind': 'call', 

403 'source_id': self.call_id, 

404 'author_id': author_id, 

405 'owner_id': self.owner_id, 

406 'tenant_id': self.scope.get('tenant_id'), 

407 'platform': self.scope.get('platform') or 'livekit', 

408 'lang': seg.get('lang'), 

409 }, 

410 synchronous=False, 

411 ) 

412 except Exception as e: 

413 logger.warning( 

414 "AgentBridgeWorker._tick: dispatch_to_agent failed " 

415 "(call=%s, agent=%s, seg=%s): %s", 

416 self.call_id, self.agent_id, seg_id, e) 

417 

418 if seg_id is not None: 

419 self._last_stt_segment_id = seg_id 

420 

421 # ── Half B: TTS outbox → audio publisher ──────────────────── 

422 # Drain any reply text that agentic_router._post_agent_reply 

423 # enqueued for this (call, agent) and hand it to the audio 

424 # publisher. When livekit-rtc is absent (flat / Nunba bundled) 

425 # we log the reply text so the call audit trail still reflects 

426 # the agent's contribution; the audio side stays no-op. 

427 try: 

428 replies = dequeue_tts_text(self.call_id, self.agent_id, 

429 limit=2) 

430 except Exception as e: 

431 replies = [] 

432 logger.warning( 

433 "AgentBridgeWorker._tick: dequeue_tts_text failed " 

434 "(call=%s, agent=%s): %s", 

435 self.call_id, self.agent_id, e) 

436 for r in replies: 

437 text = r.get('text', '') 

438 if not text: 

439 continue 

440 try: 

441 self._publish_audio_for(text) 

442 except Exception as e: 

443 logger.warning( 

444 "AgentBridgeWorker._tick: _publish_audio_for " 

445 "failed (call=%s, agent=%s): %s", 

446 self.call_id, self.agent_id, e) 

447 

448 def _publish_audio_for(self, text: str) -> None: 

449 """Synthesize `text` via PocketTTS + publish frames into the 

450 LiveKit room. Producer-adapter integration lands separately 

451 (livekit-rtc room handle wiring is async-heavy, kept out of 

452 this control-plane module so it can be reviewed in isolation). 

453 

454 Today: logs the reply at INFO so call audit trails record the 

455 agent's spoken contribution even before audio is published. 

456 """ 

457 if not text: 

458 return 

459 if not _HAS_LIVEKIT_RTC: 

460 logger.info( 

461 "AgentBridgeWorker._publish_audio_for: livekit-rtc " 

462 "absent; reply queued but not synthesized — " 

463 "call=%s agent=%s text=%r", 

464 self.call_id, self.agent_id, text[:120]) 

465 return 

466 # SDK present — synthesize via PocketTTS + publish frames. 

467 # Filled in alongside the livekit-rtc room handle integration. 

468 logger.info( 

469 "AgentBridgeWorker._publish_audio_for: stub synthesize+" 

470 "publish — call=%s agent=%s text=%r", 

471 self.call_id, self.agent_id, text[:120]) 

472 

473 def _emit_meet_copilot(self, state: str = 'live') -> None: 

474 """Push the rolling meet_copilot card state to the user's Liquid UI 

475 surface (UNIF-G5). 

476 

477 Single canonical emit site for the meet_copilot component-type 

478 defined in ``liquid_ui_service.py:COMPONENT_TYPES`` and rendered 

479 by web ``AgentOverlay.jsx`` + RN ``LiquidOverlay.js`` + iOS 

480 shared JS. Idempotent overwrite — every emit is the full state 

481 the renderer should display, so dropped / re-ordered events 

482 self-correct on the next tick. 

483 

484 Best-effort: never raises out of the bridge loop. If the 

485 LiquidUIService isn't registered (Nunba bundled mode without 

486 the agent engine), the emit is a logged no-op. 

487 """ 

488 try: 

489 from core.platform.service_registry import ServiceRegistry 

490 svc = ServiceRegistry.get('LiquidUIService') 

491 except Exception as e: 

492 logger.debug( 

493 "AgentBridgeWorker._emit_meet_copilot: ServiceRegistry " 

494 "unavailable (%s) — skipping emit", e) 

495 return 

496 if svc is None: 

497 return 

498 try: 

499 svc.agent_ui_update( 

500 str(self.agent_id), 

501 { 

502 'type': 'meet_copilot', 

503 'call_id': str(self.call_id), 

504 'platform': str( 

505 self.scope.get('platform') or 'livekit'), 

506 'room_id': str( 

507 self.scope.get('room_id') or self.call_id), 

508 'state': state, 

509 'transcript_lines': list(self._transcript_lines), 

510 'decisions': list(self._decisions), 

511 'action_items': list(self._action_items), 

512 'participants': list( 

513 self.scope.get('participants') or []), 

514 'agent_role': str( 

515 self.scope.get('role') or 'co_pilot'), 

516 }, 

517 ) 

518 except Exception as e: 

519 logger.warning( 

520 "AgentBridgeWorker._emit_meet_copilot: agent_ui_update " 

521 "failed (call=%s, agent=%s): %s", 

522 self.call_id, self.agent_id, e) 

523 

524 

525class AgentVoiceBridge: 

526 """Public surface — what api_calls.add_agent_to_call invokes.""" 

527 

528 @staticmethod 

529 def attach_agent(db, call_id: str, agent_id: str, owner_id: str, 

530 scope: Dict[str, Any]) -> Dict[str, Any]: 

531 """Spin up a bridge worker for (call, agent). Idempotent on 

532 the (call, agent) pair — re-attaching an already-active 

533 agent returns the existing worker. 

534 

535 Caller (CallService.attach_agent) is responsible for 

536 verifying the AgentJoinGrant + scope.can_voice BEFORE this 

537 gets called. This method trusts the scope dict. 

538 """ 

539 if not call_id or not agent_id or not owner_id: 

540 raise AgentBridgeError( 

541 "call_id, agent_id, owner_id required") 

542 key = (call_id, agent_id) 

543 with _WORKERS_LOCK: 

544 existing = _ACTIVE_WORKERS.get(key) 

545 if existing and existing.is_alive(): 

546 return existing.to_dict() 

547 worker = AgentBridgeWorker( 

548 call_id=call_id, agent_id=agent_id, 

549 owner_id=owner_id, scope=scope) 

550 worker.start() 

551 _ACTIVE_WORKERS[key] = worker 

552 return worker.to_dict() 

553 

554 @staticmethod 

555 def detach_agent(call_id: str, agent_id: str) -> bool: 

556 """Stop the bridge worker. Returns True iff a worker was 

557 actually stopped. Idempotent — calling with no active 

558 worker is a benign no-op.""" 

559 key = (call_id, agent_id) 

560 with _WORKERS_LOCK: 

561 worker = _ACTIVE_WORKERS.pop(key, None) 

562 # Drop any pending TTS chunks for this (call, agent) so a 

563 # later same-key attach doesn't replay stale audio. 

564 with _TTS_LOCK: 

565 _TTS_OUTBOX.pop(key, None) 

566 if worker is None: 

567 return False 

568 worker.stop() 

569 return True 

570 

571 @staticmethod 

572 def list_active(call_id: Optional[str] = None) -> List[Dict[str, Any]]: 

573 """List all active bridge workers, optionally filtered by 

574 call_id. Used by ops dashboards + integration tests.""" 

575 with _WORKERS_LOCK: 

576 workers = list(_ACTIVE_WORKERS.values()) 

577 if call_id is not None: 

578 workers = [w for w in workers if w.call_id == call_id] 

579 return [w.to_dict() for w in workers] 

580 

581 @staticmethod 

582 def shutdown_all() -> int: 

583 """Stop every active bridge. Process-shutdown hook + 

584 test cleanup. Returns count of workers stopped.""" 

585 with _WORKERS_LOCK: 

586 workers = list(_ACTIVE_WORKERS.values()) 

587 _ACTIVE_WORKERS.clear() 

588 with _TTS_LOCK: 

589 _TTS_OUTBOX.clear() 

590 for w in workers: 

591 w.stop() 

592 return len(workers) 

593 

594 

595__all__ = [ 

596 'AgentVoiceBridge', 

597 'AgentBridgeError', 

598 'AgentBridgeWorker', 

599 'enqueue_tts_text', 

600 'dequeue_tts_text', 

601 'tts_outbox_depth', 

602]