Coverage for integrations / service_tools / gpu_worker.py: 77.2%
600 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2gpu_worker.py — Persistent GPU subprocess worker for crash isolation.
4Problem: PyTorch / CUDA OOM and DLL segfaults are C-level aborts that
5Python's try/except CANNOT catch. When an in-process model crashes,
6it takes the entire parent process with it. This is especially bad
7for long-running services (Nunba, HARTOS backend) where one TTS
8crash kills chat, STT, agents, and everything else.
10Solution: run GPU models in a dedicated subprocess. When it crashes,
11only the subprocess dies. The parent catches the exit code and falls
12back to a CPU engine (Piper) or retries on the next request.
14Architecture:
15 Parent (Nunba/HARTOS)
16 │
17 │ JSON request (stdin)
18 ▼
19 Worker subprocess (python-embed or sys.executable)
20 │ ├── load model once on startup
21 │ ├── serve forever via stdin/stdout JSON lines
22 │ └── exit non-zero on fatal error
23 │
24 │ JSON response (stdout) OR non-zero exit code
25 ▼
26 Parent catches failure → fallback
28Usage:
30 # Parent side
31 worker = GPUWorker(
32 name='f5_tts',
33 module='integrations.service_tools.f5_tts_worker',
34 startup_timeout=60, # F5 takes ~40s to load
35 request_timeout=120, # longest inference we tolerate
36 )
37 worker.start() # spawns subprocess, waits for READY handshake
38 try:
39 result = worker.call({
40 'text': 'hello',
41 'language': 'en',
42 'voice': None,
43 'output_path': '/tmp/out.wav',
44 })
45 except WorkerCrash:
46 # Subprocess died — fall back to Piper
47 pass
49 # Worker side (in f5_tts_worker.py)
50 from integrations.service_tools.gpu_worker import run_worker
52 def load_model():
53 from f5_tts.api import F5TTS
54 return F5TTS()
56 def synthesize(model, req):
57 wav, sr, _ = model.infer(
58 ref_file=req.get('voice') or '',
59 ref_text='',
60 gen_text=req['text'],
61 )
62 import soundfile as sf
63 sf.write(req['output_path'], wav, sr)
64 return {
65 'path': req['output_path'],
66 'duration': len(wav) / sr,
67 'sample_rate': sr,
68 }
70 if __name__ == '__main__':
71 run_worker(name='f5_tts', load=load_model, handle=synthesize)
72"""
74import json
75import logging
76import os
77import queue
78import subprocess
79import sys
80import threading
81import time
82import weakref
83from pathlib import Path
84from typing import Any, Callable, Dict, List, Optional
86logger = logging.getLogger(__name__)
89# ═══════════════════════════════════════════════════════════════════
90# Cross-worker VRAM eviction registry
91# ═══════════════════════════════════════════════════════════════════
92#
93# Every ToolWorker instance registers itself here so that when a new
94# worker needs GPU memory and the VRAM manager reports insufficient
95# headroom, we can evict the least-recently-used OTHER worker(s) to
96# make room. Without this, spawning F5 on top of a loaded Chatterbox
97# ML would OOM the subprocess — correct crash isolation, but wasted
98# time because nobody told Chatterbox to step aside.
100_REGISTRY_LOCK = threading.Lock()
101_ALL_WORKERS: "List[weakref.ref['ToolWorker']]" = []
104def _register_tool_worker(tw: "ToolWorker") -> None:
105 """Register a ToolWorker so the cross-worker eviction sees it.
107 Uses weakref so dropping a ToolWorker doesn't keep it alive.
108 """
109 with _REGISTRY_LOCK:
110 # Prune dead refs opportunistically
111 _ALL_WORKERS[:] = [r for r in _ALL_WORKERS if r() is not None]
112 _ALL_WORKERS.append(weakref.ref(tw))
115def _live_tool_workers() -> "List[ToolWorker]":
116 """Return currently-alive ToolWorker instances (worker subprocess up)."""
117 with _REGISTRY_LOCK:
118 out = []
119 for r in _ALL_WORKERS:
120 tw = r()
121 if tw is not None and tw.is_alive():
122 out.append(tw)
123 return out
126def try_free_vram(needed_gb: float, exclude_tool: Optional[str] = None) -> bool:
127 """Stop LRU workers until `needed_gb` GB of GPU VRAM is free.
129 Called BEFORE spawning a heavy worker when the VRAM manager reports
130 insufficient free VRAM. Iterates live workers sorted by last-used
131 ascending (oldest first), skipping the worker identified by
132 `exclude_tool`, and stops each one until free VRAM meets the
133 threshold — or the registry runs out.
135 Returns True if the required headroom was reached, False otherwise.
136 """
137 try:
138 from integrations.service_tools.vram_manager import vram_manager
139 except ImportError:
140 return False
142 free_gb = vram_manager.get_free_vram()
143 if free_gb >= needed_gb:
144 return True
146 # Sort LRU first (smallest _last_used = longest idle)
147 candidates = sorted(
148 (tw for tw in _live_tool_workers() if tw.tool_name != exclude_tool),
149 key=lambda w: w._last_used or 0.0,
150 )
152 for tw in candidates:
153 logger.info(
154 f"VRAM eviction: stopping {tw.tool_name} to free memory for "
155 f"{exclude_tool or 'new worker'} "
156 f"(need {needed_gb:.1f}GB, have {vram_manager.get_free_vram():.1f}GB)"
157 )
158 try:
159 tw.stop()
160 except Exception as e:
161 logger.warning(f"eviction: failed to stop {tw.tool_name}: {e}")
162 continue
164 free_gb = vram_manager.get_free_vram()
165 if free_gb >= needed_gb:
166 logger.info(
167 f"VRAM eviction: freed enough ({free_gb:.1f}GB) for "
168 f"{exclude_tool or 'new worker'}"
169 )
170 return True
172 logger.warning(
173 f"VRAM eviction: couldn't free enough ({vram_manager.get_free_vram():.1f}GB "
174 f"free, need {needed_gb:.1f}GB)"
175 )
176 return False
179# ═══════════════════════════════════════════════════════════════════
180# Exceptions
181# ═══════════════════════════════════════════════════════════════════
183class WorkerError(Exception):
184 """Base for worker errors."""
187class WorkerCrash(WorkerError):
188 """Subprocess died (exit code != 0 or stdout closed)."""
191class WorkerTimeout(WorkerError):
192 """Worker did not respond within the timeout."""
195class WorkerNotReady(WorkerError):
196 """Worker has not completed startup handshake."""
199# ═══════════════════════════════════════════════════════════════════
200# Parent-side: spawns and talks to the worker
201# ═══════════════════════════════════════════════════════════════════
203class GPUWorker:
204 """Persistent GPU subprocess worker.
206 Thread-safe: one lock serializes requests (GPU inference is not
207 parallelizable on a single device anyway).
208 """
210 # Signal line the worker prints to stdout when ready to serve.
211 READY_MARKER = '__WORKER_READY__'
212 # Emitted immediately after READY — carries the post-load GPU VRAM
213 # measurement in GB as a float. Parent captures and forwards to
214 # vram_manager.record_actual_usage. Workers that can't measure emit
215 # 0.0 and the parent ignores the reading.
216 VRAM_MARKER_PREFIX = '__WORKER_VRAM_GB__'
218 def __init__(
219 self,
220 name: str,
221 module: str,
222 startup_timeout: float = 60.0,
223 request_timeout: float = 120.0,
224 python_exe: Optional[str] = None,
225 env: Optional[Dict[str, str]] = None,
226 args: Optional[list] = None,
227 ):
228 """
229 Args:
230 name: Worker name (for logs).
231 module: Dotted module path to run via -m (e.g. 'tts.f5_worker').
232 startup_timeout: Seconds to wait for READY marker.
233 request_timeout: Seconds to wait for each request response.
234 python_exe: Python to use. Defaults to python-embed if present,
235 else sys.executable.
236 env: Extra env vars to pass to the subprocess.
237 args: Extra CLI args appended after `-m module` (for variant
238 dispatch in modules with multiple worker entry points).
239 """
240 self.name = name
241 self.module = module
242 self.startup_timeout = startup_timeout
243 self.request_timeout = request_timeout
244 self.python_exe = python_exe or _resolve_python_exe()
245 self.env = env or {}
246 self.args = list(args) if args else []
248 self._proc: Optional[subprocess.Popen] = None
249 self._lock = threading.Lock()
250 self._ready = False
251 self._stderr_thread: Optional[threading.Thread] = None
252 self._stdout_thread: Optional[threading.Thread] = None
253 self._stdout_queue: queue.Queue = queue.Queue()
255 # ── Lifecycle ──────────────────────────────────────────────────
257 def start(self) -> None:
258 """Spawn the subprocess and wait for READY handshake."""
259 with self._lock:
260 if self._proc and self._proc.poll() is None and self._ready:
261 return # already running
263 self._spawn()
264 self._wait_ready()
266 def is_alive(self) -> bool:
267 """True if subprocess is running and READY."""
268 return (
269 self._proc is not None
270 and self._proc.poll() is None
271 and self._ready
272 )
274 def stop(self, timeout: float = 5.0) -> None:
275 """Gracefully stop the worker. Falls back to kill after timeout."""
276 with self._lock:
277 if not self._proc:
278 return
279 if self._proc.poll() is None:
280 try:
281 # Send shutdown request. Worker exits on its own.
282 self._write_line(json.dumps({'op': 'shutdown'}))
283 except Exception:
284 pass
285 try:
286 self._proc.wait(timeout=timeout)
287 except subprocess.TimeoutExpired:
288 logger.warning(f"{self.name}: shutdown timeout, killing")
289 self._proc.kill()
290 try:
291 self._proc.wait(timeout=2)
292 except subprocess.TimeoutExpired:
293 pass
294 self._proc = None
295 self._ready = False
297 # ── Request/response ───────────────────────────────────────────
299 def call(self, request: Dict[str, Any]) -> Dict[str, Any]:
300 """Send a request to the worker and return the response.
302 Raises:
303 WorkerCrash: subprocess died or closed stdout.
304 WorkerTimeout: no response within request_timeout.
305 WorkerNotReady: worker not started or crashed during startup.
306 """
307 with self._lock:
308 if not self.is_alive():
309 raise WorkerNotReady(f"{self.name}: worker is not running")
311 # Send request
312 payload = json.dumps(request)
313 try:
314 self._write_line(payload)
315 except (BrokenPipeError, OSError) as e:
316 self._reap()
317 raise WorkerCrash(f"{self.name}: write failed: {e}")
319 # Poll loop: wake up every 250ms to check if process died.
320 # This lets us distinguish "timeout" (worker stuck) from
321 # "crash" (worker died) without waiting the full request_timeout
322 # when the subprocess has already exited.
323 deadline = time.monotonic() + self.request_timeout
324 while True:
325 remaining = deadline - time.monotonic()
326 if remaining <= 0:
327 logger.error(f"{self.name}: request timeout, killing stuck worker")
328 self._reap(force=True)
329 raise WorkerTimeout(
330 f"{self.name}: no response within {self.request_timeout}s"
331 )
333 line = self._read_line_with_timeout(min(0.25, remaining))
334 if line is not None:
335 break # got a response
337 # No line yet — is the process still alive?
338 if self._proc is None or self._proc.poll() is not None:
339 code = self._proc.returncode if self._proc else -1
340 self._reap()
341 raise WorkerCrash(
342 f"{self.name}: subprocess died (exit={code})"
343 )
345 try:
346 return json.loads(line)
347 except json.JSONDecodeError as e:
348 raise WorkerError(
349 f"{self.name}: invalid JSON response: {e}: {line[:200]}"
350 )
352 # ── Internals ──────────────────────────────────────────────────
354 def _spawn(self) -> None:
355 """Launch the subprocess.
357 IMPORTANT: propagates the parent process's current sys.path to the
358 child via PYTHONPATH. Before the subprocess-isolation refactor, TTS
359 engines ran in-process and inherited any runtime sys.path mutations
360 automatically (e.g. Nunba's app.py prepends ~/.nunba/site-packages/
361 where CUDA torch, regex, transformers, parler_tts actually live).
362 Spawned subprocesses boot fresh from the python binary's default
363 sys.path and can't see those entries — every transformers-based
364 worker then crashes on `import regex` / `import transformers`.
365 Fixing at the spawn layer means every worker (TTS, STT, VLM, any
366 future engine) benefits without each one re-implementing the same
367 path plumbing.
368 """
369 env = os.environ.copy()
370 env.update(self.env)
371 # Unbuffered so stdout/stderr come through immediately
372 env['PYTHONUNBUFFERED'] = '1'
374 # Propagate parent sys.path via PYTHONPATH so the child inherits
375 # runtime-added package dirs (e.g. ~/.nunba/site-packages for CUDA
376 # torch + TTS deps). Include real dirs AND zip / egg archives —
377 # cx_Freeze bundles application packages (HARTOS's ``integrations``
378 # tree) inside ``library.zip``; excluding files broke worker
379 # spawn under frozen Nunba.exe with ModuleNotFoundError on the
380 # central dispatcher itself. Preserve an existing PYTHONPATH by
381 # appending our paths to the front (caller-set overrides last).
382 _extra_paths = [
383 p for p in sys.path
384 if p and (
385 os.path.isdir(p)
386 or (os.path.isfile(p) and p.lower().endswith(('.zip', '.egg')))
387 )
388 ]
389 if _extra_paths:
390 _existing = env.get('PYTHONPATH', '')
391 _joined = os.pathsep.join(_extra_paths)
392 if _existing:
393 env['PYTHONPATH'] = _joined + os.pathsep + _existing
394 else:
395 env['PYTHONPATH'] = _joined
397 # Windows: don't pop a console window
398 creationflags = 0
399 if sys.platform == 'win32':
400 creationflags = 0x08000000 # CREATE_NO_WINDOW
402 cmd = [self.python_exe, '-u', '-m', self.module, *self.args]
403 logger.info(f"{self.name}: spawning {' '.join(cmd)}")
405 self._proc = subprocess.Popen(
406 cmd,
407 stdin=subprocess.PIPE,
408 stdout=subprocess.PIPE,
409 stderr=subprocess.PIPE,
410 env=env,
411 creationflags=creationflags,
412 bufsize=1, # line-buffered
413 text=True,
414 encoding='utf-8',
415 errors='replace',
416 )
418 # Drain stderr in a thread so it doesn't fill the pipe buffer.
419 # Also forward worker log lines to our logger.
420 self._stderr_thread = threading.Thread(
421 target=self._drain_stderr,
422 daemon=True,
423 )
424 self._stderr_thread.start()
426 # Single persistent stdout reader thread that pumps lines into
427 # a queue. Only ONE thread ever reads from proc.stdout — this
428 # is essential because creating a new reader per-poll leaves
429 # blocked readline() threads that corrupt subsequent reads.
430 self._stdout_queue = queue.Queue()
431 self._stdout_thread = threading.Thread(
432 target=self._drain_stdout,
433 daemon=True,
434 )
435 self._stdout_thread.start()
436 self._ready = False
438 def _drain_stderr(self) -> None:
439 """Background thread: read worker stderr, log it, and surface
440 structured failure signals to the central error_advice system.
442 Specifically: when a worker subprocess dies because of a missing
443 Python package (``ModuleNotFoundError: No module named 'X'``),
444 we forward the event to:
446 1. ``core.error_advice.handle_exception(agent_remediation=True)``
447 — creates a ``goal_type='self_heal'`` AgentGoal so the
448 local coding agent can investigate WHY the package wasn't
449 pre-installed (improves the freeze pip plan over time).
450 2. Nunba's deterministic self-heal (``tts.package_installer
451 ._self_heal_missing_transitives`` if importable) — pip-
452 installs the missing package immediately, so the next
453 worker spawn succeeds without waiting for the agent loop.
455 Why here and not in the worker subprocess: the worker is a
456 sandboxed child with no DB session, no GoalManager, no agent
457 engine. The parent has all of those, plus the ability to
458 kick off a deterministic pip install. Single chokepoint per
459 worker instance, idempotent via ``_self_heal_seen_modules``.
461 See ``core/error_advice.py`` for the full advice fan-out
462 contract. See ``tts/package_installer.py`` for the
463 deterministic-install sibling.
464 """
465 if not self._proc or not self._proc.stderr:
466 return
467 if not hasattr(self, '_self_heal_seen_modules'):
468 self._self_heal_seen_modules = set()
469 try:
470 for line in self._proc.stderr:
471 line = line.rstrip()
472 if line:
473 logger.info(f"[{self.name}] {line}")
474 self._maybe_self_heal_from_line(line)
475 except Exception:
476 pass # pipe closed, process dead
478 def _maybe_self_heal_from_line(self, line: str) -> None:
479 """Pattern-match worker stderr for ``ModuleNotFoundError`` and
480 kick off agentic + deterministic remediation. Idempotent per
481 ``(worker, package)`` so a flood of identical traceback frames
482 produces exactly one heal attempt."""
483 import re
484 match = re.search(
485 r"ModuleNotFoundError: No module named ['\"]([\w.\-]+)['\"]",
486 line,
487 )
488 if not match:
489 return
490 pkg = match.group(1)
491 if pkg in self._self_heal_seen_modules:
492 return
493 self._self_heal_seen_modules.add(pkg)
495 logger.warning(
496 f"{self.name}: subprocess missing Python package '{pkg}' — "
497 f"dispatching to error_advice + deterministic self-heal"
498 )
500 # 1) Agentic side: create a self_heal goal for the coding agent.
501 # Throttled per-fingerprint inside error_advice; safe to call
502 # on every detected event.
503 try:
504 from core.error_advice import handle_exception
505 synthetic = ModuleNotFoundError(f"No module named '{pkg}'")
506 synthetic.name = pkg # type: ignore[attr-defined]
507 handle_exception(
508 synthetic,
509 category='subprocess.tool_load',
510 severity='high',
511 agent_remediation=True,
512 context={
513 'worker_name': self.name,
514 'worker_module': self.module,
515 'missing_package': pkg,
516 'remediation_hint': (
517 f"Add '{pkg}' to the freeze pip plan (Nunba "
518 f"scripts/setup_freeze_nunba.py _tts_deps or "
519 f"the appropriate _<X>_deps tuple) so it's "
520 f"bundled into python-embed/Lib/site-packages "
521 f"on the next build, AND add it to "
522 f"tts/package_installer.py legacy fallback "
523 f"plan for runtime self-heal coverage."
524 ),
525 },
526 )
527 except Exception as e:
528 logger.debug(f"{self.name}: error_advice dispatch skipped: {e}")
530 # 2) Deterministic fast-path: pip-install the missing package
531 # directly so the next worker spawn picks it up without
532 # waiting for the agentic loop. Uses the same
533 # ``self.python_exe`` the worker subprocess used (so the
534 # package lands in the SAME interpreter's site-packages,
535 # e.g. python-embed/Lib/site-packages on a frozen install,
536 # or the dev .venv on a source run). Spawned in a daemon
537 # thread so we don't block the stderr drain loop.
538 def _install_async():
539 try:
540 # `--target` to user-site keeps it consistent with
541 # tts.package_installer's existing pattern: bundled
542 # python-embed is read-only on Program Files installs,
543 # so user-writable site-packages is required. We rely
544 # on the user-site already being on sys.path (set by
545 # platform_paths.ensure_user_site_on_path at boot).
546 target = self._user_site_packages_dir()
547 pip_args = [
548 self.python_exe, '-m', 'pip', 'install',
549 '--no-build-isolation', '--progress-bar', 'off',
550 '--disable-pip-version-check',
551 ]
552 if target:
553 pip_args.extend(['--target', target])
554 pip_args.append(pkg)
555 logger.info(
556 f"{self.name}: deterministic pip install: {pkg} → "
557 f"{target or '<default site>'}"
558 )
559 rc = subprocess.run(
560 pip_args, capture_output=True, text=True, timeout=180,
561 ).returncode
562 if rc == 0:
563 logger.info(
564 f"{self.name}: '{pkg}' installed; respawn worker to "
565 f"retry tool load."
566 )
567 else:
568 logger.warning(
569 f"{self.name}: pip install '{pkg}' rc={rc} — "
570 f"agent self-heal goal will pick up from here."
571 )
572 except Exception as e:
573 logger.debug(
574 f"{self.name}: deterministic pip install for '{pkg}' "
575 f"skipped: {e}"
576 )
578 threading.Thread(
579 target=_install_async, daemon=True,
580 name=f"self-heal-{self.name}-{pkg}",
581 ).start()
583 def _user_site_packages_dir(self) -> Optional[str]:
584 """Return the user-writable site-packages dir for runtime
585 installs, or None if Nunba's platform_paths helper isn't
586 importable (HARTOS-only deploys).
587 """
588 try:
589 from tts.package_installer import get_user_site_packages # type: ignore
590 return get_user_site_packages()
591 except Exception:
592 return None
594 def _drain_stdout(self) -> None:
595 """Background thread: read worker stdout into the line queue.
597 Single reader thread guarantees no interleaved reads. Sentinel
598 value None marks EOF so the main thread knows to stop waiting.
599 """
600 if not self._proc or not self._proc.stdout:
601 return
602 try:
603 for line in self._proc.stdout:
604 self._stdout_queue.put(line.rstrip('\n'))
605 except Exception:
606 pass # pipe closed
607 finally:
608 self._stdout_queue.put(None) # EOF sentinel
610 def _wait_ready(self) -> None:
611 """Block until worker prints READY marker or exits."""
612 deadline = time.monotonic() + self.startup_timeout
613 while time.monotonic() < deadline:
614 if self._proc is None or self._proc.poll() is not None:
615 code = self._proc.returncode if self._proc else -1
616 raise WorkerCrash(f"{self.name}: died during startup (exit={code})")
618 line = self._read_line_with_timeout(0.5)
619 if line is None:
620 continue
621 stripped = line.strip()
622 if stripped == self.READY_MARKER:
623 self._ready = True
624 logger.info(f"{self.name}: worker ready")
625 return
626 # VRAM-measurement marker — emitted by workers BEFORE READY.
627 # Forward the measurement and keep looping for READY.
628 if stripped.startswith(self.VRAM_MARKER_PREFIX):
629 self._handle_vram_marker(stripped)
630 continue
631 # Ignore any other startup chatter
632 logger.debug(f"[{self.name}] startup: {stripped}")
634 # Timeout
635 self._reap(force=True)
636 raise WorkerTimeout(f"{self.name}: startup timeout ({self.startup_timeout}s)")
638 def _handle_vram_marker(self, stripped_line: str) -> None:
639 """Parse a '__WORKER_VRAM_GB__<n>' startup marker and forward the
640 measurement to vram_manager.record_actual_usage.
642 Non-fatal on parse error: older workers don't emit this marker;
643 a failed parse just means we fall back to the declared
644 VRAM_BUDGETS entry.
645 """
646 payload = stripped_line[len(self.VRAM_MARKER_PREFIX):].strip()
647 try:
648 gb = float(payload)
649 except ValueError:
650 logger.debug(f"{self.name}: unparseable VRAM marker '{payload}'")
651 return
652 try:
653 from integrations.service_tools.vram_manager import vram_manager
654 vram_manager.record_actual_usage(self.name, gb)
655 except Exception as e:
656 logger.debug(f"{self.name}: VRAM marker forward failed: {e}")
658 def _write_line(self, line: str) -> None:
659 if not self._proc or not self._proc.stdin:
660 raise WorkerCrash(f"{self.name}: stdin closed")
661 self._proc.stdin.write(line + '\n')
662 self._proc.stdin.flush()
664 def _read_line_with_timeout(self, timeout: float) -> Optional[str]:
665 """Read one line from stdout with a timeout.
667 Reads from the stdout queue populated by `_drain_stdout`. The
668 queue handles the timeout natively and there's only ever one
669 reader thread touching proc.stdout, so no corruption.
671 Returns:
672 The line (without trailing newline), or None if timeout or EOF.
673 """
674 try:
675 line = self._stdout_queue.get(timeout=timeout)
676 except queue.Empty:
677 return None
678 # None is the EOF sentinel from _drain_stdout
679 if line is None:
680 return None
681 return line
683 def _reap(self, force: bool = False) -> None:
684 """Clean up after a crashed/stuck worker."""
685 if not self._proc:
686 return
687 if self._proc.poll() is None:
688 if force:
689 self._proc.kill()
690 else:
691 self._proc.terminate()
692 try:
693 self._proc.wait(timeout=2)
694 except subprocess.TimeoutExpired:
695 self._proc.kill()
696 self._proc = None
697 self._ready = False
700# ═══════════════════════════════════════════════════════════════════
701# Worker-side: helper to run a model in a loop
702# ═══════════════════════════════════════════════════════════════════
704def run_worker(
705 name: str,
706 load: Callable[[], Any],
707 handle: Callable[[Any, Dict[str, Any]], Dict[str, Any]],
708) -> None:
709 """Main loop for a worker subprocess.
711 Args:
712 name: Worker name (for logs written to stderr).
713 load: Called once to load the model. Return anything — it's
714 passed to `handle` as the first arg.
715 handle: Called once per request. Receives (model, request_dict)
716 and must return a JSON-serializable dict.
718 Protocol:
719 1. Worker loads the model.
720 2. Worker writes READY marker to the protected protocol channel.
721 3. For each stdin line (JSON request):
722 - If op == 'shutdown', exit cleanly.
723 - Otherwise call handle(model, request), write JSON response.
724 4. Load failure → exit code 2.
725 5. Recoverable handler error → error JSON on protocol channel, continue.
726 6. stdin EOF → exit code 0.
727 7. Unexpected fatal error → exit code 3.
729 Stdout isolation:
730 Before loading the model, we duplicate fd 1 to a private file
731 handle (_protocol) and redirect fd 1 + sys.stdout to stderr.
732 This means ANY library inside the worker (torch, F5, HF,
733 transformers, CUDA init messages) that writes to stdout is
734 safely rerouted to stderr, where the parent drains it into a
735 log stream. The protocol channel stays 100% clean for JSON.
736 """
737 # ── Stdout isolation (C5 fix) ──────────────────────────────────
738 # Save a private writer pointing at the ORIGINAL fd 1, then
739 # redirect fd 1 and sys.stdout to stderr so no stray prints
740 # corrupt the JSON protocol.
741 _protocol_fd = os.dup(1)
742 _protocol = os.fdopen(_protocol_fd, 'w', buffering=1, encoding='utf-8')
743 try:
744 os.dup2(2, 1) # fd 1 -> fd 2 (stderr) — catches C-level writes too
745 except OSError:
746 pass # unusual platforms / closed stderr
747 sys.stdout = sys.stderr # catches Python-level print()
749 def _emit(obj) -> None:
750 """Write one protocol message (JSON or marker string)."""
751 if isinstance(obj, str):
752 _protocol.write(obj + '\n')
753 else:
754 _protocol.write(json.dumps(obj) + '\n')
755 _protocol.flush()
757 # Route all logging to stderr to keep the protocol channel clean.
758 # Explicit handler setup (not basicConfig) because other modules
759 # imported by the worker may have already called basicConfig, which
760 # would make ours a silent no-op.
761 _root_logger = logging.getLogger()
762 for h in list(_root_logger.handlers):
763 _root_logger.removeHandler(h)
764 _h = logging.StreamHandler(sys.stderr)
765 _h.setFormatter(logging.Formatter(f'[{name}] %(message)s'))
766 _root_logger.addHandler(_h)
767 _root_logger.setLevel(logging.INFO)
768 worker_log = logging.getLogger(f'worker.{name}')
770 # Self-identify: which Python is running this worker, and which
771 # site-packages it can see. Required for #92 — venv-spawn pickup
772 # confirmation. Pre-2026-05-07 trace evidence (older
773 # tts_chatterbox_turbo.err showed C:\miniconda3\Lib\unittest\mock.py
774 # in the stack) suggested the worker was sometimes loaded from the
775 # WRONG Python interpreter (host miniconda) instead of the engine's
776 # quarantine venv (~/Documents/Nunba/data/venvs/<engine>/).
777 # Logging sys.executable + sys.prefix at startup makes this
778 # observable: every TTS engine's worker stderr should now show
779 # the venv path, NOT the main interpreter.
780 try:
781 worker_log.info(
782 f'startup: python={sys.executable} prefix={sys.prefix} '
783 f'site_packages_top3={sys.path[:3]!r}'
784 )
785 except Exception:
786 # Never let a logging blunder block model load
787 pass
789 # ── Phase 1: load model ────────────────────────────────────────
790 try:
791 worker_log.info('loading model...')
792 t0 = time.time()
793 model = load()
794 worker_log.info(f'loaded in {time.time() - t0:.1f}s')
795 except Exception as e:
796 worker_log.exception(f'load failed: {e}')
797 sys.exit(2)
799 # ── Phase 2a: self-report post-load VRAM ──────────────────────
800 # Emitted BEFORE READY so the parent's _wait_ready loop sees it
801 # in-order on the stdout queue. (Placing it after READY races
802 # against the next call(): if 'ready' arrives first and the
803 # marker follows late, the marker ends up being consumed as a
804 # response payload → invalid-JSON error.)
805 #
806 # Best-effort: workers that can't measure (CPU-only, Metal,
807 # torch missing) emit 0.0 and the parent ignores the reading.
808 # Lets us stub conservative budgets (e.g. new OmniVoice at
809 # 3.0 GB) and auto-correct after the first real load.
810 try:
811 import torch as _torch_vram_probe
812 if _torch_vram_probe.cuda.is_available():
813 _measured = _torch_vram_probe.cuda.memory_allocated(0) / (1024 ** 3)
814 else:
815 _measured = 0.0
816 except Exception:
817 _measured = 0.0
818 _emit(f'{GPUWorker.VRAM_MARKER_PREFIX}{_measured:.3f}')
820 # ── Phase 2b: announce ready ───────────────────────────────────
821 _emit(GPUWorker.READY_MARKER)
823 # ── Phase 3: serve requests ────────────────────────────────────
824 try:
825 for line in sys.stdin:
826 line = line.strip()
827 if not line:
828 continue
830 try:
831 req = json.loads(line)
832 except json.JSONDecodeError as e:
833 _emit({'error': f'invalid JSON: {e}'})
834 continue
836 if req.get('op') == 'shutdown':
837 worker_log.info('shutdown requested')
838 sys.exit(0)
840 try:
841 t0 = time.time()
842 result = handle(model, req)
843 if isinstance(result, dict):
844 result.setdefault('latency_ms', round((time.time() - t0) * 1000, 1))
845 else:
846 result = {'result': result}
847 except Exception as e:
848 worker_log.exception('handler failed')
849 result = {'error': f'{type(e).__name__}: {e}'}
851 # Serialize defensively — a non-JSON-serializable field in the
852 # result would otherwise raise inside this try and take down the
853 # worker. Catch that and return a structured error.
854 try:
855 _emit(result)
856 except (TypeError, ValueError) as e:
857 _emit({'error': f'response serialization failed: {e}'})
859 except KeyboardInterrupt:
860 pass
861 except Exception as e:
862 worker_log.exception(f'fatal: {e}')
863 sys.exit(3)
865 sys.exit(0)
868# ═══════════════════════════════════════════════════════════════════
869# Python resolver: prefer python-embed in frozen builds
870# ═══════════════════════════════════════════════════════════════════
872def _resolve_python_exe() -> str:
873 """Return the Python executable to use for workers.
875 Preference:
876 1. $HARTOS_WORKER_PYTHON env var (explicit override)
877 2. python-embed next to the frozen exe (Nunba bundled build)
878 3. sys.executable (dev mode)
879 """
880 override = os.environ.get('HARTOS_WORKER_PYTHON')
881 if override and os.path.isfile(override):
882 return override
884 # Frozen build: python-embed sibling to Nunba.exe
885 if getattr(sys, 'frozen', False):
886 app_dir = os.path.dirname(os.path.abspath(sys.executable))
887 candidate = os.path.join(app_dir, 'python-embed', 'python.exe')
888 if os.path.isfile(candidate):
889 return candidate
891 return sys.executable
894def _resolve_backend_venv_python(tool_name: Optional[str]) -> Optional[str]:
895 """Return the per-backend venv's python.exe if one exists.
897 Thin shim over ``core.venv_paths.venv_python_if_exists`` — kept as
898 a module-local name so the spawn-site call signature is stable.
899 The single source of truth lives in ``core.venv_paths`` and is
900 shared with ``tts.backend_venv`` so install + spawn paths can
901 never drift apart.
902 """
903 from core.venv_paths import venv_python_if_exists
904 return venv_python_if_exists(tool_name)
907# ═══════════════════════════════════════════════════════════════════
908# High-level helper: one-call tool wrapper
909# ═══════════════════════════════════════════════════════════════════
910#
911# Each GPU tool (F5, Chatterbox, CosyVoice, Indic Parler, Whisper, ...)
912# needs the same parent-side boilerplate:
913# - Lazy singleton GPUWorker
914# - Thread-safe get-or-start
915# - Call → catch crash → return transient error
916# - VRAM budget allocation
917# - Output path auto-generation
918# - Uniform result JSON shape
919#
920# `ToolWorker` encapsulates ALL of it. Per-tool modules only specify:
921# - worker module name
922# - VRAM tool name (for budget allocation)
923# - output subdir (for auto-generated paths)
924# - engine display name
925#
926# Single responsibility: GPUWorker = IPC. ToolWorker = tool lifecycle.
928class ToolWorker:
929 """Reusable parent-side wrapper for GPU tools.
931 Holds a lazily-started GPUWorker and exposes a single `synthesize()`
932 method that handles: worker start, crash recovery, VRAM budget,
933 output path generation, and result JSON shaping.
935 One instance per tool, module-level singleton is typical.
936 """
938 # Central entry module — every worker spawns through here
939 _DISPATCHER = 'integrations.service_tools.gpu_worker'
941 def __init__(
942 self,
943 *,
944 tool_name: str,
945 tool_module: Optional[str] = None,
946 vram_budget: str,
947 output_subdir: str,
948 engine: str,
949 variant: Optional[str] = None,
950 startup_timeout: float = 90.0,
951 request_timeout: float = 120.0,
952 idle_timeout: float = 300.0,
953 python_exe: Optional[str] = None,
954 # Back-compat aliases — callers on the old API still work
955 worker_module: Optional[str] = None,
956 worker_args: Optional[list] = None,
957 ):
958 """
959 Args:
960 tool_name: Logical tool name (e.g. 'f5_tts').
961 tool_module: Dotted path to the library module that defines
962 `_load` / `_synthesize` (and variant-suffixed
963 versions). The parent spawns the centralized
964 dispatcher which imports this module in the
965 subprocess. No `__main__` block needed in the
966 tool module itself.
967 vram_budget: Key in VRAM_BUDGETS (e.g. 'tts_f5').
968 output_subdir: Subdir under ~/.hevolve/models for auto outputs.
969 engine: Display name for the result JSON (e.g. 'f5-tts').
970 variant: Optional variant suffix (e.g. 'turbo', 'ml'). When
971 set, the dispatcher picks `_load_<variant>` and
972 `_synthesize_<variant>` instead of the plain names.
973 startup_timeout: Seconds to wait for worker READY handshake.
974 request_timeout: Max seconds for a single request.
975 idle_timeout: Seconds of inactivity after which the worker is
976 auto-stopped to free VRAM. Default 5 min.
977 Set to 0 to disable auto-stop.
978 python_exe: Python interpreter to spawn the worker subprocess
979 under. None (default) = `_resolve_python_exe()`,
980 which picks python-embed when present else
981 sys.executable. Use this to run the worker inside
982 a per-engine venv (e.g. parler-tts needs
983 transformers==4.46.x while main has 5.x; pass
984 the venv's python.exe path here so the dispatch
985 subprocess sees the pinned deps). Read at
986 `_get_or_start` time, so callers may set
987 `tool.python_exe = '...'` after construction
988 before first synth.
990 worker_module / worker_args: DEPRECATED — legacy aliases. If
991 `tool_module` is not given, we fall back to `worker_module`.
992 If `worker_args=['<variant>']` is given, we fall back to
993 that for `variant`. New callers should use `tool_module`
994 + `variant` directly.
995 """
996 # Resolve the library module the worker should import
997 self.tool_module = tool_module or worker_module
998 if self.tool_module is None:
999 raise ValueError(
1000 f'{tool_name}: ToolWorker needs tool_module (library to run)'
1001 )
1003 # Resolve the variant — prefer explicit, fall back to legacy args
1004 if variant is None and worker_args:
1005 variant = worker_args[0] if worker_args else None
1006 self.variant = variant
1008 self.tool_name = tool_name
1009 self.vram_budget = vram_budget
1010 self.output_subdir = output_subdir
1011 self.engine = engine
1012 self.startup_timeout = startup_timeout
1013 self.request_timeout = request_timeout
1014 self.idle_timeout = idle_timeout
1015 self.python_exe = python_exe
1017 self._worker: Optional[GPUWorker] = None
1018 self._lock = threading.Lock()
1019 # Init to NOW so a brand-new worker isn't LRU-evicted immediately.
1020 # The old 0.0 default made fresh workers look like the oldest idle
1021 # in the LRU sort, causing premature eviction before first use.
1022 self._last_used: float = time.monotonic()
1023 self._idle_timer: Optional[threading.Timer] = None
1024 # State change observers. Listeners receive (tool_name, event)
1025 # where event is 'spawned' | 'stopped' | 'crashed'. Fired AFTER
1026 # the state transition is complete so observers can probe
1027 # is_alive() and get the new state.
1028 self._observers: list = []
1030 # Register with the cross-worker registry so the LRU eviction
1031 # policy can see this worker when other tools need VRAM.
1032 _register_tool_worker(self)
1034 # Back-compat shims for properties the tests still read
1035 @property
1036 def worker_module(self) -> str:
1037 """Compat alias — old tests read this."""
1038 return self.tool_module
1040 @property
1041 def worker_args(self) -> list:
1042 """Compat alias — old tests read this as ['<variant>'] or []."""
1043 return [self.variant] if self.variant else []
1045 # ── Observer API ──────────────────────────────────────────────
1047 def add_observer(self, callback: Callable[[str, str], None]) -> None:
1048 """Register a state change listener.
1050 The callback will be invoked with (tool_name, event) on every
1051 state transition: 'spawned' when the worker subprocess becomes
1052 READY, 'stopped' when it's cleanly stopped (user action or
1053 idle auto-stop), 'crashed' when the subprocess dies mid-call.
1055 Called after the transition so listeners can call .is_alive()
1056 and see ground truth. Exceptions in listeners are swallowed —
1057 a broken observer must never take down the worker.
1058 """
1059 self._observers.append(callback)
1061 def remove_observer(self, callback: Callable[[str, str], None]) -> None:
1062 """Unregister a previously-added observer (no-op if not present)."""
1063 try:
1064 self._observers.remove(callback)
1065 except ValueError:
1066 pass
1068 def _notify(self, event: str) -> None:
1069 """Fire a state-change event to every registered observer."""
1070 for cb in list(self._observers):
1071 try:
1072 cb(self.tool_name, event)
1073 except Exception as e:
1074 logger.debug(f"observer {cb} failed for {event}: {e}")
1076 # ── Public API ────────────────────────────────────────────────
1078 def call(self, request: Dict[str, Any]) -> Dict[str, Any]:
1079 """Send a request to the worker. Handles start, crash, timeout.
1081 Returns the worker's raw response dict (with an 'error' key on
1082 transient failure). Does NOT shape the result — callers that
1083 want uniform JSON output should use `synthesize()` instead.
1084 """
1085 # VRAM allocation moved INSIDE _get_or_start (after the _worker is
1086 # None check) to prevent double-counting when two concurrent call()
1087 # invocations both allocate before either enters the lock. See T138.
1088 try:
1089 worker = self._get_or_start()
1090 except (WorkerCrash, WorkerTimeout, WorkerError) as e:
1091 return {'error': f'{self.tool_name} worker startup failed: {e}'}
1093 try:
1094 result = worker.call(request)
1095 except (WorkerCrash, WorkerTimeout) as e:
1096 # Subprocess died. Worker will respawn on next call.
1097 logger.warning(f"{self.tool_name}: worker crash: {e}")
1098 self._notify('crashed')
1099 return {'error': f'{self.tool_name} crashed: {e}', 'transient': True}
1100 except WorkerError as e:
1101 return {'error': f'{self.tool_name} worker error: {e}'}
1103 # Request succeeded — reset idle timer
1104 self._touch_idle()
1105 return result
1107 def synthesize(
1108 self,
1109 *,
1110 text: str,
1111 language: str = 'en',
1112 voice: Optional[str] = None,
1113 output_path: Optional[str] = None,
1114 default_sample_rate: int = 24000,
1115 extra_request: Optional[Dict[str, Any]] = None,
1116 ) -> str:
1117 """Complete TTS synthesis with uniform JSON output.
1119 Handles all the per-tool boilerplate (text validation, output
1120 path generation, result shaping) so the tool module is ~10 lines.
1122 Returns a JSON string matching the original in-process tool
1123 output shape for drop-in compatibility.
1124 """
1125 if not text or not text.strip():
1126 return json.dumps({'error': 'Text is required'})
1128 if output_path is None:
1129 output_path = str(
1130 self._get_output_dir() / f'{self.tool_name}_{int(time.time() * 1000)}.wav'
1131 )
1133 request = {
1134 'text': text,
1135 'language': language,
1136 'voice': voice,
1137 'output_path': output_path,
1138 }
1139 if extra_request:
1140 request.update(extra_request)
1142 t0 = time.time()
1143 result = self.call(request)
1144 elapsed = time.time() - t0
1146 if 'error' in result:
1147 return json.dumps(result)
1149 duration = result.get('duration', 0)
1150 return json.dumps({
1151 'path': result.get('path', output_path),
1152 'duration': duration,
1153 'engine': result.get('engine', self.engine),
1154 'device': result.get('device', 'cuda'),
1155 'sample_rate': result.get('sample_rate', default_sample_rate),
1156 'voice': voice or 'default',
1157 'language': language,
1158 'latency_ms': round(elapsed * 1000, 1),
1159 'rtf': round(elapsed / duration, 4) if duration > 0 else 0,
1160 })
1162 def is_alive(self) -> bool:
1163 """True if the worker subprocess is running and READY."""
1164 w = self._worker
1165 return w is not None and w.is_alive()
1167 def set_idle_timeout(self, seconds: float) -> None:
1168 """Update the idle auto-stop threshold.
1170 Called by the model loader when the catalog entry's
1171 idle_timeout_s changes, so admin-UI edits take effect without
1172 restarting the worker. If a worker is currently running and
1173 the timeout was previously disabled, arms the timer now.
1174 """
1175 self.idle_timeout = max(0.0, float(seconds))
1176 if self.idle_timeout > 0 and self.is_alive():
1177 # Re-arm timer with the new deadline
1178 self._touch_idle()
1179 elif self.idle_timeout <= 0 and self._idle_timer is not None:
1180 with self._lock:
1181 self._idle_timer.cancel()
1182 self._idle_timer = None
1184 def stop(self) -> None:
1185 """Stop the worker and release VRAM."""
1186 was_running = False
1187 with self._lock:
1188 if self._idle_timer is not None:
1189 self._idle_timer.cancel()
1190 self._idle_timer = None
1191 if self._worker is not None:
1192 was_running = True
1193 try:
1194 self._worker.stop()
1195 except Exception:
1196 pass
1197 self._worker = None
1198 self._release_vram()
1199 logger.info(f"{self.tool_name}: worker stopped")
1200 if was_running:
1201 self._notify('stopped')
1203 # ── Idle timeout ─────────────────────────────────────────────
1205 def _touch_idle(self) -> None:
1206 """Record that the worker was just used; (re)arm the idle timer."""
1207 if self.idle_timeout <= 0:
1208 return
1209 with self._lock:
1210 self._last_used = time.monotonic()
1211 if self._idle_timer is not None:
1212 self._idle_timer.cancel()
1213 self._idle_timer = threading.Timer(self.idle_timeout, self._on_idle)
1214 self._idle_timer.daemon = True
1215 self._idle_timer.start()
1217 def _on_idle(self) -> None:
1218 """Fired when idle_timeout elapses with no requests.
1220 TOCTOU fix (T138): the elapsed-time check AND the stop decision
1221 both run INSIDE the lock. The old code checked elapsed under the
1222 lock, then called stop() OUTSIDE it — a concurrent call() could
1223 update _last_used between the check and the stop, killing a
1224 worker that was actively serving a request.
1225 """
1226 if self.idle_timeout <= 0:
1227 return
1228 with self._lock:
1229 elapsed = time.monotonic() - self._last_used
1230 if elapsed + 0.5 < self.idle_timeout:
1231 return # someone used it recently, skip
1232 # Stop INSIDE the lock so no concurrent call() can slip in
1233 # between the elapsed check and the actual termination.
1234 if self._worker is not None and self._worker.is_alive():
1235 logger.info(
1236 f"{self.tool_name}: idle for {self.idle_timeout:.0f}s, "
1237 f"stopping worker to free VRAM"
1238 )
1239 try:
1240 self._worker.stop()
1241 except Exception:
1242 pass
1243 self._worker = None
1244 self._release_vram()
1245 # Notify OUTSIDE the lock (observers may call back into us)
1246 self._notify('stopped')
1248 # ── Internals ─────────────────────────────────────────────────
1250 def _get_or_start(self) -> GPUWorker:
1251 """Lazy singleton: start the worker on first call or after a crash.
1253 Spawns the centralized dispatcher (`gpu_worker` module) with the
1254 target tool module (+ optional variant) as CLI args. Tool modules
1255 do NOT need their own `if __name__ == '__main__':` blocks — the
1256 dispatcher imports them and picks up `_load` / `_synthesize`.
1257 """
1258 spawned = False
1259 with self._lock:
1260 if self._worker is None or not self._worker.is_alive():
1261 # Allocate VRAM INSIDE the lock so concurrent call()
1262 # invocations don't double-count. T138 fix (c).
1263 # If allocate() returns False (GPU full), try evicting
1264 # LRU workers first, then retry. If still False after
1265 # eviction, the spawn will proceed but on CPU fallback
1266 # path (subprocess self-detects VRAM at _load time).
1267 allocated = self._allocate_vram()
1269 # Cross-worker VRAM eviction: if our budget won't fit
1270 # in current free VRAM, stop LRU other workers to make
1271 # room BEFORE spawning. Prevents a predictable OOM in
1272 # the new subprocess while older idle workers hold
1273 # memory they're not actively using.
1274 self._ensure_vram_headroom()
1276 # Retry allocate after eviction so the budget is tracked.
1277 if not allocated:
1278 self._allocate_vram()
1280 cli_args = [self.tool_module]
1281 if self.variant:
1282 cli_args.append(self.variant)
1283 # Resolve the spawn interpreter at start time, not at
1284 # ToolWorker.__init__ time: the per-backend venv is
1285 # typically created lazily on first install, AFTER the
1286 # ToolWorker singleton has been constructed. Order:
1287 # 1. Explicit self.python_exe (test override / caller-set)
1288 # 2. Per-backend venv at <data>/venvs/<tool_name>/ ← venv-installed deps
1289 # 3. GPUWorker default = _resolve_python_exe() (python-embed)
1290 spawn_python = (
1291 self.python_exe
1292 or _resolve_backend_venv_python(self.tool_name)
1293 )
1294 self._worker = GPUWorker(
1295 name=self.tool_name,
1296 module=self._DISPATCHER,
1297 startup_timeout=self.startup_timeout,
1298 request_timeout=self.request_timeout,
1299 python_exe=spawn_python,
1300 args=cli_args,
1301 )
1302 self._worker.start()
1303 spawned = True
1304 worker = self._worker
1305 if spawned:
1306 # Fire observer event OUTSIDE the lock so callbacks can
1307 # safely call back into this ToolWorker without deadlocking.
1308 self._notify('spawned')
1309 return worker
1311 def _ensure_vram_headroom(self) -> None:
1312 """Evict LRU workers if VRAM is too tight for this tool to fit.
1314 Looks up this tool's declared VRAM budget from vram_manager.
1315 If current free VRAM is below that budget, calls try_free_vram
1316 to stop other workers (oldest-idle first). Silent no-op when
1317 the tool has no registered budget or vram_manager is unavailable.
1318 """
1319 try:
1320 from integrations.service_tools.vram_manager import (
1321 vram_manager, VRAM_BUDGETS,
1322 )
1323 except ImportError:
1324 return
1325 budget = VRAM_BUDGETS.get(self.vram_budget)
1326 if not budget:
1327 return
1328 _min_vram, model_gb = budget
1329 free_gb = vram_manager.get_free_vram()
1330 if free_gb >= model_gb:
1331 return
1332 try_free_vram(needed_gb=model_gb, exclude_tool=self.tool_name)
1334 def _get_output_dir(self) -> Path:
1335 d = Path(os.environ.get(
1336 'HEVOLVE_MODEL_DIR',
1337 os.path.expanduser('~/.hevolve/models'),
1338 )) / self.output_subdir
1339 d.mkdir(parents=True, exist_ok=True)
1340 return d
1342 def _allocate_vram(self) -> bool:
1343 """Attempt to reserve VRAM. Returns True on success, False if
1344 the allocation was refused (caller may fall back to CPU).
1346 This honors the vram_manager.allocate contract: False means
1347 'won't fit', not 'error'. Exceptions during import/allocation
1348 are treated as 'unknown state -> allow', matching prior
1349 default-allow behavior while still surfacing real refusals.
1350 """
1351 try:
1352 from integrations.service_tools.vram_manager import get_vram_manager
1353 return bool(get_vram_manager().allocate(self.vram_budget))
1354 except ImportError:
1355 return True
1356 except Exception:
1357 return True
1359 def _release_vram(self) -> None:
1360 try:
1361 from integrations.service_tools.vram_manager import get_vram_manager
1362 get_vram_manager().release(self.vram_budget)
1363 except (ImportError, Exception):
1364 pass
1367# ═══════════════════════════════════════════════════════════════════
1368# Centralized worker entry point
1369# ═══════════════════════════════════════════════════════════════════
1370#
1371# ONE `if __name__ == '__main__':` block for ALL GPU workers. When the
1372# parent spawns `python -m integrations.service_tools.gpu_worker
1373# <tool_module> [variant]`, this dispatcher:
1374#
1375# 1. Dynamically imports the tool module.
1376# 2. Picks the right load/handle callbacks via convention:
1377# - no variant → module._load, module._synthesize
1378# - variant='turbo' → module._load_turbo, module._synthesize_turbo
1379# - variant='<v>' → module._load_<v>, module._synthesize_<v>
1380# (Fallback: if the variant-specific name doesn't exist, use the
1381# plain _load / _synthesize.)
1382# 3. Calls run_worker(...) — the infinite request loop.
1383#
1384# Tool modules do NOT need their own `if __name__ == '__main__':` block.
1385# They just define `_load` and `_synthesize` (plus variants if needed).
1386# This keeps the "entry point" concern in exactly one place.
1388def _dispatch_and_run(tool_module_name: str, variant: Optional[str] = None) -> None:
1389 """Import a tool module and run its worker loop.
1391 Picks `_load_<variant>` + `_synthesize_<variant>` when variant is
1392 set, else `_load` + `_synthesize`. Worker name is the module's
1393 last segment plus the variant suffix.
1394 """
1395 import importlib
1397 try:
1398 mod = importlib.import_module(tool_module_name)
1399 except Exception as e:
1400 # Log to stderr and exit non-zero so parent sees WorkerCrash
1401 print(f'[gpu_worker] import failed for {tool_module_name}: {e}', file=sys.stderr)
1402 sys.exit(2)
1404 suffix = f'_{variant}' if variant else ''
1405 load_name = f'_load{suffix}'
1406 handle_name = f'_synthesize{suffix}'
1408 load = getattr(mod, load_name, None) or getattr(mod, '_load', None)
1409 handle = getattr(mod, handle_name, None) or getattr(mod, '_synthesize', None)
1411 if load is None or handle is None:
1412 missing = []
1413 if load is None:
1414 missing.append(load_name if variant else '_load')
1415 if handle is None:
1416 missing.append(handle_name if variant else '_synthesize')
1417 print(
1418 f'[gpu_worker] {tool_module_name} missing callbacks: {missing}',
1419 file=sys.stderr,
1420 )
1421 sys.exit(2)
1423 base_name = tool_module_name.rsplit('.', 1)[-1]
1424 worker_name = f'{base_name}{suffix}' if variant else base_name
1425 run_worker(name=worker_name, load=load, handle=handle)
1428# ── #58 Scope-2: reflection dispatch via catalog id ────────────────
1429#
1430# The Python-tool path (above) covers every code-shipped engine —
1431# each *_tool.py defines `_load[_<variant>]` + `_synthesize[_<variant>]`
1432# by convention. Adding a NEW engine that fits a homogeneous load+
1433# synth API used to require writing a whole new *_tool.py just to wrap
1434# the import + method call. This was friction for engines whose synth
1435# API is reflection-friendly (Kokoro, Pocket-TTS, etc.).
1436#
1437# Reflection path: a catalog entry with no `tool_module` but the full
1438# 5-field contract (`import_path`, `init_args`, `synth_method`,
1439# `params_map`, `output_format`) is dispatchable via the catalog
1440# alone. The 5-field contract lives in `tts_router._REFLECTION_FIELDS`
1441# and the canonical output formats in `tts_router._OUTPUT_FORMATS`.
1442# Validation fires at catalog-ingest (single source of truth in
1443# `tts_router._validate_engine_caps`); the dispatcher trusts what
1444# made it past the gate.
1446def _normalize_to_wav_file(
1447 raw: Any,
1448 output_format: str,
1449 output_path: str,
1450 sample_rate: int = 24000,
1451) -> tuple:
1452 """Normalize a reflection-dispatched engine's raw output to a WAV
1453 file on disk. Returns ``(output_path, duration_seconds)``.
1455 Canonical ``output_format`` values mirror
1456 ``tts_router._OUTPUT_FORMATS``:
1458 * ``'wav_bytes'`` — bytes object holding a WAV byte stream;
1459 written verbatim.
1460 * ``'numpy_24k'`` — 1-D float32 numpy array @ 24 kHz mono;
1461 written via scipy.io.wavfile at 24000 Hz regardless of the
1462 caller's ``sample_rate`` arg (the contract pins the rate).
1463 * ``'file_path'`` — str path the engine already wrote to;
1464 copied to ``output_path`` if different.
1465 * ``'bytesio'`` — io.BytesIO containing wav bytes; written
1466 verbatim.
1468 Raises TypeError on shape mismatch (engine declared one format
1469 but returned another) and ValueError on unknown ``output_format``
1470 so the caller can surface a precise wire error rather than a
1471 silent miswrite.
1472 """
1473 import shutil
1475 if output_format == 'wav_bytes':
1476 if not isinstance(raw, (bytes, bytearray)):
1477 raise TypeError(
1478 f"output_format='wav_bytes' but raw is "
1479 f"{type(raw).__name__}"
1480 )
1481 with open(output_path, 'wb') as fh:
1482 fh.write(raw)
1483 elif output_format == 'bytesio':
1484 import io as _io
1485 if not isinstance(raw, _io.BytesIO):
1486 raise TypeError(
1487 f"output_format='bytesio' but raw is "
1488 f"{type(raw).__name__}"
1489 )
1490 with open(output_path, 'wb') as fh:
1491 fh.write(raw.getvalue())
1492 elif output_format == 'file_path':
1493 if not isinstance(raw, str) or not raw:
1494 raise TypeError(
1495 f"output_format='file_path' but raw is "
1496 f"{type(raw).__name__!r}"
1497 )
1498 if raw != output_path:
1499 shutil.copyfile(raw, output_path)
1500 elif output_format == 'numpy_24k':
1501 try:
1502 import scipy.io.wavfile # type: ignore
1503 except ImportError as e:
1504 raise RuntimeError(
1505 f"output_format='numpy_24k' requires scipy: {e}"
1506 )
1507 if not hasattr(raw, 'dtype'):
1508 raise TypeError(
1509 f"output_format='numpy_24k' but raw is "
1510 f"{type(raw).__name__}"
1511 )
1512 # Pin to 24 kHz per contract; cast to float32 (canonical type).
1513 scipy.io.wavfile.write(output_path, 24000, raw.astype('float32'))
1514 else:
1515 raise ValueError(
1516 f"unknown output_format {output_format!r}; canonical set is "
1517 f"wav_bytes / numpy_24k / file_path / bytesio"
1518 )
1520 # Duration: try wave header (PCM only — stdlib's wave module
1521 # rejects float WAV with `wave.Error: unknown format: 3`). scipy
1522 # writes float32 for numpy_24k, so fall back to scipy.io.wavfile
1523 # for that format. Either path lands on a real number; only when
1524 # both fail does duration default to 0.0.
1525 duration = 0.0
1526 try:
1527 import wave
1528 with wave.open(output_path, 'rb') as wf:
1529 n_frames = wf.getnframes()
1530 sr = wf.getframerate()
1531 duration = n_frames / sr if sr > 0 else 0.0
1532 except Exception:
1533 try:
1534 import scipy.io.wavfile # type: ignore
1535 sr, data = scipy.io.wavfile.read(output_path)
1536 duration = len(data) / sr if sr > 0 else 0.0
1537 except Exception:
1538 duration = 0.0
1540 return output_path, duration
1543def _build_reflection_callbacks(
1544 catalog_id: str,
1545 entry_capabilities: Dict[str, Any],
1546 output_dir: Optional[str] = None,
1547) -> tuple:
1548 """Build ``(load, handle)`` callbacks for a reflection-only catalog
1549 entry. No on-disk *_tool.py needed.
1551 Validation runs against `tts_router._validate_engine_caps` first;
1552 raises RuntimeError on any contract violation so the caller can
1553 exit cleanly via the shared error path (printing to stderr +
1554 sys.exit(2) — same shape as `_dispatch_and_run`'s missing-callbacks
1555 path).
1557 The 5-field contract:
1559 * ``import_path`` — ``'pkg.module:ClassName'``
1560 * ``init_args`` — ``{}`` kwargs for ``ClassName(**init_args)``
1561 * ``synth_method`` — instance method name on the loaded model
1562 * ``params_map`` — ``{payload_key → method_kwarg}`` translation
1563 * ``output_format`` — one of `tts_router._OUTPUT_FORMATS`
1565 The handler accepts the same wire payload existing TTS workers do —
1566 ``request['text']`` is required; optional ``output_path``,
1567 ``sample_rate``, plus any keys named in ``params_map``. Returns
1568 the same shape (``{path, duration, sample_rate, engine}``) so any
1569 caller routing through the reflection path is wire-compatible with
1570 the on-disk *_tool.py path.
1571 """
1572 from integrations.channels.media.tts_router import _validate_engine_caps
1573 err = _validate_engine_caps(entry_capabilities)
1574 if err is not None:
1575 raise RuntimeError(f'catalog entry {catalog_id!r}: {err}')
1577 import_path = entry_capabilities['import_path']
1578 init_args = entry_capabilities.get('init_args') or {}
1579 synth_method = entry_capabilities['synth_method']
1580 params_map = entry_capabilities.get('params_map') or {}
1581 output_format = entry_capabilities['output_format']
1583 if ':' not in import_path:
1584 raise RuntimeError(
1585 f"catalog entry {catalog_id!r}: import_path must be "
1586 f"'pkg.module:ClassName', got {import_path!r}"
1587 )
1588 mod_name, cls_name = import_path.split(':', 1)
1590 # Output dir resolution — mirrors pocket_tts_tool's fallback shape.
1591 if output_dir is None:
1592 try:
1593 from core.platform_paths import get_data_dir # type: ignore
1594 output_dir = os.path.join(
1595 get_data_dir(), 'tts_outputs', 'reflection'
1596 )
1597 except Exception:
1598 output_dir = os.path.join(
1599 os.path.expanduser('~'), '.hevolve', 'models',
1600 'reflection_outputs',
1601 )
1602 try:
1603 os.makedirs(output_dir, exist_ok=True)
1604 except OSError:
1605 # ENOSPC / read-only home — fall back to tempdir so the worker
1606 # can still respond (a synth that can't write its WAV will fail
1607 # later with a clearer ENOSPC error than os.makedirs would).
1608 import tempfile as _tempfile
1609 output_dir = _tempfile.gettempdir()
1611 def _load_reflection() -> Any:
1612 import importlib
1613 mod = importlib.import_module(mod_name)
1614 cls = getattr(mod, cls_name, None)
1615 if cls is None:
1616 raise RuntimeError(
1617 f'catalog entry {catalog_id!r}: class {cls_name!r} not '
1618 f'found in module {mod_name!r}'
1619 )
1620 return cls(**init_args)
1622 def _handle_reflection(model: Any, request: Dict[str, Any]) -> Dict[str, Any]:
1623 text = request.get('text', '')
1624 if not text or not isinstance(text, str):
1625 return {'error': 'text is required'}
1627 # Translate request → method kwargs via params_map. Keys NOT
1628 # named in params_map are ignored (engines opt into what they
1629 # see). Plus 'text' under whatever name the engine declared.
1630 kwargs: Dict[str, Any] = {}
1631 for payload_key, method_arg in params_map.items():
1632 if payload_key in request:
1633 kwargs[method_arg] = request[payload_key]
1634 # If the engine didn't declare 'text' in params_map, default to
1635 # passing it as 'text' — most reflection-friendly engines name
1636 # the arg that exact way.
1637 if 'text' not in params_map:
1638 kwargs.setdefault('text', text)
1640 method = getattr(model, synth_method, None)
1641 if method is None or not callable(method):
1642 return {'error': f'method {synth_method!r} not found on model'}
1644 # Output path: respect request override; else hash-derived under
1645 # the resolved output_dir.
1646 out_path = request.get('output_path')
1647 if not out_path:
1648 import hashlib as _h
1649 digest = _h.md5(
1650 f"{text[:50]}:{catalog_id}".encode()
1651 ).hexdigest()[:12]
1652 out_path = os.path.join(
1653 output_dir, f'{catalog_id}_{digest}.wav'
1654 )
1656 try:
1657 raw = method(**kwargs)
1658 path, duration = _normalize_to_wav_file(
1659 raw, output_format, out_path,
1660 sample_rate=request.get('sample_rate', 24000),
1661 )
1662 except Exception as e:
1663 import traceback as _tb
1664 return {
1665 'error': f'{type(e).__name__}: {e}',
1666 'traceback': _tb.format_exc()[-2000:],
1667 }
1669 return {
1670 'path': path,
1671 'duration': round(duration, 3),
1672 'sample_rate': request.get('sample_rate', 24000),
1673 'engine': f'reflection:{catalog_id}',
1674 }
1676 return _load_reflection, _handle_reflection
1679def _dispatch_catalog_id(catalog_id: str) -> None:
1680 """Run a worker loop for a reflection-only catalog entry.
1682 Spawned by the parent process via:
1683 python -m integrations.service_tools.gpu_worker --catalog-id <id>
1685 Exit codes mirror `_dispatch_and_run`:
1686 2 — diagnostic on stderr (catalog unreachable, entry not found,
1687 validation failed); parent sees WorkerCrash on next call.
1688 """
1689 try:
1690 from integrations.service_tools.model_catalog import get_catalog
1691 except Exception as e:
1692 print(
1693 f'[gpu_worker] cannot import model_catalog: {e}',
1694 file=sys.stderr,
1695 )
1696 sys.exit(2)
1698 try:
1699 catalog = get_catalog()
1700 entry = catalog.get(catalog_id)
1701 except Exception as e:
1702 print(
1703 f'[gpu_worker] catalog load failed for {catalog_id!r}: {e}',
1704 file=sys.stderr,
1705 )
1706 sys.exit(2)
1708 if entry is None:
1709 print(
1710 f'[gpu_worker] catalog has no entry {catalog_id!r}',
1711 file=sys.stderr,
1712 )
1713 sys.exit(2)
1715 try:
1716 load, handle = _build_reflection_callbacks(
1717 catalog_id, entry.capabilities or {},
1718 )
1719 except RuntimeError as e:
1720 print(f'[gpu_worker] {e}', file=sys.stderr)
1721 sys.exit(2)
1723 run_worker(name=f'reflection.{catalog_id}', load=load, handle=handle)
1726if __name__ == '__main__':
1727 # Usage:
1728 # python -m integrations.service_tools.gpu_worker <module> [variant]
1729 # python -m integrations.service_tools.gpu_worker --catalog-id <id>
1730 if len(sys.argv) >= 3 and sys.argv[1] == '--catalog-id':
1731 _dispatch_catalog_id(sys.argv[2])
1732 elif len(sys.argv) >= 2 and not sys.argv[1].startswith('--'):
1733 _tool_module = sys.argv[1]
1734 _variant = sys.argv[2] if len(sys.argv) > 2 else None
1735 _dispatch_and_run(_tool_module, _variant)
1736 else:
1737 print(
1738 'usage: python -m integrations.service_tools.gpu_worker '
1739 '<tool.module.path> [variant]\n'
1740 ' or: python -m integrations.service_tools.gpu_worker '
1741 '--catalog-id <id>',
1742 file=sys.stderr,
1743 )
1744 sys.exit(2)