Coverage for integrations / audio / diarization_service.py: 30.1%
113 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2DiarizationService - manages the speaker diarization sidecar subprocess.
4Mirrors VisionService's MiniCPM management pattern:
5 DiarizationService.start()
6 +-> diarization_server subprocess (WebSocket, port from env)
7 +-> Readiness detection via stdout signal
8 +-> atexit cleanup to prevent orphan processes
9"""
10import atexit
11import logging
12import os
13import subprocess
14import sys
15import threading
16import time
17from typing import Dict, Optional
19logger = logging.getLogger('hevolve_diarization')
22class DiarizationService:
23 """Manages the speaker diarization WebSocket sidecar.
25 Starts diarization_server.py as a subprocess, waits for readiness,
26 and provides lifecycle management (start/stop/health/status).
27 """
29 def __init__(self, port: int = None):
30 from core.port_registry import get_port
31 self._port = int(os.environ.get('HEVOLVE_DIARIZATION_PORT', port or get_port('diarization')))
32 self._process: Optional[subprocess.Popen] = None
33 self._running = False
34 self._ready = False
36 # ─── Public API ───
38 def start(self):
39 """Start the diarization sidecar (non-blocking)."""
40 if self._running:
41 logger.warning("DiarizationService already running")
42 return
44 # Check if whisperx is available before starting subprocess
45 if not self._is_whisperx_available():
46 logger.info(
47 "whisperx not installed - diarization sidecar disabled")
48 return
50 # Check HF token
51 hf_token = os.environ.get('HEVOLVE_HF_TOKEN', '')
52 if not hf_token:
53 for cfg_path in [
54 'config.json',
55 os.path.join(
56 os.path.expanduser('~'), '.hevolve', 'config.json'),
57 ]:
58 if os.path.isfile(cfg_path):
59 try:
60 import json
61 with open(cfg_path) as f:
62 cfg = json.load(f)
63 hf_token = cfg.get('huggingface', '')
64 if hf_token:
65 break
66 except Exception:
67 pass
68 if not hf_token:
69 logger.warning(
70 "No HuggingFace token - diarization sidecar disabled. "
71 "Set HEVOLVE_HF_TOKEN env var.")
72 return
74 self._running = True
75 self._start_subprocess()
77 atexit.register(self._cleanup_subprocess)
79 # Wait for readiness in background
80 threading.Thread(
81 target=self._wait_for_ready,
82 daemon=True, name='diarization-wait',
83 ).start()
85 logger.info("DiarizationService starting...")
87 def stop(self):
88 """Stop the diarization sidecar."""
89 self._running = False
90 self._ready = False
91 if self._process:
92 self._process.terminate()
93 try:
94 self._process.wait(timeout=5)
95 except subprocess.TimeoutExpired:
96 self._process.kill()
97 self._process = None
98 logger.info("Diarization sidecar stopped")
100 def is_ready(self) -> bool:
101 """Check if the sidecar is ready to accept connections."""
102 return self._ready and self._running
104 @property
105 def port(self) -> int:
106 """Actual bound port (may differ from requested if dynamic)."""
107 return self._port
109 @property
110 def ws_url(self) -> str:
111 """WebSocket URL for connecting to the sidecar."""
112 return f'ws://localhost:{self._port}'
114 def get_status(self) -> Dict:
115 """Return service status for health dashboards."""
116 alive = (
117 self._process is not None
118 and self._process.poll() is None
119 )
120 return {
121 'running': self._running,
122 'ready': self._ready,
123 'alive': alive,
124 'port': self._port,
125 }
127 # ─── Internal ───
129 def _is_whisperx_available(self) -> bool:
130 """Check if whisperx is importable."""
131 try:
132 import whisperx # noqa: F401
133 return True
134 except ImportError:
135 return False
137 def _start_subprocess(self):
138 """Launch the diarization server as a subprocess."""
139 # In frozen builds (cx_Freeze), sys.executable is Nunba.exe — using it
140 # with -m would launch a full GUI instance instead of the module.
141 # Use the bundled python interpreter from python-embed/ instead.
142 python_exe = sys.executable
143 if getattr(sys, 'frozen', False):
144 app_dir = os.path.dirname(sys.executable)
145 embed_python = os.path.join(app_dir, 'python-embed', 'python.exe')
146 if os.path.isfile(embed_python):
147 python_exe = embed_python
148 else:
149 logger.warning(
150 "python-embed/python.exe not found — "
151 "diarization sidecar may not start correctly")
153 cmd = [
154 python_exe, '-m',
155 'integrations.audio.diarization_server',
156 '--port', str(self._port),
157 ]
158 logger.info(f"Starting diarization sidecar: {' '.join(cmd)}")
160 _popen_kw = dict(stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
161 if sys.platform == 'win32':
162 _popen_kw['creationflags'] = subprocess.CREATE_NO_WINDOW
163 self._process = subprocess.Popen(cmd, **_popen_kw)
165 def _wait_for_ready(self, timeout: float = 180):
166 """Read stdout for DIARIZATION_READY signal.
168 The server prints 'DIARIZATION_READY:<port>' after model load.
169 Timeout is generous (180s) because model download can be slow.
170 """
171 if not self._process or not self._process.stdout:
172 return
174 start = time.time()
175 try:
176 for line in self._process.stdout:
177 if time.time() - start > timeout:
178 logger.error(
179 f"Diarization sidecar not ready after {timeout}s")
180 break
182 decoded = line.decode('utf-8', errors='replace').strip()
183 if decoded.startswith('DIARIZATION_READY'):
184 # Parse actual port (for dynamic allocation)
185 parts = decoded.split(':')
186 if len(parts) >= 2:
187 try:
188 self._port = int(parts[1])
189 except ValueError:
190 pass
191 self._ready = True
192 # Set env var so AudioProcessor finds it
193 os.environ['HEVOLVE_DIARIZATION_URL'] = self.ws_url
194 logger.info(
195 f"Diarization sidecar ready on port {self._port}")
196 break
198 if not self._running:
199 break
200 except Exception as e:
201 logger.debug(f"Stdout read error: {e}")
203 if not self._ready:
204 # Check if process crashed
205 if self._process.poll() is not None:
206 logger.error(
207 f"Diarization sidecar crashed "
208 f"(exit code {self._process.returncode})")
209 self._running = False
211 def _cleanup_subprocess(self):
212 """atexit handler - kill orphan subprocess."""
213 if self._process and self._process.poll() is None:
214 try:
215 self._process.terminate()
216 self._process.wait(timeout=3)
217 except Exception:
218 try:
219 self._process.kill()
220 except Exception:
221 pass