Coverage for integrations / service_tools / whisper_tool.py: 41.4%
497 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2STT tool — in-process speech-to-text.
4Engine priority (first available wins):
5 1. faster-whisper (CTranslate2) — preferred, 4x faster than openai-whisper,
6 multilingual, GPU+CPU, auto-downloads models from HuggingFace.
7 2. sherpa-onnx — lightweight ONNX alternative, no PyTorch dependency.
8 3. openai-whisper — legacy fallback (requires PyTorch).
10Model selection by hardware (via select_whisper_model):
11 - CPU, low RAM → tiny / moonshine-tiny (English, fastest)
12 - CPU, 4-8GB → base / whisper-tiny (multilingual)
13 - GPU, 2-5GB → small (multilingual)
14 - GPU, 5-10GB → medium (multilingual)
15 - GPU, 10+GB → large-v3 (multilingual, best accuracy)
17Models downloaded lazily on first use to ~/.hevolve/models/stt/
18100% local, zero cloud costs — Nunba is forever free.
19"""
21import json
22import logging
23import os
24import tarfile
25import urllib.request
26from pathlib import Path
27from typing import Optional
29from .registry import ServiceToolInfo, service_tool_registry
31logger = logging.getLogger(__name__)
33# ═══════════════════════════════════════════════════════════════
34# Model registry — sherpa-onnx model configurations
35# ═══════════════════════════════════════════════════════════════
37_SHERPA_MODEL_BASE = (
38 "https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models"
39)
41_SHERPA_MODELS = {
42 "moonshine-tiny": {
43 "type": "moonshine",
44 "archive": "sherpa-onnx-moonshine-tiny-en-int8.tar.bz2",
45 "dir": "sherpa-onnx-moonshine-tiny-en-int8",
46 "files": {
47 "preprocessor": "preprocess.onnx",
48 "encoder": "encode.int8.onnx",
49 "uncached_decoder": "uncached_decode.int8.onnx",
50 "cached_decoder": "cached_decode.int8.onnx",
51 "tokens": "tokens.txt",
52 },
53 "multilingual": False,
54 },
55 "moonshine-base": {
56 "type": "moonshine",
57 "archive": "sherpa-onnx-moonshine-base-en-int8.tar.bz2",
58 "dir": "sherpa-onnx-moonshine-base-en-int8",
59 "files": {
60 "preprocessor": "preprocess.onnx",
61 "encoder": "encode.int8.onnx",
62 "uncached_decoder": "uncached_decode.int8.onnx",
63 "cached_decoder": "cached_decode.int8.onnx",
64 "tokens": "tokens.txt",
65 },
66 "multilingual": False,
67 },
68 "whisper-tiny": {
69 "type": "whisper",
70 "archive": "sherpa-onnx-whisper-tiny.tar.bz2",
71 "dir": "sherpa-onnx-whisper-tiny",
72 "files": {
73 "encoder": "tiny-encoder.int8.onnx",
74 "decoder": "tiny-decoder.int8.onnx",
75 "tokens": "tiny-tokens.txt",
76 },
77 "multilingual": True,
78 },
79 "whisper-base": {
80 "type": "whisper",
81 "archive": "sherpa-onnx-whisper-base.tar.bz2",
82 "dir": "sherpa-onnx-whisper-base",
83 "files": {
84 "encoder": "base-encoder.int8.onnx",
85 "decoder": "base-decoder.int8.onnx",
86 "tokens": "base-tokens.txt",
87 },
88 "multilingual": True,
89 },
90 "whisper-small": {
91 "type": "whisper",
92 "archive": "sherpa-onnx-whisper-small.tar.bz2",
93 "dir": "sherpa-onnx-whisper-small",
94 "files": {
95 "encoder": "small-encoder.int8.onnx",
96 "decoder": "small-decoder.int8.onnx",
97 "tokens": "small-tokens.txt",
98 },
99 "multilingual": True,
100 },
101 "whisper-medium": {
102 "type": "whisper",
103 "archive": "sherpa-onnx-whisper-medium.tar.bz2",
104 "dir": "sherpa-onnx-whisper-medium",
105 "files": {
106 "encoder": "medium-encoder.int8.onnx",
107 "decoder": "medium-decoder.int8.onnx",
108 "tokens": "medium-tokens.txt",
109 },
110 "multilingual": True,
111 },
112}
114# ═══════════════════════════════════════════════════════════════
115# Cached recognizers (avoid reloading on every call)
116# ═══════════════════════════════════════════════════════════════
118_sherpa_recognizer = None
119_sherpa_model_name = None
121# Legacy openai-whisper fallback
122_whisper_model = None
123_whisper_model_name = None
125# faster-whisper (CTranslate2) — preferred engine
126_faster_whisper_model = None
127_faster_whisper_model_size = None
129# Backoff + circuit-breaker for the model-load retry storm.
130# Symptom #10 (Stage-A, 2026-04-16): frozen_debug.log showed ~2Hz
131# 'faster-whisper transcription failed: HF_HUB_OFFLINE' spam because
132# every realtime transcribe call re-hit the network. Now:
133# - backoff: 1s -> 2s -> 4s -> ... capped at 300s
134# - breaker: after 5 consecutive model-load failures, OPEN for 300s
135# The backoff is KEYED by model_size so changing size resets the timer.
136# The breaker is a process-wide single instance (one faster-whisper
137# model at a time in this module).
138try:
139 from core.circuit_breaker import CircuitBreaker, PeerBackoff
140 _whisper_load_backoff = PeerBackoff(initial=1.0, maximum=300.0)
141 _whisper_load_breaker = CircuitBreaker(
142 name='faster_whisper_load', threshold=5, cooldown=300.0,
143 )
144except ImportError: # dev tree without HARTOS core — defense-in-depth
145 _whisper_load_backoff = None
146 _whisper_load_breaker = None
148# Track the last user-visible error so /api/admin/stt/status can
149# surface it instead of the UI silently limping along. This is the
150# 'visible error to the user' half of the symptom #10 mandate.
151_whisper_last_error: Optional[str] = None
154def get_whisper_last_error() -> Optional[str]:
155 """Return the most recent faster-whisper failure reason, or None.
157 Exposed so the admin/status UI can show a concrete error to the
158 user instead of letting the retry storm accumulate silently in
159 the log. Cleared on the next successful load or transcribe.
160 """
161 return _whisper_last_error
164def _record_whisper_failure(reason: str) -> None:
165 """Record a whisper failure across backoff + breaker + last-error."""
166 global _whisper_last_error
167 _whisper_last_error = reason
168 if _whisper_load_breaker is not None:
169 _whisper_load_breaker.record_failure()
170 if _whisper_load_backoff is not None:
171 _whisper_load_backoff.record_failure('faster_whisper')
174def _record_whisper_success() -> None:
175 """Reset backoff + breaker + clear last-error on success."""
176 global _whisper_last_error
177 _whisper_last_error = None
178 if _whisper_load_breaker is not None:
179 _whisper_load_breaker.record_success()
180 if _whisper_load_backoff is not None:
181 _whisper_load_backoff.record_success('faster_whisper')
184# ═══════════════════════════════════════════════════════════════
185# faster-whisper (primary engine)
186# ═══════════════════════════════════════════════════════════════
188# Default faster-whisper model size. Can be overridden by the user via the
189# admin Model Management UI, which sets HEVOLVE_STT_MODEL_SIZE in the
190# orchestrator and then stops the worker so the next call respawns with
191# the new value picked up at subprocess startup.
192_FASTER_WHISPER_MODEL_SIZE = os.environ.get(
193 'HEVOLVE_STT_MODEL_SIZE', 'base',
194) # CPU int8 — preserves GPU VRAM for TTS/VLM
197def _get_faster_whisper_model(model_size: str = "base"):
198 """Lazy-load faster-whisper model (CTranslate2, auto-downloads from HuggingFace).
200 Device selection:
201 - NVIDIA GPU (CUDA): CTranslate2 uses its own CUDA runtime (not torch)
202 - AMD GPU: CTranslate2 doesn't support ROCm/Vulkan -> CPU fallback
203 - CPU: int8 quantization for speed
205 Retry semantics (Symptom #10, 2026-04-16):
206 - Backoff-gated: if the last load failed, refuses to retry until
207 the exponential backoff window elapses.
208 - Circuit-breaker-gated: after 5 consecutive failures, the
209 breaker OPENS for 300s; calls raise RuntimeError instead of
210 hitting the network on every streaming chunk.
211 - Raises on refusal so the caller can log ONCE and skip the
212 realtime path cleanly instead of absorbing the exception 2x
213 per second.
214 """
215 global _faster_whisper_model, _faster_whisper_model_size
216 if _faster_whisper_model is not None and _faster_whisper_model_size == model_size:
217 return _faster_whisper_model
219 # Gate 1: circuit breaker. If OPEN, refuse without even importing.
220 if _whisper_load_breaker is not None and _whisper_load_breaker.is_open():
221 raise RuntimeError(
222 f"faster-whisper circuit breaker OPEN (stats="
223 f"{_whisper_load_breaker.get_stats()}, last_error="
224 f"{_whisper_last_error!r})"
225 )
227 # Gate 2: exponential backoff keyed by model name. If recently
228 # failed, refuse until the window elapses. Caller gets a clear
229 # single log line per refusal, not 2Hz spam.
230 if (_whisper_load_backoff is not None
231 and _whisper_load_backoff.is_backed_off('faster_whisper')):
232 raise RuntimeError(
233 f"faster-whisper load is backed off (last_error="
234 f"{_whisper_last_error!r})"
235 )
237 try:
238 from faster_whisper import WhisperModel
239 except ImportError as e:
240 _record_whisper_failure(f"faster_whisper import failed: {e}")
241 raise
243 # Detect if CUDA is available for CTranslate2 (separate from torch CUDA)
244 device = "cpu"
245 compute_type = "int8"
246 try:
247 import ctranslate2
248 if 'cuda' in ctranslate2.get_supported_compute_types('cuda'):
249 device = "cuda"
250 compute_type = "float16"
251 logger.info("CTranslate2 CUDA available — loading faster-whisper on GPU")
252 except Exception:
253 pass
255 logger.info(f"Loading faster-whisper model '{model_size}' on {device} ({compute_type})...")
256 try:
257 _faster_whisper_model = WhisperModel(
258 model_size, device=device, compute_type=compute_type
259 )
260 except Exception as e:
261 reason = f"WhisperModel({model_size}, {device}, {compute_type}) failed: {e}"
262 logger.warning(reason)
263 _record_whisper_failure(reason)
264 raise
265 _faster_whisper_model_size = model_size
266 logger.info(f"faster-whisper model '{model_size}' loaded on {device}")
267 _record_whisper_success()
269 # Register with central lifecycle tracker via orchestrator
270 try:
271 from .model_orchestrator import get_orchestrator
272 get_orchestrator().notify_loaded('stt', f'whisper-{model_size}',
273 device=device, vram_gb=3.0 if device == 'cuda' else 0)
274 except Exception:
275 pass
277 return _faster_whisper_model
280def _faster_whisper_transcribe(audio_path: str, language: str = None) -> Optional[str]:
281 """Transcribe using faster-whisper. Returns JSON string or None on failure.
283 Symptom #10 guard (2026-04-16): each call is gated by the module
284 circuit breaker, so after N consecutive load failures we refuse
285 silently (one log every cooldown window, not 2Hz spam). The
286 backoff is shared with _get_faster_whisper_model so a model-load
287 refusal here shortcircuits the full transcribe path too.
288 """
289 # Fast-path refusal: if breaker is OPEN, skip the whole try block.
290 if _whisper_load_breaker is not None and _whisper_load_breaker.is_open():
291 return None
293 try:
294 model = _get_faster_whisper_model(_FASTER_WHISPER_MODEL_SIZE)
295 except Exception as e:
296 # _get_faster_whisper_model already records the failure +
297 # emits one warning. Don't re-log at 2Hz here.
298 logger.debug(f"faster-whisper unavailable: {e}")
299 return None
301 try:
302 kwargs = {"beam_size": 5}
303 if language:
304 kwargs["language"] = language
305 segments, info = model.transcribe(audio_path, **kwargs)
306 text = " ".join(seg.text for seg in segments).strip()
307 _record_whisper_success()
308 return json.dumps({
309 "text": text,
310 "language": info.language if info.language else "unknown",
311 })
312 except Exception as e:
313 reason = f"transcribe({audio_path}) failed: {e}"
314 logger.warning(f"faster-whisper transcription failed: {e}")
315 _record_whisper_failure(reason)
316 return None
319# ═══════════════════════════════════════════════════════════════
320# Model download (sherpa-onnx)
321# ═══════════════════════════════════════════════════════════════
323def _get_stt_dir() -> Path:
324 """Get the STT model storage directory."""
325 from .model_storage import model_storage
326 stt_dir = model_storage.get_tool_dir("stt")
327 stt_dir.mkdir(parents=True, exist_ok=True)
328 return stt_dir
331def _download_model(model_name: str) -> Path:
332 """Download and extract a sherpa-onnx model if not already present.
334 Returns the path to the extracted model directory.
335 """
336 cfg = _SHERPA_MODELS[model_name]
337 stt_dir = _get_stt_dir()
338 model_dir = stt_dir / cfg["dir"]
340 if model_dir.exists() and (model_dir / cfg["files"]["tokens"]).exists():
341 return model_dir
343 archive_url = f"{_SHERPA_MODEL_BASE}/{cfg['archive']}"
344 archive_path = stt_dir / cfg["archive"]
346 logger.info(f"Downloading STT model '{model_name}' from {archive_url}...")
347 try:
348 urllib.request.urlretrieve(archive_url, str(archive_path))
349 logger.info(f"Extracting {cfg['archive']}...")
350 with tarfile.open(str(archive_path), "r:bz2") as tar:
351 tar.extractall(path=str(stt_dir))
352 # Clean up archive
353 archive_path.unlink(missing_ok=True)
354 logger.info(f"STT model '{model_name}' ready at {model_dir}")
355 except Exception as e:
356 logger.error(f"Failed to download model '{model_name}': {e}")
357 archive_path.unlink(missing_ok=True)
358 raise
360 return model_dir
363# ═══════════════════════════════════════════════════════════════
364# sherpa-onnx recognizer creation
365# ═══════════════════════════════════════════════════════════════
367def _get_sherpa_recognizer(model_name: str = "whisper-tiny"):
368 """Create or return cached sherpa-onnx OfflineRecognizer."""
369 global _sherpa_recognizer, _sherpa_model_name
370 if _sherpa_recognizer is not None and _sherpa_model_name == model_name:
371 return _sherpa_recognizer
373 import sherpa_onnx
375 cfg = _SHERPA_MODELS[model_name]
376 model_dir = _download_model(model_name)
378 num_threads = min(os.cpu_count() or 2, 4)
380 if cfg["type"] == "moonshine":
381 _sherpa_recognizer = sherpa_onnx.OfflineRecognizer.from_moonshine(
382 preprocessor=str(model_dir / cfg["files"]["preprocessor"]),
383 encoder=str(model_dir / cfg["files"]["encoder"]),
384 uncached_decoder=str(model_dir / cfg["files"]["uncached_decoder"]),
385 cached_decoder=str(model_dir / cfg["files"]["cached_decoder"]),
386 tokens=str(model_dir / cfg["files"]["tokens"]),
387 num_threads=num_threads,
388 )
389 elif cfg["type"] == "whisper":
390 _sherpa_recognizer = sherpa_onnx.OfflineRecognizer.from_whisper(
391 encoder=str(model_dir / cfg["files"]["encoder"]),
392 decoder=str(model_dir / cfg["files"]["decoder"]),
393 tokens=str(model_dir / cfg["files"]["tokens"]),
394 num_threads=num_threads,
395 )
396 else:
397 raise ValueError(f"Unknown model type: {cfg['type']}")
399 _sherpa_model_name = model_name
400 logger.info(f"sherpa-onnx recognizer ready: {model_name}")
401 return _sherpa_recognizer
404def _sherpa_transcribe(audio_path: str, model_name: str) -> Optional[str]:
405 """Transcribe using sherpa-onnx. Returns JSON string or None on failure."""
406 try:
407 recognizer = _get_sherpa_recognizer(model_name)
408 stream = recognizer.create_stream()
409 stream.accept_wave_file(audio_path)
410 recognizer.decode_stream(stream)
411 text = stream.result.text.strip()
413 # Language: Moonshine is English-only, Whisper auto-detects
414 cfg = _SHERPA_MODELS.get(model_name, {})
415 lang = "en" if not cfg.get("multilingual") else "auto"
417 return json.dumps({"text": text, "language": lang})
418 except Exception as e:
419 logger.warning(f"sherpa-onnx transcription failed ({model_name}): {e}")
420 return None
423# ═══════════════════════════════════════════════════════════════
424# Legacy openai-whisper fallback
425# ═══════════════════════════════════════════════════════════════
427def _get_whisper_model(model_name: str = "base"):
428 """Lazy-load openai-whisper model (fallback if sherpa-onnx unavailable)."""
429 global _whisper_model, _whisper_model_name
430 if _whisper_model is not None and _whisper_model_name == model_name:
431 return _whisper_model
433 import whisper
435 from .model_storage import model_storage
436 model_dir = model_storage.get_tool_dir("whisper")
437 model_dir.mkdir(parents=True, exist_ok=True)
439 os.environ.setdefault("XDG_CACHE_HOME", str(model_dir.parent))
440 logger.info(f"Loading openai-whisper model '{model_name}' (fallback)...")
441 _whisper_model = whisper.load_model(model_name, download_root=str(model_dir))
442 _whisper_model_name = model_name
443 logger.info(f"openai-whisper model '{model_name}' loaded")
444 return _whisper_model
447def _legacy_transcribe(audio_path: str, language: str = None) -> Optional[str]:
448 """Transcribe using openai-whisper (fallback). Returns JSON string or None."""
449 try:
450 model_name = _select_legacy_model()
451 model = _get_whisper_model(model_name)
452 kwargs = {}
453 if language:
454 kwargs["language"] = language
455 result = model.transcribe(audio_path, **kwargs)
456 return json.dumps({
457 "text": result["text"].strip(),
458 "language": result.get("language", "unknown"),
459 })
460 except ImportError:
461 return None
462 except Exception as e:
463 logger.warning(f"openai-whisper fallback failed: {e}")
464 return None
467def _select_legacy_model() -> str:
468 """Select openai-whisper model by VRAM (legacy path)."""
469 try:
470 from .vram_manager import vram_manager
471 gpu = vram_manager.detect_gpu()
472 if not gpu["cuda_available"]:
473 return "base"
474 free = vram_manager.get_free_vram()
475 if free >= 10:
476 return "large-v3"
477 elif free >= 5:
478 return "medium"
479 elif free >= 2:
480 return "small"
481 except Exception:
482 pass
483 return "base"
486# ═══════════════════════════════════════════════════════════════
487# Public API (same interface for all callers)
488# ═══════════════════════════════════════════════════════════════
490def populate_stt_catalog(catalog) -> int:
491 """Register all STT model variants into the ModelCatalog.
493 Called by ModelCatalog._populate_stt_models() so the catalog is the
494 single source of truth for model names, VRAM requirements, and tier gates.
495 Replaces the hardcoded VRAM thresholds in select_whisper_model().
497 Returns number of new entries added.
498 """
499 from integrations.service_tools.model_catalog import ModelEntry, ModelType
501 # (id, name, vram_gb, ram_gb, disk_gb, quality, speed, tags, min_tier)
502 models = [
503 # faster-whisper (primary engine, CTranslate2 INT8)
504 ('stt-faster-whisper-tiny', 'Whisper Tiny (faster-whisper)', 0.0, 0.3, 0.15,
505 0.60, 0.98, ['multilingual', 'cpu-friendly', 'faster-whisper'], 'lite'),
506 ('stt-faster-whisper-base', 'Whisper Base (faster-whisper)', 0.2, 0.5, 0.30,
507 0.72, 0.95, ['multilingual', 'cpu-friendly', 'faster-whisper'], 'lite'),
508 ('stt-faster-whisper-small', 'Whisper Small (faster-whisper)', 0.5, 1.0, 0.46,
509 0.80, 0.85, ['multilingual', 'faster-whisper'], 'lite'),
510 ('stt-faster-whisper-medium', 'Whisper Medium (faster-whisper)', 1.5, 2.0, 1.50,
511 0.87, 0.72, ['multilingual', 'faster-whisper'], 'standard'),
512 ('stt-faster-whisper-large', 'Whisper Large v3 (faster-whisper)', 3.0, 4.0, 3.10,
513 0.94, 0.55, ['multilingual', 'faster-whisper'], 'full'),
514 # sherpa-onnx (lightweight ONNX, no PyTorch)
515 ('stt-sherpa-moonshine-tiny', 'Moonshine Tiny (sherpa-onnx, EN)', 0.0, 0.2, 0.08,
516 0.62, 0.99, ['english-only', 'onnx', 'sherpa-onnx', 'cpu-friendly'], 'lite'),
517 ('stt-sherpa-moonshine-base', 'Moonshine Base (sherpa-onnx, EN)', 0.0, 0.3, 0.15,
518 0.68, 0.96, ['english-only', 'onnx', 'sherpa-onnx', 'cpu-friendly'], 'lite'),
519 ('stt-sherpa-whisper-tiny', 'Whisper Tiny (sherpa-onnx)', 0.0, 0.3, 0.15,
520 0.61, 0.97, ['multilingual', 'onnx', 'sherpa-onnx', 'cpu-friendly'], 'lite'),
521 ('stt-sherpa-whisper-base', 'Whisper Base (sherpa-onnx)', 0.0, 0.4, 0.30,
522 0.72, 0.92, ['multilingual', 'onnx', 'sherpa-onnx'], 'lite'),
523 ('stt-sherpa-whisper-small', 'Whisper Small (sherpa-onnx)', 0.0, 0.7, 0.46,
524 0.79, 0.80, ['multilingual', 'onnx', 'sherpa-onnx'], 'lite'),
525 ('stt-sherpa-whisper-medium', 'Whisper Medium (sherpa-onnx)', 0.0, 1.5, 1.50,
526 0.86, 0.65, ['multilingual', 'onnx', 'sherpa-onnx'], 'standard'),
527 ]
529 added = 0
530 for (mid, name, vram, ram, disk, quality, speed, tags, min_tier) in models:
531 if catalog.get(mid) is not None:
532 continue
533 entry = ModelEntry(
534 id=mid, name=name, model_type=ModelType.STT,
535 source='github' if 'sherpa' in mid else 'huggingface',
536 vram_gb=vram, ram_gb=ram, disk_gb=disk,
537 min_capability_tier=min_tier,
538 backend='onnx' if 'sherpa' in mid else 'torch',
539 supports_gpu=(vram > 0), supports_cpu=True,
540 supports_cpu_offload=False,
541 idle_timeout_s=300,
542 capabilities={
543 'realtime': True,
544 'diarization': False,
545 'multilingual': ('multilingual' in tags),
546 },
547 quality_score=quality, speed_score=speed,
548 languages=['multilingual'] if 'multilingual' in tags else ['en'],
549 tags=tags,
550 )
551 catalog.register(entry, persist=False)
552 added += 1
553 return added
556# ── Catalog-aware model name → sherpa-onnx key mapping ─────────────────────
557_CATALOG_ID_TO_SHERPA = {
558 'stt-sherpa-moonshine-tiny': 'moonshine-tiny',
559 'stt-sherpa-moonshine-base': 'moonshine-base',
560 'stt-sherpa-whisper-tiny': 'whisper-tiny',
561 'stt-sherpa-whisper-base': 'whisper-base',
562 'stt-sherpa-whisper-small': 'whisper-small',
563 'stt-sherpa-whisper-medium': 'whisper-medium',
564}
566_CATALOG_ID_TO_FASTER_WHISPER_SIZE = {
567 'stt-faster-whisper-tiny': 'tiny',
568 'stt-faster-whisper-base': 'base',
569 'stt-faster-whisper-small': 'small',
570 'stt-faster-whisper-medium': 'medium',
571 'stt-faster-whisper-large': 'large-v3',
572}
575def select_whisper_model() -> str:
576 """Select best STT model for this hardware.
578 Tries ModelCatalog first (single source of truth for VRAM thresholds).
579 Falls back to direct VRAM query if catalog is unavailable.
581 Returns a sherpa-onnx model key (from _SHERPA_MODELS) when sherpa-onnx
582 is available, or an openai-whisper model name as a legacy fallback.
583 """
584 # ── Primary path: ask the catalog ───────────────────────────────────────
585 try:
586 from integrations.service_tools.model_orchestrator import get_orchestrator
587 orch = get_orchestrator()
588 entry = orch.select_best('stt')
589 if entry:
590 # Map catalog entry ID back to the engine-specific key
591 sherpa_key = _CATALOG_ID_TO_SHERPA.get(entry.id)
592 if sherpa_key and sherpa_key in _SHERPA_MODELS:
593 try:
594 import sherpa_onnx # noqa: F401
595 return sherpa_key
596 except ImportError:
597 pass
598 # faster-whisper size
599 fw_size = _CATALOG_ID_TO_FASTER_WHISPER_SIZE.get(entry.id)
600 if fw_size:
601 return fw_size
602 except Exception:
603 pass
605 # ── Fallback: direct VRAM query (no catalog dependency) ─────────────────
606 try:
607 import sherpa_onnx # noqa: F401 — check availability
608 except ImportError:
609 return _select_legacy_model()
611 from .vram_manager import vram_manager
612 gpu = vram_manager.detect_gpu()
614 if gpu["cuda_available"]:
615 free = vram_manager.get_free_vram()
616 if free >= 5:
617 return "whisper-medium"
618 elif free >= 2:
619 return "whisper-small"
620 else:
621 return "whisper-base"
622 else:
623 # CPU-only: prefer Moonshine (fastest) for English,
624 # Whisper tiny for multilingual
625 # Caller can override with language hint
626 return "moonshine-tiny"
629# ═══════════════════════════════════════════════════════════════
630# Subprocess isolation
631# ═══════════════════════════════════════════════════════════════
632#
633# STT engines (faster-whisper / sherpa-onnx / openai-whisper) all have
634# native C runtimes that can crash the parent process on CUDA OOM,
635# DLL conflicts, or audio decoder edge cases. Running them in a worker
636# subprocess contains those crashes: the worker dies, the parent catches
637# the exit code and returns an error JSON without bringing Nunba down.
639from .gpu_worker import ToolWorker # noqa: E402
641_stt_tool = ToolWorker(
642 tool_name='whisper',
643 tool_module='integrations.service_tools.whisper_tool',
644 vram_budget='whisper_base',
645 output_subdir='stt', # not used — STT doesn't generate files
646 engine='whisper',
647 startup_timeout=60.0,
648 request_timeout=180.0, # long audio files can take a while
649 idle_timeout=300.0, # free the model after 5 min idle
650)
653def _transcribe_impl(audio_path: str, language: str = None) -> str:
654 """Transcribe audio — runs inside the worker subprocess.
656 Engine priority: faster-whisper → sherpa-onnx → openai-whisper.
658 Returns JSON string with 'text' and 'language' keys.
659 """
660 # 1. Try faster-whisper (preferred — CTranslate2, 4x faster, multilingual)
661 try:
662 import faster_whisper # noqa: F401
663 result = _faster_whisper_transcribe(audio_path, language)
664 if result:
665 return result
666 except ImportError:
667 pass
668 except Exception as e:
669 logger.warning(f"faster-whisper failed, trying fallback: {e}")
671 # 2. Try sherpa-onnx (lightweight ONNX, no PyTorch)
672 try:
673 import sherpa_onnx # noqa: F401
675 model_name = select_whisper_model()
677 # If a non-English language is explicitly requested and the selected
678 # model is English-only (Moonshine), switch to multilingual Whisper
679 cfg = _SHERPA_MODELS.get(model_name, {})
680 if language and language != "en" and not cfg.get("multilingual"):
681 model_name = "whisper-tiny"
683 result = _sherpa_transcribe(audio_path, model_name)
684 if result:
685 return result
686 except ImportError:
687 pass
688 except Exception as e:
689 logger.warning(f"sherpa-onnx failed, trying openai-whisper: {e}")
691 # 3. Fallback: openai-whisper (PyTorch) — the risky path
692 result = _legacy_transcribe(audio_path, language)
693 if result:
694 return result
696 return json.dumps({"error": "No STT engine available (install faster-whisper)"})
699def whisper_transcribe(audio_path: str, language: str = None) -> str:
700 """Transcribe audio file to text (subprocess-isolated).
702 Runs the STT engine chain in a dedicated worker subprocess so that
703 CUDA OOM / CTranslate2 crashes / PyTorch DLL segfaults can't kill
704 the parent process.
706 Args:
707 audio_path: Path to audio file (WAV, MP3, WebM, etc.)
708 language: Optional language code. Auto-detect if None.
710 Returns:
711 JSON string with 'text' and 'language' keys.
712 """
713 result = _stt_tool.call({
714 'op': 'transcribe',
715 'audio_path': audio_path,
716 'language': language,
717 })
718 if 'error' in result:
719 return json.dumps(result)
720 return result.get('raw_json', json.dumps(result))
723def _detect_language_impl(audio_path: str) -> str:
724 """Language detection — runs inside the worker subprocess.
726 Returns JSON string with 'language' and 'probability' keys.
727 """
728 # Try faster-whisper first (has built-in language detection)
729 try:
730 from faster_whisper import WhisperModel # noqa: F401
731 model = _get_faster_whisper_model(_FASTER_WHISPER_MODEL_SIZE)
732 _, info = model.transcribe(audio_path, beam_size=1)
733 return json.dumps({
734 "language": info.language if info.language else "unknown",
735 "probability": round(info.language_probability, 4) if info.language_probability else 0.0,
736 })
737 except ImportError:
738 pass
739 except Exception as e:
740 logger.debug(f"faster-whisper language detection failed: {e}")
742 try:
743 import whisper
744 model = _get_whisper_model()
745 audio = whisper.load_audio(audio_path)
746 audio = whisper.pad_or_trim(audio)
747 mel = whisper.log_mel_spectrogram(audio).to(model.device)
748 _, probs = model.detect_language(mel)
749 lang = max(probs, key=probs.get)
750 return json.dumps({
751 "language": lang,
752 "probability": round(probs[lang], 4),
753 })
754 except ImportError:
755 # No openai-whisper — transcribe with multilingual Whisper and infer
756 try:
757 import sherpa_onnx # noqa: F401
758 result = _sherpa_transcribe(audio_path, "whisper-tiny")
759 if result:
760 parsed = json.loads(result)
761 return json.dumps({
762 "language": parsed.get("language", "unknown"),
763 "probability": 0.8,
764 })
765 except Exception:
766 pass
767 return json.dumps({"error": "Language detection unavailable"})
768 except Exception as e:
769 return json.dumps({"error": f"Language detection failed: {e}"})
772def whisper_detect_language(audio_path: str) -> str:
773 """Detect the language of an audio file (subprocess-isolated)."""
774 result = _stt_tool.call({
775 'op': 'detect_language',
776 'audio_path': audio_path,
777 })
778 if 'error' in result:
779 return json.dumps(result)
780 return result.get('raw_json', json.dumps(result))
783def unload_whisper():
784 """Unload all STT models to free memory.
786 The actual models live inside the `_stt_tool` subprocess (see the
787 SUBPROCESS ISOLATION section above), so the authoritative unload is
788 stopping that worker. We ALSO clear the parent-side globals in case
789 a worker-side reference leaked into the parent module state during
790 an import.
791 """
792 # 1. Stop the worker subprocess — this is the real VRAM release.
793 try:
794 _stt_tool.stop()
795 except Exception as e:
796 logger.warning(f"failed to stop STT worker: {e}")
798 # 2. Also clear parent-side caches (only ever populated in the worker,
799 # but defensive in case something called a legacy helper in-process).
800 global _sherpa_recognizer, _sherpa_model_name
801 global _whisper_model, _whisper_model_name
802 global _faster_whisper_model, _faster_whisper_model_size
804 _faster_whisper_model = None
805 _faster_whisper_model_size = None
806 _sherpa_recognizer = None
807 _sherpa_model_name = None
808 _whisper_model = None
809 _whisper_model_name = None
811 from .vram_manager import clear_cuda_cache
812 clear_cuda_cache()
813 logger.info("STT models unloaded")
816# ═══════════════════════════════════════════════════════════════
817# Streaming STT WebSocket Server (faster-whisper with VAD)
818#
819# Pattern: same as diarization_server.py — standalone asyncio WebSocket
820# server started as a daemon thread by DiarizationService-style manager.
821#
822# Protocol:
823# Client → Server: binary PCM16 audio chunks (16kHz mono) OR
824# binary WebM/Opus blobs (auto-detected, converted via ffmpeg)
825# Server → Client: JSON {"text": "...", "language": "en", "is_final": true/false}
826#
827# The server accumulates audio in a per-connection buffer. When VAD detects
828# a speech pause (or buffer exceeds 30s), it transcribes the buffer with
829# faster-whisper and sends back the result. Partial results are sent
830# every 2s of accumulated audio for low-latency interim display.
831# ═══════════════════════════════════════════════════════════════
833_stt_ws_server = None
834_stt_ws_port = None
836STREAM_SAMPLE_RATE = 16000
837STREAM_BYTES_PER_SAMPLE = 2
838STREAM_CHANNELS = 1
839# Transcribe every 2s of audio for interim results
840STREAM_CHUNK_SECONDS = 2
841STREAM_CHUNK_BYTES = STREAM_SAMPLE_RATE * STREAM_BYTES_PER_SAMPLE * STREAM_CHANNELS * STREAM_CHUNK_SECONDS
842# Max buffer before forced transcription (30s)
843STREAM_MAX_BUFFER_BYTES = STREAM_SAMPLE_RATE * STREAM_BYTES_PER_SAMPLE * STREAM_CHANNELS * 30
846def _ws_path(websocket) -> str:
847 """Best-effort URL-path extractor across websockets lib versions.
849 websockets 11+ moved the request to ``websocket.request.path``;
850 earlier versions exposed ``websocket.path`` directly. We probe
851 both and fall back to empty string when neither is available.
852 Never raises.
853 """
854 for getter in (
855 lambda ws: ws.request.path,
856 lambda ws: ws.path,
857 ):
858 try:
859 val = getter(websocket)
860 if isinstance(val, str):
861 return val
862 except Exception:
863 continue
864 return ''
867def _parse_call_context(ws_path: str) -> tuple:
868 """Parse ``?call_id=<id>&user_id=<u>`` from a WS request path.
870 UNIF-G7 / W1.7 Producer C — the RN mic stream (and any other
871 browser/mobile audio source) opens the streaming-STT WebSocket
872 with these query params attached when the audio belongs to a
873 voice room. Without the params, behavior is unchanged (today's
874 one-shot transcription clients still work).
876 Returns ``(call_id, user_id)`` — either may be ``None``. Never
877 raises.
878 """
879 if not ws_path:
880 return (None, None)
881 from urllib.parse import urlparse, parse_qs
882 try:
883 parsed = urlparse(ws_path)
884 qs = parse_qs(parsed.query or '')
885 call_id = (qs.get('call_id') or [None])[0]
886 user_id = (qs.get('user_id') or [None])[0]
887 return (call_id or None, user_id or None)
888 except Exception:
889 return (None, None)
892def _maybe_enqueue_call_segment(
893 call_id: Optional[str],
894 user_id: Optional[str],
895 text: str,
896 lang: str,
897 is_final: bool,
898) -> None:
899 """If the WS client opted in via ``?call_id=`` AND this is a final
900 segment, land it in the canonical per-call queue so the
901 AgentBridgeWorker can drain it (UNIF-G3 / W1.2 consumer).
903 Single canonical writer for browser/mobile-mic-driven transcripts —
904 same sink as the future Discord-voice-recv (Producer A) and
905 LiveKit-RTC (Producer B) audio paths. No parallel queue.
907 Best-effort: never raises out of the WS handler hot path.
908 """
909 if not call_id or not is_final or not text:
910 return
911 try:
912 enqueue_stt_segment(call_id, {
913 'text': text,
914 'lang': lang,
915 'author_id': user_id or 'unknown',
916 'is_final': True,
917 # t0/t1/speaker stay None — RN mic stream is single-speaker
918 # by definition (the user typing into the SPA); future
919 # multi-speaker producers will set speaker.
920 })
921 except Exception as e:
922 logger.debug(
923 "whisper_tool._maybe_enqueue_call_segment failed "
924 "(call=%s): %s", call_id, e)
927async def _stt_stream_handler(websocket):
928 """Handle a single streaming STT WebSocket connection.
930 Accepts:
931 - Raw PCM16 16kHz mono binary frames
932 - WebM/Opus blobs (auto-converted to PCM via temp file + faster-whisper)
933 - JSON {"control": "reset"} to clear buffer
934 - JSON {"control": "final"} to force final transcription
936 Sends back:
937 - {"text": "...", "language": "en", "is_final": false} for interim
938 - {"text": "...", "language": "en", "is_final": true} for final (pause detected)
940 Optional UNIF-G7 hook (Producer C):
941 The connection URL MAY include ``?call_id=<id>&user_id=<u>``.
942 When present, every final segment is ALSO landed in the per-call
943 STT queue (whisper_tool.enqueue_stt_segment) so the
944 AgentBridgeWorker can drain it and emit the meet_copilot card.
945 Absence of the params preserves today's behavior exactly — RN
946 one-shot transcription clients are unaffected.
948 CRASH ISOLATION:
949 - Model crashes are isolated: _transcribe_buffer routes through
950 _stt_tool.call() which handles subprocess crashes and returns
951 empty strings on failure.
952 - Protocol-level code (websocket frames, VAD, buffering) runs
953 in the daemon thread with an outer try/except (below) so
954 Python exceptions are caught and the connection closes cleanly.
955 - Remaining risks are C-level crashes in the websockets library
956 or audio decoders — low-probability and would require moving
957 the entire server into a subprocess with per-frame IPC, which
958 costs realtime latency. Deferred until evidence of actual crashes.
959 """
960 import io
961 import tempfile
962 import numpy as np
964 audio_buffer = io.BytesIO()
965 last_transcribe_size = 0
966 # UNIF-G7 Producer C: extract optional call context from the
967 # WS request path. When absent (call_id is None), the
968 # _maybe_enqueue_call_segment helper degrades to a no-op so plain
969 # transcription clients see ZERO behavior change.
970 call_id, user_id = _parse_call_context(_ws_path(websocket))
972 try:
973 async for message in websocket:
974 # Control messages (JSON)
975 if isinstance(message, str):
976 try:
977 ctrl = json.loads(message)
978 if ctrl.get('control') == 'reset':
979 audio_buffer = io.BytesIO()
980 last_transcribe_size = 0
981 continue
982 if ctrl.get('control') == 'final':
983 # Force final transcription of remaining buffer
984 text, lang = _transcribe_buffer(audio_buffer)
985 if text:
986 await websocket.send(json.dumps({
987 'text': text, 'language': lang, 'is_final': True,
988 }))
989 _maybe_enqueue_call_segment(
990 call_id, user_id, text, lang, True)
991 audio_buffer = io.BytesIO()
992 last_transcribe_size = 0
993 continue
994 except (json.JSONDecodeError, ValueError):
995 pass
996 continue
998 # Binary audio data
999 if not isinstance(message, (bytes, bytearray)):
1000 continue
1002 # Detect format: WebM/Opus starts with 0x1A45DFA3 (EBML header)
1003 # or "OggS" (Ogg container). Raw PCM has no header.
1004 is_container = (
1005 message[:4] == b'\x1a\x45\xdf\xa3' or # WebM/Matroska
1006 message[:4] == b'OggS' or # Ogg/Opus
1007 message[:4] == b'RIFF' # WAV
1008 )
1010 if is_container:
1011 # Save to temp file, let faster-whisper handle decoding
1012 pcm_bytes = _container_to_pcm(message)
1013 if pcm_bytes:
1014 audio_buffer.write(pcm_bytes)
1015 else:
1016 # Raw PCM16 mono 16kHz
1017 audio_buffer.write(message)
1019 buf_size = audio_buffer.getbuffer().nbytes
1021 # Force transcription if buffer exceeds max
1022 if buf_size >= STREAM_MAX_BUFFER_BYTES:
1023 text, lang = _transcribe_buffer(audio_buffer)
1024 if text:
1025 await websocket.send(json.dumps({
1026 'text': text, 'language': lang, 'is_final': True,
1027 }))
1028 _maybe_enqueue_call_segment(
1029 call_id, user_id, text, lang, True)
1030 audio_buffer = io.BytesIO()
1031 last_transcribe_size = 0
1032 continue
1034 # Interim transcription every STREAM_CHUNK_BYTES
1035 if buf_size - last_transcribe_size >= STREAM_CHUNK_BYTES:
1036 text, lang = _transcribe_buffer(audio_buffer, keep_buffer=True)
1037 last_transcribe_size = buf_size
1038 if text:
1039 await websocket.send(json.dumps({
1040 'text': text, 'language': lang, 'is_final': False,
1041 }))
1043 except Exception as e:
1044 logger.debug(f"STT stream connection ended: {e}")
1047def _container_to_pcm(data: bytes) -> Optional[bytes]:
1048 """Convert WebM/Opus/WAV container to raw PCM16 16kHz mono via temp file.
1050 faster-whisper can read any ffmpeg-supported format, so we save to a
1051 temp file, transcribe, and extract the raw audio. But for streaming we
1052 need raw PCM — use ffmpeg subprocess if available, else return raw bytes
1053 and let faster-whisper handle it at transcribe time.
1054 """
1055 import subprocess as _sp
1056 import tempfile
1058 tmp_in = None
1059 tmp_out = None
1060 try:
1061 tmp_in = tempfile.NamedTemporaryFile(suffix='.webm', delete=False)
1062 tmp_in.write(data)
1063 tmp_in.close()
1065 tmp_out = tempfile.NamedTemporaryFile(suffix='.pcm', delete=False)
1066 tmp_out.close()
1068 _kw = dict(capture_output=True, timeout=10)
1069 if hasattr(_sp, 'CREATE_NO_WINDOW'):
1070 _kw['creationflags'] = _sp.CREATE_NO_WINDOW
1072 result = _sp.run([
1073 'ffmpeg', '-y', '-i', tmp_in.name,
1074 '-ar', str(STREAM_SAMPLE_RATE), '-ac', '1', '-f', 's16le',
1075 tmp_out.name,
1076 ], **_kw)
1078 if result.returncode == 0:
1079 with open(tmp_out.name, 'rb') as f:
1080 return f.read()
1081 except FileNotFoundError:
1082 # ffmpeg not available — save raw container bytes,
1083 # _transcribe_buffer will write to temp file for faster-whisper
1084 return data
1085 except Exception as e:
1086 logger.debug(f"PCM conversion failed: {e}")
1087 finally:
1088 for p in (tmp_in, tmp_out):
1089 if p:
1090 try:
1091 os.unlink(p.name)
1092 except Exception:
1093 pass
1094 return None
1097def _transcribe_buffer(audio_buffer, keep_buffer: bool = False) -> tuple:
1098 """Transcribe accumulated audio buffer via the subprocess STT worker.
1100 Returns (text, language) tuple.
1102 Runs through `_stt_tool` so CUDA OOM or faster-whisper/CTranslate2
1103 crashes on the realtime path only kill the worker subprocess — the
1104 streaming WebSocket server (and the whole Nunba process) stay alive.
1105 """
1106 import tempfile
1108 buf_bytes = audio_buffer.getvalue()
1109 if len(buf_bytes) < STREAM_SAMPLE_RATE * 2: # < 1s of audio, skip
1110 return ('', 'unknown')
1112 if not keep_buffer:
1113 audio_buffer.seek(0)
1114 audio_buffer.truncate(0)
1116 # Write PCM to a temp WAV file — the subprocess worker reads by
1117 # path (simpler than shipping raw bytes over JSON). For realtime
1118 # workloads this is still cheap (local disk, a few KB per chunk).
1119 tmp = None
1120 try:
1121 import wave
1122 tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
1123 with wave.open(tmp.name, 'wb') as wf:
1124 wf.setnchannels(STREAM_CHANNELS)
1125 wf.setsampwidth(STREAM_BYTES_PER_SAMPLE)
1126 wf.setframerate(STREAM_SAMPLE_RATE)
1127 wf.writeframes(buf_bytes)
1128 tmp.close()
1130 result = _stt_tool.call({
1131 'op': 'transcribe',
1132 'audio_path': tmp.name,
1133 'language': None,
1134 })
1135 if 'error' in result and not result.get('raw_json'):
1136 logger.debug(f"Streaming transcribe failed: {result.get('error')}")
1137 return ('', 'unknown')
1138 raw = result.get('raw_json') or json.dumps(result)
1139 try:
1140 parsed = json.loads(raw)
1141 return (parsed.get('text', ''), parsed.get('language', 'unknown'))
1142 except json.JSONDecodeError:
1143 return ('', 'unknown')
1144 except Exception as e:
1145 logger.debug(f"Streaming transcribe failed: {e}")
1146 return ('', 'unknown')
1147 finally:
1148 if tmp is not None:
1149 try:
1150 os.unlink(tmp.name)
1151 except Exception:
1152 pass
1155def start_stt_stream_server(port: int = 0) -> Optional[int]:
1156 """Start the streaming STT WebSocket server in a daemon thread.
1158 Same pattern as DiarizationService — asyncio event loop in a thread.
1160 Args:
1161 port: Port to bind (0 = auto-select from port registry or dynamic)
1163 Returns:
1164 Actual port number, or None if failed.
1165 """
1166 global _stt_ws_server, _stt_ws_port
1168 if _stt_ws_port is not None:
1169 return _stt_ws_port # already running
1171 if port == 0:
1172 try:
1173 from core.port_registry import get_port
1174 port = get_port('stt_stream')
1175 except Exception:
1176 port = 8005 # default fallback
1178 import asyncio
1179 import threading
1181 def _run_server():
1182 global _stt_ws_server, _stt_ws_port
1183 loop = asyncio.new_event_loop()
1184 asyncio.set_event_loop(loop)
1186 try:
1187 import websockets
1189 async def _serve():
1190 global _stt_ws_server, _stt_ws_port
1191 server = await websockets.serve(
1192 _stt_stream_handler, '127.0.0.1', port,
1193 max_size=2 * 1024 * 1024, # 2MB max message (30s audio ~960KB)
1194 )
1195 actual_port = port
1196 if server.sockets:
1197 actual_port = server.sockets[0].getsockname()[1]
1198 _stt_ws_server = server
1199 _stt_ws_port = actual_port
1200 logger.info(f"Streaming STT WebSocket server on ws://127.0.0.1:{actual_port}")
1201 await asyncio.Future() # run forever
1203 loop.run_until_complete(_serve())
1204 except Exception as e:
1205 logger.error(f"STT stream server failed: {e}")
1206 _stt_ws_port = None
1208 thread = threading.Thread(target=_run_server, daemon=True, name='stt-stream-ws')
1209 thread.start()
1211 # Wait for port to be assigned
1212 import time
1213 for _ in range(30):
1214 if _stt_ws_port is not None:
1215 return _stt_ws_port
1216 time.sleep(0.1)
1218 logger.warning("STT stream server did not start within 3s")
1219 return None
1222def get_stt_stream_port() -> Optional[int]:
1223 """Get the port of the running streaming STT WebSocket server."""
1224 return _stt_ws_port
1227# ═══════════════════════════════════════════════════════════════
1228# Per-call STT segment queue (UNIF-G3 / W1.2)
1229# ═══════════════════════════════════════════════════════════════
1230#
1231# When a call has subscribers (LiveKit room, Discord voice channel,
1232# Teams meet, etc.), an audio-frame producer feeds frames through the
1233# streaming STT WebSocket above and receives ``{text, language,
1234# is_final}`` events back. ``enqueue_stt_segment`` is the canonical
1235# place to land FINAL segments so the AgentBridgeWorker (which doesn't
1236# care which adapter produced the audio) can drain them via
1237# ``dequeue_segments`` from its tick loop.
1238#
1239# This is the single canonical home for STT-segment buffering — every
1240# audio-source adapter (LiveKit subscriber, Discord voice receiver,
1241# RN mic stream) lands segments here; every consumer (agent_voice_bridge
1242# worker, transcript recorder) reads here. No parallel queues.
1244import threading
1245from collections import deque
1246from typing import Any, Dict, List, Tuple
1248# Per-call queues of (segment_id, segment_dict) tuples. Bounded by the
1249# bridge worker drain cadence (~250ms) so unbounded growth is a bug
1250# elsewhere; we still keep a soft cap to defend against producer leaks.
1251_STT_SEGMENT_QUEUE: Dict[str, deque] = {}
1252_STT_SEGMENT_NEXT_ID: Dict[str, int] = {}
1253_STT_SEGMENT_LOCK = threading.Lock()
1254_STT_SEGMENT_CAP_PER_CALL = 1024 # segments; older are evicted with WARN
1257def enqueue_stt_segment(call_id: str, segment: Dict[str, Any]) -> int:
1258 """Append a final STT segment for ``call_id``.
1260 Producer-side: any audio-adapter that has decoded a final transcript
1261 chunk calls this. ``segment`` SHOULD include:
1262 - ``text`` : transcript text
1263 - ``lang`` : detected language (BCP-47-ish)
1264 - ``t0``,``t1``: float seconds (segment span on the call timeline)
1265 - ``speaker`` : optional speaker id / name (None ⇒ unknown)
1266 - ``author_id``: caller-supplied participant identifier — used by
1267 the consumer to skip self-authored segments
1268 - ``is_final``: optional, defaults True on enqueue (interim
1269 segments don't belong here)
1271 Returns the assigned segment_id (monotonic int per call) so the
1272 producer can correlate downstream events.
1274 Best-effort: never raises. Caller's ``call_id`` is required.
1275 """
1276 if not call_id:
1277 return -1
1278 seg = dict(segment or {})
1279 seg.setdefault('is_final', True)
1280 with _STT_SEGMENT_LOCK:
1281 next_id = _STT_SEGMENT_NEXT_ID.get(call_id, 0) + 1
1282 _STT_SEGMENT_NEXT_ID[call_id] = next_id
1283 seg['segment_id'] = next_id
1284 q = _STT_SEGMENT_QUEUE.setdefault(call_id, deque())
1285 q.append((next_id, seg))
1286 # Evict oldest if soft cap exceeded — defends against a leaked
1287 # producer that never has its consumer attach. Real flows
1288 # drain at 250ms cadence so this should never fire.
1289 while len(q) > _STT_SEGMENT_CAP_PER_CALL:
1290 _evicted = q.popleft()
1291 logger.warning(
1292 "whisper_tool.enqueue_stt_segment: call=%s queue cap "
1293 "%d exceeded; evicted seg_id=%s",
1294 call_id, _STT_SEGMENT_CAP_PER_CALL, _evicted[0])
1295 return next_id
1298def dequeue_segments(
1299 call_id: str,
1300 since: int | None = None,
1301) -> List[Dict[str, Any]]:
1302 """Drain final STT segments for ``call_id`` newer than ``since``.
1304 Consumer-side: the agent_voice_bridge worker calls this on each
1305 tick. Returns segments in arrival order (FIFO). Each returned
1306 dict carries the ``segment_id`` the producer was given so the
1307 caller can update its ``since`` watermark for the next tick.
1309 ``since=None`` returns all queued segments and resets the queue
1310 for that call. ``since=N`` returns only segments with id > N
1311 AND prunes those segments from the queue.
1313 Best-effort: missing call_id → ``[]``.
1314 """
1315 if not call_id:
1316 return []
1317 with _STT_SEGMENT_LOCK:
1318 q = _STT_SEGMENT_QUEUE.get(call_id)
1319 if not q:
1320 return []
1321 if since is None:
1322 drained = [seg for (_sid, seg) in q]
1323 q.clear()
1324 return drained
1325 # Prune everything ≤ since, return everything > since.
1326 drained: List[Dict[str, Any]] = []
1327 # Iterate from left; any item with id ≤ since is consumed but
1328 # not returned (already-acked). Items > since are returned
1329 # AND removed (so the next dequeue with the same since is a
1330 # no-op, but normally callers update their watermark).
1331 keep: deque = deque()
1332 for sid, seg in q:
1333 if sid > since:
1334 drained.append(seg)
1335 # else: already acked; drop
1336 # Replace the queue contents with whatever's still > since
1337 # but un-drained (none currently — we drained ALL items > since).
1338 # Future-proofing for partial drains: keep is empty here, but
1339 # the structure makes the intent explicit.
1340 q.clear()
1341 q.extend(keep)
1342 return drained
1345def reset_stt_segment_queue(call_id: str) -> None:
1346 """Drop all queued segments for a call (called on detach / hangup).
1348 Best-effort: missing call_id is a no-op.
1349 """
1350 if not call_id:
1351 return
1352 with _STT_SEGMENT_LOCK:
1353 _STT_SEGMENT_QUEUE.pop(call_id, None)
1354 _STT_SEGMENT_NEXT_ID.pop(call_id, None)
1357# ═══════════════════════════════════════════════════════════════
1358# Service tool registration
1359# ═══════════════════════════════════════════════════════════════
1361class WhisperTool:
1362 """Register STT as an in-process service tool.
1364 Unlike other tools, STT runs in-process (no sidecar server).
1365 The tool functions are registered directly as callables.
1366 """
1368 @classmethod
1369 def register_functions(cls):
1370 """Register STT functions directly with service_tool_registry."""
1371 whisper_transcribe.__name__ = "whisper_transcribe"
1372 whisper_transcribe.__doc__ = (
1373 "Transcribe audio file to text using STT. "
1374 "Input: audio_path (string path to WAV/MP3/WebM file), "
1375 "language (optional language code like 'en'). "
1376 "Returns JSON with 'text' and 'language'."
1377 )
1379 whisper_detect_language.__name__ = "whisper_detect_language"
1380 whisper_detect_language.__doc__ = (
1381 "Detect the language spoken in an audio file. "
1382 "Input: audio_path (string path to audio file). "
1383 "Returns JSON with 'language' code and 'probability'."
1384 )
1386 tool_info = ServiceToolInfo(
1387 name="whisper",
1388 description=(
1389 "Speech-to-text transcription. Converts audio files to text "
1390 "using sherpa-onnx (Moonshine/Whisper ONNX) or OpenAI Whisper. "
1391 "Supports 100+ languages with automatic language detection."
1392 ),
1393 base_url="inprocess://whisper",
1394 endpoints={
1395 "transcribe": {
1396 "path": "/transcribe",
1397 "method": "POST",
1398 "description": whisper_transcribe.__doc__,
1399 "params_schema": {
1400 "audio_path": {"type": "string", "description": "Path to audio file"},
1401 "language": {"type": "string", "description": "Language code (optional)"},
1402 },
1403 },
1404 "detect_language": {
1405 "path": "/detect_language",
1406 "method": "POST",
1407 "description": whisper_detect_language.__doc__,
1408 "params_schema": {
1409 "audio_path": {"type": "string", "description": "Path to audio file"},
1410 },
1411 },
1412 },
1413 health_endpoint="/health",
1414 tags=["stt", "speech", "transcription", "audio", "whisper", "sherpa-onnx"],
1415 timeout=60,
1416 )
1417 tool_info.is_healthy = True
1418 service_tool_registry._tools["whisper"] = tool_info
1419 return True
1422# ═══════════════════════════════════════════════════════════════
1423# Worker callbacks (picked up by the centralized gpu_worker dispatcher)
1424# ═══════════════════════════════════════════════════════════════
1425#
1426# The STT worker has no upfront load — each transcribe call lazy-
1427# initializes the appropriate engine (faster-whisper / sherpa / legacy)
1428# inside the subprocess. `_load` is a no-op; `_synthesize` dispatches
1429# on the request op so one worker handles transcribe + detect_language.
1431def _load():
1432 """No upfront load — engines lazy-initialize per request."""
1433 return None
1436def _synthesize(_model, req: dict) -> dict:
1437 """Dispatch STT requests inside the worker subprocess."""
1438 op = req.get('op', 'transcribe')
1439 if op == 'transcribe':
1440 raw = _transcribe_impl(req.get('audio_path'), req.get('language'))
1441 elif op == 'detect_language':
1442 raw = _detect_language_impl(req.get('audio_path'))
1443 else:
1444 return {'error': f'Unknown op: {op}'}
1445 # Return both the raw JSON (for pass-through) and parsed fields
1446 try:
1447 return {'raw_json': raw, **json.loads(raw)}
1448 except json.JSONDecodeError:
1449 return {'error': f'Invalid engine response: {raw[:200]}'}
1451# NOTE: no `if __name__ == '__main__':` block — the centralized
1452# dispatcher at integrations.service_tools.gpu_worker imports this
1453# module and calls _load / _synthesize on spawn.