Coverage for integrations / service_tools / gpu_worker.py: 77.2%

600 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2gpu_worker.py — Persistent GPU subprocess worker for crash isolation. 

3 

4Problem: PyTorch / CUDA OOM and DLL segfaults are C-level aborts that 

5Python's try/except CANNOT catch. When an in-process model crashes, 

6it takes the entire parent process with it. This is especially bad 

7for long-running services (Nunba, HARTOS backend) where one TTS 

8crash kills chat, STT, agents, and everything else. 

9 

10Solution: run GPU models in a dedicated subprocess. When it crashes, 

11only the subprocess dies. The parent catches the exit code and falls 

12back to a CPU engine (Piper) or retries on the next request. 

13 

14Architecture: 

15 Parent (Nunba/HARTOS) 

16 

17 │ JSON request (stdin) 

18 

19 Worker subprocess (python-embed or sys.executable) 

20 │ ├── load model once on startup 

21 │ ├── serve forever via stdin/stdout JSON lines 

22 │ └── exit non-zero on fatal error 

23 

24 │ JSON response (stdout) OR non-zero exit code 

25 

26 Parent catches failure → fallback 

27 

28Usage: 

29 

30 # Parent side 

31 worker = GPUWorker( 

32 name='f5_tts', 

33 module='integrations.service_tools.f5_tts_worker', 

34 startup_timeout=60, # F5 takes ~40s to load 

35 request_timeout=120, # longest inference we tolerate 

36 ) 

37 worker.start() # spawns subprocess, waits for READY handshake 

38 try: 

39 result = worker.call({ 

40 'text': 'hello', 

41 'language': 'en', 

42 'voice': None, 

43 'output_path': '/tmp/out.wav', 

44 }) 

45 except WorkerCrash: 

46 # Subprocess died — fall back to Piper 

47 pass 

48 

49 # Worker side (in f5_tts_worker.py) 

50 from integrations.service_tools.gpu_worker import run_worker 

51 

52 def load_model(): 

53 from f5_tts.api import F5TTS 

54 return F5TTS() 

55 

56 def synthesize(model, req): 

57 wav, sr, _ = model.infer( 

58 ref_file=req.get('voice') or '', 

59 ref_text='', 

60 gen_text=req['text'], 

61 ) 

62 import soundfile as sf 

63 sf.write(req['output_path'], wav, sr) 

64 return { 

65 'path': req['output_path'], 

66 'duration': len(wav) / sr, 

67 'sample_rate': sr, 

68 } 

69 

70 if __name__ == '__main__': 

71 run_worker(name='f5_tts', load=load_model, handle=synthesize) 

72""" 

73 

74import json 

75import logging 

76import os 

77import queue 

78import subprocess 

79import sys 

80import threading 

81import time 

82import weakref 

83from pathlib import Path 

84from typing import Any, Callable, Dict, List, Optional 

85 

86logger = logging.getLogger(__name__) 

87 

88 

89# ═══════════════════════════════════════════════════════════════════ 

90# Cross-worker VRAM eviction registry 

91# ═══════════════════════════════════════════════════════════════════ 

92# 

93# Every ToolWorker instance registers itself here so that when a new 

94# worker needs GPU memory and the VRAM manager reports insufficient 

95# headroom, we can evict the least-recently-used OTHER worker(s) to 

96# make room. Without this, spawning F5 on top of a loaded Chatterbox 

97# ML would OOM the subprocess — correct crash isolation, but wasted 

98# time because nobody told Chatterbox to step aside. 

99 

100_REGISTRY_LOCK = threading.Lock() 

101_ALL_WORKERS: "List[weakref.ref['ToolWorker']]" = [] 

102 

103 

104def _register_tool_worker(tw: "ToolWorker") -> None: 

105 """Register a ToolWorker so the cross-worker eviction sees it. 

106 

107 Uses weakref so dropping a ToolWorker doesn't keep it alive. 

108 """ 

109 with _REGISTRY_LOCK: 

110 # Prune dead refs opportunistically 

111 _ALL_WORKERS[:] = [r for r in _ALL_WORKERS if r() is not None] 

112 _ALL_WORKERS.append(weakref.ref(tw)) 

113 

114 

115def _live_tool_workers() -> "List[ToolWorker]": 

116 """Return currently-alive ToolWorker instances (worker subprocess up).""" 

117 with _REGISTRY_LOCK: 

118 out = [] 

119 for r in _ALL_WORKERS: 

120 tw = r() 

121 if tw is not None and tw.is_alive(): 

122 out.append(tw) 

123 return out 

124 

125 

126def try_free_vram(needed_gb: float, exclude_tool: Optional[str] = None) -> bool: 

127 """Stop LRU workers until `needed_gb` GB of GPU VRAM is free. 

128 

129 Called BEFORE spawning a heavy worker when the VRAM manager reports 

130 insufficient free VRAM. Iterates live workers sorted by last-used 

131 ascending (oldest first), skipping the worker identified by 

132 `exclude_tool`, and stops each one until free VRAM meets the 

133 threshold — or the registry runs out. 

134 

135 Returns True if the required headroom was reached, False otherwise. 

136 """ 

137 try: 

138 from integrations.service_tools.vram_manager import vram_manager 

139 except ImportError: 

140 return False 

141 

142 free_gb = vram_manager.get_free_vram() 

143 if free_gb >= needed_gb: 

144 return True 

145 

146 # Sort LRU first (smallest _last_used = longest idle) 

147 candidates = sorted( 

148 (tw for tw in _live_tool_workers() if tw.tool_name != exclude_tool), 

149 key=lambda w: w._last_used or 0.0, 

150 ) 

151 

152 for tw in candidates: 

153 logger.info( 

154 f"VRAM eviction: stopping {tw.tool_name} to free memory for " 

155 f"{exclude_tool or 'new worker'} " 

156 f"(need {needed_gb:.1f}GB, have {vram_manager.get_free_vram():.1f}GB)" 

157 ) 

158 try: 

159 tw.stop() 

160 except Exception as e: 

161 logger.warning(f"eviction: failed to stop {tw.tool_name}: {e}") 

162 continue 

163 

164 free_gb = vram_manager.get_free_vram() 

165 if free_gb >= needed_gb: 

166 logger.info( 

167 f"VRAM eviction: freed enough ({free_gb:.1f}GB) for " 

168 f"{exclude_tool or 'new worker'}" 

169 ) 

170 return True 

171 

172 logger.warning( 

173 f"VRAM eviction: couldn't free enough ({vram_manager.get_free_vram():.1f}GB " 

174 f"free, need {needed_gb:.1f}GB)" 

175 ) 

176 return False 

177 

178 

179# ═══════════════════════════════════════════════════════════════════ 

180# Exceptions 

181# ═══════════════════════════════════════════════════════════════════ 

182 

183class WorkerError(Exception): 

184 """Base for worker errors.""" 

185 

186 

187class WorkerCrash(WorkerError): 

188 """Subprocess died (exit code != 0 or stdout closed).""" 

189 

190 

191class WorkerTimeout(WorkerError): 

192 """Worker did not respond within the timeout.""" 

193 

194 

195class WorkerNotReady(WorkerError): 

196 """Worker has not completed startup handshake.""" 

197 

198 

199# ═══════════════════════════════════════════════════════════════════ 

200# Parent-side: spawns and talks to the worker 

201# ═══════════════════════════════════════════════════════════════════ 

202 

203class GPUWorker: 

204 """Persistent GPU subprocess worker. 

205 

206 Thread-safe: one lock serializes requests (GPU inference is not 

207 parallelizable on a single device anyway). 

208 """ 

209 

210 # Signal line the worker prints to stdout when ready to serve. 

211 READY_MARKER = '__WORKER_READY__' 

212 # Emitted immediately after READY — carries the post-load GPU VRAM 

213 # measurement in GB as a float. Parent captures and forwards to 

214 # vram_manager.record_actual_usage. Workers that can't measure emit 

215 # 0.0 and the parent ignores the reading. 

216 VRAM_MARKER_PREFIX = '__WORKER_VRAM_GB__' 

217 

218 def __init__( 

219 self, 

220 name: str, 

221 module: str, 

222 startup_timeout: float = 60.0, 

223 request_timeout: float = 120.0, 

224 python_exe: Optional[str] = None, 

225 env: Optional[Dict[str, str]] = None, 

226 args: Optional[list] = None, 

227 ): 

228 """ 

229 Args: 

230 name: Worker name (for logs). 

231 module: Dotted module path to run via -m (e.g. 'tts.f5_worker'). 

232 startup_timeout: Seconds to wait for READY marker. 

233 request_timeout: Seconds to wait for each request response. 

234 python_exe: Python to use. Defaults to python-embed if present, 

235 else sys.executable. 

236 env: Extra env vars to pass to the subprocess. 

237 args: Extra CLI args appended after `-m module` (for variant 

238 dispatch in modules with multiple worker entry points). 

239 """ 

240 self.name = name 

241 self.module = module 

242 self.startup_timeout = startup_timeout 

243 self.request_timeout = request_timeout 

244 self.python_exe = python_exe or _resolve_python_exe() 

245 self.env = env or {} 

246 self.args = list(args) if args else [] 

247 

248 self._proc: Optional[subprocess.Popen] = None 

249 self._lock = threading.Lock() 

250 self._ready = False 

251 self._stderr_thread: Optional[threading.Thread] = None 

252 self._stdout_thread: Optional[threading.Thread] = None 

253 self._stdout_queue: queue.Queue = queue.Queue() 

254 

255 # ── Lifecycle ────────────────────────────────────────────────── 

256 

257 def start(self) -> None: 

258 """Spawn the subprocess and wait for READY handshake.""" 

259 with self._lock: 

260 if self._proc and self._proc.poll() is None and self._ready: 

261 return # already running 

262 

263 self._spawn() 

264 self._wait_ready() 

265 

266 def is_alive(self) -> bool: 

267 """True if subprocess is running and READY.""" 

268 return ( 

269 self._proc is not None 

270 and self._proc.poll() is None 

271 and self._ready 

272 ) 

273 

274 def stop(self, timeout: float = 5.0) -> None: 

275 """Gracefully stop the worker. Falls back to kill after timeout.""" 

276 with self._lock: 

277 if not self._proc: 

278 return 

279 if self._proc.poll() is None: 

280 try: 

281 # Send shutdown request. Worker exits on its own. 

282 self._write_line(json.dumps({'op': 'shutdown'})) 

283 except Exception: 

284 pass 

285 try: 

286 self._proc.wait(timeout=timeout) 

287 except subprocess.TimeoutExpired: 

288 logger.warning(f"{self.name}: shutdown timeout, killing") 

289 self._proc.kill() 

290 try: 

291 self._proc.wait(timeout=2) 

292 except subprocess.TimeoutExpired: 

293 pass 

294 self._proc = None 

295 self._ready = False 

296 

297 # ── Request/response ─────────────────────────────────────────── 

298 

299 def call(self, request: Dict[str, Any]) -> Dict[str, Any]: 

300 """Send a request to the worker and return the response. 

301 

302 Raises: 

303 WorkerCrash: subprocess died or closed stdout. 

304 WorkerTimeout: no response within request_timeout. 

305 WorkerNotReady: worker not started or crashed during startup. 

306 """ 

307 with self._lock: 

308 if not self.is_alive(): 

309 raise WorkerNotReady(f"{self.name}: worker is not running") 

310 

311 # Send request 

312 payload = json.dumps(request) 

313 try: 

314 self._write_line(payload) 

315 except (BrokenPipeError, OSError) as e: 

316 self._reap() 

317 raise WorkerCrash(f"{self.name}: write failed: {e}") 

318 

319 # Poll loop: wake up every 250ms to check if process died. 

320 # This lets us distinguish "timeout" (worker stuck) from 

321 # "crash" (worker died) without waiting the full request_timeout 

322 # when the subprocess has already exited. 

323 deadline = time.monotonic() + self.request_timeout 

324 while True: 

325 remaining = deadline - time.monotonic() 

326 if remaining <= 0: 

327 logger.error(f"{self.name}: request timeout, killing stuck worker") 

328 self._reap(force=True) 

329 raise WorkerTimeout( 

330 f"{self.name}: no response within {self.request_timeout}s" 

331 ) 

332 

333 line = self._read_line_with_timeout(min(0.25, remaining)) 

334 if line is not None: 

335 break # got a response 

336 

337 # No line yet — is the process still alive? 

338 if self._proc is None or self._proc.poll() is not None: 

339 code = self._proc.returncode if self._proc else -1 

340 self._reap() 

341 raise WorkerCrash( 

342 f"{self.name}: subprocess died (exit={code})" 

343 ) 

344 

345 try: 

346 return json.loads(line) 

347 except json.JSONDecodeError as e: 

348 raise WorkerError( 

349 f"{self.name}: invalid JSON response: {e}: {line[:200]}" 

350 ) 

351 

352 # ── Internals ────────────────────────────────────────────────── 

353 

354 def _spawn(self) -> None: 

355 """Launch the subprocess. 

356 

357 IMPORTANT: propagates the parent process's current sys.path to the 

358 child via PYTHONPATH. Before the subprocess-isolation refactor, TTS 

359 engines ran in-process and inherited any runtime sys.path mutations 

360 automatically (e.g. Nunba's app.py prepends ~/.nunba/site-packages/ 

361 where CUDA torch, regex, transformers, parler_tts actually live). 

362 Spawned subprocesses boot fresh from the python binary's default 

363 sys.path and can't see those entries — every transformers-based 

364 worker then crashes on `import regex` / `import transformers`. 

365 Fixing at the spawn layer means every worker (TTS, STT, VLM, any 

366 future engine) benefits without each one re-implementing the same 

367 path plumbing. 

368 """ 

369 env = os.environ.copy() 

370 env.update(self.env) 

371 # Unbuffered so stdout/stderr come through immediately 

372 env['PYTHONUNBUFFERED'] = '1' 

373 

374 # Propagate parent sys.path via PYTHONPATH so the child inherits 

375 # runtime-added package dirs (e.g. ~/.nunba/site-packages for CUDA 

376 # torch + TTS deps). Include real dirs AND zip / egg archives — 

377 # cx_Freeze bundles application packages (HARTOS's ``integrations`` 

378 # tree) inside ``library.zip``; excluding files broke worker 

379 # spawn under frozen Nunba.exe with ModuleNotFoundError on the 

380 # central dispatcher itself. Preserve an existing PYTHONPATH by 

381 # appending our paths to the front (caller-set overrides last). 

382 _extra_paths = [ 

383 p for p in sys.path 

384 if p and ( 

385 os.path.isdir(p) 

386 or (os.path.isfile(p) and p.lower().endswith(('.zip', '.egg'))) 

387 ) 

388 ] 

389 if _extra_paths: 

390 _existing = env.get('PYTHONPATH', '') 

391 _joined = os.pathsep.join(_extra_paths) 

392 if _existing: 

393 env['PYTHONPATH'] = _joined + os.pathsep + _existing 

394 else: 

395 env['PYTHONPATH'] = _joined 

396 

397 # Windows: don't pop a console window 

398 creationflags = 0 

399 if sys.platform == 'win32': 

400 creationflags = 0x08000000 # CREATE_NO_WINDOW 

401 

402 cmd = [self.python_exe, '-u', '-m', self.module, *self.args] 

403 logger.info(f"{self.name}: spawning {' '.join(cmd)}") 

404 

405 self._proc = subprocess.Popen( 

406 cmd, 

407 stdin=subprocess.PIPE, 

408 stdout=subprocess.PIPE, 

409 stderr=subprocess.PIPE, 

410 env=env, 

411 creationflags=creationflags, 

412 bufsize=1, # line-buffered 

413 text=True, 

414 encoding='utf-8', 

415 errors='replace', 

416 ) 

417 

418 # Drain stderr in a thread so it doesn't fill the pipe buffer. 

419 # Also forward worker log lines to our logger. 

420 self._stderr_thread = threading.Thread( 

421 target=self._drain_stderr, 

422 daemon=True, 

423 ) 

424 self._stderr_thread.start() 

425 

426 # Single persistent stdout reader thread that pumps lines into 

427 # a queue. Only ONE thread ever reads from proc.stdout — this 

428 # is essential because creating a new reader per-poll leaves 

429 # blocked readline() threads that corrupt subsequent reads. 

430 self._stdout_queue = queue.Queue() 

431 self._stdout_thread = threading.Thread( 

432 target=self._drain_stdout, 

433 daemon=True, 

434 ) 

435 self._stdout_thread.start() 

436 self._ready = False 

437 

438 def _drain_stderr(self) -> None: 

439 """Background thread: read worker stderr, log it, and surface 

440 structured failure signals to the central error_advice system. 

441 

442 Specifically: when a worker subprocess dies because of a missing 

443 Python package (``ModuleNotFoundError: No module named 'X'``), 

444 we forward the event to: 

445 

446 1. ``core.error_advice.handle_exception(agent_remediation=True)`` 

447 — creates a ``goal_type='self_heal'`` AgentGoal so the 

448 local coding agent can investigate WHY the package wasn't 

449 pre-installed (improves the freeze pip plan over time). 

450 2. Nunba's deterministic self-heal (``tts.package_installer 

451 ._self_heal_missing_transitives`` if importable) — pip- 

452 installs the missing package immediately, so the next 

453 worker spawn succeeds without waiting for the agent loop. 

454 

455 Why here and not in the worker subprocess: the worker is a 

456 sandboxed child with no DB session, no GoalManager, no agent 

457 engine. The parent has all of those, plus the ability to 

458 kick off a deterministic pip install. Single chokepoint per 

459 worker instance, idempotent via ``_self_heal_seen_modules``. 

460 

461 See ``core/error_advice.py`` for the full advice fan-out 

462 contract. See ``tts/package_installer.py`` for the 

463 deterministic-install sibling. 

464 """ 

465 if not self._proc or not self._proc.stderr: 

466 return 

467 if not hasattr(self, '_self_heal_seen_modules'): 

468 self._self_heal_seen_modules = set() 

469 try: 

470 for line in self._proc.stderr: 

471 line = line.rstrip() 

472 if line: 

473 logger.info(f"[{self.name}] {line}") 

474 self._maybe_self_heal_from_line(line) 

475 except Exception: 

476 pass # pipe closed, process dead 

477 

478 def _maybe_self_heal_from_line(self, line: str) -> None: 

479 """Pattern-match worker stderr for ``ModuleNotFoundError`` and 

480 kick off agentic + deterministic remediation. Idempotent per 

481 ``(worker, package)`` so a flood of identical traceback frames 

482 produces exactly one heal attempt.""" 

483 import re 

484 match = re.search( 

485 r"ModuleNotFoundError: No module named ['\"]([\w.\-]+)['\"]", 

486 line, 

487 ) 

488 if not match: 

489 return 

490 pkg = match.group(1) 

491 if pkg in self._self_heal_seen_modules: 

492 return 

493 self._self_heal_seen_modules.add(pkg) 

494 

495 logger.warning( 

496 f"{self.name}: subprocess missing Python package '{pkg}' — " 

497 f"dispatching to error_advice + deterministic self-heal" 

498 ) 

499 

500 # 1) Agentic side: create a self_heal goal for the coding agent. 

501 # Throttled per-fingerprint inside error_advice; safe to call 

502 # on every detected event. 

503 try: 

504 from core.error_advice import handle_exception 

505 synthetic = ModuleNotFoundError(f"No module named '{pkg}'") 

506 synthetic.name = pkg # type: ignore[attr-defined] 

507 handle_exception( 

508 synthetic, 

509 category='subprocess.tool_load', 

510 severity='high', 

511 agent_remediation=True, 

512 context={ 

513 'worker_name': self.name, 

514 'worker_module': self.module, 

515 'missing_package': pkg, 

516 'remediation_hint': ( 

517 f"Add '{pkg}' to the freeze pip plan (Nunba " 

518 f"scripts/setup_freeze_nunba.py _tts_deps or " 

519 f"the appropriate _<X>_deps tuple) so it's " 

520 f"bundled into python-embed/Lib/site-packages " 

521 f"on the next build, AND add it to " 

522 f"tts/package_installer.py legacy fallback " 

523 f"plan for runtime self-heal coverage." 

524 ), 

525 }, 

526 ) 

527 except Exception as e: 

528 logger.debug(f"{self.name}: error_advice dispatch skipped: {e}") 

529 

530 # 2) Deterministic fast-path: pip-install the missing package 

531 # directly so the next worker spawn picks it up without 

532 # waiting for the agentic loop. Uses the same 

533 # ``self.python_exe`` the worker subprocess used (so the 

534 # package lands in the SAME interpreter's site-packages, 

535 # e.g. python-embed/Lib/site-packages on a frozen install, 

536 # or the dev .venv on a source run). Spawned in a daemon 

537 # thread so we don't block the stderr drain loop. 

538 def _install_async(): 

539 try: 

540 # `--target` to user-site keeps it consistent with 

541 # tts.package_installer's existing pattern: bundled 

542 # python-embed is read-only on Program Files installs, 

543 # so user-writable site-packages is required. We rely 

544 # on the user-site already being on sys.path (set by 

545 # platform_paths.ensure_user_site_on_path at boot). 

546 target = self._user_site_packages_dir() 

547 pip_args = [ 

548 self.python_exe, '-m', 'pip', 'install', 

549 '--no-build-isolation', '--progress-bar', 'off', 

550 '--disable-pip-version-check', 

551 ] 

552 if target: 

553 pip_args.extend(['--target', target]) 

554 pip_args.append(pkg) 

555 logger.info( 

556 f"{self.name}: deterministic pip install: {pkg} → " 

557 f"{target or '<default site>'}" 

558 ) 

559 rc = subprocess.run( 

560 pip_args, capture_output=True, text=True, timeout=180, 

561 ).returncode 

562 if rc == 0: 

563 logger.info( 

564 f"{self.name}: '{pkg}' installed; respawn worker to " 

565 f"retry tool load." 

566 ) 

567 else: 

568 logger.warning( 

569 f"{self.name}: pip install '{pkg}' rc={rc} — " 

570 f"agent self-heal goal will pick up from here." 

571 ) 

572 except Exception as e: 

573 logger.debug( 

574 f"{self.name}: deterministic pip install for '{pkg}' " 

575 f"skipped: {e}" 

576 ) 

577 

578 threading.Thread( 

579 target=_install_async, daemon=True, 

580 name=f"self-heal-{self.name}-{pkg}", 

581 ).start() 

582 

583 def _user_site_packages_dir(self) -> Optional[str]: 

584 """Return the user-writable site-packages dir for runtime 

585 installs, or None if Nunba's platform_paths helper isn't 

586 importable (HARTOS-only deploys). 

587 """ 

588 try: 

589 from tts.package_installer import get_user_site_packages # type: ignore 

590 return get_user_site_packages() 

591 except Exception: 

592 return None 

593 

594 def _drain_stdout(self) -> None: 

595 """Background thread: read worker stdout into the line queue. 

596 

597 Single reader thread guarantees no interleaved reads. Sentinel 

598 value None marks EOF so the main thread knows to stop waiting. 

599 """ 

600 if not self._proc or not self._proc.stdout: 

601 return 

602 try: 

603 for line in self._proc.stdout: 

604 self._stdout_queue.put(line.rstrip('\n')) 

605 except Exception: 

606 pass # pipe closed 

607 finally: 

608 self._stdout_queue.put(None) # EOF sentinel 

609 

610 def _wait_ready(self) -> None: 

611 """Block until worker prints READY marker or exits.""" 

612 deadline = time.monotonic() + self.startup_timeout 

613 while time.monotonic() < deadline: 

614 if self._proc is None or self._proc.poll() is not None: 

615 code = self._proc.returncode if self._proc else -1 

616 raise WorkerCrash(f"{self.name}: died during startup (exit={code})") 

617 

618 line = self._read_line_with_timeout(0.5) 

619 if line is None: 

620 continue 

621 stripped = line.strip() 

622 if stripped == self.READY_MARKER: 

623 self._ready = True 

624 logger.info(f"{self.name}: worker ready") 

625 return 

626 # VRAM-measurement marker — emitted by workers BEFORE READY. 

627 # Forward the measurement and keep looping for READY. 

628 if stripped.startswith(self.VRAM_MARKER_PREFIX): 

629 self._handle_vram_marker(stripped) 

630 continue 

631 # Ignore any other startup chatter 

632 logger.debug(f"[{self.name}] startup: {stripped}") 

633 

634 # Timeout 

635 self._reap(force=True) 

636 raise WorkerTimeout(f"{self.name}: startup timeout ({self.startup_timeout}s)") 

637 

638 def _handle_vram_marker(self, stripped_line: str) -> None: 

639 """Parse a '__WORKER_VRAM_GB__<n>' startup marker and forward the 

640 measurement to vram_manager.record_actual_usage. 

641 

642 Non-fatal on parse error: older workers don't emit this marker; 

643 a failed parse just means we fall back to the declared 

644 VRAM_BUDGETS entry. 

645 """ 

646 payload = stripped_line[len(self.VRAM_MARKER_PREFIX):].strip() 

647 try: 

648 gb = float(payload) 

649 except ValueError: 

650 logger.debug(f"{self.name}: unparseable VRAM marker '{payload}'") 

651 return 

652 try: 

653 from integrations.service_tools.vram_manager import vram_manager 

654 vram_manager.record_actual_usage(self.name, gb) 

655 except Exception as e: 

656 logger.debug(f"{self.name}: VRAM marker forward failed: {e}") 

657 

658 def _write_line(self, line: str) -> None: 

659 if not self._proc or not self._proc.stdin: 

660 raise WorkerCrash(f"{self.name}: stdin closed") 

661 self._proc.stdin.write(line + '\n') 

662 self._proc.stdin.flush() 

663 

664 def _read_line_with_timeout(self, timeout: float) -> Optional[str]: 

665 """Read one line from stdout with a timeout. 

666 

667 Reads from the stdout queue populated by `_drain_stdout`. The 

668 queue handles the timeout natively and there's only ever one 

669 reader thread touching proc.stdout, so no corruption. 

670 

671 Returns: 

672 The line (without trailing newline), or None if timeout or EOF. 

673 """ 

674 try: 

675 line = self._stdout_queue.get(timeout=timeout) 

676 except queue.Empty: 

677 return None 

678 # None is the EOF sentinel from _drain_stdout 

679 if line is None: 

680 return None 

681 return line 

682 

683 def _reap(self, force: bool = False) -> None: 

684 """Clean up after a crashed/stuck worker.""" 

685 if not self._proc: 

686 return 

687 if self._proc.poll() is None: 

688 if force: 

689 self._proc.kill() 

690 else: 

691 self._proc.terminate() 

692 try: 

693 self._proc.wait(timeout=2) 

694 except subprocess.TimeoutExpired: 

695 self._proc.kill() 

696 self._proc = None 

697 self._ready = False 

698 

699 

700# ═══════════════════════════════════════════════════════════════════ 

701# Worker-side: helper to run a model in a loop 

702# ═══════════════════════════════════════════════════════════════════ 

703 

704def run_worker( 

705 name: str, 

706 load: Callable[[], Any], 

707 handle: Callable[[Any, Dict[str, Any]], Dict[str, Any]], 

708) -> None: 

709 """Main loop for a worker subprocess. 

710 

711 Args: 

712 name: Worker name (for logs written to stderr). 

713 load: Called once to load the model. Return anything — it's 

714 passed to `handle` as the first arg. 

715 handle: Called once per request. Receives (model, request_dict) 

716 and must return a JSON-serializable dict. 

717 

718 Protocol: 

719 1. Worker loads the model. 

720 2. Worker writes READY marker to the protected protocol channel. 

721 3. For each stdin line (JSON request): 

722 - If op == 'shutdown', exit cleanly. 

723 - Otherwise call handle(model, request), write JSON response. 

724 4. Load failure → exit code 2. 

725 5. Recoverable handler error → error JSON on protocol channel, continue. 

726 6. stdin EOF → exit code 0. 

727 7. Unexpected fatal error → exit code 3. 

728 

729 Stdout isolation: 

730 Before loading the model, we duplicate fd 1 to a private file 

731 handle (_protocol) and redirect fd 1 + sys.stdout to stderr. 

732 This means ANY library inside the worker (torch, F5, HF, 

733 transformers, CUDA init messages) that writes to stdout is 

734 safely rerouted to stderr, where the parent drains it into a 

735 log stream. The protocol channel stays 100% clean for JSON. 

736 """ 

737 # ── Stdout isolation (C5 fix) ────────────────────────────────── 

738 # Save a private writer pointing at the ORIGINAL fd 1, then 

739 # redirect fd 1 and sys.stdout to stderr so no stray prints 

740 # corrupt the JSON protocol. 

741 _protocol_fd = os.dup(1) 

742 _protocol = os.fdopen(_protocol_fd, 'w', buffering=1, encoding='utf-8') 

743 try: 

744 os.dup2(2, 1) # fd 1 -> fd 2 (stderr) — catches C-level writes too 

745 except OSError: 

746 pass # unusual platforms / closed stderr 

747 sys.stdout = sys.stderr # catches Python-level print() 

748 

749 def _emit(obj) -> None: 

750 """Write one protocol message (JSON or marker string).""" 

751 if isinstance(obj, str): 

752 _protocol.write(obj + '\n') 

753 else: 

754 _protocol.write(json.dumps(obj) + '\n') 

755 _protocol.flush() 

756 

757 # Route all logging to stderr to keep the protocol channel clean. 

758 # Explicit handler setup (not basicConfig) because other modules 

759 # imported by the worker may have already called basicConfig, which 

760 # would make ours a silent no-op. 

761 _root_logger = logging.getLogger() 

762 for h in list(_root_logger.handlers): 

763 _root_logger.removeHandler(h) 

764 _h = logging.StreamHandler(sys.stderr) 

765 _h.setFormatter(logging.Formatter(f'[{name}] %(message)s')) 

766 _root_logger.addHandler(_h) 

767 _root_logger.setLevel(logging.INFO) 

768 worker_log = logging.getLogger(f'worker.{name}') 

769 

770 # Self-identify: which Python is running this worker, and which 

771 # site-packages it can see. Required for #92 — venv-spawn pickup 

772 # confirmation. Pre-2026-05-07 trace evidence (older 

773 # tts_chatterbox_turbo.err showed C:\miniconda3\Lib\unittest\mock.py 

774 # in the stack) suggested the worker was sometimes loaded from the 

775 # WRONG Python interpreter (host miniconda) instead of the engine's 

776 # quarantine venv (~/Documents/Nunba/data/venvs/<engine>/). 

777 # Logging sys.executable + sys.prefix at startup makes this 

778 # observable: every TTS engine's worker stderr should now show 

779 # the venv path, NOT the main interpreter. 

780 try: 

781 worker_log.info( 

782 f'startup: python={sys.executable} prefix={sys.prefix} ' 

783 f'site_packages_top3={sys.path[:3]!r}' 

784 ) 

785 except Exception: 

786 # Never let a logging blunder block model load 

787 pass 

788 

789 # ── Phase 1: load model ──────────────────────────────────────── 

790 try: 

791 worker_log.info('loading model...') 

792 t0 = time.time() 

793 model = load() 

794 worker_log.info(f'loaded in {time.time() - t0:.1f}s') 

795 except Exception as e: 

796 worker_log.exception(f'load failed: {e}') 

797 sys.exit(2) 

798 

799 # ── Phase 2a: self-report post-load VRAM ────────────────────── 

800 # Emitted BEFORE READY so the parent's _wait_ready loop sees it 

801 # in-order on the stdout queue. (Placing it after READY races 

802 # against the next call(): if 'ready' arrives first and the 

803 # marker follows late, the marker ends up being consumed as a 

804 # response payload → invalid-JSON error.) 

805 # 

806 # Best-effort: workers that can't measure (CPU-only, Metal, 

807 # torch missing) emit 0.0 and the parent ignores the reading. 

808 # Lets us stub conservative budgets (e.g. new OmniVoice at 

809 # 3.0 GB) and auto-correct after the first real load. 

810 try: 

811 import torch as _torch_vram_probe 

812 if _torch_vram_probe.cuda.is_available(): 

813 _measured = _torch_vram_probe.cuda.memory_allocated(0) / (1024 ** 3) 

814 else: 

815 _measured = 0.0 

816 except Exception: 

817 _measured = 0.0 

818 _emit(f'{GPUWorker.VRAM_MARKER_PREFIX}{_measured:.3f}') 

819 

820 # ── Phase 2b: announce ready ─────────────────────────────────── 

821 _emit(GPUWorker.READY_MARKER) 

822 

823 # ── Phase 3: serve requests ──────────────────────────────────── 

824 try: 

825 for line in sys.stdin: 

826 line = line.strip() 

827 if not line: 

828 continue 

829 

830 try: 

831 req = json.loads(line) 

832 except json.JSONDecodeError as e: 

833 _emit({'error': f'invalid JSON: {e}'}) 

834 continue 

835 

836 if req.get('op') == 'shutdown': 

837 worker_log.info('shutdown requested') 

838 sys.exit(0) 

839 

840 try: 

841 t0 = time.time() 

842 result = handle(model, req) 

843 if isinstance(result, dict): 

844 result.setdefault('latency_ms', round((time.time() - t0) * 1000, 1)) 

845 else: 

846 result = {'result': result} 

847 except Exception as e: 

848 worker_log.exception('handler failed') 

849 result = {'error': f'{type(e).__name__}: {e}'} 

850 

851 # Serialize defensively — a non-JSON-serializable field in the 

852 # result would otherwise raise inside this try and take down the 

853 # worker. Catch that and return a structured error. 

854 try: 

855 _emit(result) 

856 except (TypeError, ValueError) as e: 

857 _emit({'error': f'response serialization failed: {e}'}) 

858 

859 except KeyboardInterrupt: 

860 pass 

861 except Exception as e: 

862 worker_log.exception(f'fatal: {e}') 

863 sys.exit(3) 

864 

865 sys.exit(0) 

866 

867 

868# ═══════════════════════════════════════════════════════════════════ 

869# Python resolver: prefer python-embed in frozen builds 

870# ═══════════════════════════════════════════════════════════════════ 

871 

872def _resolve_python_exe() -> str: 

873 """Return the Python executable to use for workers. 

874 

875 Preference: 

876 1. $HARTOS_WORKER_PYTHON env var (explicit override) 

877 2. python-embed next to the frozen exe (Nunba bundled build) 

878 3. sys.executable (dev mode) 

879 """ 

880 override = os.environ.get('HARTOS_WORKER_PYTHON') 

881 if override and os.path.isfile(override): 

882 return override 

883 

884 # Frozen build: python-embed sibling to Nunba.exe 

885 if getattr(sys, 'frozen', False): 

886 app_dir = os.path.dirname(os.path.abspath(sys.executable)) 

887 candidate = os.path.join(app_dir, 'python-embed', 'python.exe') 

888 if os.path.isfile(candidate): 

889 return candidate 

890 

891 return sys.executable 

892 

893 

894def _resolve_backend_venv_python(tool_name: Optional[str]) -> Optional[str]: 

895 """Return the per-backend venv's python.exe if one exists. 

896 

897 Thin shim over ``core.venv_paths.venv_python_if_exists`` — kept as 

898 a module-local name so the spawn-site call signature is stable. 

899 The single source of truth lives in ``core.venv_paths`` and is 

900 shared with ``tts.backend_venv`` so install + spawn paths can 

901 never drift apart. 

902 """ 

903 from core.venv_paths import venv_python_if_exists 

904 return venv_python_if_exists(tool_name) 

905 

906 

907# ═══════════════════════════════════════════════════════════════════ 

908# High-level helper: one-call tool wrapper 

909# ═══════════════════════════════════════════════════════════════════ 

910# 

911# Each GPU tool (F5, Chatterbox, CosyVoice, Indic Parler, Whisper, ...) 

912# needs the same parent-side boilerplate: 

913# - Lazy singleton GPUWorker 

914# - Thread-safe get-or-start 

915# - Call → catch crash → return transient error 

916# - VRAM budget allocation 

917# - Output path auto-generation 

918# - Uniform result JSON shape 

919# 

920# `ToolWorker` encapsulates ALL of it. Per-tool modules only specify: 

921# - worker module name 

922# - VRAM tool name (for budget allocation) 

923# - output subdir (for auto-generated paths) 

924# - engine display name 

925# 

926# Single responsibility: GPUWorker = IPC. ToolWorker = tool lifecycle. 

927 

928class ToolWorker: 

929 """Reusable parent-side wrapper for GPU tools. 

930 

931 Holds a lazily-started GPUWorker and exposes a single `synthesize()` 

932 method that handles: worker start, crash recovery, VRAM budget, 

933 output path generation, and result JSON shaping. 

934 

935 One instance per tool, module-level singleton is typical. 

936 """ 

937 

938 # Central entry module — every worker spawns through here 

939 _DISPATCHER = 'integrations.service_tools.gpu_worker' 

940 

941 def __init__( 

942 self, 

943 *, 

944 tool_name: str, 

945 tool_module: Optional[str] = None, 

946 vram_budget: str, 

947 output_subdir: str, 

948 engine: str, 

949 variant: Optional[str] = None, 

950 startup_timeout: float = 90.0, 

951 request_timeout: float = 120.0, 

952 idle_timeout: float = 300.0, 

953 python_exe: Optional[str] = None, 

954 # Back-compat aliases — callers on the old API still work 

955 worker_module: Optional[str] = None, 

956 worker_args: Optional[list] = None, 

957 ): 

958 """ 

959 Args: 

960 tool_name: Logical tool name (e.g. 'f5_tts'). 

961 tool_module: Dotted path to the library module that defines 

962 `_load` / `_synthesize` (and variant-suffixed 

963 versions). The parent spawns the centralized 

964 dispatcher which imports this module in the 

965 subprocess. No `__main__` block needed in the 

966 tool module itself. 

967 vram_budget: Key in VRAM_BUDGETS (e.g. 'tts_f5'). 

968 output_subdir: Subdir under ~/.hevolve/models for auto outputs. 

969 engine: Display name for the result JSON (e.g. 'f5-tts'). 

970 variant: Optional variant suffix (e.g. 'turbo', 'ml'). When 

971 set, the dispatcher picks `_load_<variant>` and 

972 `_synthesize_<variant>` instead of the plain names. 

973 startup_timeout: Seconds to wait for worker READY handshake. 

974 request_timeout: Max seconds for a single request. 

975 idle_timeout: Seconds of inactivity after which the worker is 

976 auto-stopped to free VRAM. Default 5 min. 

977 Set to 0 to disable auto-stop. 

978 python_exe: Python interpreter to spawn the worker subprocess 

979 under. None (default) = `_resolve_python_exe()`, 

980 which picks python-embed when present else 

981 sys.executable. Use this to run the worker inside 

982 a per-engine venv (e.g. parler-tts needs 

983 transformers==4.46.x while main has 5.x; pass 

984 the venv's python.exe path here so the dispatch 

985 subprocess sees the pinned deps). Read at 

986 `_get_or_start` time, so callers may set 

987 `tool.python_exe = '...'` after construction 

988 before first synth. 

989 

990 worker_module / worker_args: DEPRECATED — legacy aliases. If 

991 `tool_module` is not given, we fall back to `worker_module`. 

992 If `worker_args=['<variant>']` is given, we fall back to 

993 that for `variant`. New callers should use `tool_module` 

994 + `variant` directly. 

995 """ 

996 # Resolve the library module the worker should import 

997 self.tool_module = tool_module or worker_module 

998 if self.tool_module is None: 

999 raise ValueError( 

1000 f'{tool_name}: ToolWorker needs tool_module (library to run)' 

1001 ) 

1002 

1003 # Resolve the variant — prefer explicit, fall back to legacy args 

1004 if variant is None and worker_args: 

1005 variant = worker_args[0] if worker_args else None 

1006 self.variant = variant 

1007 

1008 self.tool_name = tool_name 

1009 self.vram_budget = vram_budget 

1010 self.output_subdir = output_subdir 

1011 self.engine = engine 

1012 self.startup_timeout = startup_timeout 

1013 self.request_timeout = request_timeout 

1014 self.idle_timeout = idle_timeout 

1015 self.python_exe = python_exe 

1016 

1017 self._worker: Optional[GPUWorker] = None 

1018 self._lock = threading.Lock() 

1019 # Init to NOW so a brand-new worker isn't LRU-evicted immediately. 

1020 # The old 0.0 default made fresh workers look like the oldest idle 

1021 # in the LRU sort, causing premature eviction before first use. 

1022 self._last_used: float = time.monotonic() 

1023 self._idle_timer: Optional[threading.Timer] = None 

1024 # State change observers. Listeners receive (tool_name, event) 

1025 # where event is 'spawned' | 'stopped' | 'crashed'. Fired AFTER 

1026 # the state transition is complete so observers can probe 

1027 # is_alive() and get the new state. 

1028 self._observers: list = [] 

1029 

1030 # Register with the cross-worker registry so the LRU eviction 

1031 # policy can see this worker when other tools need VRAM. 

1032 _register_tool_worker(self) 

1033 

1034 # Back-compat shims for properties the tests still read 

1035 @property 

1036 def worker_module(self) -> str: 

1037 """Compat alias — old tests read this.""" 

1038 return self.tool_module 

1039 

1040 @property 

1041 def worker_args(self) -> list: 

1042 """Compat alias — old tests read this as ['<variant>'] or [].""" 

1043 return [self.variant] if self.variant else [] 

1044 

1045 # ── Observer API ────────────────────────────────────────────── 

1046 

1047 def add_observer(self, callback: Callable[[str, str], None]) -> None: 

1048 """Register a state change listener. 

1049 

1050 The callback will be invoked with (tool_name, event) on every 

1051 state transition: 'spawned' when the worker subprocess becomes 

1052 READY, 'stopped' when it's cleanly stopped (user action or 

1053 idle auto-stop), 'crashed' when the subprocess dies mid-call. 

1054 

1055 Called after the transition so listeners can call .is_alive() 

1056 and see ground truth. Exceptions in listeners are swallowed — 

1057 a broken observer must never take down the worker. 

1058 """ 

1059 self._observers.append(callback) 

1060 

1061 def remove_observer(self, callback: Callable[[str, str], None]) -> None: 

1062 """Unregister a previously-added observer (no-op if not present).""" 

1063 try: 

1064 self._observers.remove(callback) 

1065 except ValueError: 

1066 pass 

1067 

1068 def _notify(self, event: str) -> None: 

1069 """Fire a state-change event to every registered observer.""" 

1070 for cb in list(self._observers): 

1071 try: 

1072 cb(self.tool_name, event) 

1073 except Exception as e: 

1074 logger.debug(f"observer {cb} failed for {event}: {e}") 

1075 

1076 # ── Public API ──────────────────────────────────────────────── 

1077 

1078 def call(self, request: Dict[str, Any]) -> Dict[str, Any]: 

1079 """Send a request to the worker. Handles start, crash, timeout. 

1080 

1081 Returns the worker's raw response dict (with an 'error' key on 

1082 transient failure). Does NOT shape the result — callers that 

1083 want uniform JSON output should use `synthesize()` instead. 

1084 """ 

1085 # VRAM allocation moved INSIDE _get_or_start (after the _worker is 

1086 # None check) to prevent double-counting when two concurrent call() 

1087 # invocations both allocate before either enters the lock. See T138. 

1088 try: 

1089 worker = self._get_or_start() 

1090 except (WorkerCrash, WorkerTimeout, WorkerError) as e: 

1091 return {'error': f'{self.tool_name} worker startup failed: {e}'} 

1092 

1093 try: 

1094 result = worker.call(request) 

1095 except (WorkerCrash, WorkerTimeout) as e: 

1096 # Subprocess died. Worker will respawn on next call. 

1097 logger.warning(f"{self.tool_name}: worker crash: {e}") 

1098 self._notify('crashed') 

1099 return {'error': f'{self.tool_name} crashed: {e}', 'transient': True} 

1100 except WorkerError as e: 

1101 return {'error': f'{self.tool_name} worker error: {e}'} 

1102 

1103 # Request succeeded — reset idle timer 

1104 self._touch_idle() 

1105 return result 

1106 

1107 def synthesize( 

1108 self, 

1109 *, 

1110 text: str, 

1111 language: str = 'en', 

1112 voice: Optional[str] = None, 

1113 output_path: Optional[str] = None, 

1114 default_sample_rate: int = 24000, 

1115 extra_request: Optional[Dict[str, Any]] = None, 

1116 ) -> str: 

1117 """Complete TTS synthesis with uniform JSON output. 

1118 

1119 Handles all the per-tool boilerplate (text validation, output 

1120 path generation, result shaping) so the tool module is ~10 lines. 

1121 

1122 Returns a JSON string matching the original in-process tool 

1123 output shape for drop-in compatibility. 

1124 """ 

1125 if not text or not text.strip(): 

1126 return json.dumps({'error': 'Text is required'}) 

1127 

1128 if output_path is None: 

1129 output_path = str( 

1130 self._get_output_dir() / f'{self.tool_name}_{int(time.time() * 1000)}.wav' 

1131 ) 

1132 

1133 request = { 

1134 'text': text, 

1135 'language': language, 

1136 'voice': voice, 

1137 'output_path': output_path, 

1138 } 

1139 if extra_request: 

1140 request.update(extra_request) 

1141 

1142 t0 = time.time() 

1143 result = self.call(request) 

1144 elapsed = time.time() - t0 

1145 

1146 if 'error' in result: 

1147 return json.dumps(result) 

1148 

1149 duration = result.get('duration', 0) 

1150 return json.dumps({ 

1151 'path': result.get('path', output_path), 

1152 'duration': duration, 

1153 'engine': result.get('engine', self.engine), 

1154 'device': result.get('device', 'cuda'), 

1155 'sample_rate': result.get('sample_rate', default_sample_rate), 

1156 'voice': voice or 'default', 

1157 'language': language, 

1158 'latency_ms': round(elapsed * 1000, 1), 

1159 'rtf': round(elapsed / duration, 4) if duration > 0 else 0, 

1160 }) 

1161 

1162 def is_alive(self) -> bool: 

1163 """True if the worker subprocess is running and READY.""" 

1164 w = self._worker 

1165 return w is not None and w.is_alive() 

1166 

1167 def set_idle_timeout(self, seconds: float) -> None: 

1168 """Update the idle auto-stop threshold. 

1169 

1170 Called by the model loader when the catalog entry's 

1171 idle_timeout_s changes, so admin-UI edits take effect without 

1172 restarting the worker. If a worker is currently running and 

1173 the timeout was previously disabled, arms the timer now. 

1174 """ 

1175 self.idle_timeout = max(0.0, float(seconds)) 

1176 if self.idle_timeout > 0 and self.is_alive(): 

1177 # Re-arm timer with the new deadline 

1178 self._touch_idle() 

1179 elif self.idle_timeout <= 0 and self._idle_timer is not None: 

1180 with self._lock: 

1181 self._idle_timer.cancel() 

1182 self._idle_timer = None 

1183 

1184 def stop(self) -> None: 

1185 """Stop the worker and release VRAM.""" 

1186 was_running = False 

1187 with self._lock: 

1188 if self._idle_timer is not None: 

1189 self._idle_timer.cancel() 

1190 self._idle_timer = None 

1191 if self._worker is not None: 

1192 was_running = True 

1193 try: 

1194 self._worker.stop() 

1195 except Exception: 

1196 pass 

1197 self._worker = None 

1198 self._release_vram() 

1199 logger.info(f"{self.tool_name}: worker stopped") 

1200 if was_running: 

1201 self._notify('stopped') 

1202 

1203 # ── Idle timeout ───────────────────────────────────────────── 

1204 

1205 def _touch_idle(self) -> None: 

1206 """Record that the worker was just used; (re)arm the idle timer.""" 

1207 if self.idle_timeout <= 0: 

1208 return 

1209 with self._lock: 

1210 self._last_used = time.monotonic() 

1211 if self._idle_timer is not None: 

1212 self._idle_timer.cancel() 

1213 self._idle_timer = threading.Timer(self.idle_timeout, self._on_idle) 

1214 self._idle_timer.daemon = True 

1215 self._idle_timer.start() 

1216 

1217 def _on_idle(self) -> None: 

1218 """Fired when idle_timeout elapses with no requests. 

1219 

1220 TOCTOU fix (T138): the elapsed-time check AND the stop decision 

1221 both run INSIDE the lock. The old code checked elapsed under the 

1222 lock, then called stop() OUTSIDE it — a concurrent call() could 

1223 update _last_used between the check and the stop, killing a 

1224 worker that was actively serving a request. 

1225 """ 

1226 if self.idle_timeout <= 0: 

1227 return 

1228 with self._lock: 

1229 elapsed = time.monotonic() - self._last_used 

1230 if elapsed + 0.5 < self.idle_timeout: 

1231 return # someone used it recently, skip 

1232 # Stop INSIDE the lock so no concurrent call() can slip in 

1233 # between the elapsed check and the actual termination. 

1234 if self._worker is not None and self._worker.is_alive(): 

1235 logger.info( 

1236 f"{self.tool_name}: idle for {self.idle_timeout:.0f}s, " 

1237 f"stopping worker to free VRAM" 

1238 ) 

1239 try: 

1240 self._worker.stop() 

1241 except Exception: 

1242 pass 

1243 self._worker = None 

1244 self._release_vram() 

1245 # Notify OUTSIDE the lock (observers may call back into us) 

1246 self._notify('stopped') 

1247 

1248 # ── Internals ───────────────────────────────────────────────── 

1249 

1250 def _get_or_start(self) -> GPUWorker: 

1251 """Lazy singleton: start the worker on first call or after a crash. 

1252 

1253 Spawns the centralized dispatcher (`gpu_worker` module) with the 

1254 target tool module (+ optional variant) as CLI args. Tool modules 

1255 do NOT need their own `if __name__ == '__main__':` blocks — the 

1256 dispatcher imports them and picks up `_load` / `_synthesize`. 

1257 """ 

1258 spawned = False 

1259 with self._lock: 

1260 if self._worker is None or not self._worker.is_alive(): 

1261 # Allocate VRAM INSIDE the lock so concurrent call() 

1262 # invocations don't double-count. T138 fix (c). 

1263 # If allocate() returns False (GPU full), try evicting 

1264 # LRU workers first, then retry. If still False after 

1265 # eviction, the spawn will proceed but on CPU fallback 

1266 # path (subprocess self-detects VRAM at _load time). 

1267 allocated = self._allocate_vram() 

1268 

1269 # Cross-worker VRAM eviction: if our budget won't fit 

1270 # in current free VRAM, stop LRU other workers to make 

1271 # room BEFORE spawning. Prevents a predictable OOM in 

1272 # the new subprocess while older idle workers hold 

1273 # memory they're not actively using. 

1274 self._ensure_vram_headroom() 

1275 

1276 # Retry allocate after eviction so the budget is tracked. 

1277 if not allocated: 

1278 self._allocate_vram() 

1279 

1280 cli_args = [self.tool_module] 

1281 if self.variant: 

1282 cli_args.append(self.variant) 

1283 # Resolve the spawn interpreter at start time, not at 

1284 # ToolWorker.__init__ time: the per-backend venv is 

1285 # typically created lazily on first install, AFTER the 

1286 # ToolWorker singleton has been constructed. Order: 

1287 # 1. Explicit self.python_exe (test override / caller-set) 

1288 # 2. Per-backend venv at <data>/venvs/<tool_name>/ ← venv-installed deps 

1289 # 3. GPUWorker default = _resolve_python_exe() (python-embed) 

1290 spawn_python = ( 

1291 self.python_exe 

1292 or _resolve_backend_venv_python(self.tool_name) 

1293 ) 

1294 self._worker = GPUWorker( 

1295 name=self.tool_name, 

1296 module=self._DISPATCHER, 

1297 startup_timeout=self.startup_timeout, 

1298 request_timeout=self.request_timeout, 

1299 python_exe=spawn_python, 

1300 args=cli_args, 

1301 ) 

1302 self._worker.start() 

1303 spawned = True 

1304 worker = self._worker 

1305 if spawned: 

1306 # Fire observer event OUTSIDE the lock so callbacks can 

1307 # safely call back into this ToolWorker without deadlocking. 

1308 self._notify('spawned') 

1309 return worker 

1310 

1311 def _ensure_vram_headroom(self) -> None: 

1312 """Evict LRU workers if VRAM is too tight for this tool to fit. 

1313 

1314 Looks up this tool's declared VRAM budget from vram_manager. 

1315 If current free VRAM is below that budget, calls try_free_vram 

1316 to stop other workers (oldest-idle first). Silent no-op when 

1317 the tool has no registered budget or vram_manager is unavailable. 

1318 """ 

1319 try: 

1320 from integrations.service_tools.vram_manager import ( 

1321 vram_manager, VRAM_BUDGETS, 

1322 ) 

1323 except ImportError: 

1324 return 

1325 budget = VRAM_BUDGETS.get(self.vram_budget) 

1326 if not budget: 

1327 return 

1328 _min_vram, model_gb = budget 

1329 free_gb = vram_manager.get_free_vram() 

1330 if free_gb >= model_gb: 

1331 return 

1332 try_free_vram(needed_gb=model_gb, exclude_tool=self.tool_name) 

1333 

1334 def _get_output_dir(self) -> Path: 

1335 d = Path(os.environ.get( 

1336 'HEVOLVE_MODEL_DIR', 

1337 os.path.expanduser('~/.hevolve/models'), 

1338 )) / self.output_subdir 

1339 d.mkdir(parents=True, exist_ok=True) 

1340 return d 

1341 

1342 def _allocate_vram(self) -> bool: 

1343 """Attempt to reserve VRAM. Returns True on success, False if 

1344 the allocation was refused (caller may fall back to CPU). 

1345 

1346 This honors the vram_manager.allocate contract: False means 

1347 'won't fit', not 'error'. Exceptions during import/allocation 

1348 are treated as 'unknown state -> allow', matching prior 

1349 default-allow behavior while still surfacing real refusals. 

1350 """ 

1351 try: 

1352 from integrations.service_tools.vram_manager import get_vram_manager 

1353 return bool(get_vram_manager().allocate(self.vram_budget)) 

1354 except ImportError: 

1355 return True 

1356 except Exception: 

1357 return True 

1358 

1359 def _release_vram(self) -> None: 

1360 try: 

1361 from integrations.service_tools.vram_manager import get_vram_manager 

1362 get_vram_manager().release(self.vram_budget) 

1363 except (ImportError, Exception): 

1364 pass 

1365 

1366 

1367# ═══════════════════════════════════════════════════════════════════ 

1368# Centralized worker entry point 

1369# ═══════════════════════════════════════════════════════════════════ 

1370# 

1371# ONE `if __name__ == '__main__':` block for ALL GPU workers. When the 

1372# parent spawns `python -m integrations.service_tools.gpu_worker 

1373# <tool_module> [variant]`, this dispatcher: 

1374# 

1375# 1. Dynamically imports the tool module. 

1376# 2. Picks the right load/handle callbacks via convention: 

1377# - no variant → module._load, module._synthesize 

1378# - variant='turbo' → module._load_turbo, module._synthesize_turbo 

1379# - variant='<v>' → module._load_<v>, module._synthesize_<v> 

1380# (Fallback: if the variant-specific name doesn't exist, use the 

1381# plain _load / _synthesize.) 

1382# 3. Calls run_worker(...) — the infinite request loop. 

1383# 

1384# Tool modules do NOT need their own `if __name__ == '__main__':` block. 

1385# They just define `_load` and `_synthesize` (plus variants if needed). 

1386# This keeps the "entry point" concern in exactly one place. 

1387 

1388def _dispatch_and_run(tool_module_name: str, variant: Optional[str] = None) -> None: 

1389 """Import a tool module and run its worker loop. 

1390 

1391 Picks `_load_<variant>` + `_synthesize_<variant>` when variant is 

1392 set, else `_load` + `_synthesize`. Worker name is the module's 

1393 last segment plus the variant suffix. 

1394 """ 

1395 import importlib 

1396 

1397 try: 

1398 mod = importlib.import_module(tool_module_name) 

1399 except Exception as e: 

1400 # Log to stderr and exit non-zero so parent sees WorkerCrash 

1401 print(f'[gpu_worker] import failed for {tool_module_name}: {e}', file=sys.stderr) 

1402 sys.exit(2) 

1403 

1404 suffix = f'_{variant}' if variant else '' 

1405 load_name = f'_load{suffix}' 

1406 handle_name = f'_synthesize{suffix}' 

1407 

1408 load = getattr(mod, load_name, None) or getattr(mod, '_load', None) 

1409 handle = getattr(mod, handle_name, None) or getattr(mod, '_synthesize', None) 

1410 

1411 if load is None or handle is None: 

1412 missing = [] 

1413 if load is None: 

1414 missing.append(load_name if variant else '_load') 

1415 if handle is None: 

1416 missing.append(handle_name if variant else '_synthesize') 

1417 print( 

1418 f'[gpu_worker] {tool_module_name} missing callbacks: {missing}', 

1419 file=sys.stderr, 

1420 ) 

1421 sys.exit(2) 

1422 

1423 base_name = tool_module_name.rsplit('.', 1)[-1] 

1424 worker_name = f'{base_name}{suffix}' if variant else base_name 

1425 run_worker(name=worker_name, load=load, handle=handle) 

1426 

1427 

1428# ── #58 Scope-2: reflection dispatch via catalog id ──────────────── 

1429# 

1430# The Python-tool path (above) covers every code-shipped engine — 

1431# each *_tool.py defines `_load[_<variant>]` + `_synthesize[_<variant>]` 

1432# by convention. Adding a NEW engine that fits a homogeneous load+ 

1433# synth API used to require writing a whole new *_tool.py just to wrap 

1434# the import + method call. This was friction for engines whose synth 

1435# API is reflection-friendly (Kokoro, Pocket-TTS, etc.). 

1436# 

1437# Reflection path: a catalog entry with no `tool_module` but the full 

1438# 5-field contract (`import_path`, `init_args`, `synth_method`, 

1439# `params_map`, `output_format`) is dispatchable via the catalog 

1440# alone. The 5-field contract lives in `tts_router._REFLECTION_FIELDS` 

1441# and the canonical output formats in `tts_router._OUTPUT_FORMATS`. 

1442# Validation fires at catalog-ingest (single source of truth in 

1443# `tts_router._validate_engine_caps`); the dispatcher trusts what 

1444# made it past the gate. 

1445 

1446def _normalize_to_wav_file( 

1447 raw: Any, 

1448 output_format: str, 

1449 output_path: str, 

1450 sample_rate: int = 24000, 

1451) -> tuple: 

1452 """Normalize a reflection-dispatched engine's raw output to a WAV 

1453 file on disk. Returns ``(output_path, duration_seconds)``. 

1454 

1455 Canonical ``output_format`` values mirror 

1456 ``tts_router._OUTPUT_FORMATS``: 

1457 

1458 * ``'wav_bytes'`` — bytes object holding a WAV byte stream; 

1459 written verbatim. 

1460 * ``'numpy_24k'`` — 1-D float32 numpy array @ 24 kHz mono; 

1461 written via scipy.io.wavfile at 24000 Hz regardless of the 

1462 caller's ``sample_rate`` arg (the contract pins the rate). 

1463 * ``'file_path'`` — str path the engine already wrote to; 

1464 copied to ``output_path`` if different. 

1465 * ``'bytesio'`` — io.BytesIO containing wav bytes; written 

1466 verbatim. 

1467 

1468 Raises TypeError on shape mismatch (engine declared one format 

1469 but returned another) and ValueError on unknown ``output_format`` 

1470 so the caller can surface a precise wire error rather than a 

1471 silent miswrite. 

1472 """ 

1473 import shutil 

1474 

1475 if output_format == 'wav_bytes': 

1476 if not isinstance(raw, (bytes, bytearray)): 

1477 raise TypeError( 

1478 f"output_format='wav_bytes' but raw is " 

1479 f"{type(raw).__name__}" 

1480 ) 

1481 with open(output_path, 'wb') as fh: 

1482 fh.write(raw) 

1483 elif output_format == 'bytesio': 

1484 import io as _io 

1485 if not isinstance(raw, _io.BytesIO): 

1486 raise TypeError( 

1487 f"output_format='bytesio' but raw is " 

1488 f"{type(raw).__name__}" 

1489 ) 

1490 with open(output_path, 'wb') as fh: 

1491 fh.write(raw.getvalue()) 

1492 elif output_format == 'file_path': 

1493 if not isinstance(raw, str) or not raw: 

1494 raise TypeError( 

1495 f"output_format='file_path' but raw is " 

1496 f"{type(raw).__name__!r}" 

1497 ) 

1498 if raw != output_path: 

1499 shutil.copyfile(raw, output_path) 

1500 elif output_format == 'numpy_24k': 

1501 try: 

1502 import scipy.io.wavfile # type: ignore 

1503 except ImportError as e: 

1504 raise RuntimeError( 

1505 f"output_format='numpy_24k' requires scipy: {e}" 

1506 ) 

1507 if not hasattr(raw, 'dtype'): 

1508 raise TypeError( 

1509 f"output_format='numpy_24k' but raw is " 

1510 f"{type(raw).__name__}" 

1511 ) 

1512 # Pin to 24 kHz per contract; cast to float32 (canonical type). 

1513 scipy.io.wavfile.write(output_path, 24000, raw.astype('float32')) 

1514 else: 

1515 raise ValueError( 

1516 f"unknown output_format {output_format!r}; canonical set is " 

1517 f"wav_bytes / numpy_24k / file_path / bytesio" 

1518 ) 

1519 

1520 # Duration: try wave header (PCM only — stdlib's wave module 

1521 # rejects float WAV with `wave.Error: unknown format: 3`). scipy 

1522 # writes float32 for numpy_24k, so fall back to scipy.io.wavfile 

1523 # for that format. Either path lands on a real number; only when 

1524 # both fail does duration default to 0.0. 

1525 duration = 0.0 

1526 try: 

1527 import wave 

1528 with wave.open(output_path, 'rb') as wf: 

1529 n_frames = wf.getnframes() 

1530 sr = wf.getframerate() 

1531 duration = n_frames / sr if sr > 0 else 0.0 

1532 except Exception: 

1533 try: 

1534 import scipy.io.wavfile # type: ignore 

1535 sr, data = scipy.io.wavfile.read(output_path) 

1536 duration = len(data) / sr if sr > 0 else 0.0 

1537 except Exception: 

1538 duration = 0.0 

1539 

1540 return output_path, duration 

1541 

1542 

1543def _build_reflection_callbacks( 

1544 catalog_id: str, 

1545 entry_capabilities: Dict[str, Any], 

1546 output_dir: Optional[str] = None, 

1547) -> tuple: 

1548 """Build ``(load, handle)`` callbacks for a reflection-only catalog 

1549 entry. No on-disk *_tool.py needed. 

1550 

1551 Validation runs against `tts_router._validate_engine_caps` first; 

1552 raises RuntimeError on any contract violation so the caller can 

1553 exit cleanly via the shared error path (printing to stderr + 

1554 sys.exit(2) — same shape as `_dispatch_and_run`'s missing-callbacks 

1555 path). 

1556 

1557 The 5-field contract: 

1558 

1559 * ``import_path`` — ``'pkg.module:ClassName'`` 

1560 * ``init_args`` — ``{}`` kwargs for ``ClassName(**init_args)`` 

1561 * ``synth_method`` — instance method name on the loaded model 

1562 * ``params_map`` — ``{payload_key → method_kwarg}`` translation 

1563 * ``output_format`` — one of `tts_router._OUTPUT_FORMATS` 

1564 

1565 The handler accepts the same wire payload existing TTS workers do — 

1566 ``request['text']`` is required; optional ``output_path``, 

1567 ``sample_rate``, plus any keys named in ``params_map``. Returns 

1568 the same shape (``{path, duration, sample_rate, engine}``) so any 

1569 caller routing through the reflection path is wire-compatible with 

1570 the on-disk *_tool.py path. 

1571 """ 

1572 from integrations.channels.media.tts_router import _validate_engine_caps 

1573 err = _validate_engine_caps(entry_capabilities) 

1574 if err is not None: 

1575 raise RuntimeError(f'catalog entry {catalog_id!r}: {err}') 

1576 

1577 import_path = entry_capabilities['import_path'] 

1578 init_args = entry_capabilities.get('init_args') or {} 

1579 synth_method = entry_capabilities['synth_method'] 

1580 params_map = entry_capabilities.get('params_map') or {} 

1581 output_format = entry_capabilities['output_format'] 

1582 

1583 if ':' not in import_path: 

1584 raise RuntimeError( 

1585 f"catalog entry {catalog_id!r}: import_path must be " 

1586 f"'pkg.module:ClassName', got {import_path!r}" 

1587 ) 

1588 mod_name, cls_name = import_path.split(':', 1) 

1589 

1590 # Output dir resolution — mirrors pocket_tts_tool's fallback shape. 

1591 if output_dir is None: 

1592 try: 

1593 from core.platform_paths import get_data_dir # type: ignore 

1594 output_dir = os.path.join( 

1595 get_data_dir(), 'tts_outputs', 'reflection' 

1596 ) 

1597 except Exception: 

1598 output_dir = os.path.join( 

1599 os.path.expanduser('~'), '.hevolve', 'models', 

1600 'reflection_outputs', 

1601 ) 

1602 try: 

1603 os.makedirs(output_dir, exist_ok=True) 

1604 except OSError: 

1605 # ENOSPC / read-only home — fall back to tempdir so the worker 

1606 # can still respond (a synth that can't write its WAV will fail 

1607 # later with a clearer ENOSPC error than os.makedirs would). 

1608 import tempfile as _tempfile 

1609 output_dir = _tempfile.gettempdir() 

1610 

1611 def _load_reflection() -> Any: 

1612 import importlib 

1613 mod = importlib.import_module(mod_name) 

1614 cls = getattr(mod, cls_name, None) 

1615 if cls is None: 

1616 raise RuntimeError( 

1617 f'catalog entry {catalog_id!r}: class {cls_name!r} not ' 

1618 f'found in module {mod_name!r}' 

1619 ) 

1620 return cls(**init_args) 

1621 

1622 def _handle_reflection(model: Any, request: Dict[str, Any]) -> Dict[str, Any]: 

1623 text = request.get('text', '') 

1624 if not text or not isinstance(text, str): 

1625 return {'error': 'text is required'} 

1626 

1627 # Translate request → method kwargs via params_map. Keys NOT 

1628 # named in params_map are ignored (engines opt into what they 

1629 # see). Plus 'text' under whatever name the engine declared. 

1630 kwargs: Dict[str, Any] = {} 

1631 for payload_key, method_arg in params_map.items(): 

1632 if payload_key in request: 

1633 kwargs[method_arg] = request[payload_key] 

1634 # If the engine didn't declare 'text' in params_map, default to 

1635 # passing it as 'text' — most reflection-friendly engines name 

1636 # the arg that exact way. 

1637 if 'text' not in params_map: 

1638 kwargs.setdefault('text', text) 

1639 

1640 method = getattr(model, synth_method, None) 

1641 if method is None or not callable(method): 

1642 return {'error': f'method {synth_method!r} not found on model'} 

1643 

1644 # Output path: respect request override; else hash-derived under 

1645 # the resolved output_dir. 

1646 out_path = request.get('output_path') 

1647 if not out_path: 

1648 import hashlib as _h 

1649 digest = _h.md5( 

1650 f"{text[:50]}:{catalog_id}".encode() 

1651 ).hexdigest()[:12] 

1652 out_path = os.path.join( 

1653 output_dir, f'{catalog_id}_{digest}.wav' 

1654 ) 

1655 

1656 try: 

1657 raw = method(**kwargs) 

1658 path, duration = _normalize_to_wav_file( 

1659 raw, output_format, out_path, 

1660 sample_rate=request.get('sample_rate', 24000), 

1661 ) 

1662 except Exception as e: 

1663 import traceback as _tb 

1664 return { 

1665 'error': f'{type(e).__name__}: {e}', 

1666 'traceback': _tb.format_exc()[-2000:], 

1667 } 

1668 

1669 return { 

1670 'path': path, 

1671 'duration': round(duration, 3), 

1672 'sample_rate': request.get('sample_rate', 24000), 

1673 'engine': f'reflection:{catalog_id}', 

1674 } 

1675 

1676 return _load_reflection, _handle_reflection 

1677 

1678 

1679def _dispatch_catalog_id(catalog_id: str) -> None: 

1680 """Run a worker loop for a reflection-only catalog entry. 

1681 

1682 Spawned by the parent process via: 

1683 python -m integrations.service_tools.gpu_worker --catalog-id <id> 

1684 

1685 Exit codes mirror `_dispatch_and_run`: 

1686 2 — diagnostic on stderr (catalog unreachable, entry not found, 

1687 validation failed); parent sees WorkerCrash on next call. 

1688 """ 

1689 try: 

1690 from integrations.service_tools.model_catalog import get_catalog 

1691 except Exception as e: 

1692 print( 

1693 f'[gpu_worker] cannot import model_catalog: {e}', 

1694 file=sys.stderr, 

1695 ) 

1696 sys.exit(2) 

1697 

1698 try: 

1699 catalog = get_catalog() 

1700 entry = catalog.get(catalog_id) 

1701 except Exception as e: 

1702 print( 

1703 f'[gpu_worker] catalog load failed for {catalog_id!r}: {e}', 

1704 file=sys.stderr, 

1705 ) 

1706 sys.exit(2) 

1707 

1708 if entry is None: 

1709 print( 

1710 f'[gpu_worker] catalog has no entry {catalog_id!r}', 

1711 file=sys.stderr, 

1712 ) 

1713 sys.exit(2) 

1714 

1715 try: 

1716 load, handle = _build_reflection_callbacks( 

1717 catalog_id, entry.capabilities or {}, 

1718 ) 

1719 except RuntimeError as e: 

1720 print(f'[gpu_worker] {e}', file=sys.stderr) 

1721 sys.exit(2) 

1722 

1723 run_worker(name=f'reflection.{catalog_id}', load=load, handle=handle) 

1724 

1725 

1726if __name__ == '__main__': 

1727 # Usage: 

1728 # python -m integrations.service_tools.gpu_worker <module> [variant] 

1729 # python -m integrations.service_tools.gpu_worker --catalog-id <id> 

1730 if len(sys.argv) >= 3 and sys.argv[1] == '--catalog-id': 

1731 _dispatch_catalog_id(sys.argv[2]) 

1732 elif len(sys.argv) >= 2 and not sys.argv[1].startswith('--'): 

1733 _tool_module = sys.argv[1] 

1734 _variant = sys.argv[2] if len(sys.argv) > 2 else None 

1735 _dispatch_and_run(_tool_module, _variant) 

1736 else: 

1737 print( 

1738 'usage: python -m integrations.service_tools.gpu_worker ' 

1739 '<tool.module.path> [variant]\n' 

1740 ' or: python -m integrations.service_tools.gpu_worker ' 

1741 '--catalog-id <id>', 

1742 file=sys.stderr, 

1743 ) 

1744 sys.exit(2)