Coverage for integrations / channels / media / audio.py: 34.1%

185 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Audio Processor for audio transcription. 

3 

4Supports multiple providers: openai, deepgram, whisper-local 

5""" 

6 

7import asyncio 

8from dataclasses import dataclass, field 

9from enum import Enum 

10from typing import Optional, List, Dict, Any, Union 

11from pathlib import Path 

12import logging 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class AudioProvider(Enum): 

18 """Supported audio transcription providers.""" 

19 OPENAI = "openai" 

20 DEEPGRAM = "deepgram" 

21 WHISPER_LOCAL = "whisper-local" 

22 

23 

24@dataclass 

25class TranscriptionWord: 

26 """Individual word with timing information.""" 

27 word: str 

28 start: float 

29 end: float 

30 confidence: float = 1.0 

31 

32 def to_dict(self) -> Dict[str, Any]: 

33 return { 

34 "word": self.word, 

35 "start": self.start, 

36 "end": self.end, 

37 "confidence": self.confidence 

38 } 

39 

40 

41@dataclass 

42class TranscriptionSegment: 

43 """Segment of transcription with timing.""" 

44 text: str 

45 start: float 

46 end: float 

47 speaker: Optional[str] = None 

48 confidence: float = 1.0 

49 words: List[TranscriptionWord] = field(default_factory=list) 

50 

51 def to_dict(self) -> Dict[str, Any]: 

52 return { 

53 "text": self.text, 

54 "start": self.start, 

55 "end": self.end, 

56 "speaker": self.speaker, 

57 "confidence": self.confidence, 

58 "words": [w.to_dict() for w in self.words] 

59 } 

60 

61 

62@dataclass 

63class TranscriptionResult: 

64 """Complete transcription result.""" 

65 text: str 

66 language: Optional[str] = None 

67 confidence: float = 1.0 

68 duration: float = 0.0 

69 segments: List[TranscriptionSegment] = field(default_factory=list) 

70 speakers: List[str] = field(default_factory=list) 

71 metadata: Dict[str, Any] = field(default_factory=dict) 

72 

73 def to_dict(self) -> Dict[str, Any]: 

74 return { 

75 "text": self.text, 

76 "language": self.language, 

77 "confidence": self.confidence, 

78 "duration": self.duration, 

79 "segments": [s.to_dict() for s in self.segments], 

80 "speakers": self.speakers, 

81 "metadata": self.metadata 

82 } 

83 

84 def to_srt(self) -> str: 

85 """Export transcription as SRT subtitle format.""" 

86 lines = [] 

87 for i, segment in enumerate(self.segments, 1): 

88 start_time = self._format_srt_time(segment.start) 

89 end_time = self._format_srt_time(segment.end) 

90 lines.append(str(i)) 

91 lines.append(f"{start_time} --> {end_time}") 

92 lines.append(segment.text) 

93 lines.append("") 

94 return "\n".join(lines) 

95 

96 def to_vtt(self) -> str: 

97 """Export transcription as WebVTT subtitle format.""" 

98 lines = ["WEBVTT", ""] 

99 for segment in self.segments: 

100 start_time = self._format_vtt_time(segment.start) 

101 end_time = self._format_vtt_time(segment.end) 

102 lines.append(f"{start_time} --> {end_time}") 

103 lines.append(segment.text) 

104 lines.append("") 

105 return "\n".join(lines) 

106 

107 @staticmethod 

108 def _format_srt_time(seconds: float) -> str: 

109 """Format time for SRT format (HH:MM:SS,mmm).""" 

110 hours = int(seconds // 3600) 

111 minutes = int((seconds % 3600) // 60) 

112 secs = int(seconds % 60) 

113 millis = int((seconds % 1) * 1000) 

114 return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" 

115 

116 @staticmethod 

117 def _format_vtt_time(seconds: float) -> str: 

118 """Format time for VTT format (HH:MM:SS.mmm).""" 

119 hours = int(seconds // 3600) 

120 minutes = int((seconds % 3600) // 60) 

121 secs = int(seconds % 60) 

122 millis = int((seconds % 1) * 1000) 

123 return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}" 

124 

125 

126@dataclass 

127class LanguageDetection: 

128 """Language detection result.""" 

129 language: str 

130 confidence: float 

131 alternatives: List[Dict[str, Any]] = field(default_factory=list) 

132 

133 def to_dict(self) -> Dict[str, Any]: 

134 return { 

135 "language": self.language, 

136 "confidence": self.confidence, 

137 "alternatives": self.alternatives 

138 } 

139 

140 

141class AudioProcessor: 

142 """ 

143 Audio processor for transcription and analysis. 

144 

145 Supports multiple providers for speech-to-text. 

146 """ 

147 

148 def __init__( 

149 self, 

150 provider: Union[AudioProvider, str] = AudioProvider.OPENAI, 

151 api_key: Optional[str] = None, 

152 model: Optional[str] = None, 

153 config: Optional[Dict[str, Any]] = None 

154 ): 

155 """ 

156 Initialize audio processor. 

157 

158 Args: 

159 provider: Audio provider to use 

160 api_key: API key for the provider 

161 model: Specific model to use 

162 config: Additional configuration options 

163 """ 

164 if isinstance(provider, str): 

165 provider = AudioProvider(provider.lower()) 

166 

167 self.provider = provider 

168 self.api_key = api_key 

169 self.config = config or {} 

170 

171 # Set default models per provider 

172 self.model = model or self._get_default_model() 

173 

174 # Initialize provider-specific client 

175 self._client = None 

176 self._initialized = False 

177 

178 def _get_default_model(self) -> str: 

179 """Get default model for provider.""" 

180 defaults = { 

181 AudioProvider.OPENAI: "whisper-1", 

182 AudioProvider.DEEPGRAM: "nova-2", 

183 AudioProvider.WHISPER_LOCAL: "base" 

184 } 

185 return defaults.get(self.provider, "default") 

186 

187 async def _ensure_initialized(self): 

188 """Ensure provider client is initialized.""" 

189 if self._initialized: 

190 return 

191 

192 if self.provider == AudioProvider.OPENAI: 

193 # Would initialize OpenAI client 

194 pass 

195 elif self.provider == AudioProvider.DEEPGRAM: 

196 # Would initialize Deepgram client 

197 pass 

198 elif self.provider == AudioProvider.WHISPER_LOCAL: 

199 # Would load local whisper model 

200 pass 

201 

202 self._initialized = True 

203 

204 async def transcribe( 

205 self, 

206 audio: Union[str, bytes, Path], 

207 language: Optional[str] = None, 

208 prompt: Optional[str] = None, 

209 word_timestamps: bool = False, 

210 speaker_diarization: bool = False 

211 ) -> TranscriptionResult: 

212 """ 

213 Transcribe audio to text using Whisper. 

214 

215 Args: 

216 audio: Audio file path, URL, or bytes 

217 language: Expected language (ISO code) or None for auto-detect 

218 prompt: Optional prompt to guide transcription 

219 word_timestamps: Whether to include word-level timestamps 

220 speaker_diarization: Whether to query speaker_diarization service 

221 

222 Returns: 

223 TranscriptionResult with transcribed text and metadata 

224 """ 

225 await self._ensure_initialized() 

226 

227 import json as _json 

228 import os 

229 import tempfile 

230 

231 # Resolve audio to a file path for whisper_transcribe 

232 audio_path = None 

233 cleanup_path = False 

234 if isinstance(audio, (str, Path)): 

235 path = Path(audio) 

236 if path.exists(): 

237 audio_path = str(path) 

238 elif isinstance(audio, bytes): 

239 # Write bytes to temp file 

240 tmp = tempfile.NamedTemporaryFile( 

241 suffix='.wav', delete=False) 

242 tmp.write(audio) 

243 tmp.close() 

244 audio_path = tmp.name 

245 cleanup_path = True 

246 

247 text = "" 

248 detected_language = language or "en" 

249 confidence = 0.0 

250 speakers = [] 

251 

252 try: 

253 # Transcribe with Whisper 

254 if audio_path: 

255 try: 

256 from integrations.service_tools.whisper_tool import ( 

257 whisper_transcribe, 

258 ) 

259 result_json = whisper_transcribe(audio_path, language) 

260 parsed = _json.loads(result_json) 

261 if 'error' not in parsed: 

262 text = parsed.get('text', '') 

263 detected_language = parsed.get( 

264 'language', detected_language) 

265 confidence = 0.9 

266 except ImportError: 

267 logger.warning( 

268 "whisper not available — transcription disabled") 

269 except Exception as e: 

270 logger.warning(f"Whisper transcription failed: {e}") 

271 

272 # Speaker diarization via WebSocket bridge 

273 if speaker_diarization and audio_path: 

274 diarization_url = os.environ.get('HEVOLVE_DIARIZATION_URL') 

275 if diarization_url: 

276 diarization_result = await self._run_diarization( 

277 audio_path, diarization_url) 

278 if diarization_result: 

279 n_speakers = diarization_result.get( 

280 'no_of_speaker', 0) 

281 speakers = [ 

282 f"Speaker_{i}" for i in range(n_speakers)] 

283 finally: 

284 if cleanup_path and audio_path: 

285 try: 

286 os.unlink(audio_path) 

287 except OSError: 

288 pass 

289 

290 return TranscriptionResult( 

291 text=text, 

292 language=detected_language, 

293 confidence=confidence, 

294 duration=await self.get_duration(audio), 

295 speakers=speakers, 

296 metadata={ 

297 "provider": self.provider.value, 

298 "model": self.model, 

299 "word_timestamps": word_timestamps, 

300 "speaker_diarization": speaker_diarization, 

301 }, 

302 ) 

303 

304 async def _run_diarization( 

305 self, audio_path: str, diarization_url: str 

306 ) -> Optional[Dict[str, Any]]: 

307 """Send audio to speaker_diarization service via WebSocket. 

308 

309 The service expects PCM 16-bit mono 16kHz audio as hex-encoded 

310 chunks and returns ``{"no_of_speaker": N, "stop_mic": bool}``. 

311 

312 Sends JSON format (compatible with both new sidecar and old server). 

313 Parses response as JSON first, falls back to ast.literal_eval 

314 for old servers that send Python repr. 

315 """ 

316 import ast 

317 import json as _json 

318 

319 try: 

320 import websockets 

321 except ImportError: 

322 logger.warning("websockets not installed — diarization disabled") 

323 return None 

324 

325 try: 

326 with open(audio_path, 'rb') as f: 

327 audio_bytes = f.read() 

328 

329 async with websockets.connect( 

330 diarization_url, close_timeout=5 

331 ) as ws: 

332 msg = _json.dumps( 

333 {'user_id': 0, 'chunk': audio_bytes.hex()}) 

334 await ws.send(msg) 

335 response = await asyncio.wait_for(ws.recv(), timeout=10) 

336 # New sidecar sends JSON, old server sends Python repr 

337 try: 

338 return _json.loads(response) 

339 except (_json.JSONDecodeError, ValueError): 

340 return ast.literal_eval(response) 

341 except Exception as e: 

342 logger.debug(f"Diarization failed: {e}") 

343 return None 

344 

345 async def detect_language( 

346 self, 

347 audio: Union[str, bytes, Path], 

348 max_alternatives: int = 3 

349 ) -> LanguageDetection: 

350 """ 

351 Detect the language spoken in audio. 

352 

353 Args: 

354 audio: Audio file path, URL, or bytes 

355 max_alternatives: Maximum number of alternative languages 

356 

357 Returns: 

358 LanguageDetection with detected language and confidence 

359 """ 

360 await self._ensure_initialized() 

361 

362 # Simulated language detection 

363 return LanguageDetection( 

364 language="en", 

365 confidence=0.95, 

366 alternatives=[ 

367 {"language": "es", "confidence": 0.03}, 

368 {"language": "fr", "confidence": 0.02} 

369 ][:max_alternatives - 1] 

370 ) 

371 

372 async def get_duration( 

373 self, 

374 audio: Union[str, bytes, Path] 

375 ) -> float: 

376 """ 

377 Get audio duration in seconds. 

378 

379 Args: 

380 audio: Audio file path, URL, or bytes 

381 

382 Returns: 

383 Duration in seconds 

384 """ 

385 # Would use actual audio analysis library (pydub, librosa, etc.) 

386 # For now, return simulated duration 

387 return 0.0 

388 

389 async def transcribe_streaming( 

390 self, 

391 audio_stream, 

392 language: Optional[str] = None, 

393 on_partial: Optional[callable] = None, 

394 on_final: Optional[callable] = None 

395 ): 

396 """ 

397 Transcribe audio stream in real-time. 

398 

399 Args: 

400 audio_stream: Async iterator of audio chunks 

401 language: Expected language 

402 on_partial: Callback for partial results 

403 on_final: Callback for final results 

404 

405 Yields: 

406 TranscriptionSegment as they become available 

407 """ 

408 await self._ensure_initialized() 

409 

410 # Would implement streaming transcription 

411 # This is provider-specific (Deepgram excels at streaming) 

412 return 

413 

414 async def translate( 

415 self, 

416 audio: Union[str, bytes, Path], 

417 target_language: str = "en" 

418 ) -> TranscriptionResult: 

419 """ 

420 Transcribe and translate audio to target language. 

421 

422 Args: 

423 audio: Audio file path, URL, or bytes 

424 target_language: Target language for translation 

425 

426 Returns: 

427 TranscriptionResult with translated text 

428 """ 

429 await self._ensure_initialized() 

430 

431 # OpenAI Whisper supports direct translation to English 

432 # Other providers may need separate translation step 

433 transcription = await self.transcribe(audio) 

434 

435 if self.provider == AudioProvider.OPENAI and target_language == "en": 

436 # OpenAI can translate directly 

437 pass 

438 

439 transcription.metadata["translated"] = True 

440 transcription.metadata["target_language"] = target_language 

441 

442 return transcription 

443 

444 def get_supported_formats(self) -> List[str]: 

445 """Get list of supported audio formats.""" 

446 formats = { 

447 AudioProvider.OPENAI: ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"], 

448 AudioProvider.DEEPGRAM: ["mp3", "mp4", "wav", "flac", "ogg", "webm", "m4a"], 

449 AudioProvider.WHISPER_LOCAL: ["mp3", "wav", "flac", "ogg", "m4a"] 

450 } 

451 return formats.get(self.provider, ["mp3", "wav"]) 

452 

453 def get_max_audio_duration(self) -> int: 

454 """Get maximum supported audio duration in seconds.""" 

455 limits = { 

456 AudioProvider.OPENAI: 3600, # 1 hour (25MB file limit) 

457 AudioProvider.DEEPGRAM: 7200, # 2 hours 

458 AudioProvider.WHISPER_LOCAL: 14400 # 4 hours (depends on hardware) 

459 } 

460 return limits.get(self.provider, 3600) 

461 

462 def get_max_file_size(self) -> int: 

463 """Get maximum supported file size in bytes.""" 

464 limits = { 

465 AudioProvider.OPENAI: 25 * 1024 * 1024, # 25MB 

466 AudioProvider.DEEPGRAM: 2 * 1024 * 1024 * 1024, # 2GB 

467 AudioProvider.WHISPER_LOCAL: 500 * 1024 * 1024 # 500MB 

468 } 

469 return limits.get(self.provider, 25 * 1024 * 1024) 

470 

471 def get_supported_languages(self) -> List[str]: 

472 """Get list of supported languages.""" 

473 # Common languages supported by most providers 

474 return [ 

475 "en", "es", "fr", "de", "it", "pt", "nl", "pl", "ru", 

476 "zh", "ja", "ko", "ar", "hi", "tr", "vi", "th", "id" 

477 ]