Coverage for integrations / channels / discord_voice_recv_sink.py: 79.7%
123 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2discord_voice_recv_sink — UNIF-G7 / W1.7 Producer A.
4Optional Discord voice-receive bridge. When the
5``discord-ext-voice-recv`` community library is installed,
6``DiscordAdapter.join_room`` attaches a ``HevolveStreamingSink`` to
7the connected ``VoiceClient`` so per-speaker PCM frames are piped
8through the canonical streaming-STT WebSocket and final segments
9land in ``whisper_tool.enqueue_stt_segment`` via Producer C's
10``?call_id=`` hook.
12Single canonical sink — every audio source (Discord voice, LiveKit
13RTC, RN mic, future others) lands segments in the SAME per-call
14queue, drained by ``agent_voice_bridge._tick``. No parallel paths.
16Lib gate:
17 The module is importable even when ``discord-ext-voice-recv``
18 is NOT installed. ``HAS_VOICE_RECV`` is False in that case
19 and ``maybe_attach_recv_sink`` becomes a no-op so today's
20 voice-room presence-only behavior is preserved.
22Threading model:
23 discord-ext-voice-recv calls ``AudioSink.write`` from its own
24 audio receiver thread (NOT the asyncio event loop). We open a
25 sync ``websockets`` client per speaker on first frame, push the
26 resampled PCM, and rely on Producer C's server-side ``?call_id=``
27 hook to enqueue the final transcript. The WS response stream
28 is drained in a daemon thread so back-pressure never blocks the
29 audio-write path.
31Resampling:
32 Discord delivers 48kHz s16le stereo per voice packet. The STT
33 server expects 16kHz s16le mono. We use ``audioop.tomono`` +
34 ``audioop.ratecv`` (Python stdlib) — no new dependency.
35"""
36from __future__ import annotations
38import logging
39import threading
40from typing import Any, Callable, Dict, Optional
42logger = logging.getLogger(__name__)
45# Optional dependency — the entire feature is gated on this.
46try:
47 from discord_ext_voice_recv import ( # type: ignore
48 AudioSink, VoiceRecvClient,
49 )
50 HAS_VOICE_RECV = True
51except Exception:
52 AudioSink = object # type: ignore
53 VoiceRecvClient = None # type: ignore
54 HAS_VOICE_RECV = False
57# Discord audio frame format.
58_DISCORD_RATE = 48000
59_DISCORD_CHANNELS = 2
60_TARGET_RATE = 16000
61_TARGET_CHANNELS = 1
62_SAMPLE_WIDTH = 2 # s16le
65def _resample_48k_stereo_to_16k_mono(pcm: bytes) -> bytes:
66 """Discord 48kHz stereo s16le → STT-server-expected 16kHz mono s16le.
68 Uses stdlib ``audioop`` so no new dependency. Returns ``b''`` on
69 any conversion error (sink will skip that packet — caller never
70 sees an exception).
71 """
72 if not pcm:
73 return b''
74 try:
75 import audioop
76 mono = audioop.tomono(pcm, _SAMPLE_WIDTH, 1.0, 1.0)
77 downsampled, _ = audioop.ratecv(
78 mono, _SAMPLE_WIDTH, _TARGET_CHANNELS,
79 _DISCORD_RATE, _TARGET_RATE, None)
80 return downsampled
81 except Exception as e:
82 logger.debug('discord_voice_recv_sink: resample failed: %s', e)
83 return b''
86class HevolveStreamingSink(AudioSink):
87 """One per Discord voice channel join. Forwards PCM frames per
88 speaker to the canonical streaming-STT WS server.
90 See module docstring for design notes.
91 """
93 def __init__(self, call_id: str, bot_user_id: Optional[int] = None,
94 ws_connect: Optional[Callable[..., Any]] = None,
95 stt_port_provider: Optional[Callable[[], Optional[int]]] = None):
96 # ``ws_connect`` and ``stt_port_provider`` are dependency-
97 # injection seams for tests — production passes None and we
98 # resolve them lazily.
99 if HAS_VOICE_RECV:
100 try:
101 super().__init__()
102 except Exception:
103 pass
104 self.call_id = str(call_id) if call_id is not None else ''
105 self.bot_user_id = bot_user_id
106 self._ws_connect = ws_connect
107 self._stt_port_provider = stt_port_provider
108 # Per-speaker WS client + the daemon-thread that drains it.
109 self._ws_per_user: Dict[str, Any] = {}
110 self._lock = threading.Lock()
111 self._closed = False
113 # ── discord-ext-voice-recv contract ──────────────────────────
115 def wants_opus(self) -> bool:
116 """Receive raw PCM, not Opus packets."""
117 return False
119 def write(self, user, data) -> None:
120 """Called per voice packet, on the recv lib's audio thread."""
121 if self._closed or not self.call_id:
122 return
123 if user is None or data is None:
124 return
125 user_id = getattr(user, 'id', None)
126 if user_id is None:
127 return
128 if self.bot_user_id is not None and user_id == self.bot_user_id:
129 return # don't echo bot back
130 pcm = getattr(data, 'pcm', None) or getattr(data, 'data', None)
131 if not pcm:
132 return
133 resampled = _resample_48k_stereo_to_16k_mono(pcm)
134 if not resampled:
135 return
136 ws = self._get_ws(str(user_id))
137 if ws is None:
138 return
139 try:
140 ws.send(resampled)
141 except Exception as e:
142 logger.debug(
143 'discord_voice_recv_sink: send failed for user=%s: %s',
144 user_id, e)
145 self._reset_ws(str(user_id))
147 def cleanup(self) -> None:
148 """Called by the recv lib on disconnect. Closes all per-speaker
149 WS clients."""
150 with self._lock:
151 self._closed = True
152 wss = list(self._ws_per_user.values())
153 self._ws_per_user.clear()
154 for ws in wss:
155 try:
156 ws.close()
157 except Exception:
158 pass
160 # ── Internals ────────────────────────────────────────────────
162 def _get_ws(self, speaker_id: str) -> Any:
163 with self._lock:
164 existing = self._ws_per_user.get(speaker_id)
165 if existing is not None:
166 return existing
167 if self._closed:
168 return None
169 # Resolve port + connector lazily — not under the lock.
170 port = self._resolve_stt_port()
171 if not port:
172 return None
173 connect = self._resolve_ws_connect()
174 if connect is None:
175 return None
176 url = (f'ws://127.0.0.1:{port}/?call_id={self.call_id}'
177 f'&user_id={speaker_id}')
178 try:
179 ws = connect(url, max_size=2 * 1024 * 1024)
180 except Exception as e:
181 logger.warning(
182 'discord_voice_recv_sink: WS connect failed (%s): %s',
183 url, e)
184 return None
185 with self._lock:
186 if self._closed:
187 # Race — sink closed between check and connect.
188 try:
189 ws.close()
190 except Exception:
191 pass
192 return None
193 self._ws_per_user[speaker_id] = ws
194 # Drain inbound responses so back-pressure never blocks send().
195 # Producer C does the enqueue server-side; we discard the
196 # client-side echo.
197 threading.Thread(
198 target=self._drain_loop, args=(ws,), daemon=True,
199 name=f'hevolve-discord-recv-drain-{speaker_id[:8]}',
200 ).start()
201 return ws
203 def _resolve_stt_port(self) -> Optional[int]:
204 if self._stt_port_provider is not None:
205 try:
206 return self._stt_port_provider()
207 except Exception:
208 return None
209 try:
210 from integrations.service_tools.whisper_tool import (
211 get_stt_stream_port, start_stt_stream_server,
212 )
213 return get_stt_stream_port() or start_stt_stream_server()
214 except Exception as e:
215 logger.debug(
216 'discord_voice_recv_sink: cannot resolve STT port: %s', e)
217 return None
219 def _resolve_ws_connect(self) -> Optional[Callable[..., Any]]:
220 if self._ws_connect is not None:
221 return self._ws_connect
222 try:
223 from websockets.sync.client import connect # type: ignore
224 return connect
225 except Exception as e:
226 logger.debug(
227 'discord_voice_recv_sink: websockets.sync.client '
228 'unavailable: %s', e)
229 return None
231 def _drain_loop(self, ws) -> None:
232 try:
233 for _msg in ws:
234 if self._closed:
235 return
236 except Exception:
237 return
239 def _reset_ws(self, speaker_id: str) -> None:
240 with self._lock:
241 ws = self._ws_per_user.pop(speaker_id, None)
242 if ws is not None:
243 try:
244 ws.close()
245 except Exception:
246 pass
249def maybe_attach_recv_sink(voice_client, call_id: str,
250 bot_user_id: Optional[int] = None) -> bool:
251 """Attach a ``HevolveStreamingSink`` to a connected ``VoiceClient``
252 if (a) the recv lib is installed AND (b) the voice_client supports
253 listening (i.e. it's actually a ``VoiceRecvClient``).
255 Returns True iff a sink was attached. False is the no-op fallback
256 that preserves today's "presence only" voice-room behavior.
258 Best-effort: any failure logs at debug + returns False so the
259 caller can continue.
260 """
261 if not HAS_VOICE_RECV or voice_client is None:
262 return False
263 listen = getattr(voice_client, 'listen', None)
264 if not callable(listen):
265 return False
266 try:
267 sink = HevolveStreamingSink(
268 call_id=str(call_id), bot_user_id=bot_user_id)
269 listen(sink)
270 logger.info(
271 'discord_voice_recv_sink: attached for call_id=%s '
272 '(bot_user_id=%s)', call_id, bot_user_id)
273 return True
274 except Exception as e:
275 logger.warning(
276 'discord_voice_recv_sink: maybe_attach_recv_sink failed: %s',
277 e)
278 return False