Coverage for integrations / channels / media / audio.py: 34.1%
185 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Audio Processor for audio transcription.
4Supports multiple providers: openai, deepgram, whisper-local
5"""
7import asyncio
8from dataclasses import dataclass, field
9from enum import Enum
10from typing import Optional, List, Dict, Any, Union
11from pathlib import Path
12import logging
14logger = logging.getLogger(__name__)
17class AudioProvider(Enum):
18 """Supported audio transcription providers."""
19 OPENAI = "openai"
20 DEEPGRAM = "deepgram"
21 WHISPER_LOCAL = "whisper-local"
24@dataclass
25class TranscriptionWord:
26 """Individual word with timing information."""
27 word: str
28 start: float
29 end: float
30 confidence: float = 1.0
32 def to_dict(self) -> Dict[str, Any]:
33 return {
34 "word": self.word,
35 "start": self.start,
36 "end": self.end,
37 "confidence": self.confidence
38 }
41@dataclass
42class TranscriptionSegment:
43 """Segment of transcription with timing."""
44 text: str
45 start: float
46 end: float
47 speaker: Optional[str] = None
48 confidence: float = 1.0
49 words: List[TranscriptionWord] = field(default_factory=list)
51 def to_dict(self) -> Dict[str, Any]:
52 return {
53 "text": self.text,
54 "start": self.start,
55 "end": self.end,
56 "speaker": self.speaker,
57 "confidence": self.confidence,
58 "words": [w.to_dict() for w in self.words]
59 }
62@dataclass
63class TranscriptionResult:
64 """Complete transcription result."""
65 text: str
66 language: Optional[str] = None
67 confidence: float = 1.0
68 duration: float = 0.0
69 segments: List[TranscriptionSegment] = field(default_factory=list)
70 speakers: List[str] = field(default_factory=list)
71 metadata: Dict[str, Any] = field(default_factory=dict)
73 def to_dict(self) -> Dict[str, Any]:
74 return {
75 "text": self.text,
76 "language": self.language,
77 "confidence": self.confidence,
78 "duration": self.duration,
79 "segments": [s.to_dict() for s in self.segments],
80 "speakers": self.speakers,
81 "metadata": self.metadata
82 }
84 def to_srt(self) -> str:
85 """Export transcription as SRT subtitle format."""
86 lines = []
87 for i, segment in enumerate(self.segments, 1):
88 start_time = self._format_srt_time(segment.start)
89 end_time = self._format_srt_time(segment.end)
90 lines.append(str(i))
91 lines.append(f"{start_time} --> {end_time}")
92 lines.append(segment.text)
93 lines.append("")
94 return "\n".join(lines)
96 def to_vtt(self) -> str:
97 """Export transcription as WebVTT subtitle format."""
98 lines = ["WEBVTT", ""]
99 for segment in self.segments:
100 start_time = self._format_vtt_time(segment.start)
101 end_time = self._format_vtt_time(segment.end)
102 lines.append(f"{start_time} --> {end_time}")
103 lines.append(segment.text)
104 lines.append("")
105 return "\n".join(lines)
107 @staticmethod
108 def _format_srt_time(seconds: float) -> str:
109 """Format time for SRT format (HH:MM:SS,mmm)."""
110 hours = int(seconds // 3600)
111 minutes = int((seconds % 3600) // 60)
112 secs = int(seconds % 60)
113 millis = int((seconds % 1) * 1000)
114 return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
116 @staticmethod
117 def _format_vtt_time(seconds: float) -> str:
118 """Format time for VTT format (HH:MM:SS.mmm)."""
119 hours = int(seconds // 3600)
120 minutes = int((seconds % 3600) // 60)
121 secs = int(seconds % 60)
122 millis = int((seconds % 1) * 1000)
123 return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}"
126@dataclass
127class LanguageDetection:
128 """Language detection result."""
129 language: str
130 confidence: float
131 alternatives: List[Dict[str, Any]] = field(default_factory=list)
133 def to_dict(self) -> Dict[str, Any]:
134 return {
135 "language": self.language,
136 "confidence": self.confidence,
137 "alternatives": self.alternatives
138 }
141class AudioProcessor:
142 """
143 Audio processor for transcription and analysis.
145 Supports multiple providers for speech-to-text.
146 """
148 def __init__(
149 self,
150 provider: Union[AudioProvider, str] = AudioProvider.OPENAI,
151 api_key: Optional[str] = None,
152 model: Optional[str] = None,
153 config: Optional[Dict[str, Any]] = None
154 ):
155 """
156 Initialize audio processor.
158 Args:
159 provider: Audio provider to use
160 api_key: API key for the provider
161 model: Specific model to use
162 config: Additional configuration options
163 """
164 if isinstance(provider, str):
165 provider = AudioProvider(provider.lower())
167 self.provider = provider
168 self.api_key = api_key
169 self.config = config or {}
171 # Set default models per provider
172 self.model = model or self._get_default_model()
174 # Initialize provider-specific client
175 self._client = None
176 self._initialized = False
178 def _get_default_model(self) -> str:
179 """Get default model for provider."""
180 defaults = {
181 AudioProvider.OPENAI: "whisper-1",
182 AudioProvider.DEEPGRAM: "nova-2",
183 AudioProvider.WHISPER_LOCAL: "base"
184 }
185 return defaults.get(self.provider, "default")
187 async def _ensure_initialized(self):
188 """Ensure provider client is initialized."""
189 if self._initialized:
190 return
192 if self.provider == AudioProvider.OPENAI:
193 # Would initialize OpenAI client
194 pass
195 elif self.provider == AudioProvider.DEEPGRAM:
196 # Would initialize Deepgram client
197 pass
198 elif self.provider == AudioProvider.WHISPER_LOCAL:
199 # Would load local whisper model
200 pass
202 self._initialized = True
204 async def transcribe(
205 self,
206 audio: Union[str, bytes, Path],
207 language: Optional[str] = None,
208 prompt: Optional[str] = None,
209 word_timestamps: bool = False,
210 speaker_diarization: bool = False
211 ) -> TranscriptionResult:
212 """
213 Transcribe audio to text using Whisper.
215 Args:
216 audio: Audio file path, URL, or bytes
217 language: Expected language (ISO code) or None for auto-detect
218 prompt: Optional prompt to guide transcription
219 word_timestamps: Whether to include word-level timestamps
220 speaker_diarization: Whether to query speaker_diarization service
222 Returns:
223 TranscriptionResult with transcribed text and metadata
224 """
225 await self._ensure_initialized()
227 import json as _json
228 import os
229 import tempfile
231 # Resolve audio to a file path for whisper_transcribe
232 audio_path = None
233 cleanup_path = False
234 if isinstance(audio, (str, Path)):
235 path = Path(audio)
236 if path.exists():
237 audio_path = str(path)
238 elif isinstance(audio, bytes):
239 # Write bytes to temp file
240 tmp = tempfile.NamedTemporaryFile(
241 suffix='.wav', delete=False)
242 tmp.write(audio)
243 tmp.close()
244 audio_path = tmp.name
245 cleanup_path = True
247 text = ""
248 detected_language = language or "en"
249 confidence = 0.0
250 speakers = []
252 try:
253 # Transcribe with Whisper
254 if audio_path:
255 try:
256 from integrations.service_tools.whisper_tool import (
257 whisper_transcribe,
258 )
259 result_json = whisper_transcribe(audio_path, language)
260 parsed = _json.loads(result_json)
261 if 'error' not in parsed:
262 text = parsed.get('text', '')
263 detected_language = parsed.get(
264 'language', detected_language)
265 confidence = 0.9
266 except ImportError:
267 logger.warning(
268 "whisper not available — transcription disabled")
269 except Exception as e:
270 logger.warning(f"Whisper transcription failed: {e}")
272 # Speaker diarization via WebSocket bridge
273 if speaker_diarization and audio_path:
274 diarization_url = os.environ.get('HEVOLVE_DIARIZATION_URL')
275 if diarization_url:
276 diarization_result = await self._run_diarization(
277 audio_path, diarization_url)
278 if diarization_result:
279 n_speakers = diarization_result.get(
280 'no_of_speaker', 0)
281 speakers = [
282 f"Speaker_{i}" for i in range(n_speakers)]
283 finally:
284 if cleanup_path and audio_path:
285 try:
286 os.unlink(audio_path)
287 except OSError:
288 pass
290 return TranscriptionResult(
291 text=text,
292 language=detected_language,
293 confidence=confidence,
294 duration=await self.get_duration(audio),
295 speakers=speakers,
296 metadata={
297 "provider": self.provider.value,
298 "model": self.model,
299 "word_timestamps": word_timestamps,
300 "speaker_diarization": speaker_diarization,
301 },
302 )
304 async def _run_diarization(
305 self, audio_path: str, diarization_url: str
306 ) -> Optional[Dict[str, Any]]:
307 """Send audio to speaker_diarization service via WebSocket.
309 The service expects PCM 16-bit mono 16kHz audio as hex-encoded
310 chunks and returns ``{"no_of_speaker": N, "stop_mic": bool}``.
312 Sends JSON format (compatible with both new sidecar and old server).
313 Parses response as JSON first, falls back to ast.literal_eval
314 for old servers that send Python repr.
315 """
316 import ast
317 import json as _json
319 try:
320 import websockets
321 except ImportError:
322 logger.warning("websockets not installed — diarization disabled")
323 return None
325 try:
326 with open(audio_path, 'rb') as f:
327 audio_bytes = f.read()
329 async with websockets.connect(
330 diarization_url, close_timeout=5
331 ) as ws:
332 msg = _json.dumps(
333 {'user_id': 0, 'chunk': audio_bytes.hex()})
334 await ws.send(msg)
335 response = await asyncio.wait_for(ws.recv(), timeout=10)
336 # New sidecar sends JSON, old server sends Python repr
337 try:
338 return _json.loads(response)
339 except (_json.JSONDecodeError, ValueError):
340 return ast.literal_eval(response)
341 except Exception as e:
342 logger.debug(f"Diarization failed: {e}")
343 return None
345 async def detect_language(
346 self,
347 audio: Union[str, bytes, Path],
348 max_alternatives: int = 3
349 ) -> LanguageDetection:
350 """
351 Detect the language spoken in audio.
353 Args:
354 audio: Audio file path, URL, or bytes
355 max_alternatives: Maximum number of alternative languages
357 Returns:
358 LanguageDetection with detected language and confidence
359 """
360 await self._ensure_initialized()
362 # Simulated language detection
363 return LanguageDetection(
364 language="en",
365 confidence=0.95,
366 alternatives=[
367 {"language": "es", "confidence": 0.03},
368 {"language": "fr", "confidence": 0.02}
369 ][:max_alternatives - 1]
370 )
372 async def get_duration(
373 self,
374 audio: Union[str, bytes, Path]
375 ) -> float:
376 """
377 Get audio duration in seconds.
379 Args:
380 audio: Audio file path, URL, or bytes
382 Returns:
383 Duration in seconds
384 """
385 # Would use actual audio analysis library (pydub, librosa, etc.)
386 # For now, return simulated duration
387 return 0.0
389 async def transcribe_streaming(
390 self,
391 audio_stream,
392 language: Optional[str] = None,
393 on_partial: Optional[callable] = None,
394 on_final: Optional[callable] = None
395 ):
396 """
397 Transcribe audio stream in real-time.
399 Args:
400 audio_stream: Async iterator of audio chunks
401 language: Expected language
402 on_partial: Callback for partial results
403 on_final: Callback for final results
405 Yields:
406 TranscriptionSegment as they become available
407 """
408 await self._ensure_initialized()
410 # Would implement streaming transcription
411 # This is provider-specific (Deepgram excels at streaming)
412 return
414 async def translate(
415 self,
416 audio: Union[str, bytes, Path],
417 target_language: str = "en"
418 ) -> TranscriptionResult:
419 """
420 Transcribe and translate audio to target language.
422 Args:
423 audio: Audio file path, URL, or bytes
424 target_language: Target language for translation
426 Returns:
427 TranscriptionResult with translated text
428 """
429 await self._ensure_initialized()
431 # OpenAI Whisper supports direct translation to English
432 # Other providers may need separate translation step
433 transcription = await self.transcribe(audio)
435 if self.provider == AudioProvider.OPENAI and target_language == "en":
436 # OpenAI can translate directly
437 pass
439 transcription.metadata["translated"] = True
440 transcription.metadata["target_language"] = target_language
442 return transcription
444 def get_supported_formats(self) -> List[str]:
445 """Get list of supported audio formats."""
446 formats = {
447 AudioProvider.OPENAI: ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"],
448 AudioProvider.DEEPGRAM: ["mp3", "mp4", "wav", "flac", "ogg", "webm", "m4a"],
449 AudioProvider.WHISPER_LOCAL: ["mp3", "wav", "flac", "ogg", "m4a"]
450 }
451 return formats.get(self.provider, ["mp3", "wav"])
453 def get_max_audio_duration(self) -> int:
454 """Get maximum supported audio duration in seconds."""
455 limits = {
456 AudioProvider.OPENAI: 3600, # 1 hour (25MB file limit)
457 AudioProvider.DEEPGRAM: 7200, # 2 hours
458 AudioProvider.WHISPER_LOCAL: 14400 # 4 hours (depends on hardware)
459 }
460 return limits.get(self.provider, 3600)
462 def get_max_file_size(self) -> int:
463 """Get maximum supported file size in bytes."""
464 limits = {
465 AudioProvider.OPENAI: 25 * 1024 * 1024, # 25MB
466 AudioProvider.DEEPGRAM: 2 * 1024 * 1024 * 1024, # 2GB
467 AudioProvider.WHISPER_LOCAL: 500 * 1024 * 1024 # 500MB
468 }
469 return limits.get(self.provider, 25 * 1024 * 1024)
471 def get_supported_languages(self) -> List[str]:
472 """Get list of supported languages."""
473 # Common languages supported by most providers
474 return [
475 "en", "es", "fr", "de", "it", "pt", "nl", "pl", "ru",
476 "zh", "ja", "ko", "ar", "hi", "tr", "vi", "th", "id"
477 ]