Coverage for integrations / service_tools / runtime_manager.py: 56.2%

276 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Runtime Tool Manager — orchestrates the full lifecycle of media tools. 

3 

4Manages: detect → download → start → register → stop → unload 

5Persists state to ~/.hevolve/tool_state.json so restarts skip completed setup. 

6 

7All sidecar servers use dynamic port allocation (no fixed ports). 

8Whisper runs in-process (no sidecar). 

9""" 

10 

11import atexit 

12import json 

13import logging 

14import os 

15import subprocess 

16import sys 

17import time 

18from pathlib import Path 

19from threading import Lock, Thread 

20from typing import Dict, Optional 

21 

22from .model_storage import ModelStorageManager, model_storage 

23from .vram_manager import VRAMManager, vram_manager 

24from .registry import service_tool_registry 

25 

26logger = logging.getLogger(__name__) 

27 

28STATE_FILE = Path.home() / '.hevolve' / 'tool_state.json' 

29SERVERS_DIR = os.path.join(os.path.dirname(__file__), 'servers') 

30 

31# Tool configuration: name → {repo_url, server_script, hf_repo_id, is_inprocess, catalog_id} 

32# catalog_id links RTM tools to ModelCatalog entries so the orchestrator stays in sync. 

33# None = no catalog entry (tool is a wrapper or resolved dynamically). 

34TOOL_CONFIGS = { 

35 # Video generation 

36 'wan2gp': { 

37 'repo_url': 'https://github.com/deepbeepmeep/Wan2GP', 

38 'server_script': os.path.join(SERVERS_DIR, 'wan2gp_server.py'), 

39 'download_type': 'git', 

40 'catalog_id': 'video_gen-wan2gp', 

41 }, 

42 'ltx2': { 

43 'server_script': os.path.join(SERVERS_DIR, 'ltx2_server.py') if SERVERS_DIR else None, 

44 'hf_repo_id': 'Lightricks/LTX-Video', 

45 'download_type': 'hf', 

46 'catalog_id': 'video_gen-ltx2', 

47 }, 

48 # Music / singing 

49 'acestep': { 

50 'repo_url': 'https://github.com/ace-step/ACE-Step-1.5', 

51 'download_type': 'git', 

52 'run_command': ['uv', 'run', 'acestep-api', '--port', '8001'], 

53 'catalog_id': 'audio_gen-acestep', 

54 }, 

55 'diffrhythm': { 

56 'hf_repo_id': 'DiffRhythm/diffrhythm-v1', 

57 'download_type': 'hf', 

58 'catalog_id': 'audio_gen-diffrhythm', 

59 }, 

60 # TTS audio suite (multiple engines) 

61 'tts_audio_suite': { 

62 'repo_url': 'https://github.com/diodiogod/TTS-Audio-Suite', 

63 'server_script': os.path.join(SERVERS_DIR, 'tts_audio_suite_server.py'), 

64 'download_type': 'git', 

65 'catalog_id': None, 

66 }, 

67 # STT 

68 'whisper': { 

69 'hf_repo_id': 'openai/whisper-base', 

70 'download_type': 'hf', 

71 'is_inprocess': True, 

72 'catalog_id': None, 

73 }, 

74 # Vision 

75 'minicpm': { 

76 'hf_repo_id': 'openbmb/MiniCPM-V-2_6', 

77 'download_type': 'hf', 

78 'catalog_id': None, 

79 }, 

80} 

81 

82 

83class RuntimeToolManager: 

84 """Central orchestrator for runtime media tool lifecycle.""" 

85 

86 def __init__(self, storage: ModelStorageManager = None, 

87 vram: VRAMManager = None): 

88 self.storage = storage or model_storage 

89 self.vram = vram or vram_manager 

90 self._processes: Dict[str, subprocess.Popen] = {} 

91 self._ports: Dict[str, int] = {} 

92 self._lock = Lock() 

93 # Lifecycle hooks — ModelLifecycleManager subscribes to these 

94 self._lifecycle_hooks = { 

95 'on_tool_started': [], 

96 'on_tool_stopped': [], 

97 } 

98 

99 def register_lifecycle_hook(self, event: str, callback) -> None: 

100 """Register a lifecycle event callback. Non-breaking addition.""" 

101 if event in self._lifecycle_hooks: 

102 self._lifecycle_hooks[event].append(callback) 

103 

104 def _notify_hooks(self, event: str, tool_name: str, **kwargs) -> None: 

105 """Fire all registered hooks for an event.""" 

106 for cb in self._lifecycle_hooks.get(event, []): 

107 try: 

108 cb(tool_name, **kwargs) 

109 except Exception as e: 

110 logger.debug(f"Lifecycle hook error ({event}, {tool_name}): {e}") 

111 

112 # ── Tool lifecycle ─────────────────────────────────────────── 

113 

114 def setup_tool(self, tool_name: str) -> Dict: 

115 """Download + start + register a tool. Idempotent. 

116 

117 Returns status dict with keys: downloaded, running, port, offload_mode. 

118 """ 

119 config = TOOL_CONFIGS.get(tool_name) 

120 if not config: 

121 return {'error': f'Unknown tool: {tool_name}'} 

122 

123 result = {'tool': tool_name} 

124 

125 # Step 1: Download if needed 

126 if not self.storage.is_downloaded(tool_name): 

127 dl_type = config.get('download_type', 'git') 

128 if dl_type == 'git': 

129 path = self.storage.clone_repo(tool_name, config['repo_url']) 

130 elif dl_type == 'hf': 

131 path = self.storage.download_hf_model( 

132 tool_name, config['hf_repo_id']) 

133 else: 

134 return {'error': f'Unknown download_type: {dl_type}'} 

135 

136 if path is None: 

137 return {'error': f'Download failed for {tool_name}'} 

138 

139 result['downloaded'] = True 

140 

141 # Step 2: Check VRAM and decide offload mode 

142 offload = self.vram.suggest_offload_mode(tool_name) 

143 result['offload_mode'] = offload 

144 

145 # Step 3: Start server (or load in-process) 

146 if config.get('is_inprocess'): 

147 start_result = self._start_inprocess(tool_name, config) 

148 else: 

149 start_result = self._start_sidecar(tool_name, config, offload) 

150 

151 result.update(start_result) 

152 

153 # Step 4: Save state 

154 self.save_state() 

155 

156 return result 

157 

158 def start_tool(self, tool_name: str) -> Dict: 

159 """Start a tool that's already downloaded.""" 

160 if not self.storage.is_downloaded(tool_name): 

161 return {'error': f'{tool_name} not downloaded. Use setup_tool() first.'} 

162 

163 config = TOOL_CONFIGS.get(tool_name) 

164 if not config: 

165 return {'error': f'Unknown tool: {tool_name}'} 

166 

167 offload = self.vram.suggest_offload_mode(tool_name) 

168 

169 if config.get('is_inprocess'): 

170 result = self._start_inprocess(tool_name, config) 

171 else: 

172 result = self._start_sidecar(tool_name, config, offload) 

173 

174 self.save_state() 

175 return result 

176 

177 def stop_tool(self, tool_name: str) -> Dict: 

178 """Stop a tool's server and free VRAM.""" 

179 config = TOOL_CONFIGS.get(tool_name) 

180 if config and config.get('is_inprocess'): 

181 result = self._stop_inprocess(tool_name) 

182 self._unsync_catalog(tool_name) 

183 self._notify_hooks('on_tool_stopped', tool_name) 

184 return result 

185 

186 self._kill_server(tool_name) 

187 self.vram.release(tool_name) 

188 self._unsync_catalog(tool_name) 

189 self._notify_hooks('on_tool_stopped', tool_name) 

190 self.save_state() 

191 return {'tool': tool_name, 'status': 'stopped'} 

192 

193 def unload_tool(self, tool_name: str) -> Dict: 

194 """Stop + deregister a tool.""" 

195 self.stop_tool(tool_name) # stop_tool already fires on_tool_stopped 

196 service_tool_registry.unregister_tool(tool_name) 

197 self.save_state() 

198 return {'tool': tool_name, 'status': 'unloaded'} 

199 

200 def get_tool_status(self, tool_name: str) -> Dict: 

201 """Get full status for a single tool.""" 

202 config = TOOL_CONFIGS.get(tool_name) 

203 if not config: 

204 return {'error': f'Unknown tool: {tool_name}'} 

205 

206 is_running = self._is_server_alive(tool_name) 

207 return { 

208 'tool': tool_name, 

209 'downloaded': self.storage.is_downloaded(tool_name), 

210 'running': is_running, 

211 'port': self._ports.get(tool_name), 

212 'is_inprocess': config.get('is_inprocess', False), 

213 'vram_allocated_gb': self.vram.get_allocations().get(tool_name, 0), 

214 'offload_mode': self.vram.suggest_offload_mode(tool_name), 

215 } 

216 

217 # ── Bulk operations ────────────────────────────────────────── 

218 

219 def setup_available_tools(self) -> Dict: 

220 """Setup all tools that can fit in available VRAM.""" 

221 results = {} 

222 for name in TOOL_CONFIGS: 

223 if self.vram.can_fit(name): 

224 results[name] = self.setup_tool(name) 

225 else: 

226 results[name] = {'skipped': 'insufficient VRAM'} 

227 return results 

228 

229 def get_all_status(self) -> Dict: 

230 """Dashboard view of all tools.""" 

231 status = {} 

232 for name in TOOL_CONFIGS: 

233 status[name] = self.get_tool_status(name) 

234 status['vram'] = self.vram.get_status() 

235 status['storage'] = { 

236 'total_size_gb': round(self.storage.get_total_size() / 1e9, 2), 

237 'base_dir': str(self.storage.base_dir), 

238 } 

239 return status 

240 

241 def stop_all(self) -> None: 

242 """Graceful shutdown of all running tools.""" 

243 for name in list(self._processes.keys()): 

244 self.stop_tool(name) 

245 # Also stop in-process tools 

246 for name, config in TOOL_CONFIGS.items(): 

247 if config.get('is_inprocess'): 

248 self._stop_inprocess(name) 

249 self.save_state() 

250 logger.info("All runtime tools stopped") 

251 

252 # ── Catalog sync — single authority for model state ───────── 

253 # RTM is a process manager; the orchestrator's catalog is the 

254 # authority on "what is loaded." These methods bridge the gap. 

255 

256 def _sync_catalog(self, tool_name: str, device: str = 'gpu', 

257 catalog_id: str = None) -> None: 

258 """Notify orchestrator catalog that a model is now loaded.""" 

259 cid = catalog_id or TOOL_CONFIGS.get(tool_name, {}).get('catalog_id') 

260 if not cid: 

261 return 

262 try: 

263 from .model_orchestrator import get_orchestrator 

264 orch = get_orchestrator() 

265 entry = orch._catalog.get(cid) 

266 if entry and not entry.loaded: 

267 orch._catalog.mark_loaded(cid, device=device) 

268 orch._register_vram(entry, device) 

269 orch._register_lifecycle(entry) 

270 orch._register_service_tool(entry) 

271 logger.info(f"Catalog synced: {cid} loaded via RTM") 

272 except Exception as e: 

273 logger.debug(f"Catalog sync skipped for {tool_name}: {e}") 

274 

275 def _unsync_catalog(self, tool_name: str) -> None: 

276 """Notify orchestrator catalog that a model was unloaded.""" 

277 cid = TOOL_CONFIGS.get(tool_name, {}).get('catalog_id') 

278 if not cid: 

279 return 

280 try: 

281 from .model_orchestrator import get_orchestrator 

282 orch = get_orchestrator() 

283 entry = orch._catalog.get(cid) 

284 if entry and entry.loaded: 

285 orch._release_vram(entry) 

286 orch._deregister_service_tool(entry) 

287 orch._catalog.mark_unloaded(cid) 

288 logger.info(f"Catalog synced: {cid} unloaded via RTM") 

289 except Exception as e: 

290 logger.debug(f"Catalog unsync skipped for {tool_name}: {e}") 

291 

292 # ── State persistence ──────────────────────────────────────── 

293 

294 def save_state(self) -> None: 

295 """Persist tool state to JSON.""" 

296 state = { 

297 'tools': {}, 

298 'ports': dict(self._ports), 

299 } 

300 for name in TOOL_CONFIGS: 

301 state['tools'][name] = { 

302 'downloaded': self.storage.is_downloaded(name), 

303 'was_running': self._is_server_alive(name), 

304 'port': self._ports.get(name), 

305 } 

306 

307 STATE_FILE.parent.mkdir(parents=True, exist_ok=True) 

308 try: 

309 STATE_FILE.write_text(json.dumps(state, indent=2)) 

310 except Exception as e: 

311 logger.warning(f"Failed to save tool state: {e}") 

312 

313 def load_state(self) -> Dict: 

314 """Restore tool state from JSON. Re-starts previously running tools.""" 

315 if not STATE_FILE.exists(): 

316 logger.info("No tool state to restore") 

317 return {} 

318 

319 try: 

320 state = json.loads(STATE_FILE.read_text()) 

321 except Exception as e: 

322 logger.warning(f"Failed to load tool state: {e}") 

323 return {} 

324 

325 restored = {} 

326 for name, info in state.get('tools', {}).items(): 

327 if info.get('was_running') and info.get('downloaded'): 

328 logger.info(f"Restoring {name}...") 

329 result = self.start_tool(name) 

330 restored[name] = result 

331 

332 logger.info(f"Restored {len(restored)} tools from state") 

333 return restored 

334 

335 # ── Server process management ──────────────────────────────── 

336 

337 def _start_sidecar(self, tool_name: str, config: Dict, 

338 offload_mode: str) -> Dict: 

339 """Launch a sidecar server subprocess with dynamic port.""" 

340 if self._is_server_alive(tool_name): 

341 port = self._ports.get(tool_name) 

342 return {'running': True, 'port': port, 'message': 'already running'} 

343 

344 script = config.get('server_script') 

345 if not script or not os.path.exists(script): 

346 return {'error': f'Server script not found: {script}'} 

347 

348 # Fail-early VRAM check: if the tool has a GPU offload mode and 

349 # the budget won't fit, refuse to spawn the proc rather than 

350 # letting it OOM mid-load. cpu_only offload mode bypasses this 

351 # check (CPU-only inference has no VRAM budget). 

352 if offload_mode != 'cpu_only' and not self.vram.can_fit(tool_name): 

353 free_gb = self.vram.get_free_vram() 

354 logger.warning( 

355 f"Refusing to start {tool_name}: won't fit " 

356 f"(free={free_gb:.1f}GB, offload={offload_mode})" 

357 ) 

358 return { 

359 'error': f'Insufficient VRAM for {tool_name} ' 

360 f'(free={free_gb:.1f}GB); try cpu_only', 

361 'oom': True, 

362 } 

363 

364 # Set environment for the child process 

365 env = os.environ.copy() 

366 model_dir = str(self.storage.get_tool_dir(tool_name)) 

367 env_key = f"{tool_name.upper()}_MODEL_DIR" 

368 env[env_key] = model_dir 

369 env[f"{tool_name.upper()}_OFFLOAD"] = offload_mode 

370 

371 python_exe = sys.executable 

372 # In frozen builds (cx_Freeze), sys.executable is Nunba.exe — not a 

373 # Python interpreter. Use the bundled python-embed/ instead. 

374 if getattr(sys, 'frozen', False): 

375 app_dir = os.path.dirname(sys.executable) 

376 embed_python = os.path.join(app_dir, 'python-embed', 'python.exe') 

377 if os.path.isfile(embed_python): 

378 python_exe = embed_python 

379 

380 try: 

381 _popen_kwargs = dict( 

382 stdout=subprocess.PIPE, 

383 stderr=subprocess.PIPE, 

384 env=env, 

385 text=True, 

386 ) 

387 if sys.platform == 'win32': 

388 _popen_kwargs['creationflags'] = subprocess.CREATE_NO_WINDOW 

389 proc = subprocess.Popen( 

390 [python_exe, script], 

391 **_popen_kwargs, 

392 ) 

393 

394 # Read PORT=NNNNN from stdout (timeout 30s) 

395 port = self._read_port_from_stdout(proc, timeout=30) 

396 if port is None: 

397 proc.kill() 

398 return {'error': f'Server did not report port within 30s'} 

399 

400 with self._lock: 

401 self._processes[tool_name] = proc 

402 self._ports[tool_name] = port 

403 

404 # Allocate VRAM — if rejected here the proc grabbed VRAM 

405 # concurrently (race), so we keep the proc but log the 

406 # inconsistency so the operator can re-check. 

407 if not self.vram.allocate(tool_name): 

408 logger.warning( 

409 f"VRAM.allocate({tool_name}) returned False even " 

410 f"though can_fit passed pre-spawn — concurrent race" 

411 ) 

412 

413 # Register with service_tool_registry 

414 self._register_tool_at_port(tool_name, port) 

415 

416 logger.info(f"Started {tool_name} on port {port} (PID {proc.pid})") 

417 self._sync_catalog(tool_name, device='gpu') 

418 self._notify_hooks('on_tool_started', tool_name, 

419 device='gpu', offload_mode=offload_mode) 

420 return {'running': True, 'port': port, 'pid': proc.pid} 

421 

422 except Exception as e: 

423 logger.error(f"Failed to start {tool_name}: {e}") 

424 return {'error': str(e)} 

425 

426 def _start_inprocess(self, tool_name: str, config: Dict) -> Dict: 

427 """Start an in-process tool (no server subprocess).""" 

428 if tool_name == 'whisper': 

429 try: 

430 from .whisper_tool import WhisperTool, select_whisper_model 

431 model_name = select_whisper_model() 

432 # For in-process whisper, VRAM is eagerly consumed by 

433 # faster-whisper on first call — honor the bool here so 

434 # a refusal is logged alongside the success path. 

435 WhisperTool.register_functions() 

436 if not self.vram.allocate(tool_name): 

437 logger.warning( 

438 f"Whisper in-process registered but VRAM budget " 

439 f"refused — running in CPU fallback mode" 

440 ) 

441 logger.info(f"Whisper registered in-process (model: {model_name})") 

442 # Resolve catalog_id dynamically from selected model size 

443 self._sync_catalog(tool_name, device='cpu', 

444 catalog_id=f'stt-whisper-{model_name}') 

445 self._notify_hooks('on_tool_started', tool_name, 

446 device='gpu', inprocess=True) 

447 return {'running': True, 'inprocess': True, 'model': model_name} 

448 except Exception as e: 

449 return {'error': f'Whisper init failed: {e}'} 

450 

451 return {'error': f'No in-process handler for {tool_name}'} 

452 

453 def _stop_inprocess(self, tool_name: str) -> Dict: 

454 """Stop an in-process tool.""" 

455 if tool_name == 'whisper': 

456 try: 

457 from .whisper_tool import unload_whisper 

458 unload_whisper() 

459 self.vram.release(tool_name) 

460 return {'tool': tool_name, 'status': 'stopped'} 

461 except Exception as e: 

462 return {'error': str(e)} 

463 return {'error': f'No in-process handler for {tool_name}'} 

464 

465 def _read_port_from_stdout(self, proc: subprocess.Popen, 

466 timeout: int = 30) -> Optional[int]: 

467 """Read PORT=NNNNN line from subprocess stdout.""" 

468 deadline = time.time() + timeout 

469 while time.time() < deadline: 

470 if proc.poll() is not None: 

471 # Process died 

472 stderr = proc.stderr.read() if proc.stderr else '' 

473 logger.error(f"Server process died: {stderr[:300]}") 

474 return None 

475 

476 line = proc.stdout.readline() 

477 if line: 

478 line = line.strip() 

479 if line.startswith('PORT='): 

480 try: 

481 return int(line.split('=', 1)[1]) 

482 except ValueError: 

483 pass 

484 else: 

485 time.sleep(0.1) 

486 

487 return None 

488 

489 def _register_tool_at_port(self, tool_name: str, port: int) -> None: 

490 """Register the tool wrapper with the discovered port.""" 

491 base_url = f"http://127.0.0.1:{port}" 

492 

493 if tool_name == 'wan2gp': 

494 from .wan2gp_tool import Wan2GPTool 

495 Wan2GPTool.register(base_url) 

496 elif tool_name == 'tts_audio_suite': 

497 from .tts_audio_suite_tool import TTSAudioSuiteTool 

498 TTSAudioSuiteTool.register(base_url) 

499 else: 

500 logger.warning(f"No tool wrapper for {tool_name}") 

501 

502 def _kill_server(self, tool_name: str) -> None: 

503 """Kill a sidecar server process.""" 

504 with self._lock: 

505 proc = self._processes.pop(tool_name, None) 

506 self._ports.pop(tool_name, None) 

507 

508 if proc and proc.poll() is None: 

509 try: 

510 proc.terminate() 

511 proc.wait(timeout=5) 

512 except subprocess.TimeoutExpired: 

513 proc.kill() 

514 logger.info(f"Killed {tool_name} server (PID {proc.pid})") 

515 

516 def _is_server_alive(self, tool_name: str) -> bool: 

517 """Check if a sidecar server is still running. 

518 

519 For `whisper` the "server" is the STT subprocess worker managed 

520 by `whisper_tool._stt_tool`; check its lifetime via ToolWorker. 

521 For other in-process tools there is no separate process to 

522 check — return False. 

523 """ 

524 config = TOOL_CONFIGS.get(tool_name, {}) 

525 if config.get('is_inprocess'): 

526 if tool_name == 'whisper': 

527 try: 

528 from .whisper_tool import _stt_tool 

529 return _stt_tool.is_alive() 

530 except Exception: 

531 return False 

532 return False 

533 

534 proc = self._processes.get(tool_name) 

535 if proc is None: 

536 return False 

537 return proc.poll() is None 

538 

539 # ── AutoGen/LangChain helpers ──────────────────────────────── 

540 

541 def get_autogen_tools(self) -> Dict: 

542 """Get all running tools as AutoGen-compatible functions. 

543 

544 Delegates to service_tool_registry which already handles this. 

545 """ 

546 return service_tool_registry.get_all_tool_functions() 

547 

548 def get_langchain_tools(self) -> list: 

549 """Get all running tools as LangChain Tool objects. 

550 

551 Delegates to service_tool_registry which already handles this. 

552 """ 

553 return service_tool_registry.get_langchain_tools() 

554 

555 

556# Global singleton 

557runtime_tool_manager = RuntimeToolManager() 

558 

559# Ensure cleanup on process exit 

560atexit.register(runtime_tool_manager.stop_all)