Coverage for integrations / service_tools / whisper_tool.py: 41.4%

497 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2STT tool — in-process speech-to-text. 

3 

4Engine priority (first available wins): 

5 1. faster-whisper (CTranslate2) — preferred, 4x faster than openai-whisper, 

6 multilingual, GPU+CPU, auto-downloads models from HuggingFace. 

7 2. sherpa-onnx — lightweight ONNX alternative, no PyTorch dependency. 

8 3. openai-whisper — legacy fallback (requires PyTorch). 

9 

10Model selection by hardware (via select_whisper_model): 

11 - CPU, low RAM → tiny / moonshine-tiny (English, fastest) 

12 - CPU, 4-8GB → base / whisper-tiny (multilingual) 

13 - GPU, 2-5GB → small (multilingual) 

14 - GPU, 5-10GB → medium (multilingual) 

15 - GPU, 10+GB → large-v3 (multilingual, best accuracy) 

16 

17Models downloaded lazily on first use to ~/.hevolve/models/stt/ 

18100% local, zero cloud costs — Nunba is forever free. 

19""" 

20 

21import json 

22import logging 

23import os 

24import tarfile 

25import urllib.request 

26from pathlib import Path 

27from typing import Optional 

28 

29from .registry import ServiceToolInfo, service_tool_registry 

30 

31logger = logging.getLogger(__name__) 

32 

33# ═══════════════════════════════════════════════════════════════ 

34# Model registry — sherpa-onnx model configurations 

35# ═══════════════════════════════════════════════════════════════ 

36 

37_SHERPA_MODEL_BASE = ( 

38 "https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models" 

39) 

40 

41_SHERPA_MODELS = { 

42 "moonshine-tiny": { 

43 "type": "moonshine", 

44 "archive": "sherpa-onnx-moonshine-tiny-en-int8.tar.bz2", 

45 "dir": "sherpa-onnx-moonshine-tiny-en-int8", 

46 "files": { 

47 "preprocessor": "preprocess.onnx", 

48 "encoder": "encode.int8.onnx", 

49 "uncached_decoder": "uncached_decode.int8.onnx", 

50 "cached_decoder": "cached_decode.int8.onnx", 

51 "tokens": "tokens.txt", 

52 }, 

53 "multilingual": False, 

54 }, 

55 "moonshine-base": { 

56 "type": "moonshine", 

57 "archive": "sherpa-onnx-moonshine-base-en-int8.tar.bz2", 

58 "dir": "sherpa-onnx-moonshine-base-en-int8", 

59 "files": { 

60 "preprocessor": "preprocess.onnx", 

61 "encoder": "encode.int8.onnx", 

62 "uncached_decoder": "uncached_decode.int8.onnx", 

63 "cached_decoder": "cached_decode.int8.onnx", 

64 "tokens": "tokens.txt", 

65 }, 

66 "multilingual": False, 

67 }, 

68 "whisper-tiny": { 

69 "type": "whisper", 

70 "archive": "sherpa-onnx-whisper-tiny.tar.bz2", 

71 "dir": "sherpa-onnx-whisper-tiny", 

72 "files": { 

73 "encoder": "tiny-encoder.int8.onnx", 

74 "decoder": "tiny-decoder.int8.onnx", 

75 "tokens": "tiny-tokens.txt", 

76 }, 

77 "multilingual": True, 

78 }, 

79 "whisper-base": { 

80 "type": "whisper", 

81 "archive": "sherpa-onnx-whisper-base.tar.bz2", 

82 "dir": "sherpa-onnx-whisper-base", 

83 "files": { 

84 "encoder": "base-encoder.int8.onnx", 

85 "decoder": "base-decoder.int8.onnx", 

86 "tokens": "base-tokens.txt", 

87 }, 

88 "multilingual": True, 

89 }, 

90 "whisper-small": { 

91 "type": "whisper", 

92 "archive": "sherpa-onnx-whisper-small.tar.bz2", 

93 "dir": "sherpa-onnx-whisper-small", 

94 "files": { 

95 "encoder": "small-encoder.int8.onnx", 

96 "decoder": "small-decoder.int8.onnx", 

97 "tokens": "small-tokens.txt", 

98 }, 

99 "multilingual": True, 

100 }, 

101 "whisper-medium": { 

102 "type": "whisper", 

103 "archive": "sherpa-onnx-whisper-medium.tar.bz2", 

104 "dir": "sherpa-onnx-whisper-medium", 

105 "files": { 

106 "encoder": "medium-encoder.int8.onnx", 

107 "decoder": "medium-decoder.int8.onnx", 

108 "tokens": "medium-tokens.txt", 

109 }, 

110 "multilingual": True, 

111 }, 

112} 

113 

114# ═══════════════════════════════════════════════════════════════ 

115# Cached recognizers (avoid reloading on every call) 

116# ═══════════════════════════════════════════════════════════════ 

117 

118_sherpa_recognizer = None 

119_sherpa_model_name = None 

120 

121# Legacy openai-whisper fallback 

122_whisper_model = None 

123_whisper_model_name = None 

124 

125# faster-whisper (CTranslate2) — preferred engine 

126_faster_whisper_model = None 

127_faster_whisper_model_size = None 

128 

129# Backoff + circuit-breaker for the model-load retry storm. 

130# Symptom #10 (Stage-A, 2026-04-16): frozen_debug.log showed ~2Hz 

131# 'faster-whisper transcription failed: HF_HUB_OFFLINE' spam because 

132# every realtime transcribe call re-hit the network. Now: 

133# - backoff: 1s -> 2s -> 4s -> ... capped at 300s 

134# - breaker: after 5 consecutive model-load failures, OPEN for 300s 

135# The backoff is KEYED by model_size so changing size resets the timer. 

136# The breaker is a process-wide single instance (one faster-whisper 

137# model at a time in this module). 

138try: 

139 from core.circuit_breaker import CircuitBreaker, PeerBackoff 

140 _whisper_load_backoff = PeerBackoff(initial=1.0, maximum=300.0) 

141 _whisper_load_breaker = CircuitBreaker( 

142 name='faster_whisper_load', threshold=5, cooldown=300.0, 

143 ) 

144except ImportError: # dev tree without HARTOS core — defense-in-depth 

145 _whisper_load_backoff = None 

146 _whisper_load_breaker = None 

147 

148# Track the last user-visible error so /api/admin/stt/status can 

149# surface it instead of the UI silently limping along. This is the 

150# 'visible error to the user' half of the symptom #10 mandate. 

151_whisper_last_error: Optional[str] = None 

152 

153 

154def get_whisper_last_error() -> Optional[str]: 

155 """Return the most recent faster-whisper failure reason, or None. 

156 

157 Exposed so the admin/status UI can show a concrete error to the 

158 user instead of letting the retry storm accumulate silently in 

159 the log. Cleared on the next successful load or transcribe. 

160 """ 

161 return _whisper_last_error 

162 

163 

164def _record_whisper_failure(reason: str) -> None: 

165 """Record a whisper failure across backoff + breaker + last-error.""" 

166 global _whisper_last_error 

167 _whisper_last_error = reason 

168 if _whisper_load_breaker is not None: 

169 _whisper_load_breaker.record_failure() 

170 if _whisper_load_backoff is not None: 

171 _whisper_load_backoff.record_failure('faster_whisper') 

172 

173 

174def _record_whisper_success() -> None: 

175 """Reset backoff + breaker + clear last-error on success.""" 

176 global _whisper_last_error 

177 _whisper_last_error = None 

178 if _whisper_load_breaker is not None: 

179 _whisper_load_breaker.record_success() 

180 if _whisper_load_backoff is not None: 

181 _whisper_load_backoff.record_success('faster_whisper') 

182 

183 

184# ═══════════════════════════════════════════════════════════════ 

185# faster-whisper (primary engine) 

186# ═══════════════════════════════════════════════════════════════ 

187 

188# Default faster-whisper model size. Can be overridden by the user via the 

189# admin Model Management UI, which sets HEVOLVE_STT_MODEL_SIZE in the 

190# orchestrator and then stops the worker so the next call respawns with 

191# the new value picked up at subprocess startup. 

192_FASTER_WHISPER_MODEL_SIZE = os.environ.get( 

193 'HEVOLVE_STT_MODEL_SIZE', 'base', 

194) # CPU int8 — preserves GPU VRAM for TTS/VLM 

195 

196 

197def _get_faster_whisper_model(model_size: str = "base"): 

198 """Lazy-load faster-whisper model (CTranslate2, auto-downloads from HuggingFace). 

199 

200 Device selection: 

201 - NVIDIA GPU (CUDA): CTranslate2 uses its own CUDA runtime (not torch) 

202 - AMD GPU: CTranslate2 doesn't support ROCm/Vulkan -> CPU fallback 

203 - CPU: int8 quantization for speed 

204 

205 Retry semantics (Symptom #10, 2026-04-16): 

206 - Backoff-gated: if the last load failed, refuses to retry until 

207 the exponential backoff window elapses. 

208 - Circuit-breaker-gated: after 5 consecutive failures, the 

209 breaker OPENS for 300s; calls raise RuntimeError instead of 

210 hitting the network on every streaming chunk. 

211 - Raises on refusal so the caller can log ONCE and skip the 

212 realtime path cleanly instead of absorbing the exception 2x 

213 per second. 

214 """ 

215 global _faster_whisper_model, _faster_whisper_model_size 

216 if _faster_whisper_model is not None and _faster_whisper_model_size == model_size: 

217 return _faster_whisper_model 

218 

219 # Gate 1: circuit breaker. If OPEN, refuse without even importing. 

220 if _whisper_load_breaker is not None and _whisper_load_breaker.is_open(): 

221 raise RuntimeError( 

222 f"faster-whisper circuit breaker OPEN (stats=" 

223 f"{_whisper_load_breaker.get_stats()}, last_error=" 

224 f"{_whisper_last_error!r})" 

225 ) 

226 

227 # Gate 2: exponential backoff keyed by model name. If recently 

228 # failed, refuse until the window elapses. Caller gets a clear 

229 # single log line per refusal, not 2Hz spam. 

230 if (_whisper_load_backoff is not None 

231 and _whisper_load_backoff.is_backed_off('faster_whisper')): 

232 raise RuntimeError( 

233 f"faster-whisper load is backed off (last_error=" 

234 f"{_whisper_last_error!r})" 

235 ) 

236 

237 try: 

238 from faster_whisper import WhisperModel 

239 except ImportError as e: 

240 _record_whisper_failure(f"faster_whisper import failed: {e}") 

241 raise 

242 

243 # Detect if CUDA is available for CTranslate2 (separate from torch CUDA) 

244 device = "cpu" 

245 compute_type = "int8" 

246 try: 

247 import ctranslate2 

248 if 'cuda' in ctranslate2.get_supported_compute_types('cuda'): 

249 device = "cuda" 

250 compute_type = "float16" 

251 logger.info("CTranslate2 CUDA available — loading faster-whisper on GPU") 

252 except Exception: 

253 pass 

254 

255 logger.info(f"Loading faster-whisper model '{model_size}' on {device} ({compute_type})...") 

256 try: 

257 _faster_whisper_model = WhisperModel( 

258 model_size, device=device, compute_type=compute_type 

259 ) 

260 except Exception as e: 

261 reason = f"WhisperModel({model_size}, {device}, {compute_type}) failed: {e}" 

262 logger.warning(reason) 

263 _record_whisper_failure(reason) 

264 raise 

265 _faster_whisper_model_size = model_size 

266 logger.info(f"faster-whisper model '{model_size}' loaded on {device}") 

267 _record_whisper_success() 

268 

269 # Register with central lifecycle tracker via orchestrator 

270 try: 

271 from .model_orchestrator import get_orchestrator 

272 get_orchestrator().notify_loaded('stt', f'whisper-{model_size}', 

273 device=device, vram_gb=3.0 if device == 'cuda' else 0) 

274 except Exception: 

275 pass 

276 

277 return _faster_whisper_model 

278 

279 

280def _faster_whisper_transcribe(audio_path: str, language: str = None) -> Optional[str]: 

281 """Transcribe using faster-whisper. Returns JSON string or None on failure. 

282 

283 Symptom #10 guard (2026-04-16): each call is gated by the module 

284 circuit breaker, so after N consecutive load failures we refuse 

285 silently (one log every cooldown window, not 2Hz spam). The 

286 backoff is shared with _get_faster_whisper_model so a model-load 

287 refusal here shortcircuits the full transcribe path too. 

288 """ 

289 # Fast-path refusal: if breaker is OPEN, skip the whole try block. 

290 if _whisper_load_breaker is not None and _whisper_load_breaker.is_open(): 

291 return None 

292 

293 try: 

294 model = _get_faster_whisper_model(_FASTER_WHISPER_MODEL_SIZE) 

295 except Exception as e: 

296 # _get_faster_whisper_model already records the failure + 

297 # emits one warning. Don't re-log at 2Hz here. 

298 logger.debug(f"faster-whisper unavailable: {e}") 

299 return None 

300 

301 try: 

302 kwargs = {"beam_size": 5} 

303 if language: 

304 kwargs["language"] = language 

305 segments, info = model.transcribe(audio_path, **kwargs) 

306 text = " ".join(seg.text for seg in segments).strip() 

307 _record_whisper_success() 

308 return json.dumps({ 

309 "text": text, 

310 "language": info.language if info.language else "unknown", 

311 }) 

312 except Exception as e: 

313 reason = f"transcribe({audio_path}) failed: {e}" 

314 logger.warning(f"faster-whisper transcription failed: {e}") 

315 _record_whisper_failure(reason) 

316 return None 

317 

318 

319# ═══════════════════════════════════════════════════════════════ 

320# Model download (sherpa-onnx) 

321# ═══════════════════════════════════════════════════════════════ 

322 

323def _get_stt_dir() -> Path: 

324 """Get the STT model storage directory.""" 

325 from .model_storage import model_storage 

326 stt_dir = model_storage.get_tool_dir("stt") 

327 stt_dir.mkdir(parents=True, exist_ok=True) 

328 return stt_dir 

329 

330 

331def _download_model(model_name: str) -> Path: 

332 """Download and extract a sherpa-onnx model if not already present. 

333 

334 Returns the path to the extracted model directory. 

335 """ 

336 cfg = _SHERPA_MODELS[model_name] 

337 stt_dir = _get_stt_dir() 

338 model_dir = stt_dir / cfg["dir"] 

339 

340 if model_dir.exists() and (model_dir / cfg["files"]["tokens"]).exists(): 

341 return model_dir 

342 

343 archive_url = f"{_SHERPA_MODEL_BASE}/{cfg['archive']}" 

344 archive_path = stt_dir / cfg["archive"] 

345 

346 logger.info(f"Downloading STT model '{model_name}' from {archive_url}...") 

347 try: 

348 urllib.request.urlretrieve(archive_url, str(archive_path)) 

349 logger.info(f"Extracting {cfg['archive']}...") 

350 with tarfile.open(str(archive_path), "r:bz2") as tar: 

351 tar.extractall(path=str(stt_dir)) 

352 # Clean up archive 

353 archive_path.unlink(missing_ok=True) 

354 logger.info(f"STT model '{model_name}' ready at {model_dir}") 

355 except Exception as e: 

356 logger.error(f"Failed to download model '{model_name}': {e}") 

357 archive_path.unlink(missing_ok=True) 

358 raise 

359 

360 return model_dir 

361 

362 

363# ═══════════════════════════════════════════════════════════════ 

364# sherpa-onnx recognizer creation 

365# ═══════════════════════════════════════════════════════════════ 

366 

367def _get_sherpa_recognizer(model_name: str = "whisper-tiny"): 

368 """Create or return cached sherpa-onnx OfflineRecognizer.""" 

369 global _sherpa_recognizer, _sherpa_model_name 

370 if _sherpa_recognizer is not None and _sherpa_model_name == model_name: 

371 return _sherpa_recognizer 

372 

373 import sherpa_onnx 

374 

375 cfg = _SHERPA_MODELS[model_name] 

376 model_dir = _download_model(model_name) 

377 

378 num_threads = min(os.cpu_count() or 2, 4) 

379 

380 if cfg["type"] == "moonshine": 

381 _sherpa_recognizer = sherpa_onnx.OfflineRecognizer.from_moonshine( 

382 preprocessor=str(model_dir / cfg["files"]["preprocessor"]), 

383 encoder=str(model_dir / cfg["files"]["encoder"]), 

384 uncached_decoder=str(model_dir / cfg["files"]["uncached_decoder"]), 

385 cached_decoder=str(model_dir / cfg["files"]["cached_decoder"]), 

386 tokens=str(model_dir / cfg["files"]["tokens"]), 

387 num_threads=num_threads, 

388 ) 

389 elif cfg["type"] == "whisper": 

390 _sherpa_recognizer = sherpa_onnx.OfflineRecognizer.from_whisper( 

391 encoder=str(model_dir / cfg["files"]["encoder"]), 

392 decoder=str(model_dir / cfg["files"]["decoder"]), 

393 tokens=str(model_dir / cfg["files"]["tokens"]), 

394 num_threads=num_threads, 

395 ) 

396 else: 

397 raise ValueError(f"Unknown model type: {cfg['type']}") 

398 

399 _sherpa_model_name = model_name 

400 logger.info(f"sherpa-onnx recognizer ready: {model_name}") 

401 return _sherpa_recognizer 

402 

403 

404def _sherpa_transcribe(audio_path: str, model_name: str) -> Optional[str]: 

405 """Transcribe using sherpa-onnx. Returns JSON string or None on failure.""" 

406 try: 

407 recognizer = _get_sherpa_recognizer(model_name) 

408 stream = recognizer.create_stream() 

409 stream.accept_wave_file(audio_path) 

410 recognizer.decode_stream(stream) 

411 text = stream.result.text.strip() 

412 

413 # Language: Moonshine is English-only, Whisper auto-detects 

414 cfg = _SHERPA_MODELS.get(model_name, {}) 

415 lang = "en" if not cfg.get("multilingual") else "auto" 

416 

417 return json.dumps({"text": text, "language": lang}) 

418 except Exception as e: 

419 logger.warning(f"sherpa-onnx transcription failed ({model_name}): {e}") 

420 return None 

421 

422 

423# ═══════════════════════════════════════════════════════════════ 

424# Legacy openai-whisper fallback 

425# ═══════════════════════════════════════════════════════════════ 

426 

427def _get_whisper_model(model_name: str = "base"): 

428 """Lazy-load openai-whisper model (fallback if sherpa-onnx unavailable).""" 

429 global _whisper_model, _whisper_model_name 

430 if _whisper_model is not None and _whisper_model_name == model_name: 

431 return _whisper_model 

432 

433 import whisper 

434 

435 from .model_storage import model_storage 

436 model_dir = model_storage.get_tool_dir("whisper") 

437 model_dir.mkdir(parents=True, exist_ok=True) 

438 

439 os.environ.setdefault("XDG_CACHE_HOME", str(model_dir.parent)) 

440 logger.info(f"Loading openai-whisper model '{model_name}' (fallback)...") 

441 _whisper_model = whisper.load_model(model_name, download_root=str(model_dir)) 

442 _whisper_model_name = model_name 

443 logger.info(f"openai-whisper model '{model_name}' loaded") 

444 return _whisper_model 

445 

446 

447def _legacy_transcribe(audio_path: str, language: str = None) -> Optional[str]: 

448 """Transcribe using openai-whisper (fallback). Returns JSON string or None.""" 

449 try: 

450 model_name = _select_legacy_model() 

451 model = _get_whisper_model(model_name) 

452 kwargs = {} 

453 if language: 

454 kwargs["language"] = language 

455 result = model.transcribe(audio_path, **kwargs) 

456 return json.dumps({ 

457 "text": result["text"].strip(), 

458 "language": result.get("language", "unknown"), 

459 }) 

460 except ImportError: 

461 return None 

462 except Exception as e: 

463 logger.warning(f"openai-whisper fallback failed: {e}") 

464 return None 

465 

466 

467def _select_legacy_model() -> str: 

468 """Select openai-whisper model by VRAM (legacy path).""" 

469 try: 

470 from .vram_manager import vram_manager 

471 gpu = vram_manager.detect_gpu() 

472 if not gpu["cuda_available"]: 

473 return "base" 

474 free = vram_manager.get_free_vram() 

475 if free >= 10: 

476 return "large-v3" 

477 elif free >= 5: 

478 return "medium" 

479 elif free >= 2: 

480 return "small" 

481 except Exception: 

482 pass 

483 return "base" 

484 

485 

486# ═══════════════════════════════════════════════════════════════ 

487# Public API (same interface for all callers) 

488# ═══════════════════════════════════════════════════════════════ 

489 

490def populate_stt_catalog(catalog) -> int: 

491 """Register all STT model variants into the ModelCatalog. 

492 

493 Called by ModelCatalog._populate_stt_models() so the catalog is the 

494 single source of truth for model names, VRAM requirements, and tier gates. 

495 Replaces the hardcoded VRAM thresholds in select_whisper_model(). 

496 

497 Returns number of new entries added. 

498 """ 

499 from integrations.service_tools.model_catalog import ModelEntry, ModelType 

500 

501 # (id, name, vram_gb, ram_gb, disk_gb, quality, speed, tags, min_tier) 

502 models = [ 

503 # faster-whisper (primary engine, CTranslate2 INT8) 

504 ('stt-faster-whisper-tiny', 'Whisper Tiny (faster-whisper)', 0.0, 0.3, 0.15, 

505 0.60, 0.98, ['multilingual', 'cpu-friendly', 'faster-whisper'], 'lite'), 

506 ('stt-faster-whisper-base', 'Whisper Base (faster-whisper)', 0.2, 0.5, 0.30, 

507 0.72, 0.95, ['multilingual', 'cpu-friendly', 'faster-whisper'], 'lite'), 

508 ('stt-faster-whisper-small', 'Whisper Small (faster-whisper)', 0.5, 1.0, 0.46, 

509 0.80, 0.85, ['multilingual', 'faster-whisper'], 'lite'), 

510 ('stt-faster-whisper-medium', 'Whisper Medium (faster-whisper)', 1.5, 2.0, 1.50, 

511 0.87, 0.72, ['multilingual', 'faster-whisper'], 'standard'), 

512 ('stt-faster-whisper-large', 'Whisper Large v3 (faster-whisper)', 3.0, 4.0, 3.10, 

513 0.94, 0.55, ['multilingual', 'faster-whisper'], 'full'), 

514 # sherpa-onnx (lightweight ONNX, no PyTorch) 

515 ('stt-sherpa-moonshine-tiny', 'Moonshine Tiny (sherpa-onnx, EN)', 0.0, 0.2, 0.08, 

516 0.62, 0.99, ['english-only', 'onnx', 'sherpa-onnx', 'cpu-friendly'], 'lite'), 

517 ('stt-sherpa-moonshine-base', 'Moonshine Base (sherpa-onnx, EN)', 0.0, 0.3, 0.15, 

518 0.68, 0.96, ['english-only', 'onnx', 'sherpa-onnx', 'cpu-friendly'], 'lite'), 

519 ('stt-sherpa-whisper-tiny', 'Whisper Tiny (sherpa-onnx)', 0.0, 0.3, 0.15, 

520 0.61, 0.97, ['multilingual', 'onnx', 'sherpa-onnx', 'cpu-friendly'], 'lite'), 

521 ('stt-sherpa-whisper-base', 'Whisper Base (sherpa-onnx)', 0.0, 0.4, 0.30, 

522 0.72, 0.92, ['multilingual', 'onnx', 'sherpa-onnx'], 'lite'), 

523 ('stt-sherpa-whisper-small', 'Whisper Small (sherpa-onnx)', 0.0, 0.7, 0.46, 

524 0.79, 0.80, ['multilingual', 'onnx', 'sherpa-onnx'], 'lite'), 

525 ('stt-sherpa-whisper-medium', 'Whisper Medium (sherpa-onnx)', 0.0, 1.5, 1.50, 

526 0.86, 0.65, ['multilingual', 'onnx', 'sherpa-onnx'], 'standard'), 

527 ] 

528 

529 added = 0 

530 for (mid, name, vram, ram, disk, quality, speed, tags, min_tier) in models: 

531 if catalog.get(mid) is not None: 

532 continue 

533 entry = ModelEntry( 

534 id=mid, name=name, model_type=ModelType.STT, 

535 source='github' if 'sherpa' in mid else 'huggingface', 

536 vram_gb=vram, ram_gb=ram, disk_gb=disk, 

537 min_capability_tier=min_tier, 

538 backend='onnx' if 'sherpa' in mid else 'torch', 

539 supports_gpu=(vram > 0), supports_cpu=True, 

540 supports_cpu_offload=False, 

541 idle_timeout_s=300, 

542 capabilities={ 

543 'realtime': True, 

544 'diarization': False, 

545 'multilingual': ('multilingual' in tags), 

546 }, 

547 quality_score=quality, speed_score=speed, 

548 languages=['multilingual'] if 'multilingual' in tags else ['en'], 

549 tags=tags, 

550 ) 

551 catalog.register(entry, persist=False) 

552 added += 1 

553 return added 

554 

555 

556# ── Catalog-aware model name → sherpa-onnx key mapping ───────────────────── 

557_CATALOG_ID_TO_SHERPA = { 

558 'stt-sherpa-moonshine-tiny': 'moonshine-tiny', 

559 'stt-sherpa-moonshine-base': 'moonshine-base', 

560 'stt-sherpa-whisper-tiny': 'whisper-tiny', 

561 'stt-sherpa-whisper-base': 'whisper-base', 

562 'stt-sherpa-whisper-small': 'whisper-small', 

563 'stt-sherpa-whisper-medium': 'whisper-medium', 

564} 

565 

566_CATALOG_ID_TO_FASTER_WHISPER_SIZE = { 

567 'stt-faster-whisper-tiny': 'tiny', 

568 'stt-faster-whisper-base': 'base', 

569 'stt-faster-whisper-small': 'small', 

570 'stt-faster-whisper-medium': 'medium', 

571 'stt-faster-whisper-large': 'large-v3', 

572} 

573 

574 

575def select_whisper_model() -> str: 

576 """Select best STT model for this hardware. 

577 

578 Tries ModelCatalog first (single source of truth for VRAM thresholds). 

579 Falls back to direct VRAM query if catalog is unavailable. 

580 

581 Returns a sherpa-onnx model key (from _SHERPA_MODELS) when sherpa-onnx 

582 is available, or an openai-whisper model name as a legacy fallback. 

583 """ 

584 # ── Primary path: ask the catalog ─────────────────────────────────────── 

585 try: 

586 from integrations.service_tools.model_orchestrator import get_orchestrator 

587 orch = get_orchestrator() 

588 entry = orch.select_best('stt') 

589 if entry: 

590 # Map catalog entry ID back to the engine-specific key 

591 sherpa_key = _CATALOG_ID_TO_SHERPA.get(entry.id) 

592 if sherpa_key and sherpa_key in _SHERPA_MODELS: 

593 try: 

594 import sherpa_onnx # noqa: F401 

595 return sherpa_key 

596 except ImportError: 

597 pass 

598 # faster-whisper size 

599 fw_size = _CATALOG_ID_TO_FASTER_WHISPER_SIZE.get(entry.id) 

600 if fw_size: 

601 return fw_size 

602 except Exception: 

603 pass 

604 

605 # ── Fallback: direct VRAM query (no catalog dependency) ───────────────── 

606 try: 

607 import sherpa_onnx # noqa: F401 — check availability 

608 except ImportError: 

609 return _select_legacy_model() 

610 

611 from .vram_manager import vram_manager 

612 gpu = vram_manager.detect_gpu() 

613 

614 if gpu["cuda_available"]: 

615 free = vram_manager.get_free_vram() 

616 if free >= 5: 

617 return "whisper-medium" 

618 elif free >= 2: 

619 return "whisper-small" 

620 else: 

621 return "whisper-base" 

622 else: 

623 # CPU-only: prefer Moonshine (fastest) for English, 

624 # Whisper tiny for multilingual 

625 # Caller can override with language hint 

626 return "moonshine-tiny" 

627 

628 

629# ═══════════════════════════════════════════════════════════════ 

630# Subprocess isolation 

631# ═══════════════════════════════════════════════════════════════ 

632# 

633# STT engines (faster-whisper / sherpa-onnx / openai-whisper) all have 

634# native C runtimes that can crash the parent process on CUDA OOM, 

635# DLL conflicts, or audio decoder edge cases. Running them in a worker 

636# subprocess contains those crashes: the worker dies, the parent catches 

637# the exit code and returns an error JSON without bringing Nunba down. 

638 

639from .gpu_worker import ToolWorker # noqa: E402 

640 

641_stt_tool = ToolWorker( 

642 tool_name='whisper', 

643 tool_module='integrations.service_tools.whisper_tool', 

644 vram_budget='whisper_base', 

645 output_subdir='stt', # not used — STT doesn't generate files 

646 engine='whisper', 

647 startup_timeout=60.0, 

648 request_timeout=180.0, # long audio files can take a while 

649 idle_timeout=300.0, # free the model after 5 min idle 

650) 

651 

652 

653def _transcribe_impl(audio_path: str, language: str = None) -> str: 

654 """Transcribe audio — runs inside the worker subprocess. 

655 

656 Engine priority: faster-whisper → sherpa-onnx → openai-whisper. 

657 

658 Returns JSON string with 'text' and 'language' keys. 

659 """ 

660 # 1. Try faster-whisper (preferred — CTranslate2, 4x faster, multilingual) 

661 try: 

662 import faster_whisper # noqa: F401 

663 result = _faster_whisper_transcribe(audio_path, language) 

664 if result: 

665 return result 

666 except ImportError: 

667 pass 

668 except Exception as e: 

669 logger.warning(f"faster-whisper failed, trying fallback: {e}") 

670 

671 # 2. Try sherpa-onnx (lightweight ONNX, no PyTorch) 

672 try: 

673 import sherpa_onnx # noqa: F401 

674 

675 model_name = select_whisper_model() 

676 

677 # If a non-English language is explicitly requested and the selected 

678 # model is English-only (Moonshine), switch to multilingual Whisper 

679 cfg = _SHERPA_MODELS.get(model_name, {}) 

680 if language and language != "en" and not cfg.get("multilingual"): 

681 model_name = "whisper-tiny" 

682 

683 result = _sherpa_transcribe(audio_path, model_name) 

684 if result: 

685 return result 

686 except ImportError: 

687 pass 

688 except Exception as e: 

689 logger.warning(f"sherpa-onnx failed, trying openai-whisper: {e}") 

690 

691 # 3. Fallback: openai-whisper (PyTorch) — the risky path 

692 result = _legacy_transcribe(audio_path, language) 

693 if result: 

694 return result 

695 

696 return json.dumps({"error": "No STT engine available (install faster-whisper)"}) 

697 

698 

699def whisper_transcribe(audio_path: str, language: str = None) -> str: 

700 """Transcribe audio file to text (subprocess-isolated). 

701 

702 Runs the STT engine chain in a dedicated worker subprocess so that 

703 CUDA OOM / CTranslate2 crashes / PyTorch DLL segfaults can't kill 

704 the parent process. 

705 

706 Args: 

707 audio_path: Path to audio file (WAV, MP3, WebM, etc.) 

708 language: Optional language code. Auto-detect if None. 

709 

710 Returns: 

711 JSON string with 'text' and 'language' keys. 

712 """ 

713 result = _stt_tool.call({ 

714 'op': 'transcribe', 

715 'audio_path': audio_path, 

716 'language': language, 

717 }) 

718 if 'error' in result: 

719 return json.dumps(result) 

720 return result.get('raw_json', json.dumps(result)) 

721 

722 

723def _detect_language_impl(audio_path: str) -> str: 

724 """Language detection — runs inside the worker subprocess. 

725 

726 Returns JSON string with 'language' and 'probability' keys. 

727 """ 

728 # Try faster-whisper first (has built-in language detection) 

729 try: 

730 from faster_whisper import WhisperModel # noqa: F401 

731 model = _get_faster_whisper_model(_FASTER_WHISPER_MODEL_SIZE) 

732 _, info = model.transcribe(audio_path, beam_size=1) 

733 return json.dumps({ 

734 "language": info.language if info.language else "unknown", 

735 "probability": round(info.language_probability, 4) if info.language_probability else 0.0, 

736 }) 

737 except ImportError: 

738 pass 

739 except Exception as e: 

740 logger.debug(f"faster-whisper language detection failed: {e}") 

741 

742 try: 

743 import whisper 

744 model = _get_whisper_model() 

745 audio = whisper.load_audio(audio_path) 

746 audio = whisper.pad_or_trim(audio) 

747 mel = whisper.log_mel_spectrogram(audio).to(model.device) 

748 _, probs = model.detect_language(mel) 

749 lang = max(probs, key=probs.get) 

750 return json.dumps({ 

751 "language": lang, 

752 "probability": round(probs[lang], 4), 

753 }) 

754 except ImportError: 

755 # No openai-whisper — transcribe with multilingual Whisper and infer 

756 try: 

757 import sherpa_onnx # noqa: F401 

758 result = _sherpa_transcribe(audio_path, "whisper-tiny") 

759 if result: 

760 parsed = json.loads(result) 

761 return json.dumps({ 

762 "language": parsed.get("language", "unknown"), 

763 "probability": 0.8, 

764 }) 

765 except Exception: 

766 pass 

767 return json.dumps({"error": "Language detection unavailable"}) 

768 except Exception as e: 

769 return json.dumps({"error": f"Language detection failed: {e}"}) 

770 

771 

772def whisper_detect_language(audio_path: str) -> str: 

773 """Detect the language of an audio file (subprocess-isolated).""" 

774 result = _stt_tool.call({ 

775 'op': 'detect_language', 

776 'audio_path': audio_path, 

777 }) 

778 if 'error' in result: 

779 return json.dumps(result) 

780 return result.get('raw_json', json.dumps(result)) 

781 

782 

783def unload_whisper(): 

784 """Unload all STT models to free memory. 

785 

786 The actual models live inside the `_stt_tool` subprocess (see the 

787 SUBPROCESS ISOLATION section above), so the authoritative unload is 

788 stopping that worker. We ALSO clear the parent-side globals in case 

789 a worker-side reference leaked into the parent module state during 

790 an import. 

791 """ 

792 # 1. Stop the worker subprocess — this is the real VRAM release. 

793 try: 

794 _stt_tool.stop() 

795 except Exception as e: 

796 logger.warning(f"failed to stop STT worker: {e}") 

797 

798 # 2. Also clear parent-side caches (only ever populated in the worker, 

799 # but defensive in case something called a legacy helper in-process). 

800 global _sherpa_recognizer, _sherpa_model_name 

801 global _whisper_model, _whisper_model_name 

802 global _faster_whisper_model, _faster_whisper_model_size 

803 

804 _faster_whisper_model = None 

805 _faster_whisper_model_size = None 

806 _sherpa_recognizer = None 

807 _sherpa_model_name = None 

808 _whisper_model = None 

809 _whisper_model_name = None 

810 

811 from .vram_manager import clear_cuda_cache 

812 clear_cuda_cache() 

813 logger.info("STT models unloaded") 

814 

815 

816# ═══════════════════════════════════════════════════════════════ 

817# Streaming STT WebSocket Server (faster-whisper with VAD) 

818# 

819# Pattern: same as diarization_server.py — standalone asyncio WebSocket 

820# server started as a daemon thread by DiarizationService-style manager. 

821# 

822# Protocol: 

823# Client → Server: binary PCM16 audio chunks (16kHz mono) OR 

824# binary WebM/Opus blobs (auto-detected, converted via ffmpeg) 

825# Server → Client: JSON {"text": "...", "language": "en", "is_final": true/false} 

826# 

827# The server accumulates audio in a per-connection buffer. When VAD detects 

828# a speech pause (or buffer exceeds 30s), it transcribes the buffer with 

829# faster-whisper and sends back the result. Partial results are sent 

830# every 2s of accumulated audio for low-latency interim display. 

831# ═══════════════════════════════════════════════════════════════ 

832 

833_stt_ws_server = None 

834_stt_ws_port = None 

835 

836STREAM_SAMPLE_RATE = 16000 

837STREAM_BYTES_PER_SAMPLE = 2 

838STREAM_CHANNELS = 1 

839# Transcribe every 2s of audio for interim results 

840STREAM_CHUNK_SECONDS = 2 

841STREAM_CHUNK_BYTES = STREAM_SAMPLE_RATE * STREAM_BYTES_PER_SAMPLE * STREAM_CHANNELS * STREAM_CHUNK_SECONDS 

842# Max buffer before forced transcription (30s) 

843STREAM_MAX_BUFFER_BYTES = STREAM_SAMPLE_RATE * STREAM_BYTES_PER_SAMPLE * STREAM_CHANNELS * 30 

844 

845 

846def _ws_path(websocket) -> str: 

847 """Best-effort URL-path extractor across websockets lib versions. 

848 

849 websockets 11+ moved the request to ``websocket.request.path``; 

850 earlier versions exposed ``websocket.path`` directly. We probe 

851 both and fall back to empty string when neither is available. 

852 Never raises. 

853 """ 

854 for getter in ( 

855 lambda ws: ws.request.path, 

856 lambda ws: ws.path, 

857 ): 

858 try: 

859 val = getter(websocket) 

860 if isinstance(val, str): 

861 return val 

862 except Exception: 

863 continue 

864 return '' 

865 

866 

867def _parse_call_context(ws_path: str) -> tuple: 

868 """Parse ``?call_id=<id>&user_id=<u>`` from a WS request path. 

869 

870 UNIF-G7 / W1.7 Producer C — the RN mic stream (and any other 

871 browser/mobile audio source) opens the streaming-STT WebSocket 

872 with these query params attached when the audio belongs to a 

873 voice room. Without the params, behavior is unchanged (today's 

874 one-shot transcription clients still work). 

875 

876 Returns ``(call_id, user_id)`` — either may be ``None``. Never 

877 raises. 

878 """ 

879 if not ws_path: 

880 return (None, None) 

881 from urllib.parse import urlparse, parse_qs 

882 try: 

883 parsed = urlparse(ws_path) 

884 qs = parse_qs(parsed.query or '') 

885 call_id = (qs.get('call_id') or [None])[0] 

886 user_id = (qs.get('user_id') or [None])[0] 

887 return (call_id or None, user_id or None) 

888 except Exception: 

889 return (None, None) 

890 

891 

892def _maybe_enqueue_call_segment( 

893 call_id: Optional[str], 

894 user_id: Optional[str], 

895 text: str, 

896 lang: str, 

897 is_final: bool, 

898) -> None: 

899 """If the WS client opted in via ``?call_id=`` AND this is a final 

900 segment, land it in the canonical per-call queue so the 

901 AgentBridgeWorker can drain it (UNIF-G3 / W1.2 consumer). 

902 

903 Single canonical writer for browser/mobile-mic-driven transcripts — 

904 same sink as the future Discord-voice-recv (Producer A) and 

905 LiveKit-RTC (Producer B) audio paths. No parallel queue. 

906 

907 Best-effort: never raises out of the WS handler hot path. 

908 """ 

909 if not call_id or not is_final or not text: 

910 return 

911 try: 

912 enqueue_stt_segment(call_id, { 

913 'text': text, 

914 'lang': lang, 

915 'author_id': user_id or 'unknown', 

916 'is_final': True, 

917 # t0/t1/speaker stay None — RN mic stream is single-speaker 

918 # by definition (the user typing into the SPA); future 

919 # multi-speaker producers will set speaker. 

920 }) 

921 except Exception as e: 

922 logger.debug( 

923 "whisper_tool._maybe_enqueue_call_segment failed " 

924 "(call=%s): %s", call_id, e) 

925 

926 

927async def _stt_stream_handler(websocket): 

928 """Handle a single streaming STT WebSocket connection. 

929 

930 Accepts: 

931 - Raw PCM16 16kHz mono binary frames 

932 - WebM/Opus blobs (auto-converted to PCM via temp file + faster-whisper) 

933 - JSON {"control": "reset"} to clear buffer 

934 - JSON {"control": "final"} to force final transcription 

935 

936 Sends back: 

937 - {"text": "...", "language": "en", "is_final": false} for interim 

938 - {"text": "...", "language": "en", "is_final": true} for final (pause detected) 

939 

940 Optional UNIF-G7 hook (Producer C): 

941 The connection URL MAY include ``?call_id=<id>&user_id=<u>``. 

942 When present, every final segment is ALSO landed in the per-call 

943 STT queue (whisper_tool.enqueue_stt_segment) so the 

944 AgentBridgeWorker can drain it and emit the meet_copilot card. 

945 Absence of the params preserves today's behavior exactly — RN 

946 one-shot transcription clients are unaffected. 

947 

948 CRASH ISOLATION: 

949 - Model crashes are isolated: _transcribe_buffer routes through 

950 _stt_tool.call() which handles subprocess crashes and returns 

951 empty strings on failure. 

952 - Protocol-level code (websocket frames, VAD, buffering) runs 

953 in the daemon thread with an outer try/except (below) so 

954 Python exceptions are caught and the connection closes cleanly. 

955 - Remaining risks are C-level crashes in the websockets library 

956 or audio decoders — low-probability and would require moving 

957 the entire server into a subprocess with per-frame IPC, which 

958 costs realtime latency. Deferred until evidence of actual crashes. 

959 """ 

960 import io 

961 import tempfile 

962 import numpy as np 

963 

964 audio_buffer = io.BytesIO() 

965 last_transcribe_size = 0 

966 # UNIF-G7 Producer C: extract optional call context from the 

967 # WS request path. When absent (call_id is None), the 

968 # _maybe_enqueue_call_segment helper degrades to a no-op so plain 

969 # transcription clients see ZERO behavior change. 

970 call_id, user_id = _parse_call_context(_ws_path(websocket)) 

971 

972 try: 

973 async for message in websocket: 

974 # Control messages (JSON) 

975 if isinstance(message, str): 

976 try: 

977 ctrl = json.loads(message) 

978 if ctrl.get('control') == 'reset': 

979 audio_buffer = io.BytesIO() 

980 last_transcribe_size = 0 

981 continue 

982 if ctrl.get('control') == 'final': 

983 # Force final transcription of remaining buffer 

984 text, lang = _transcribe_buffer(audio_buffer) 

985 if text: 

986 await websocket.send(json.dumps({ 

987 'text': text, 'language': lang, 'is_final': True, 

988 })) 

989 _maybe_enqueue_call_segment( 

990 call_id, user_id, text, lang, True) 

991 audio_buffer = io.BytesIO() 

992 last_transcribe_size = 0 

993 continue 

994 except (json.JSONDecodeError, ValueError): 

995 pass 

996 continue 

997 

998 # Binary audio data 

999 if not isinstance(message, (bytes, bytearray)): 

1000 continue 

1001 

1002 # Detect format: WebM/Opus starts with 0x1A45DFA3 (EBML header) 

1003 # or "OggS" (Ogg container). Raw PCM has no header. 

1004 is_container = ( 

1005 message[:4] == b'\x1a\x45\xdf\xa3' or # WebM/Matroska 

1006 message[:4] == b'OggS' or # Ogg/Opus 

1007 message[:4] == b'RIFF' # WAV 

1008 ) 

1009 

1010 if is_container: 

1011 # Save to temp file, let faster-whisper handle decoding 

1012 pcm_bytes = _container_to_pcm(message) 

1013 if pcm_bytes: 

1014 audio_buffer.write(pcm_bytes) 

1015 else: 

1016 # Raw PCM16 mono 16kHz 

1017 audio_buffer.write(message) 

1018 

1019 buf_size = audio_buffer.getbuffer().nbytes 

1020 

1021 # Force transcription if buffer exceeds max 

1022 if buf_size >= STREAM_MAX_BUFFER_BYTES: 

1023 text, lang = _transcribe_buffer(audio_buffer) 

1024 if text: 

1025 await websocket.send(json.dumps({ 

1026 'text': text, 'language': lang, 'is_final': True, 

1027 })) 

1028 _maybe_enqueue_call_segment( 

1029 call_id, user_id, text, lang, True) 

1030 audio_buffer = io.BytesIO() 

1031 last_transcribe_size = 0 

1032 continue 

1033 

1034 # Interim transcription every STREAM_CHUNK_BYTES 

1035 if buf_size - last_transcribe_size >= STREAM_CHUNK_BYTES: 

1036 text, lang = _transcribe_buffer(audio_buffer, keep_buffer=True) 

1037 last_transcribe_size = buf_size 

1038 if text: 

1039 await websocket.send(json.dumps({ 

1040 'text': text, 'language': lang, 'is_final': False, 

1041 })) 

1042 

1043 except Exception as e: 

1044 logger.debug(f"STT stream connection ended: {e}") 

1045 

1046 

1047def _container_to_pcm(data: bytes) -> Optional[bytes]: 

1048 """Convert WebM/Opus/WAV container to raw PCM16 16kHz mono via temp file. 

1049 

1050 faster-whisper can read any ffmpeg-supported format, so we save to a 

1051 temp file, transcribe, and extract the raw audio. But for streaming we 

1052 need raw PCM — use ffmpeg subprocess if available, else return raw bytes 

1053 and let faster-whisper handle it at transcribe time. 

1054 """ 

1055 import subprocess as _sp 

1056 import tempfile 

1057 

1058 tmp_in = None 

1059 tmp_out = None 

1060 try: 

1061 tmp_in = tempfile.NamedTemporaryFile(suffix='.webm', delete=False) 

1062 tmp_in.write(data) 

1063 tmp_in.close() 

1064 

1065 tmp_out = tempfile.NamedTemporaryFile(suffix='.pcm', delete=False) 

1066 tmp_out.close() 

1067 

1068 _kw = dict(capture_output=True, timeout=10) 

1069 if hasattr(_sp, 'CREATE_NO_WINDOW'): 

1070 _kw['creationflags'] = _sp.CREATE_NO_WINDOW 

1071 

1072 result = _sp.run([ 

1073 'ffmpeg', '-y', '-i', tmp_in.name, 

1074 '-ar', str(STREAM_SAMPLE_RATE), '-ac', '1', '-f', 's16le', 

1075 tmp_out.name, 

1076 ], **_kw) 

1077 

1078 if result.returncode == 0: 

1079 with open(tmp_out.name, 'rb') as f: 

1080 return f.read() 

1081 except FileNotFoundError: 

1082 # ffmpeg not available — save raw container bytes, 

1083 # _transcribe_buffer will write to temp file for faster-whisper 

1084 return data 

1085 except Exception as e: 

1086 logger.debug(f"PCM conversion failed: {e}") 

1087 finally: 

1088 for p in (tmp_in, tmp_out): 

1089 if p: 

1090 try: 

1091 os.unlink(p.name) 

1092 except Exception: 

1093 pass 

1094 return None 

1095 

1096 

1097def _transcribe_buffer(audio_buffer, keep_buffer: bool = False) -> tuple: 

1098 """Transcribe accumulated audio buffer via the subprocess STT worker. 

1099 

1100 Returns (text, language) tuple. 

1101 

1102 Runs through `_stt_tool` so CUDA OOM or faster-whisper/CTranslate2 

1103 crashes on the realtime path only kill the worker subprocess — the 

1104 streaming WebSocket server (and the whole Nunba process) stay alive. 

1105 """ 

1106 import tempfile 

1107 

1108 buf_bytes = audio_buffer.getvalue() 

1109 if len(buf_bytes) < STREAM_SAMPLE_RATE * 2: # < 1s of audio, skip 

1110 return ('', 'unknown') 

1111 

1112 if not keep_buffer: 

1113 audio_buffer.seek(0) 

1114 audio_buffer.truncate(0) 

1115 

1116 # Write PCM to a temp WAV file — the subprocess worker reads by 

1117 # path (simpler than shipping raw bytes over JSON). For realtime 

1118 # workloads this is still cheap (local disk, a few KB per chunk). 

1119 tmp = None 

1120 try: 

1121 import wave 

1122 tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) 

1123 with wave.open(tmp.name, 'wb') as wf: 

1124 wf.setnchannels(STREAM_CHANNELS) 

1125 wf.setsampwidth(STREAM_BYTES_PER_SAMPLE) 

1126 wf.setframerate(STREAM_SAMPLE_RATE) 

1127 wf.writeframes(buf_bytes) 

1128 tmp.close() 

1129 

1130 result = _stt_tool.call({ 

1131 'op': 'transcribe', 

1132 'audio_path': tmp.name, 

1133 'language': None, 

1134 }) 

1135 if 'error' in result and not result.get('raw_json'): 

1136 logger.debug(f"Streaming transcribe failed: {result.get('error')}") 

1137 return ('', 'unknown') 

1138 raw = result.get('raw_json') or json.dumps(result) 

1139 try: 

1140 parsed = json.loads(raw) 

1141 return (parsed.get('text', ''), parsed.get('language', 'unknown')) 

1142 except json.JSONDecodeError: 

1143 return ('', 'unknown') 

1144 except Exception as e: 

1145 logger.debug(f"Streaming transcribe failed: {e}") 

1146 return ('', 'unknown') 

1147 finally: 

1148 if tmp is not None: 

1149 try: 

1150 os.unlink(tmp.name) 

1151 except Exception: 

1152 pass 

1153 

1154 

1155def start_stt_stream_server(port: int = 0) -> Optional[int]: 

1156 """Start the streaming STT WebSocket server in a daemon thread. 

1157 

1158 Same pattern as DiarizationService — asyncio event loop in a thread. 

1159 

1160 Args: 

1161 port: Port to bind (0 = auto-select from port registry or dynamic) 

1162 

1163 Returns: 

1164 Actual port number, or None if failed. 

1165 """ 

1166 global _stt_ws_server, _stt_ws_port 

1167 

1168 if _stt_ws_port is not None: 

1169 return _stt_ws_port # already running 

1170 

1171 if port == 0: 

1172 try: 

1173 from core.port_registry import get_port 

1174 port = get_port('stt_stream') 

1175 except Exception: 

1176 port = 8005 # default fallback 

1177 

1178 import asyncio 

1179 import threading 

1180 

1181 def _run_server(): 

1182 global _stt_ws_server, _stt_ws_port 

1183 loop = asyncio.new_event_loop() 

1184 asyncio.set_event_loop(loop) 

1185 

1186 try: 

1187 import websockets 

1188 

1189 async def _serve(): 

1190 global _stt_ws_server, _stt_ws_port 

1191 server = await websockets.serve( 

1192 _stt_stream_handler, '127.0.0.1', port, 

1193 max_size=2 * 1024 * 1024, # 2MB max message (30s audio ~960KB) 

1194 ) 

1195 actual_port = port 

1196 if server.sockets: 

1197 actual_port = server.sockets[0].getsockname()[1] 

1198 _stt_ws_server = server 

1199 _stt_ws_port = actual_port 

1200 logger.info(f"Streaming STT WebSocket server on ws://127.0.0.1:{actual_port}") 

1201 await asyncio.Future() # run forever 

1202 

1203 loop.run_until_complete(_serve()) 

1204 except Exception as e: 

1205 logger.error(f"STT stream server failed: {e}") 

1206 _stt_ws_port = None 

1207 

1208 thread = threading.Thread(target=_run_server, daemon=True, name='stt-stream-ws') 

1209 thread.start() 

1210 

1211 # Wait for port to be assigned 

1212 import time 

1213 for _ in range(30): 

1214 if _stt_ws_port is not None: 

1215 return _stt_ws_port 

1216 time.sleep(0.1) 

1217 

1218 logger.warning("STT stream server did not start within 3s") 

1219 return None 

1220 

1221 

1222def get_stt_stream_port() -> Optional[int]: 

1223 """Get the port of the running streaming STT WebSocket server.""" 

1224 return _stt_ws_port 

1225 

1226 

1227# ═══════════════════════════════════════════════════════════════ 

1228# Per-call STT segment queue (UNIF-G3 / W1.2) 

1229# ═══════════════════════════════════════════════════════════════ 

1230# 

1231# When a call has subscribers (LiveKit room, Discord voice channel, 

1232# Teams meet, etc.), an audio-frame producer feeds frames through the 

1233# streaming STT WebSocket above and receives ``{text, language, 

1234# is_final}`` events back. ``enqueue_stt_segment`` is the canonical 

1235# place to land FINAL segments so the AgentBridgeWorker (which doesn't 

1236# care which adapter produced the audio) can drain them via 

1237# ``dequeue_segments`` from its tick loop. 

1238# 

1239# This is the single canonical home for STT-segment buffering — every 

1240# audio-source adapter (LiveKit subscriber, Discord voice receiver, 

1241# RN mic stream) lands segments here; every consumer (agent_voice_bridge 

1242# worker, transcript recorder) reads here. No parallel queues. 

1243 

1244import threading 

1245from collections import deque 

1246from typing import Any, Dict, List, Tuple 

1247 

1248# Per-call queues of (segment_id, segment_dict) tuples. Bounded by the 

1249# bridge worker drain cadence (~250ms) so unbounded growth is a bug 

1250# elsewhere; we still keep a soft cap to defend against producer leaks. 

1251_STT_SEGMENT_QUEUE: Dict[str, deque] = {} 

1252_STT_SEGMENT_NEXT_ID: Dict[str, int] = {} 

1253_STT_SEGMENT_LOCK = threading.Lock() 

1254_STT_SEGMENT_CAP_PER_CALL = 1024 # segments; older are evicted with WARN 

1255 

1256 

1257def enqueue_stt_segment(call_id: str, segment: Dict[str, Any]) -> int: 

1258 """Append a final STT segment for ``call_id``. 

1259 

1260 Producer-side: any audio-adapter that has decoded a final transcript 

1261 chunk calls this. ``segment`` SHOULD include: 

1262 - ``text`` : transcript text 

1263 - ``lang`` : detected language (BCP-47-ish) 

1264 - ``t0``,``t1``: float seconds (segment span on the call timeline) 

1265 - ``speaker`` : optional speaker id / name (None ⇒ unknown) 

1266 - ``author_id``: caller-supplied participant identifier — used by 

1267 the consumer to skip self-authored segments 

1268 - ``is_final``: optional, defaults True on enqueue (interim 

1269 segments don't belong here) 

1270 

1271 Returns the assigned segment_id (monotonic int per call) so the 

1272 producer can correlate downstream events. 

1273 

1274 Best-effort: never raises. Caller's ``call_id`` is required. 

1275 """ 

1276 if not call_id: 

1277 return -1 

1278 seg = dict(segment or {}) 

1279 seg.setdefault('is_final', True) 

1280 with _STT_SEGMENT_LOCK: 

1281 next_id = _STT_SEGMENT_NEXT_ID.get(call_id, 0) + 1 

1282 _STT_SEGMENT_NEXT_ID[call_id] = next_id 

1283 seg['segment_id'] = next_id 

1284 q = _STT_SEGMENT_QUEUE.setdefault(call_id, deque()) 

1285 q.append((next_id, seg)) 

1286 # Evict oldest if soft cap exceeded — defends against a leaked 

1287 # producer that never has its consumer attach. Real flows 

1288 # drain at 250ms cadence so this should never fire. 

1289 while len(q) > _STT_SEGMENT_CAP_PER_CALL: 

1290 _evicted = q.popleft() 

1291 logger.warning( 

1292 "whisper_tool.enqueue_stt_segment: call=%s queue cap " 

1293 "%d exceeded; evicted seg_id=%s", 

1294 call_id, _STT_SEGMENT_CAP_PER_CALL, _evicted[0]) 

1295 return next_id 

1296 

1297 

1298def dequeue_segments( 

1299 call_id: str, 

1300 since: int | None = None, 

1301) -> List[Dict[str, Any]]: 

1302 """Drain final STT segments for ``call_id`` newer than ``since``. 

1303 

1304 Consumer-side: the agent_voice_bridge worker calls this on each 

1305 tick. Returns segments in arrival order (FIFO). Each returned 

1306 dict carries the ``segment_id`` the producer was given so the 

1307 caller can update its ``since`` watermark for the next tick. 

1308 

1309 ``since=None`` returns all queued segments and resets the queue 

1310 for that call. ``since=N`` returns only segments with id > N 

1311 AND prunes those segments from the queue. 

1312 

1313 Best-effort: missing call_id → ``[]``. 

1314 """ 

1315 if not call_id: 

1316 return [] 

1317 with _STT_SEGMENT_LOCK: 

1318 q = _STT_SEGMENT_QUEUE.get(call_id) 

1319 if not q: 

1320 return [] 

1321 if since is None: 

1322 drained = [seg for (_sid, seg) in q] 

1323 q.clear() 

1324 return drained 

1325 # Prune everything ≤ since, return everything > since. 

1326 drained: List[Dict[str, Any]] = [] 

1327 # Iterate from left; any item with id ≤ since is consumed but 

1328 # not returned (already-acked). Items > since are returned 

1329 # AND removed (so the next dequeue with the same since is a 

1330 # no-op, but normally callers update their watermark). 

1331 keep: deque = deque() 

1332 for sid, seg in q: 

1333 if sid > since: 

1334 drained.append(seg) 

1335 # else: already acked; drop 

1336 # Replace the queue contents with whatever's still > since 

1337 # but un-drained (none currently — we drained ALL items > since). 

1338 # Future-proofing for partial drains: keep is empty here, but 

1339 # the structure makes the intent explicit. 

1340 q.clear() 

1341 q.extend(keep) 

1342 return drained 

1343 

1344 

1345def reset_stt_segment_queue(call_id: str) -> None: 

1346 """Drop all queued segments for a call (called on detach / hangup). 

1347 

1348 Best-effort: missing call_id is a no-op. 

1349 """ 

1350 if not call_id: 

1351 return 

1352 with _STT_SEGMENT_LOCK: 

1353 _STT_SEGMENT_QUEUE.pop(call_id, None) 

1354 _STT_SEGMENT_NEXT_ID.pop(call_id, None) 

1355 

1356 

1357# ═══════════════════════════════════════════════════════════════ 

1358# Service tool registration 

1359# ═══════════════════════════════════════════════════════════════ 

1360 

1361class WhisperTool: 

1362 """Register STT as an in-process service tool. 

1363 

1364 Unlike other tools, STT runs in-process (no sidecar server). 

1365 The tool functions are registered directly as callables. 

1366 """ 

1367 

1368 @classmethod 

1369 def register_functions(cls): 

1370 """Register STT functions directly with service_tool_registry.""" 

1371 whisper_transcribe.__name__ = "whisper_transcribe" 

1372 whisper_transcribe.__doc__ = ( 

1373 "Transcribe audio file to text using STT. " 

1374 "Input: audio_path (string path to WAV/MP3/WebM file), " 

1375 "language (optional language code like 'en'). " 

1376 "Returns JSON with 'text' and 'language'." 

1377 ) 

1378 

1379 whisper_detect_language.__name__ = "whisper_detect_language" 

1380 whisper_detect_language.__doc__ = ( 

1381 "Detect the language spoken in an audio file. " 

1382 "Input: audio_path (string path to audio file). " 

1383 "Returns JSON with 'language' code and 'probability'." 

1384 ) 

1385 

1386 tool_info = ServiceToolInfo( 

1387 name="whisper", 

1388 description=( 

1389 "Speech-to-text transcription. Converts audio files to text " 

1390 "using sherpa-onnx (Moonshine/Whisper ONNX) or OpenAI Whisper. " 

1391 "Supports 100+ languages with automatic language detection." 

1392 ), 

1393 base_url="inprocess://whisper", 

1394 endpoints={ 

1395 "transcribe": { 

1396 "path": "/transcribe", 

1397 "method": "POST", 

1398 "description": whisper_transcribe.__doc__, 

1399 "params_schema": { 

1400 "audio_path": {"type": "string", "description": "Path to audio file"}, 

1401 "language": {"type": "string", "description": "Language code (optional)"}, 

1402 }, 

1403 }, 

1404 "detect_language": { 

1405 "path": "/detect_language", 

1406 "method": "POST", 

1407 "description": whisper_detect_language.__doc__, 

1408 "params_schema": { 

1409 "audio_path": {"type": "string", "description": "Path to audio file"}, 

1410 }, 

1411 }, 

1412 }, 

1413 health_endpoint="/health", 

1414 tags=["stt", "speech", "transcription", "audio", "whisper", "sherpa-onnx"], 

1415 timeout=60, 

1416 ) 

1417 tool_info.is_healthy = True 

1418 service_tool_registry._tools["whisper"] = tool_info 

1419 return True 

1420 

1421 

1422# ═══════════════════════════════════════════════════════════════ 

1423# Worker callbacks (picked up by the centralized gpu_worker dispatcher) 

1424# ═══════════════════════════════════════════════════════════════ 

1425# 

1426# The STT worker has no upfront load — each transcribe call lazy- 

1427# initializes the appropriate engine (faster-whisper / sherpa / legacy) 

1428# inside the subprocess. `_load` is a no-op; `_synthesize` dispatches 

1429# on the request op so one worker handles transcribe + detect_language. 

1430 

1431def _load(): 

1432 """No upfront load — engines lazy-initialize per request.""" 

1433 return None 

1434 

1435 

1436def _synthesize(_model, req: dict) -> dict: 

1437 """Dispatch STT requests inside the worker subprocess.""" 

1438 op = req.get('op', 'transcribe') 

1439 if op == 'transcribe': 

1440 raw = _transcribe_impl(req.get('audio_path'), req.get('language')) 

1441 elif op == 'detect_language': 

1442 raw = _detect_language_impl(req.get('audio_path')) 

1443 else: 

1444 return {'error': f'Unknown op: {op}'} 

1445 # Return both the raw JSON (for pass-through) and parsed fields 

1446 try: 

1447 return {'raw_json': raw, **json.loads(raw)} 

1448 except json.JSONDecodeError: 

1449 return {'error': f'Invalid engine response: {raw[:200]}'} 

1450 

1451# NOTE: no `if __name__ == '__main__':` block — the centralized 

1452# dispatcher at integrations.service_tools.gpu_worker imports this 

1453# module and calls _load / _synthesize on spawn.