Coverage for integrations / service_tools / model_orchestrator.py: 57.2%

423 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2ModelOrchestrator — compute-aware model loading for ANY model type. 

3 

4Lives in HARTOS so all deployment targets (Nunba desktop, embedded, cloud) 

5share the same orchestration logic. Application-specific loaders (LLM, TTS, 

6STT, VLM) are registered as plugins at startup via register_loader(). 

7 

8Bridges: 

9 - ModelCatalog (what models exist) 

10 - VRAMManager (how much GPU is free) 

11 - ModelLifecycle (idle eviction, pressure response) 

12 - Pluggable loaders (registered by the application) 

13 

14Usage: 

15 from integrations.service_tools.model_orchestrator import get_orchestrator 

16 

17 orch = get_orchestrator() 

18 

19 # Register application-specific loaders (typically at app startup) 

20 orch.register_loader('llm', my_llm_loader) 

21 orch.register_loader('tts', my_tts_loader) 

22 

23 # Auto-select and load the best LLM for current hardware 

24 entry = orch.auto_load('llm') 

25 

26 # Load a specific model by ID 

27 entry = orch.load('tts-chatterbox-turbo') 

28""" 

29 

30import logging 

31import subprocess 

32import sys 

33import threading 

34import time 

35from typing import Optional, Dict, Any, List, Callable 

36 

37from integrations.service_tools.model_catalog import ( 

38 ModelCatalog, ModelEntry, get_catalog, 

39) 

40 

41logger = logging.getLogger('ModelOrchestrator') 

42 

43 

44class ModelLoader: 

45 """Interface for subsystem-specific model loaders. 

46 

47 Applications implement this to teach the orchestrator how to load/unload 

48 models of a specific type. Each method receives the catalog entry and 

49 should return True/False for success. 

50 """ 

51 

52 def load(self, entry: ModelEntry, run_mode: str) -> bool: 

53 """Load the model. run_mode is 'gpu', 'cpu', or 'cpu_offload'.""" 

54 raise NotImplementedError 

55 

56 def unload(self, entry: ModelEntry) -> None: 

57 """Unload/release the model.""" 

58 pass 

59 

60 def download(self, entry: ModelEntry) -> bool: 

61 """Download model files. Return True if successful.""" 

62 return False 

63 

64 def is_downloaded(self, entry: ModelEntry) -> bool: 

65 """Check if model files are on disk.""" 

66 return False 

67 

68 def is_loaded(self, entry: ModelEntry) -> bool: 

69 """Live probe — is the model actually running right now? 

70 

71 Default reads `entry.loaded` (the catalog flag). Subclasses that 

72 have a real liveness signal (a running subprocess, an HTTP 

73 health endpoint, etc.) SHOULD override this. Catalog state alone 

74 can drift from reality after idle auto-stop, crashes, or 

75 external process kills. 

76 """ 

77 return bool(getattr(entry, 'loaded', False)) 

78 

79 def validate(self, entry: ModelEntry) -> tuple: 

80 """Post-load capability probe. 

81 

82 Called by the install pipeline after a successful ``load()`` to 

83 prove the loaded model actually answers for its declared 

84 capability — not merely "bytes on disk + subprocess alive". 

85 

86 Modality-specific subclasses (VLMLoader, TTSLoader, STTLoader, 

87 LlamaLoader) SHOULD override this with a canned, deterministic 

88 round-trip (e.g. VLM: describe a 32×32 JPEG; TTS: synthesize a 

89 fixed phrase; STT: transcribe the TTS output; LLM: complete a 

90 canned prompt). 

91 

92 Default returns ``(True, 'no capability probe defined')`` so 

93 loaders without an override don't gate installs. Any override 

94 must be deterministic (fixed input) and fast (<5s wall clock) 

95 so the install-probe doesn't hang. 

96 

97 Returns: 

98 (ok: bool, reason: str) — ``ok=True`` on pass; ``reason`` 

99 is a one-line human-readable diagnostic used by the 

100 install-progress UI and the dispatcher's fallback logging. 

101 """ 

102 return (True, 'no capability probe defined') 

103 

104 

105class ModelOrchestrator: 

106 """Compute-aware model loader that works for ANY model type. 

107 

108 Subsystem-specific loaders are registered as plugins. The orchestrator 

109 handles compute state, VRAM tracking, lifecycle, and swap — the loaders 

110 only handle the actual load/unload/download mechanics. 

111 """ 

112 

113 def __init__(self, catalog: Optional[ModelCatalog] = None): 

114 self._catalog = catalog or get_catalog() 

115 self._lock = threading.Lock() 

116 self._loaders: Dict[str, ModelLoader] = {} 

117 self._scan_downloaded() 

118 

119 # ── Loader registration ─────────────────────────────────────── 

120 

121 def register_loader(self, model_type: str, loader: ModelLoader) -> None: 

122 """Register a subsystem-specific loader for a model type. 

123 

124 Example: 

125 orch.register_loader('llm', LlamaLoader()) 

126 orch.register_loader('tts', TTSLoader()) 

127 """ 

128 self._loaders[model_type] = loader 

129 logger.info(f"Registered loader for model_type={model_type}: " 

130 f"{loader.__class__.__name__}") 

131 

132 # ── Compute state ───────────────────────────────────────────── 

133 

134 def _get_compute_state(self) -> dict: 

135 """Get current compute availability from VRAMManager singleton.""" 

136 state = { 

137 'gpu_available': False, 

138 'gpu_type': 'none', 

139 'vram_total_gb': 0.0, 

140 'vram_free_gb': 0.0, 

141 'ram_free_gb': 4.0, 

142 'allocations': {}, 

143 } 

144 try: 

145 from integrations.service_tools.vram_manager import vram_manager 

146 gpu = vram_manager.detect_gpu() 

147 state['gpu_available'] = gpu.get('cuda_available', False) or gpu.get('metal_available', False) 

148 state['gpu_type'] = 'cuda' if gpu.get('cuda_available') else ( 

149 'metal' if gpu.get('metal_available') else 'none') 

150 state['vram_total_gb'] = gpu.get('total_gb', 0.0) 

151 state['vram_free_gb'] = vram_manager.get_free_vram() 

152 state['allocations'] = vram_manager.get_allocations_display() 

153 except ImportError: 

154 pass 

155 try: 

156 import psutil 

157 state['ram_free_gb'] = round(psutil.virtual_memory().available / (1024**3), 2) 

158 except Exception: 

159 pass 

160 return state 

161 

162 # ── Auto-selection ──────────────────────────────────────────── 

163 

164 def select_best(self, model_type: str, language: Optional[str] = None, 

165 require_capability: Optional[Dict[str, Any]] = None, 

166 ) -> Optional[ModelEntry]: 

167 """Select the best model for a type given current compute state.""" 

168 cs = self._get_compute_state() 

169 return self._catalog.select_best( 

170 model_type=model_type, 

171 budget_vram_gb=cs['vram_free_gb'], 

172 budget_ram_gb=cs['ram_free_gb'], 

173 gpu_available=cs['gpu_available'], 

174 language=language, 

175 require_capability=require_capability, 

176 ) 

177 

178 # ── Load / Unload ───────────────────────────────────────────── 

179 

180 def auto_load(self, model_type: str, language: Optional[str] = None, 

181 **kwargs) -> Optional[ModelEntry]: 

182 """Select the best model for a type and load it.""" 

183 entry = self.select_best(model_type, language=language, **kwargs) 

184 if not entry: 

185 logger.warning(f"No {model_type} model fits current compute") 

186 return None 

187 return self.load(entry.id) 

188 

189 def ensure_loaded_async(self, model_type: str, 

190 language: Optional[str] = None, 

191 caller: str = 'unknown', 

192 **kwargs) -> None: 

193 """Fire-and-forget wrapper around auto_load(). 

194 

195 THE single "bring me a model that can do X" entry point for 

196 every caller (chat fallback on cold LLM, TTS synth on cold 

197 engine, VLM request on cold vision, etc.). Replaces the old 

198 pattern where each model type had its own starter helper 

199 (LlamaConfig.ensure_running_async, tts_engine._switch_backend, 

200 …) — all of them did select_best → load under different names. 

201 

202 Capability/task hints flow through **kwargs to select_best so 

203 language routing, voice_clone, emotion_tags, narration etc. 

204 all funnel into the same selection logic the catalog already 

205 indexes. No parallel path. 

206 

207 Runs in a daemon thread so the caller returns immediately. 

208 Exceptions never escape — the caller already sent its 

209 response and this runs in the background. 

210 """ 

211 import threading 

212 

213 def _worker(): 

214 try: 

215 entry = self.auto_load(model_type, language=language, **kwargs) 

216 if entry: 

217 logger.info( 

218 f'ensure_loaded_async: loaded {entry.id} ' 

219 f'on {entry.device} (type={model_type}, caller={caller})' 

220 ) 

221 else: 

222 logger.warning( 

223 f'ensure_loaded_async: no {model_type} fit ' 

224 f'(caller={caller})' 

225 ) 

226 except Exception as e: 

227 logger.error(f'ensure_loaded_async: worker crashed: {e}') 

228 

229 threading.Thread( 

230 target=_worker, daemon=True, 

231 name=f'ensure_loaded_{model_type}', 

232 ).start() 

233 

234 def load(self, model_id: str) -> Optional[ModelEntry]: 

235 """Load a specific model by ID. Downloads if needed.""" 

236 entry = self._catalog.get(model_id) 

237 if not entry: 

238 logger.error(f"Model not found in catalog: {model_id}") 

239 return None 

240 

241 if entry.loaded: 

242 # Reconcile against the loader's live probe — entry.loaded 

243 # is a CACHE that drifts when the underlying process dies 

244 # externally (CUDA hang, OOM, manual kill, idle auto-stop 

245 # outside our lifecycle). Without this probe, the load() 

246 # short-circuits with "Model already loaded" and the user 

247 # sees a dead service ("Draft boot decision" loop on 

248 # 2026-05-04 — task #80). Loaders that override is_loaded() 

249 # (LlamaLoader, TTSLoader, STTLoader, VLMLoader) return the 

250 # actual liveness; the base class falls back to entry.loaded 

251 # so loaders without an override preserve current behavior. 

252 loader = self._loaders.get(entry.model_type) 

253 try: 

254 actually_loaded = ( 

255 loader.is_loaded(entry) if loader else True 

256 ) 

257 except Exception as _probe_err: 

258 logger.warning( 

259 f"is_loaded probe failed for {model_id}: " 

260 f"{type(_probe_err).__name__}: {_probe_err} — " 

261 f"trusting cache flag" 

262 ) 

263 actually_loaded = True 

264 if actually_loaded: 

265 logger.info(f"Model already loaded: {model_id} ({entry.device})") 

266 return entry 

267 # Cache lies — process is dead. Reconcile state: release 

268 # VRAM, clear the catalog flag, fall through to a fresh load. 

269 logger.warning( 

270 f"Stale cache for {model_id}: catalog says loaded but " 

271 f"is_loaded() probe says dead — reconciling and " 

272 f"respawning" 

273 ) 

274 try: 

275 self._release_vram(entry) 

276 except Exception: 

277 pass 

278 try: 

279 self._catalog.mark_unloaded(model_id) 

280 except Exception: 

281 pass 

282 

283 cs = self._get_compute_state() 

284 fit = entry.matches_compute( 

285 cs['vram_free_gb'], cs['ram_free_gb'], cs['gpu_available']) 

286 if fit == 'impossible': 

287 if cs['gpu_available'] and entry.vram_gb > 0: 

288 swapped = self._attempt_swap(entry, cs) 

289 if swapped: 

290 cs = self._get_compute_state() 

291 fit = entry.matches_compute( 

292 cs['vram_free_gb'], cs['ram_free_gb'], cs['gpu_available']) 

293 

294 if fit == 'impossible': 

295 logger.error(f"Cannot load {model_id}: insufficient compute " 

296 f"(need {entry.vram_gb}GB VRAM or {entry.ram_gb}GB RAM)") 

297 self._catalog.mark_error(model_id, 'Insufficient compute') 

298 return None 

299 

300 logger.info(f"Loading {model_id} ({entry.model_type}) in {fit} mode...") 

301 

302 # Allocate VRAM BEFORE load so other models see the reservation. 

303 # Rolled back on failure. 

304 if not self._register_vram(entry, fit): 

305 logger.warning(f"Skipping {model_id}: VRAM full") 

306 return None 

307 try: 

308 success = self._dispatch_load(entry, fit) 

309 if success: 

310 self._catalog.mark_loaded(model_id, device=fit) 

311 self._register_lifecycle(entry) 

312 self._register_service_tool(entry) 

313 logger.info(f"Loaded {model_id} on {fit}") 

314 return entry 

315 else: 

316 self._release_vram(entry) 

317 self._catalog.mark_error(model_id, 'Loader returned failure') 

318 return None 

319 except Exception as e: 

320 self._release_vram(entry) 

321 logger.error(f"Failed to load {model_id}: {e}") 

322 self._catalog.mark_error(model_id, str(e)) 

323 return None 

324 

325 def unload(self, model_id: str) -> bool: 

326 """Unload a model and release its resources.""" 

327 entry = self._catalog.get(model_id) 

328 if not entry or not entry.loaded: 

329 return False 

330 

331 try: 

332 self._dispatch_unload(entry) 

333 except Exception as e: 

334 logger.warning(f"Unload dispatch failed for {model_id}: {e}") 

335 

336 self._release_vram(entry) 

337 self._deregister_service_tool(entry) 

338 self._catalog.mark_unloaded(model_id) 

339 logger.info(f"Unloaded {model_id}") 

340 return True 

341 

342 def download(self, model_id: str) -> bool: 

343 """Download a model without loading it.""" 

344 entry = self._catalog.get(model_id) 

345 if not entry: 

346 return False 

347 if entry.downloaded: 

348 return True 

349 try: 

350 success = self._dispatch_download(entry) 

351 if success: 

352 self._catalog.mark_downloaded(model_id) 

353 return success 

354 except Exception as e: 

355 logger.error(f"Download failed for {model_id}: {e}") 

356 self._catalog.mark_error(model_id, str(e)) 

357 return False 

358 

359 # ── Capability introspection ──────────────────────────────────── 

360 

361 def available_capabilities(self) -> Dict[str, Any]: 

362 """What this node can do right now — universal, any model type. 

363 

364 Merges THREE sources into one capability map: 

365 1. ModelCatalog — static entries (LLM, TTS, STT, VLM, video_gen, audio_gen, etc.) 

366 2. ServiceToolRegistry — dynamic sidecars (ACE Step, DiffRhythm, txt2img, etc.) 

367 3. RuntimeToolManager — running services with health status 

368 

369 Returns dict keyed by category (model_type OR service tag), each with: 

370 - available: bool 

371 - loaded: list of loaded model/service IDs 

372 - can_load: list that fit current compute but aren't loaded 

373 - capabilities: merged capability set across all entries 

374 - services: list of running dynamic services in this category 

375 

376 New categories from dynamic services appear automatically — 

377 no code changes needed when a new service type registers. 

378 """ 

379 cs = self._get_compute_state() 

380 result = {} 

381 

382 # 1. Catalog models (static + dynamically registered) 

383 for mt in self._catalog.list_types(): 

384 entries = self._catalog.list_by_type(mt) 

385 loaded = [e.id for e in entries if e.loaded] 

386 can_load = [ 

387 e.id for e in entries 

388 if not e.loaded and e.enabled 

389 and e.matches_compute( 

390 cs['vram_free_gb'], cs['ram_free_gb'], 

391 cs['gpu_available']) != 'impossible' 

392 ] 

393 # Merge capabilities from all available models 

394 merged_caps = {} 

395 for e in entries: 

396 if e.loaded or (e.enabled and e.matches_compute( 

397 cs['vram_free_gb'], cs['ram_free_gb'], 

398 cs['gpu_available']) != 'impossible'): 

399 for k, v in e.capabilities.items(): 

400 if v: 

401 merged_caps[k] = True 

402 result[mt] = { 

403 'available': bool(loaded or can_load), 

404 'loaded': loaded, 

405 'can_load': can_load, 

406 'capabilities': merged_caps, 

407 'services': [], 

408 } 

409 

410 # 2. Dynamic services (ServiceToolRegistry) — may introduce NEW categories 

411 try: 

412 from integrations.service_tools.registry import service_tool_registry 

413 for name, tool_info in service_tool_registry._tools.items(): 

414 tags = getattr(tool_info, 'tags', []) or [] 

415 # Each tag can map to a category 

416 categories = set() 

417 for tag in tags: 

418 if tag in result: 

419 categories.add(tag) 

420 # If no existing category matches, use the tool name as category 

421 if not categories: 

422 cat = tags[0] if tags else name 

423 categories.add(cat) 

424 

425 for cat in categories: 

426 if cat not in result: 

427 result[cat] = { 

428 'available': True, 

429 'loaded': [], 

430 'can_load': [], 

431 'capabilities': {}, 

432 'services': [], 

433 } 

434 result[cat]['available'] = True 

435 result[cat]['services'].append(name) 

436 except Exception: 

437 pass 

438 

439 # 3. Running services (RuntimeToolManager) — mark health status 

440 try: 

441 from integrations.service_tools.runtime_manager import runtime_tool_manager 

442 all_status = runtime_tool_manager.get_all_status() 

443 for tool_name, status in all_status.items(): 

444 if status.get('running'): 

445 # Find which category this tool belongs to 

446 for cat, info in result.items(): 

447 if tool_name in info.get('services', []): 

448 info['available'] = True 

449 except Exception: 

450 pass 

451 

452 return result 

453 

454 def can_do(self, model_type: str, capability: str = None) -> bool: 

455 """Quick check: can this node handle a task right now? 

456 

457 Works for ANY model type or service category — including ones 

458 that only exist as dynamic services (not in the catalog). 

459 

460 Usage: 

461 orchestrator.can_do('tts') # any TTS? 

462 orchestrator.can_do('audio_gen', 'music_gen') # music specifically? 

463 orchestrator.can_do('video_gen', 'txt2vid') # text-to-video? 

464 orchestrator.can_do('robot_locomotion') # new category from service? 

465 """ 

466 caps = self.available_capabilities() 

467 type_info = caps.get(model_type, {}) 

468 if not type_info.get('available'): 

469 return False 

470 if not capability: 

471 return True 

472 # Check merged capabilities from models + services 

473 if type_info.get('capabilities', {}).get(capability): 

474 return True 

475 # Deeper check: individual model entries 

476 all_ids = type_info.get('loaded', []) + type_info.get('can_load', []) 

477 for mid in all_ids: 

478 entry = self._catalog.get(mid) 

479 if entry and entry.capabilities.get(capability): 

480 return True 

481 return False 

482 

483 def capability_prompt(self) -> str: 

484 """Auto-generate a compact capability summary for LLM prompt injection. 

485 

486 Dynamically built from live state — new services appear automatically, 

487 offline services disappear. No hardcoding needed. 

488 

489 Returns empty string if nothing special is available (no prompt bloat). 

490 Format designed for small LLMs: one line per capability, action-oriented. 

491 """ 

492 caps = self.available_capabilities() 

493 lines = [] 

494 

495 for cat, info in caps.items(): 

496 if not info.get('available'): 

497 continue 

498 # Skip LLM (the agent IS the LLM) and embedding (internal) 

499 if cat in ('llm', 'embedding'): 

500 continue 

501 

502 cap_list = sorted(info.get('capabilities', {}).keys()) 

503 loaded = info.get('loaded', []) 

504 services = info.get('services', []) 

505 

506 # Build human-readable one-liner 

507 status_parts = [] 

508 if loaded: 

509 status_parts.append(f"ready: {', '.join(loaded[:3])}") 

510 elif services: 

511 status_parts.append(f"via: {', '.join(services[:3])}") 

512 elif info.get('can_load'): 

513 status_parts.append(f"available: {', '.join(info['can_load'][:2])}") 

514 cap_str = f" — {', '.join(cap_list[:5])}" if cap_list else '' 

515 

516 label = cat.replace('_', ' ') 

517 status = f" ({'; '.join(status_parts)})" if status_parts else '' 

518 lines.append(f"- {label}{cap_str}{status}") 

519 

520 if not lines: 

521 return '' 

522 

523 return ( 

524 'Available capabilities on this node (use via tools — ' 

525 'call generate_media or synthesize_multilingual_audio):\n' 

526 + '\n'.join(lines) 

527 ) 

528 

529 # ── Loader dispatch ─────────────────────────────────────────── 

530 

531 def _dispatch_load(self, entry: ModelEntry, run_mode: str) -> bool: 

532 """Route loading to the registered loader for this model type.""" 

533 loader = self._loaders.get(entry.model_type) 

534 if loader: 

535 return loader.load(entry, run_mode) 

536 # Fallback: try RuntimeToolManager for sidecar-based tools 

537 return self._load_generic(entry, run_mode) 

538 

539 def _dispatch_unload(self, entry: ModelEntry) -> None: 

540 """Route unloading to the registered loader.""" 

541 loader = self._loaders.get(entry.model_type) 

542 if loader: 

543 loader.unload(entry) 

544 

545 def _dispatch_download(self, entry: ModelEntry) -> bool: 

546 """Route downloading to the registered loader or generic fallback.""" 

547 loader = self._loaders.get(entry.model_type) 

548 if loader: 

549 return loader.download(entry) 

550 if entry.source == 'pip': 

551 return self._install_pip(entry) 

552 return False 

553 

554 def _load_generic(self, entry: ModelEntry, run_mode: str) -> bool: 

555 """Fallback: try RuntimeToolManager for sidecar-based tools.""" 

556 try: 

557 from integrations.service_tools.runtime_manager import runtime_tool_manager 

558 tool_name = entry.id.replace(f'{entry.model_type}-', '') 

559 result = runtime_tool_manager.setup_tool(tool_name) 

560 return result.get('running', False) 

561 except Exception as e: 

562 logger.warning(f"Generic load failed for {entry.id}: {e}") 

563 return False 

564 

565 def _install_pip(self, entry: ModelEntry) -> bool: 

566 """Install a pip package for a model backend.""" 

567 pkg = entry.files.get('package') or entry.repo_id 

568 if not pkg: 

569 return False 

570 try: 

571 _kw = dict(capture_output=True, text=True, timeout=300) 

572 if sys.platform == 'win32': 

573 _kw['creationflags'] = subprocess.CREATE_NO_WINDOW 

574 result = subprocess.run( 

575 [sys.executable, '-m', 'pip', 'install', pkg, '--quiet'], 

576 **_kw) 

577 return result.returncode == 0 

578 except Exception as e: 

579 logger.error(f"pip install failed for {pkg}: {e}") 

580 return False 

581 

582 # ── VRAM integration ────────────────────────────────────────── 

583 # 

584 # KEY ALIGNMENT: VRAMManager._allocations and VRAM_BUDGETS use raw tool 

585 # names ("whisper", "tts_chatterbox_turbo"). RuntimeToolManager (RTM) also 

586 # uses these raw names. We use the SAME key convention to avoid 

587 # double-counting when both RTM and the Orchestrator register the same model. 

588 

589 _CATALOG_TO_VRAM_KEY = { 

590 # STT — faster-whisper (primary engine) 

591 'stt-faster-whisper-tiny': 'whisper_tiny', 

592 'stt-faster-whisper-base': 'whisper_base', 

593 'stt-faster-whisper-small': 'whisper_small', 

594 'stt-faster-whisper-medium': 'whisper_medium', 

595 'stt-faster-whisper-large': 'whisper_large', 

596 # STT — sherpa-onnx (CPU-only ONNX, no GPU VRAM) 

597 'stt-sherpa-moonshine-tiny': 'sherpa_moonshine_tiny', 

598 'stt-sherpa-moonshine-base': 'sherpa_moonshine_base', 

599 'stt-sherpa-whisper-tiny': 'sherpa_whisper_tiny', 

600 'stt-sherpa-whisper-base': 'sherpa_whisper_base', 

601 'stt-sherpa-whisper-small': 'sherpa_whisper_small', 

602 'stt-sherpa-whisper-medium': 'sherpa_whisper_medium', 

603 # STT — legacy fallback IDs (used by old catalog entries) 

604 'stt-whisper-base': 'whisper_base', 

605 'stt-whisper-medium': 'whisper_medium', 

606 'stt-whisper-large': 'whisper_large', 

607 # TTS 

608 'tts-chatterbox-turbo': 'tts_chatterbox_turbo', 

609 'tts-f5-tts': 'tts_f5', 

610 'tts-indic-parler': 'tts_indic_parler', 

611 'tts-cosyvoice3': 'tts_cosyvoice3', 

612 'tts-chatterbox-ml': 'tts_chatterbox_ml', 

613 'tts-kokoro': 'tts_kokoro', 

614 # VLM 

615 'vlm-minicpm-v2': 'minicpm', 

616 'vlm-qwen3vl': 'qwen3vl', 

617 # VLM — CPU-only backends (no GPU VRAM tracking needed, included for completeness) 

618 'vlm-mobilevlm': 'mobilevlm', 

619 'vlm-clip': 'clip', 

620 # Video gen 

621 'video_gen-wan2gp': 'wan2gp', 

622 'video_gen-ltx2': 'ltx2', 

623 } 

624 

625 def _vram_key(self, entry: ModelEntry) -> str: 

626 """Get the VRAMManager allocation key for a catalog entry. 

627 

628 For LLMs, always uses 'llm' — there's only one LLM loaded at a time 

629 (llama-server is single-model). This makes registration idempotent 

630 regardless of whether LlamaConfig or the Orchestrator registers first. 

631 """ 

632 if entry.model_type == 'llm': 

633 return 'llm' 

634 return self._CATALOG_TO_VRAM_KEY.get(entry.id, entry.id) 

635 

636 def _register_vram(self, entry: ModelEntry, run_mode: str) -> bool: 

637 """Register VRAM allocation. Returns False if GPU is full.""" 

638 if run_mode != 'gpu' or entry.vram_gb <= 0: 

639 return True 

640 try: 

641 from integrations.service_tools.vram_manager import vram_manager 

642 return vram_manager.allocate(self._vram_key(entry)) 

643 except ImportError: 

644 return True 

645 

646 def _release_vram(self, entry: ModelEntry) -> None: 

647 """Release VRAM allocation.""" 

648 try: 

649 from integrations.service_tools.vram_manager import vram_manager 

650 tool_key = self._vram_key(entry) 

651 freed = vram_manager._allocations.pop(tool_key, 0) 

652 if freed: 

653 logger.info(f"VRAM released: {tool_key} = {freed}GB") 

654 except ImportError: 

655 pass 

656 

657 # ── Lifecycle integration ───────────────────────────────────── 

658 

659 def _register_lifecycle(self, entry: ModelEntry) -> None: 

660 """Register model with ModelLifecycleManager — central tracker for ALL GPU models. 

661 

662 Every model that loads on GPU MUST register here so the lifecycle 

663 manager can evict/offload/restore models when VRAM is needed. 

664 

665 Two LLM-specific eviction policies are applied here based on the 

666 catalog entry id: 

667 

668 * **Draft 0.8B classifier** (``llm-qwen3.5-0.8b*``) → 

669 ``pinned=True``. The 0.8B is first-contact for EVERY chat 

670 message (speculative_dispatcher.dispatch_draft_first) so 

671 evicting it on idle means every message cold-starts the 

672 classifier → full LangChain pipeline fallthrough. Pinning 

673 costs ~550 MB + mmproj permanently, which is cheaper than 

674 paying cold-start cost per request. 

675 

676 * **Main+VLM LLMs** (``llm-qwen*-vl*``, or ``purposes`` 

677 containing any of ``vision`` / ``caption`` / ``grounding``) 

678 → ``pinned=True``. The same llama-server serves both 

679 chat AND the VLM agentic loop; losing it kills both 

680 surfaces and any in-flight agentic loop has no recovery 

681 context. See 2026-05-03 incident: the 4B-VL died 

682 silently between 16:05 and 16:59, no event in any log 

683 (Nunba uses ``subprocess.PIPE`` for the main server's 

684 stdout and only drains it during startup, so any final 

685 output was lost), VRAM freed back to 7.83 GB / 8 GB 

686 despite the prior ``pressure_evict_only`` policy. Pin 

687 forces the model to survive every code path including 

688 phase-3 pressure eviction. 

689 

690 * **Chat-only main LLMs** (``llm-qwen*-2b*`` / 

691 ``llm-qwen*-4b*`` without VL) → ``pressure_evict_only= 

692 True``. Survive passive idle sweep (phase 7), yield 

693 under real VRAM pressure (phase 3). A brief reload 

694 costs only chat continuity, no in-flight VLM loop. 

695 Before this distinction, the 2026-04-11 incident showed 

696 the 4B being killed every 5 minutes mid-session because 

697 its 340s idle exceeded the 300s timeout, even though 

698 nothing else needed the VRAM. 

699 

700 * All other models (STT, TTS, standalone vision sidecars) 

701 keep the default passive idle eviction — their cold 

702 start is cheap and the VRAM is better spent on the 

703 model the user's about to use next. 

704 """ 

705 try: 

706 from integrations.service_tools.model_lifecycle import ( 

707 get_model_lifecycle_manager, ModelState, ModelDevice, ModelPriority) 

708 mlm = get_model_lifecycle_manager() 

709 device = (ModelDevice.GPU if entry.device == 'gpu' 

710 else ModelDevice.CPU if entry.device in ('cpu', 'cpu_offload') 

711 else ModelDevice.CPU) 

712 # Map catalog names to offload table names (e.g., 'stt-whisper-large' → 'whisper') 

713 offload_name = entry.id.split('-')[1] if '-' in entry.id else entry.id 

714 # Use the offload table key if it exists, otherwise the catalog ID 

715 from integrations.service_tools.model_lifecycle import CPU_OFFLOAD_TABLE 

716 if offload_name not in CPU_OFFLOAD_TABLE: 

717 offload_name = entry.id 

718 

719 # Eviction-policy flags. See the ModelState docstring for the 

720 # full rationale and the 2026-04-12 incident context. 

721 _pinned = False 

722 _pressure_evict_only = False 

723 if entry.model_type == 'llm': 

724 # Use catalog purpose assignment (admin-configurable). 

725 # Fallback: pattern match on ID for backward compat. 

726 _purposes = getattr(entry, 'purposes', []) or [] 

727 _id_lower = entry.id.lower() 

728 _is_draft = 'draft' in _purposes or ( 

729 not _purposes and any( 

730 t in _id_lower for t in ('0.8b', 'draft', 'caption') 

731 ) 

732 ) 

733 # Main LLM that ALSO serves VLM (one llama-server with 

734 # mmproj loaded — chat + agentic-loop on the same model) 

735 # gets pinned. See the docstring above for the 2026-05-03 

736 # silent-death incident. 

737 _is_vlm_main = ( 

738 not _is_draft and ( 

739 any(p in _purposes for p in ('vision', 'caption', 'grounding')) 

740 or (not _purposes and any( 

741 t in _id_lower for t in ('-vl-', '-vlm-', 'mmproj') 

742 )) 

743 ) 

744 ) 

745 if _is_draft or _is_vlm_main: 

746 _pinned = True 

747 else: 

748 # Chat-only main tier — survive idle sweeps, yield 

749 # under real VRAM pressure. 

750 _pressure_evict_only = True 

751 

752 _priority = ModelPriority.ACTIVE if entry.model_type == 'llm' else ModelPriority.WARM 

753 mlm._models[offload_name] = ModelState( 

754 name=offload_name, 

755 device=device, 

756 priority=_priority, 

757 pinned=_pinned, 

758 pressure_evict_only=_pressure_evict_only, 

759 ) 

760 if hasattr(mlm, 'notify_access'): 

761 mlm.notify_access(offload_name) 

762 _policy = ('pinned' if _pinned 

763 else 'pressure_evict_only' if _pressure_evict_only 

764 else 'default_idle_evict') 

765 logger.info( 

766 f"Lifecycle: registered {offload_name} " 

767 f"(device={device}, policy={_policy})" 

768 ) 

769 except ImportError: 

770 pass 

771 except Exception as e: 

772 logger.debug(f"Lifecycle registration failed for {entry.id}: {e}") 

773 

774 # ── Service tool registration ──────────────────────────────────── 

775 # When a model loads, register its corresponding service tool so 

776 # the LLM sees the capability via get_tools() → {{tools}}. 

777 # Each tool class (AceStepTool, CosyVoiceTool, etc.) self-registers 

778 # with service_tool_registry — we just trigger the registration. 

779 

780 # Maps catalog model_type or id-prefix to the tool module + class 

781 _SERVICE_TOOL_MAP = { 

782 'audio_gen-acestep': ('integrations.service_tools.acestep_tool', 'AceStepTool'), 

783 'stt-whisper': ('integrations.service_tools.whisper_tool', 'WhisperTool'), 

784 'tts-cosyvoice3': ('integrations.service_tools.cosyvoice_tool', 'CosyVoiceTool'), 

785 'tts-f5': ('integrations.service_tools.f5_tts_tool', 'F5TTSTool'), 

786 'tts-indic-parler': ('integrations.service_tools.indic_parler_tool', 'IndicParlerTool'), 

787 'tts-pocket': ('integrations.service_tools.pocket_tts_tool', 'PocketTTSTool'), 

788 } 

789 

790 def _register_service_tool(self, entry: ModelEntry) -> None: 

791 """Register loaded model with service_tool_registry.""" 

792 for prefix, (mod_path, cls_name) in self._SERVICE_TOOL_MAP.items(): 

793 if entry.id.startswith(prefix): 

794 try: 

795 import importlib 

796 mod = importlib.import_module(mod_path) 

797 tool_cls = getattr(mod, cls_name, None) 

798 if tool_cls: 

799 reg = getattr(tool_cls, 'register', None) or \ 

800 getattr(tool_cls, 'register_functions', None) 

801 if reg: 

802 reg() 

803 logger.info(f"Service tool registered: {cls_name}") 

804 except Exception as e: 

805 logger.debug(f"Service tool registration skipped for {entry.id}: {e}") 

806 return 

807 

808 def _deregister_service_tool(self, entry: ModelEntry) -> None: 

809 """Remove tool from service_tool_registry on unload.""" 

810 for prefix, (mod_path, cls_name) in self._SERVICE_TOOL_MAP.items(): 

811 if entry.id.startswith(prefix): 

812 try: 

813 from integrations.service_tools.registry import service_tool_registry 

814 # Extract tool name from class convention (AceStepTool → acestep) 

815 tool_name = prefix.split('-', 1)[-1] if '-' in prefix else prefix 

816 if tool_name in service_tool_registry._tools: 

817 service_tool_registry._tools[tool_name].is_healthy = False 

818 logger.info(f"Service tool deregistered: {tool_name}") 

819 except Exception: 

820 pass 

821 return 

822 

823 # ── Model swapping ────────────────────────────────────────────── 

824 

825 def _attempt_swap(self, needed: ModelEntry, cs: dict) -> bool: 

826 """Try to free GPU VRAM by evicting a lower-priority model.""" 

827 try: 

828 from integrations.service_tools.model_lifecycle import ( 

829 get_model_lifecycle_manager) 

830 mlm = get_model_lifecycle_manager() 

831 swapped = mlm.request_swap( 

832 needed_model=needed.id, 

833 needed_type='gpu', 

834 ) 

835 if swapped: 

836 logger.info(f"Swap initiated to make room for {needed.id}") 

837 time.sleep(1.0) 

838 try: 

839 from integrations.service_tools.vram_manager import vram_manager 

840 vram_manager.refresh_gpu_info() 

841 except Exception: 

842 pass 

843 return True 

844 except ImportError: 

845 pass 

846 return False 

847 

848 # ── External sync — for bypass paths that load outside orchestrator ── 

849 

850 def notify_loaded(self, model_type: str, model_name: str, 

851 device: str = 'gpu', vram_gb: float = 0) -> None: 

852 """Called by subsystems that loaded a model outside the orchestrator.""" 

853 entry = self._find_entry_by_name(model_type, model_name) 

854 if not entry: 

855 return 

856 self._catalog.mark_loaded(entry.id, device=device) 

857 if not entry.downloaded: 

858 self._catalog.mark_downloaded(entry.id) 

859 self._register_vram(entry, device) 

860 self._register_lifecycle(entry) 

861 logger.info(f"Catalog synced: {entry.id} loaded on {device} (external)") 

862 # Push capability notification to frontend via SSE + Liquid UI 

863 _cap_event = { 

864 'capability': model_type, 

865 'status': 'ready', 

866 'name': model_name, 

867 } 

868 from core.platform.events import broadcast_sse_safe 

869 broadcast_sse_safe('capability_update', _cap_event) 

870 try: 

871 from core.platform.service_registry import ServiceRegistry 

872 _lui = ServiceRegistry.get('LiquidUIService') 

873 if _lui: 

874 _lui.agent_ui_update('system', { 

875 'type': 'notification', 

876 'title': 'Capability Ready', 

877 'message': f'{model_name} is ready', 

878 'severity': 'success', 

879 }) 

880 except Exception: 

881 pass 

882 

883 def notify_unloaded(self, model_type: str, model_name: str) -> None: 

884 """Called by subsystems that unloaded a model outside the orchestrator.""" 

885 entry = self._find_entry_by_name(model_type, model_name) 

886 if not entry: 

887 return 

888 self._release_vram(entry) 

889 self._catalog.mark_unloaded(entry.id) 

890 logger.info(f"Catalog synced: {entry.id} unloaded (external)") 

891 

892 def notify_downloaded(self, model_type: str, model_name: str) -> None: 

893 """Called when a model is downloaded outside the orchestrator.""" 

894 entry = self._find_entry_by_name(model_type, model_name) 

895 if entry and not entry.downloaded: 

896 self._catalog.mark_downloaded(entry.id) 

897 logger.info(f"Catalog synced: {entry.id} downloaded (external)") 

898 

899 def _find_entry_by_name(self, model_type: str, model_name: str) -> Optional[ModelEntry]: 

900 """Find a catalog entry by type + display name, partial name, or file name.""" 

901 name_lower = model_name.lower() 

902 for entry in self._catalog.list_by_type(model_type): 

903 if entry.name == model_name or model_name in entry.id: 

904 return entry 

905 if name_lower in entry.name.lower() or name_lower in entry.id: 

906 return entry 

907 if entry.files.get('model') and model_name in entry.files['model']: 

908 return entry 

909 return None 

910 

911 # ── Scan downloaded state ───────────────────────────────────── 

912 

913 def _scan_downloaded(self) -> None: 

914 """Check which catalog entries have their files on disk.""" 

915 for entry in self._catalog.list_all(): 

916 if entry.source == 'api': 

917 entry.downloaded = True 

918 continue 

919 # Delegate to registered loader if available 

920 loader = self._loaders.get(entry.model_type) 

921 if loader: 

922 try: 

923 entry.downloaded = loader.is_downloaded(entry) 

924 except Exception: 

925 pass 

926 elif entry.source == 'pip': 

927 pkg = entry.files.get('package') or entry.repo_id 

928 if pkg: 

929 import importlib.util 

930 entry.downloaded = importlib.util.find_spec( 

931 pkg.replace('-', '_')) is not None 

932 

933 # ── Dashboard ───────────────────────────────────────────────── 

934 

935 def reconcile_live_state(self) -> int: 

936 """Sync catalog flags with the actual runtime state of each loader. 

937 

938 For every catalog entry, ask the matching loader "is this 

939 actually loaded right now?" and update the catalog if the 

940 answer differs from the stored flag. This catches: 

941 

942 - Workers that idle-auto-stopped (catalog says loaded, worker dead) 

943 - Workers that crashed (same) 

944 - Workers started outside the orchestrator (loaded but catalog False) 

945 

946 Returns the number of entries whose state changed. 

947 """ 

948 changed = 0 

949 for entry in self._catalog.list_all(): 

950 loader = self._loaders.get(entry.model_type) 

951 if loader is None: 

952 continue 

953 try: 

954 live = bool(loader.is_loaded(entry)) 

955 except Exception as e: 

956 logger.debug(f"is_loaded probe failed for {entry.id}: {e}") 

957 continue 

958 if live != bool(entry.loaded): 

959 if live: 

960 self._catalog.mark_loaded( 

961 entry.id, device=entry.device or 'unknown', 

962 ) 

963 else: 

964 self._catalog.mark_unloaded(entry.id) 

965 self._release_vram(entry) 

966 changed += 1 

967 logger.info( 

968 f"reconcile: {entry.id} catalog={entry.loaded} → live={live}" 

969 ) 

970 return changed 

971 

972 def get_status(self) -> dict: 

973 """Full system state for admin dashboard. 

974 

975 Reconciles catalog flags with live loader state before returning, 

976 so the UI always sees reality (not a stale snapshot). Also tags 

977 each entry with stale-state diagnostics (loader probe outcome, 

978 worker health), so the UI can render warning pills. 

979 """ 

980 reconcile_changed = self.reconcile_live_state() 

981 

982 cs = self._get_compute_state() 

983 entries = self._catalog.to_json() 

984 

985 # Per-entry stale-state annotation. Four axes of suspicion: 

986 # 1. loader_missing — catalog claims a model_type with no loader 

987 # 2. probe_failed — is_loaded() raised (worker likely gone) 

988 # 3. has_error — entry.error is set (last load failed) 

989 # 4. download_missing — entry marked downloaded but path absent 

990 # The UI reads `stale` (truthy) + `stale_reasons` (user-facing list). 

991 stale_total = 0 

992 for e in entries: 

993 reasons = [] 

994 mtype = e.get('model_type') 

995 loader = self._loaders.get(mtype) 

996 if loader is None: 

997 reasons.append(f"no loader for type '{mtype}'") 

998 else: 

999 try: 

1000 live = bool(loader.is_loaded(self._catalog.get(e['id']))) 

1001 if live != bool(e.get('loaded')): 

1002 reasons.append('catalog/loader state drift') 

1003 except Exception as ex: 

1004 reasons.append(f'probe failed: {ex}') 

1005 if e.get('error'): 

1006 reasons.append(f"last error: {e['error']}") 

1007 e['stale'] = bool(reasons) 

1008 e['stale_reasons'] = reasons 

1009 if reasons: 

1010 stale_total += 1 

1011 

1012 by_type = {} 

1013 for e in entries: 

1014 t = e.get('model_type', 'unknown') 

1015 by_type.setdefault(t, []).append(e) 

1016 

1017 loaded = [e for e in entries if e.get('loaded')] 

1018 downloaded = [e for e in entries if e.get('downloaded')] 

1019 

1020 return { 

1021 'compute': cs, 

1022 'total_models': len(entries), 

1023 'loaded_count': len(loaded), 

1024 'downloaded_count': len(downloaded), 

1025 'stale_count': stale_total, 

1026 'reconcile_changed': reconcile_changed, 

1027 'models_by_type': by_type, 

1028 'loaded_models': loaded, 

1029 'all_models': entries, 

1030 } 

1031 

1032 

1033# ── Singleton ───────────────────────────────────────────────────── 

1034_orchestrator_instance: Optional[ModelOrchestrator] = None 

1035_orchestrator_lock = threading.Lock() 

1036 

1037 

1038def get_orchestrator() -> ModelOrchestrator: 

1039 """Get or create the global ModelOrchestrator singleton.""" 

1040 global _orchestrator_instance 

1041 if _orchestrator_instance is None: 

1042 with _orchestrator_lock: 

1043 if _orchestrator_instance is None: 

1044 _orchestrator_instance = ModelOrchestrator() 

1045 return _orchestrator_instance