Coverage for integrations / channels / discord_voice_recv_sink.py: 79.7%

123 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2discord_voice_recv_sink — UNIF-G7 / W1.7 Producer A. 

3 

4Optional Discord voice-receive bridge. When the 

5``discord-ext-voice-recv`` community library is installed, 

6``DiscordAdapter.join_room`` attaches a ``HevolveStreamingSink`` to 

7the connected ``VoiceClient`` so per-speaker PCM frames are piped 

8through the canonical streaming-STT WebSocket and final segments 

9land in ``whisper_tool.enqueue_stt_segment`` via Producer C's 

10``?call_id=`` hook. 

11 

12Single canonical sink — every audio source (Discord voice, LiveKit 

13RTC, RN mic, future others) lands segments in the SAME per-call 

14queue, drained by ``agent_voice_bridge._tick``. No parallel paths. 

15 

16Lib gate: 

17 The module is importable even when ``discord-ext-voice-recv`` 

18 is NOT installed. ``HAS_VOICE_RECV`` is False in that case 

19 and ``maybe_attach_recv_sink`` becomes a no-op so today's 

20 voice-room presence-only behavior is preserved. 

21 

22Threading model: 

23 discord-ext-voice-recv calls ``AudioSink.write`` from its own 

24 audio receiver thread (NOT the asyncio event loop). We open a 

25 sync ``websockets`` client per speaker on first frame, push the 

26 resampled PCM, and rely on Producer C's server-side ``?call_id=`` 

27 hook to enqueue the final transcript. The WS response stream 

28 is drained in a daemon thread so back-pressure never blocks the 

29 audio-write path. 

30 

31Resampling: 

32 Discord delivers 48kHz s16le stereo per voice packet. The STT 

33 server expects 16kHz s16le mono. We use ``audioop.tomono`` + 

34 ``audioop.ratecv`` (Python stdlib) — no new dependency. 

35""" 

36from __future__ import annotations 

37 

38import logging 

39import threading 

40from typing import Any, Callable, Dict, Optional 

41 

42logger = logging.getLogger(__name__) 

43 

44 

45# Optional dependency — the entire feature is gated on this. 

46try: 

47 from discord_ext_voice_recv import ( # type: ignore 

48 AudioSink, VoiceRecvClient, 

49 ) 

50 HAS_VOICE_RECV = True 

51except Exception: 

52 AudioSink = object # type: ignore 

53 VoiceRecvClient = None # type: ignore 

54 HAS_VOICE_RECV = False 

55 

56 

57# Discord audio frame format. 

58_DISCORD_RATE = 48000 

59_DISCORD_CHANNELS = 2 

60_TARGET_RATE = 16000 

61_TARGET_CHANNELS = 1 

62_SAMPLE_WIDTH = 2 # s16le 

63 

64 

65def _resample_48k_stereo_to_16k_mono(pcm: bytes) -> bytes: 

66 """Discord 48kHz stereo s16le → STT-server-expected 16kHz mono s16le. 

67 

68 Uses stdlib ``audioop`` so no new dependency. Returns ``b''`` on 

69 any conversion error (sink will skip that packet — caller never 

70 sees an exception). 

71 """ 

72 if not pcm: 

73 return b'' 

74 try: 

75 import audioop 

76 mono = audioop.tomono(pcm, _SAMPLE_WIDTH, 1.0, 1.0) 

77 downsampled, _ = audioop.ratecv( 

78 mono, _SAMPLE_WIDTH, _TARGET_CHANNELS, 

79 _DISCORD_RATE, _TARGET_RATE, None) 

80 return downsampled 

81 except Exception as e: 

82 logger.debug('discord_voice_recv_sink: resample failed: %s', e) 

83 return b'' 

84 

85 

86class HevolveStreamingSink(AudioSink): 

87 """One per Discord voice channel join. Forwards PCM frames per 

88 speaker to the canonical streaming-STT WS server. 

89 

90 See module docstring for design notes. 

91 """ 

92 

93 def __init__(self, call_id: str, bot_user_id: Optional[int] = None, 

94 ws_connect: Optional[Callable[..., Any]] = None, 

95 stt_port_provider: Optional[Callable[[], Optional[int]]] = None): 

96 # ``ws_connect`` and ``stt_port_provider`` are dependency- 

97 # injection seams for tests — production passes None and we 

98 # resolve them lazily. 

99 if HAS_VOICE_RECV: 

100 try: 

101 super().__init__() 

102 except Exception: 

103 pass 

104 self.call_id = str(call_id) if call_id is not None else '' 

105 self.bot_user_id = bot_user_id 

106 self._ws_connect = ws_connect 

107 self._stt_port_provider = stt_port_provider 

108 # Per-speaker WS client + the daemon-thread that drains it. 

109 self._ws_per_user: Dict[str, Any] = {} 

110 self._lock = threading.Lock() 

111 self._closed = False 

112 

113 # ── discord-ext-voice-recv contract ────────────────────────── 

114 

115 def wants_opus(self) -> bool: 

116 """Receive raw PCM, not Opus packets.""" 

117 return False 

118 

119 def write(self, user, data) -> None: 

120 """Called per voice packet, on the recv lib's audio thread.""" 

121 if self._closed or not self.call_id: 

122 return 

123 if user is None or data is None: 

124 return 

125 user_id = getattr(user, 'id', None) 

126 if user_id is None: 

127 return 

128 if self.bot_user_id is not None and user_id == self.bot_user_id: 

129 return # don't echo bot back 

130 pcm = getattr(data, 'pcm', None) or getattr(data, 'data', None) 

131 if not pcm: 

132 return 

133 resampled = _resample_48k_stereo_to_16k_mono(pcm) 

134 if not resampled: 

135 return 

136 ws = self._get_ws(str(user_id)) 

137 if ws is None: 

138 return 

139 try: 

140 ws.send(resampled) 

141 except Exception as e: 

142 logger.debug( 

143 'discord_voice_recv_sink: send failed for user=%s: %s', 

144 user_id, e) 

145 self._reset_ws(str(user_id)) 

146 

147 def cleanup(self) -> None: 

148 """Called by the recv lib on disconnect. Closes all per-speaker 

149 WS clients.""" 

150 with self._lock: 

151 self._closed = True 

152 wss = list(self._ws_per_user.values()) 

153 self._ws_per_user.clear() 

154 for ws in wss: 

155 try: 

156 ws.close() 

157 except Exception: 

158 pass 

159 

160 # ── Internals ──────────────────────────────────────────────── 

161 

162 def _get_ws(self, speaker_id: str) -> Any: 

163 with self._lock: 

164 existing = self._ws_per_user.get(speaker_id) 

165 if existing is not None: 

166 return existing 

167 if self._closed: 

168 return None 

169 # Resolve port + connector lazily — not under the lock. 

170 port = self._resolve_stt_port() 

171 if not port: 

172 return None 

173 connect = self._resolve_ws_connect() 

174 if connect is None: 

175 return None 

176 url = (f'ws://127.0.0.1:{port}/?call_id={self.call_id}' 

177 f'&user_id={speaker_id}') 

178 try: 

179 ws = connect(url, max_size=2 * 1024 * 1024) 

180 except Exception as e: 

181 logger.warning( 

182 'discord_voice_recv_sink: WS connect failed (%s): %s', 

183 url, e) 

184 return None 

185 with self._lock: 

186 if self._closed: 

187 # Race — sink closed between check and connect. 

188 try: 

189 ws.close() 

190 except Exception: 

191 pass 

192 return None 

193 self._ws_per_user[speaker_id] = ws 

194 # Drain inbound responses so back-pressure never blocks send(). 

195 # Producer C does the enqueue server-side; we discard the 

196 # client-side echo. 

197 threading.Thread( 

198 target=self._drain_loop, args=(ws,), daemon=True, 

199 name=f'hevolve-discord-recv-drain-{speaker_id[:8]}', 

200 ).start() 

201 return ws 

202 

203 def _resolve_stt_port(self) -> Optional[int]: 

204 if self._stt_port_provider is not None: 

205 try: 

206 return self._stt_port_provider() 

207 except Exception: 

208 return None 

209 try: 

210 from integrations.service_tools.whisper_tool import ( 

211 get_stt_stream_port, start_stt_stream_server, 

212 ) 

213 return get_stt_stream_port() or start_stt_stream_server() 

214 except Exception as e: 

215 logger.debug( 

216 'discord_voice_recv_sink: cannot resolve STT port: %s', e) 

217 return None 

218 

219 def _resolve_ws_connect(self) -> Optional[Callable[..., Any]]: 

220 if self._ws_connect is not None: 

221 return self._ws_connect 

222 try: 

223 from websockets.sync.client import connect # type: ignore 

224 return connect 

225 except Exception as e: 

226 logger.debug( 

227 'discord_voice_recv_sink: websockets.sync.client ' 

228 'unavailable: %s', e) 

229 return None 

230 

231 def _drain_loop(self, ws) -> None: 

232 try: 

233 for _msg in ws: 

234 if self._closed: 

235 return 

236 except Exception: 

237 return 

238 

239 def _reset_ws(self, speaker_id: str) -> None: 

240 with self._lock: 

241 ws = self._ws_per_user.pop(speaker_id, None) 

242 if ws is not None: 

243 try: 

244 ws.close() 

245 except Exception: 

246 pass 

247 

248 

249def maybe_attach_recv_sink(voice_client, call_id: str, 

250 bot_user_id: Optional[int] = None) -> bool: 

251 """Attach a ``HevolveStreamingSink`` to a connected ``VoiceClient`` 

252 if (a) the recv lib is installed AND (b) the voice_client supports 

253 listening (i.e. it's actually a ``VoiceRecvClient``). 

254 

255 Returns True iff a sink was attached. False is the no-op fallback 

256 that preserves today's "presence only" voice-room behavior. 

257 

258 Best-effort: any failure logs at debug + returns False so the 

259 caller can continue. 

260 """ 

261 if not HAS_VOICE_RECV or voice_client is None: 

262 return False 

263 listen = getattr(voice_client, 'listen', None) 

264 if not callable(listen): 

265 return False 

266 try: 

267 sink = HevolveStreamingSink( 

268 call_id=str(call_id), bot_user_id=bot_user_id) 

269 listen(sink) 

270 logger.info( 

271 'discord_voice_recv_sink: attached for call_id=%s ' 

272 '(bot_user_id=%s)', call_id, bot_user_id) 

273 return True 

274 except Exception as e: 

275 logger.warning( 

276 'discord_voice_recv_sink: maybe_attach_recv_sink failed: %s', 

277 e) 

278 return False