Coverage for integrations / audio / diarization_service.py: 30.1%

113 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2DiarizationService - manages the speaker diarization sidecar subprocess. 

3 

4Mirrors VisionService's MiniCPM management pattern: 

5 DiarizationService.start() 

6 +-> diarization_server subprocess (WebSocket, port from env) 

7 +-> Readiness detection via stdout signal 

8 +-> atexit cleanup to prevent orphan processes 

9""" 

10import atexit 

11import logging 

12import os 

13import subprocess 

14import sys 

15import threading 

16import time 

17from typing import Dict, Optional 

18 

19logger = logging.getLogger('hevolve_diarization') 

20 

21 

22class DiarizationService: 

23 """Manages the speaker diarization WebSocket sidecar. 

24 

25 Starts diarization_server.py as a subprocess, waits for readiness, 

26 and provides lifecycle management (start/stop/health/status). 

27 """ 

28 

29 def __init__(self, port: int = None): 

30 from core.port_registry import get_port 

31 self._port = int(os.environ.get('HEVOLVE_DIARIZATION_PORT', port or get_port('diarization'))) 

32 self._process: Optional[subprocess.Popen] = None 

33 self._running = False 

34 self._ready = False 

35 

36 # ─── Public API ─── 

37 

38 def start(self): 

39 """Start the diarization sidecar (non-blocking).""" 

40 if self._running: 

41 logger.warning("DiarizationService already running") 

42 return 

43 

44 # Check if whisperx is available before starting subprocess 

45 if not self._is_whisperx_available(): 

46 logger.info( 

47 "whisperx not installed - diarization sidecar disabled") 

48 return 

49 

50 # Check HF token 

51 hf_token = os.environ.get('HEVOLVE_HF_TOKEN', '') 

52 if not hf_token: 

53 for cfg_path in [ 

54 'config.json', 

55 os.path.join( 

56 os.path.expanduser('~'), '.hevolve', 'config.json'), 

57 ]: 

58 if os.path.isfile(cfg_path): 

59 try: 

60 import json 

61 with open(cfg_path) as f: 

62 cfg = json.load(f) 

63 hf_token = cfg.get('huggingface', '') 

64 if hf_token: 

65 break 

66 except Exception: 

67 pass 

68 if not hf_token: 

69 logger.warning( 

70 "No HuggingFace token - diarization sidecar disabled. " 

71 "Set HEVOLVE_HF_TOKEN env var.") 

72 return 

73 

74 self._running = True 

75 self._start_subprocess() 

76 

77 atexit.register(self._cleanup_subprocess) 

78 

79 # Wait for readiness in background 

80 threading.Thread( 

81 target=self._wait_for_ready, 

82 daemon=True, name='diarization-wait', 

83 ).start() 

84 

85 logger.info("DiarizationService starting...") 

86 

87 def stop(self): 

88 """Stop the diarization sidecar.""" 

89 self._running = False 

90 self._ready = False 

91 if self._process: 

92 self._process.terminate() 

93 try: 

94 self._process.wait(timeout=5) 

95 except subprocess.TimeoutExpired: 

96 self._process.kill() 

97 self._process = None 

98 logger.info("Diarization sidecar stopped") 

99 

100 def is_ready(self) -> bool: 

101 """Check if the sidecar is ready to accept connections.""" 

102 return self._ready and self._running 

103 

104 @property 

105 def port(self) -> int: 

106 """Actual bound port (may differ from requested if dynamic).""" 

107 return self._port 

108 

109 @property 

110 def ws_url(self) -> str: 

111 """WebSocket URL for connecting to the sidecar.""" 

112 return f'ws://localhost:{self._port}' 

113 

114 def get_status(self) -> Dict: 

115 """Return service status for health dashboards.""" 

116 alive = ( 

117 self._process is not None 

118 and self._process.poll() is None 

119 ) 

120 return { 

121 'running': self._running, 

122 'ready': self._ready, 

123 'alive': alive, 

124 'port': self._port, 

125 } 

126 

127 # ─── Internal ─── 

128 

129 def _is_whisperx_available(self) -> bool: 

130 """Check if whisperx is importable.""" 

131 try: 

132 import whisperx # noqa: F401 

133 return True 

134 except ImportError: 

135 return False 

136 

137 def _start_subprocess(self): 

138 """Launch the diarization server as a subprocess.""" 

139 # In frozen builds (cx_Freeze), sys.executable is Nunba.exe — using it 

140 # with -m would launch a full GUI instance instead of the module. 

141 # Use the bundled python interpreter from python-embed/ instead. 

142 python_exe = sys.executable 

143 if getattr(sys, 'frozen', False): 

144 app_dir = os.path.dirname(sys.executable) 

145 embed_python = os.path.join(app_dir, 'python-embed', 'python.exe') 

146 if os.path.isfile(embed_python): 

147 python_exe = embed_python 

148 else: 

149 logger.warning( 

150 "python-embed/python.exe not found — " 

151 "diarization sidecar may not start correctly") 

152 

153 cmd = [ 

154 python_exe, '-m', 

155 'integrations.audio.diarization_server', 

156 '--port', str(self._port), 

157 ] 

158 logger.info(f"Starting diarization sidecar: {' '.join(cmd)}") 

159 

160 _popen_kw = dict(stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) 

161 if sys.platform == 'win32': 

162 _popen_kw['creationflags'] = subprocess.CREATE_NO_WINDOW 

163 self._process = subprocess.Popen(cmd, **_popen_kw) 

164 

165 def _wait_for_ready(self, timeout: float = 180): 

166 """Read stdout for DIARIZATION_READY signal. 

167 

168 The server prints 'DIARIZATION_READY:<port>' after model load. 

169 Timeout is generous (180s) because model download can be slow. 

170 """ 

171 if not self._process or not self._process.stdout: 

172 return 

173 

174 start = time.time() 

175 try: 

176 for line in self._process.stdout: 

177 if time.time() - start > timeout: 

178 logger.error( 

179 f"Diarization sidecar not ready after {timeout}s") 

180 break 

181 

182 decoded = line.decode('utf-8', errors='replace').strip() 

183 if decoded.startswith('DIARIZATION_READY'): 

184 # Parse actual port (for dynamic allocation) 

185 parts = decoded.split(':') 

186 if len(parts) >= 2: 

187 try: 

188 self._port = int(parts[1]) 

189 except ValueError: 

190 pass 

191 self._ready = True 

192 # Set env var so AudioProcessor finds it 

193 os.environ['HEVOLVE_DIARIZATION_URL'] = self.ws_url 

194 logger.info( 

195 f"Diarization sidecar ready on port {self._port}") 

196 break 

197 

198 if not self._running: 

199 break 

200 except Exception as e: 

201 logger.debug(f"Stdout read error: {e}") 

202 

203 if not self._ready: 

204 # Check if process crashed 

205 if self._process.poll() is not None: 

206 logger.error( 

207 f"Diarization sidecar crashed " 

208 f"(exit code {self._process.returncode})") 

209 self._running = False 

210 

211 def _cleanup_subprocess(self): 

212 """atexit handler - kill orphan subprocess.""" 

213 if self._process and self._process.poll() is None: 

214 try: 

215 self._process.terminate() 

216 self._process.wait(timeout=3) 

217 except Exception: 

218 try: 

219 self._process.kill() 

220 except Exception: 

221 pass