Coverage for integrations / service_tools / runtime_manager.py: 56.2%
276 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Runtime Tool Manager — orchestrates the full lifecycle of media tools.
4Manages: detect → download → start → register → stop → unload
5Persists state to ~/.hevolve/tool_state.json so restarts skip completed setup.
7All sidecar servers use dynamic port allocation (no fixed ports).
8Whisper runs in-process (no sidecar).
9"""
11import atexit
12import json
13import logging
14import os
15import subprocess
16import sys
17import time
18from pathlib import Path
19from threading import Lock, Thread
20from typing import Dict, Optional
22from .model_storage import ModelStorageManager, model_storage
23from .vram_manager import VRAMManager, vram_manager
24from .registry import service_tool_registry
26logger = logging.getLogger(__name__)
28STATE_FILE = Path.home() / '.hevolve' / 'tool_state.json'
29SERVERS_DIR = os.path.join(os.path.dirname(__file__), 'servers')
31# Tool configuration: name → {repo_url, server_script, hf_repo_id, is_inprocess, catalog_id}
32# catalog_id links RTM tools to ModelCatalog entries so the orchestrator stays in sync.
33# None = no catalog entry (tool is a wrapper or resolved dynamically).
34TOOL_CONFIGS = {
35 # Video generation
36 'wan2gp': {
37 'repo_url': 'https://github.com/deepbeepmeep/Wan2GP',
38 'server_script': os.path.join(SERVERS_DIR, 'wan2gp_server.py'),
39 'download_type': 'git',
40 'catalog_id': 'video_gen-wan2gp',
41 },
42 'ltx2': {
43 'server_script': os.path.join(SERVERS_DIR, 'ltx2_server.py') if SERVERS_DIR else None,
44 'hf_repo_id': 'Lightricks/LTX-Video',
45 'download_type': 'hf',
46 'catalog_id': 'video_gen-ltx2',
47 },
48 # Music / singing
49 'acestep': {
50 'repo_url': 'https://github.com/ace-step/ACE-Step-1.5',
51 'download_type': 'git',
52 'run_command': ['uv', 'run', 'acestep-api', '--port', '8001'],
53 'catalog_id': 'audio_gen-acestep',
54 },
55 'diffrhythm': {
56 'hf_repo_id': 'DiffRhythm/diffrhythm-v1',
57 'download_type': 'hf',
58 'catalog_id': 'audio_gen-diffrhythm',
59 },
60 # TTS audio suite (multiple engines)
61 'tts_audio_suite': {
62 'repo_url': 'https://github.com/diodiogod/TTS-Audio-Suite',
63 'server_script': os.path.join(SERVERS_DIR, 'tts_audio_suite_server.py'),
64 'download_type': 'git',
65 'catalog_id': None,
66 },
67 # STT
68 'whisper': {
69 'hf_repo_id': 'openai/whisper-base',
70 'download_type': 'hf',
71 'is_inprocess': True,
72 'catalog_id': None,
73 },
74 # Vision
75 'minicpm': {
76 'hf_repo_id': 'openbmb/MiniCPM-V-2_6',
77 'download_type': 'hf',
78 'catalog_id': None,
79 },
80}
83class RuntimeToolManager:
84 """Central orchestrator for runtime media tool lifecycle."""
86 def __init__(self, storage: ModelStorageManager = None,
87 vram: VRAMManager = None):
88 self.storage = storage or model_storage
89 self.vram = vram or vram_manager
90 self._processes: Dict[str, subprocess.Popen] = {}
91 self._ports: Dict[str, int] = {}
92 self._lock = Lock()
93 # Lifecycle hooks — ModelLifecycleManager subscribes to these
94 self._lifecycle_hooks = {
95 'on_tool_started': [],
96 'on_tool_stopped': [],
97 }
99 def register_lifecycle_hook(self, event: str, callback) -> None:
100 """Register a lifecycle event callback. Non-breaking addition."""
101 if event in self._lifecycle_hooks:
102 self._lifecycle_hooks[event].append(callback)
104 def _notify_hooks(self, event: str, tool_name: str, **kwargs) -> None:
105 """Fire all registered hooks for an event."""
106 for cb in self._lifecycle_hooks.get(event, []):
107 try:
108 cb(tool_name, **kwargs)
109 except Exception as e:
110 logger.debug(f"Lifecycle hook error ({event}, {tool_name}): {e}")
112 # ── Tool lifecycle ───────────────────────────────────────────
114 def setup_tool(self, tool_name: str) -> Dict:
115 """Download + start + register a tool. Idempotent.
117 Returns status dict with keys: downloaded, running, port, offload_mode.
118 """
119 config = TOOL_CONFIGS.get(tool_name)
120 if not config:
121 return {'error': f'Unknown tool: {tool_name}'}
123 result = {'tool': tool_name}
125 # Step 1: Download if needed
126 if not self.storage.is_downloaded(tool_name):
127 dl_type = config.get('download_type', 'git')
128 if dl_type == 'git':
129 path = self.storage.clone_repo(tool_name, config['repo_url'])
130 elif dl_type == 'hf':
131 path = self.storage.download_hf_model(
132 tool_name, config['hf_repo_id'])
133 else:
134 return {'error': f'Unknown download_type: {dl_type}'}
136 if path is None:
137 return {'error': f'Download failed for {tool_name}'}
139 result['downloaded'] = True
141 # Step 2: Check VRAM and decide offload mode
142 offload = self.vram.suggest_offload_mode(tool_name)
143 result['offload_mode'] = offload
145 # Step 3: Start server (or load in-process)
146 if config.get('is_inprocess'):
147 start_result = self._start_inprocess(tool_name, config)
148 else:
149 start_result = self._start_sidecar(tool_name, config, offload)
151 result.update(start_result)
153 # Step 4: Save state
154 self.save_state()
156 return result
158 def start_tool(self, tool_name: str) -> Dict:
159 """Start a tool that's already downloaded."""
160 if not self.storage.is_downloaded(tool_name):
161 return {'error': f'{tool_name} not downloaded. Use setup_tool() first.'}
163 config = TOOL_CONFIGS.get(tool_name)
164 if not config:
165 return {'error': f'Unknown tool: {tool_name}'}
167 offload = self.vram.suggest_offload_mode(tool_name)
169 if config.get('is_inprocess'):
170 result = self._start_inprocess(tool_name, config)
171 else:
172 result = self._start_sidecar(tool_name, config, offload)
174 self.save_state()
175 return result
177 def stop_tool(self, tool_name: str) -> Dict:
178 """Stop a tool's server and free VRAM."""
179 config = TOOL_CONFIGS.get(tool_name)
180 if config and config.get('is_inprocess'):
181 result = self._stop_inprocess(tool_name)
182 self._unsync_catalog(tool_name)
183 self._notify_hooks('on_tool_stopped', tool_name)
184 return result
186 self._kill_server(tool_name)
187 self.vram.release(tool_name)
188 self._unsync_catalog(tool_name)
189 self._notify_hooks('on_tool_stopped', tool_name)
190 self.save_state()
191 return {'tool': tool_name, 'status': 'stopped'}
193 def unload_tool(self, tool_name: str) -> Dict:
194 """Stop + deregister a tool."""
195 self.stop_tool(tool_name) # stop_tool already fires on_tool_stopped
196 service_tool_registry.unregister_tool(tool_name)
197 self.save_state()
198 return {'tool': tool_name, 'status': 'unloaded'}
200 def get_tool_status(self, tool_name: str) -> Dict:
201 """Get full status for a single tool."""
202 config = TOOL_CONFIGS.get(tool_name)
203 if not config:
204 return {'error': f'Unknown tool: {tool_name}'}
206 is_running = self._is_server_alive(tool_name)
207 return {
208 'tool': tool_name,
209 'downloaded': self.storage.is_downloaded(tool_name),
210 'running': is_running,
211 'port': self._ports.get(tool_name),
212 'is_inprocess': config.get('is_inprocess', False),
213 'vram_allocated_gb': self.vram.get_allocations().get(tool_name, 0),
214 'offload_mode': self.vram.suggest_offload_mode(tool_name),
215 }
217 # ── Bulk operations ──────────────────────────────────────────
219 def setup_available_tools(self) -> Dict:
220 """Setup all tools that can fit in available VRAM."""
221 results = {}
222 for name in TOOL_CONFIGS:
223 if self.vram.can_fit(name):
224 results[name] = self.setup_tool(name)
225 else:
226 results[name] = {'skipped': 'insufficient VRAM'}
227 return results
229 def get_all_status(self) -> Dict:
230 """Dashboard view of all tools."""
231 status = {}
232 for name in TOOL_CONFIGS:
233 status[name] = self.get_tool_status(name)
234 status['vram'] = self.vram.get_status()
235 status['storage'] = {
236 'total_size_gb': round(self.storage.get_total_size() / 1e9, 2),
237 'base_dir': str(self.storage.base_dir),
238 }
239 return status
241 def stop_all(self) -> None:
242 """Graceful shutdown of all running tools."""
243 for name in list(self._processes.keys()):
244 self.stop_tool(name)
245 # Also stop in-process tools
246 for name, config in TOOL_CONFIGS.items():
247 if config.get('is_inprocess'):
248 self._stop_inprocess(name)
249 self.save_state()
250 logger.info("All runtime tools stopped")
252 # ── Catalog sync — single authority for model state ─────────
253 # RTM is a process manager; the orchestrator's catalog is the
254 # authority on "what is loaded." These methods bridge the gap.
256 def _sync_catalog(self, tool_name: str, device: str = 'gpu',
257 catalog_id: str = None) -> None:
258 """Notify orchestrator catalog that a model is now loaded."""
259 cid = catalog_id or TOOL_CONFIGS.get(tool_name, {}).get('catalog_id')
260 if not cid:
261 return
262 try:
263 from .model_orchestrator import get_orchestrator
264 orch = get_orchestrator()
265 entry = orch._catalog.get(cid)
266 if entry and not entry.loaded:
267 orch._catalog.mark_loaded(cid, device=device)
268 orch._register_vram(entry, device)
269 orch._register_lifecycle(entry)
270 orch._register_service_tool(entry)
271 logger.info(f"Catalog synced: {cid} loaded via RTM")
272 except Exception as e:
273 logger.debug(f"Catalog sync skipped for {tool_name}: {e}")
275 def _unsync_catalog(self, tool_name: str) -> None:
276 """Notify orchestrator catalog that a model was unloaded."""
277 cid = TOOL_CONFIGS.get(tool_name, {}).get('catalog_id')
278 if not cid:
279 return
280 try:
281 from .model_orchestrator import get_orchestrator
282 orch = get_orchestrator()
283 entry = orch._catalog.get(cid)
284 if entry and entry.loaded:
285 orch._release_vram(entry)
286 orch._deregister_service_tool(entry)
287 orch._catalog.mark_unloaded(cid)
288 logger.info(f"Catalog synced: {cid} unloaded via RTM")
289 except Exception as e:
290 logger.debug(f"Catalog unsync skipped for {tool_name}: {e}")
292 # ── State persistence ────────────────────────────────────────
294 def save_state(self) -> None:
295 """Persist tool state to JSON."""
296 state = {
297 'tools': {},
298 'ports': dict(self._ports),
299 }
300 for name in TOOL_CONFIGS:
301 state['tools'][name] = {
302 'downloaded': self.storage.is_downloaded(name),
303 'was_running': self._is_server_alive(name),
304 'port': self._ports.get(name),
305 }
307 STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
308 try:
309 STATE_FILE.write_text(json.dumps(state, indent=2))
310 except Exception as e:
311 logger.warning(f"Failed to save tool state: {e}")
313 def load_state(self) -> Dict:
314 """Restore tool state from JSON. Re-starts previously running tools."""
315 if not STATE_FILE.exists():
316 logger.info("No tool state to restore")
317 return {}
319 try:
320 state = json.loads(STATE_FILE.read_text())
321 except Exception as e:
322 logger.warning(f"Failed to load tool state: {e}")
323 return {}
325 restored = {}
326 for name, info in state.get('tools', {}).items():
327 if info.get('was_running') and info.get('downloaded'):
328 logger.info(f"Restoring {name}...")
329 result = self.start_tool(name)
330 restored[name] = result
332 logger.info(f"Restored {len(restored)} tools from state")
333 return restored
335 # ── Server process management ────────────────────────────────
337 def _start_sidecar(self, tool_name: str, config: Dict,
338 offload_mode: str) -> Dict:
339 """Launch a sidecar server subprocess with dynamic port."""
340 if self._is_server_alive(tool_name):
341 port = self._ports.get(tool_name)
342 return {'running': True, 'port': port, 'message': 'already running'}
344 script = config.get('server_script')
345 if not script or not os.path.exists(script):
346 return {'error': f'Server script not found: {script}'}
348 # Fail-early VRAM check: if the tool has a GPU offload mode and
349 # the budget won't fit, refuse to spawn the proc rather than
350 # letting it OOM mid-load. cpu_only offload mode bypasses this
351 # check (CPU-only inference has no VRAM budget).
352 if offload_mode != 'cpu_only' and not self.vram.can_fit(tool_name):
353 free_gb = self.vram.get_free_vram()
354 logger.warning(
355 f"Refusing to start {tool_name}: won't fit "
356 f"(free={free_gb:.1f}GB, offload={offload_mode})"
357 )
358 return {
359 'error': f'Insufficient VRAM for {tool_name} '
360 f'(free={free_gb:.1f}GB); try cpu_only',
361 'oom': True,
362 }
364 # Set environment for the child process
365 env = os.environ.copy()
366 model_dir = str(self.storage.get_tool_dir(tool_name))
367 env_key = f"{tool_name.upper()}_MODEL_DIR"
368 env[env_key] = model_dir
369 env[f"{tool_name.upper()}_OFFLOAD"] = offload_mode
371 python_exe = sys.executable
372 # In frozen builds (cx_Freeze), sys.executable is Nunba.exe — not a
373 # Python interpreter. Use the bundled python-embed/ instead.
374 if getattr(sys, 'frozen', False):
375 app_dir = os.path.dirname(sys.executable)
376 embed_python = os.path.join(app_dir, 'python-embed', 'python.exe')
377 if os.path.isfile(embed_python):
378 python_exe = embed_python
380 try:
381 _popen_kwargs = dict(
382 stdout=subprocess.PIPE,
383 stderr=subprocess.PIPE,
384 env=env,
385 text=True,
386 )
387 if sys.platform == 'win32':
388 _popen_kwargs['creationflags'] = subprocess.CREATE_NO_WINDOW
389 proc = subprocess.Popen(
390 [python_exe, script],
391 **_popen_kwargs,
392 )
394 # Read PORT=NNNNN from stdout (timeout 30s)
395 port = self._read_port_from_stdout(proc, timeout=30)
396 if port is None:
397 proc.kill()
398 return {'error': f'Server did not report port within 30s'}
400 with self._lock:
401 self._processes[tool_name] = proc
402 self._ports[tool_name] = port
404 # Allocate VRAM — if rejected here the proc grabbed VRAM
405 # concurrently (race), so we keep the proc but log the
406 # inconsistency so the operator can re-check.
407 if not self.vram.allocate(tool_name):
408 logger.warning(
409 f"VRAM.allocate({tool_name}) returned False even "
410 f"though can_fit passed pre-spawn — concurrent race"
411 )
413 # Register with service_tool_registry
414 self._register_tool_at_port(tool_name, port)
416 logger.info(f"Started {tool_name} on port {port} (PID {proc.pid})")
417 self._sync_catalog(tool_name, device='gpu')
418 self._notify_hooks('on_tool_started', tool_name,
419 device='gpu', offload_mode=offload_mode)
420 return {'running': True, 'port': port, 'pid': proc.pid}
422 except Exception as e:
423 logger.error(f"Failed to start {tool_name}: {e}")
424 return {'error': str(e)}
426 def _start_inprocess(self, tool_name: str, config: Dict) -> Dict:
427 """Start an in-process tool (no server subprocess)."""
428 if tool_name == 'whisper':
429 try:
430 from .whisper_tool import WhisperTool, select_whisper_model
431 model_name = select_whisper_model()
432 # For in-process whisper, VRAM is eagerly consumed by
433 # faster-whisper on first call — honor the bool here so
434 # a refusal is logged alongside the success path.
435 WhisperTool.register_functions()
436 if not self.vram.allocate(tool_name):
437 logger.warning(
438 f"Whisper in-process registered but VRAM budget "
439 f"refused — running in CPU fallback mode"
440 )
441 logger.info(f"Whisper registered in-process (model: {model_name})")
442 # Resolve catalog_id dynamically from selected model size
443 self._sync_catalog(tool_name, device='cpu',
444 catalog_id=f'stt-whisper-{model_name}')
445 self._notify_hooks('on_tool_started', tool_name,
446 device='gpu', inprocess=True)
447 return {'running': True, 'inprocess': True, 'model': model_name}
448 except Exception as e:
449 return {'error': f'Whisper init failed: {e}'}
451 return {'error': f'No in-process handler for {tool_name}'}
453 def _stop_inprocess(self, tool_name: str) -> Dict:
454 """Stop an in-process tool."""
455 if tool_name == 'whisper':
456 try:
457 from .whisper_tool import unload_whisper
458 unload_whisper()
459 self.vram.release(tool_name)
460 return {'tool': tool_name, 'status': 'stopped'}
461 except Exception as e:
462 return {'error': str(e)}
463 return {'error': f'No in-process handler for {tool_name}'}
465 def _read_port_from_stdout(self, proc: subprocess.Popen,
466 timeout: int = 30) -> Optional[int]:
467 """Read PORT=NNNNN line from subprocess stdout."""
468 deadline = time.time() + timeout
469 while time.time() < deadline:
470 if proc.poll() is not None:
471 # Process died
472 stderr = proc.stderr.read() if proc.stderr else ''
473 logger.error(f"Server process died: {stderr[:300]}")
474 return None
476 line = proc.stdout.readline()
477 if line:
478 line = line.strip()
479 if line.startswith('PORT='):
480 try:
481 return int(line.split('=', 1)[1])
482 except ValueError:
483 pass
484 else:
485 time.sleep(0.1)
487 return None
489 def _register_tool_at_port(self, tool_name: str, port: int) -> None:
490 """Register the tool wrapper with the discovered port."""
491 base_url = f"http://127.0.0.1:{port}"
493 if tool_name == 'wan2gp':
494 from .wan2gp_tool import Wan2GPTool
495 Wan2GPTool.register(base_url)
496 elif tool_name == 'tts_audio_suite':
497 from .tts_audio_suite_tool import TTSAudioSuiteTool
498 TTSAudioSuiteTool.register(base_url)
499 else:
500 logger.warning(f"No tool wrapper for {tool_name}")
502 def _kill_server(self, tool_name: str) -> None:
503 """Kill a sidecar server process."""
504 with self._lock:
505 proc = self._processes.pop(tool_name, None)
506 self._ports.pop(tool_name, None)
508 if proc and proc.poll() is None:
509 try:
510 proc.terminate()
511 proc.wait(timeout=5)
512 except subprocess.TimeoutExpired:
513 proc.kill()
514 logger.info(f"Killed {tool_name} server (PID {proc.pid})")
516 def _is_server_alive(self, tool_name: str) -> bool:
517 """Check if a sidecar server is still running.
519 For `whisper` the "server" is the STT subprocess worker managed
520 by `whisper_tool._stt_tool`; check its lifetime via ToolWorker.
521 For other in-process tools there is no separate process to
522 check — return False.
523 """
524 config = TOOL_CONFIGS.get(tool_name, {})
525 if config.get('is_inprocess'):
526 if tool_name == 'whisper':
527 try:
528 from .whisper_tool import _stt_tool
529 return _stt_tool.is_alive()
530 except Exception:
531 return False
532 return False
534 proc = self._processes.get(tool_name)
535 if proc is None:
536 return False
537 return proc.poll() is None
539 # ── AutoGen/LangChain helpers ────────────────────────────────
541 def get_autogen_tools(self) -> Dict:
542 """Get all running tools as AutoGen-compatible functions.
544 Delegates to service_tool_registry which already handles this.
545 """
546 return service_tool_registry.get_all_tool_functions()
548 def get_langchain_tools(self) -> list:
549 """Get all running tools as LangChain Tool objects.
551 Delegates to service_tool_registry which already handles this.
552 """
553 return service_tool_registry.get_langchain_tools()
556# Global singleton
557runtime_tool_manager = RuntimeToolManager()
559# Ensure cleanup on process exit
560atexit.register(runtime_tool_manager.stop_all)