Coverage for integrations / service_tools / model_orchestrator.py: 57.2%
423 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2ModelOrchestrator — compute-aware model loading for ANY model type.
4Lives in HARTOS so all deployment targets (Nunba desktop, embedded, cloud)
5share the same orchestration logic. Application-specific loaders (LLM, TTS,
6STT, VLM) are registered as plugins at startup via register_loader().
8Bridges:
9 - ModelCatalog (what models exist)
10 - VRAMManager (how much GPU is free)
11 - ModelLifecycle (idle eviction, pressure response)
12 - Pluggable loaders (registered by the application)
14Usage:
15 from integrations.service_tools.model_orchestrator import get_orchestrator
17 orch = get_orchestrator()
19 # Register application-specific loaders (typically at app startup)
20 orch.register_loader('llm', my_llm_loader)
21 orch.register_loader('tts', my_tts_loader)
23 # Auto-select and load the best LLM for current hardware
24 entry = orch.auto_load('llm')
26 # Load a specific model by ID
27 entry = orch.load('tts-chatterbox-turbo')
28"""
30import logging
31import subprocess
32import sys
33import threading
34import time
35from typing import Optional, Dict, Any, List, Callable
37from integrations.service_tools.model_catalog import (
38 ModelCatalog, ModelEntry, get_catalog,
39)
41logger = logging.getLogger('ModelOrchestrator')
44class ModelLoader:
45 """Interface for subsystem-specific model loaders.
47 Applications implement this to teach the orchestrator how to load/unload
48 models of a specific type. Each method receives the catalog entry and
49 should return True/False for success.
50 """
52 def load(self, entry: ModelEntry, run_mode: str) -> bool:
53 """Load the model. run_mode is 'gpu', 'cpu', or 'cpu_offload'."""
54 raise NotImplementedError
56 def unload(self, entry: ModelEntry) -> None:
57 """Unload/release the model."""
58 pass
60 def download(self, entry: ModelEntry) -> bool:
61 """Download model files. Return True if successful."""
62 return False
64 def is_downloaded(self, entry: ModelEntry) -> bool:
65 """Check if model files are on disk."""
66 return False
68 def is_loaded(self, entry: ModelEntry) -> bool:
69 """Live probe — is the model actually running right now?
71 Default reads `entry.loaded` (the catalog flag). Subclasses that
72 have a real liveness signal (a running subprocess, an HTTP
73 health endpoint, etc.) SHOULD override this. Catalog state alone
74 can drift from reality after idle auto-stop, crashes, or
75 external process kills.
76 """
77 return bool(getattr(entry, 'loaded', False))
79 def validate(self, entry: ModelEntry) -> tuple:
80 """Post-load capability probe.
82 Called by the install pipeline after a successful ``load()`` to
83 prove the loaded model actually answers for its declared
84 capability — not merely "bytes on disk + subprocess alive".
86 Modality-specific subclasses (VLMLoader, TTSLoader, STTLoader,
87 LlamaLoader) SHOULD override this with a canned, deterministic
88 round-trip (e.g. VLM: describe a 32×32 JPEG; TTS: synthesize a
89 fixed phrase; STT: transcribe the TTS output; LLM: complete a
90 canned prompt).
92 Default returns ``(True, 'no capability probe defined')`` so
93 loaders without an override don't gate installs. Any override
94 must be deterministic (fixed input) and fast (<5s wall clock)
95 so the install-probe doesn't hang.
97 Returns:
98 (ok: bool, reason: str) — ``ok=True`` on pass; ``reason``
99 is a one-line human-readable diagnostic used by the
100 install-progress UI and the dispatcher's fallback logging.
101 """
102 return (True, 'no capability probe defined')
105class ModelOrchestrator:
106 """Compute-aware model loader that works for ANY model type.
108 Subsystem-specific loaders are registered as plugins. The orchestrator
109 handles compute state, VRAM tracking, lifecycle, and swap — the loaders
110 only handle the actual load/unload/download mechanics.
111 """
113 def __init__(self, catalog: Optional[ModelCatalog] = None):
114 self._catalog = catalog or get_catalog()
115 self._lock = threading.Lock()
116 self._loaders: Dict[str, ModelLoader] = {}
117 self._scan_downloaded()
119 # ── Loader registration ───────────────────────────────────────
121 def register_loader(self, model_type: str, loader: ModelLoader) -> None:
122 """Register a subsystem-specific loader for a model type.
124 Example:
125 orch.register_loader('llm', LlamaLoader())
126 orch.register_loader('tts', TTSLoader())
127 """
128 self._loaders[model_type] = loader
129 logger.info(f"Registered loader for model_type={model_type}: "
130 f"{loader.__class__.__name__}")
132 # ── Compute state ─────────────────────────────────────────────
134 def _get_compute_state(self) -> dict:
135 """Get current compute availability from VRAMManager singleton."""
136 state = {
137 'gpu_available': False,
138 'gpu_type': 'none',
139 'vram_total_gb': 0.0,
140 'vram_free_gb': 0.0,
141 'ram_free_gb': 4.0,
142 'allocations': {},
143 }
144 try:
145 from integrations.service_tools.vram_manager import vram_manager
146 gpu = vram_manager.detect_gpu()
147 state['gpu_available'] = gpu.get('cuda_available', False) or gpu.get('metal_available', False)
148 state['gpu_type'] = 'cuda' if gpu.get('cuda_available') else (
149 'metal' if gpu.get('metal_available') else 'none')
150 state['vram_total_gb'] = gpu.get('total_gb', 0.0)
151 state['vram_free_gb'] = vram_manager.get_free_vram()
152 state['allocations'] = vram_manager.get_allocations_display()
153 except ImportError:
154 pass
155 try:
156 import psutil
157 state['ram_free_gb'] = round(psutil.virtual_memory().available / (1024**3), 2)
158 except Exception:
159 pass
160 return state
162 # ── Auto-selection ────────────────────────────────────────────
164 def select_best(self, model_type: str, language: Optional[str] = None,
165 require_capability: Optional[Dict[str, Any]] = None,
166 ) -> Optional[ModelEntry]:
167 """Select the best model for a type given current compute state."""
168 cs = self._get_compute_state()
169 return self._catalog.select_best(
170 model_type=model_type,
171 budget_vram_gb=cs['vram_free_gb'],
172 budget_ram_gb=cs['ram_free_gb'],
173 gpu_available=cs['gpu_available'],
174 language=language,
175 require_capability=require_capability,
176 )
178 # ── Load / Unload ─────────────────────────────────────────────
180 def auto_load(self, model_type: str, language: Optional[str] = None,
181 **kwargs) -> Optional[ModelEntry]:
182 """Select the best model for a type and load it."""
183 entry = self.select_best(model_type, language=language, **kwargs)
184 if not entry:
185 logger.warning(f"No {model_type} model fits current compute")
186 return None
187 return self.load(entry.id)
189 def ensure_loaded_async(self, model_type: str,
190 language: Optional[str] = None,
191 caller: str = 'unknown',
192 **kwargs) -> None:
193 """Fire-and-forget wrapper around auto_load().
195 THE single "bring me a model that can do X" entry point for
196 every caller (chat fallback on cold LLM, TTS synth on cold
197 engine, VLM request on cold vision, etc.). Replaces the old
198 pattern where each model type had its own starter helper
199 (LlamaConfig.ensure_running_async, tts_engine._switch_backend,
200 …) — all of them did select_best → load under different names.
202 Capability/task hints flow through **kwargs to select_best so
203 language routing, voice_clone, emotion_tags, narration etc.
204 all funnel into the same selection logic the catalog already
205 indexes. No parallel path.
207 Runs in a daemon thread so the caller returns immediately.
208 Exceptions never escape — the caller already sent its
209 response and this runs in the background.
210 """
211 import threading
213 def _worker():
214 try:
215 entry = self.auto_load(model_type, language=language, **kwargs)
216 if entry:
217 logger.info(
218 f'ensure_loaded_async: loaded {entry.id} '
219 f'on {entry.device} (type={model_type}, caller={caller})'
220 )
221 else:
222 logger.warning(
223 f'ensure_loaded_async: no {model_type} fit '
224 f'(caller={caller})'
225 )
226 except Exception as e:
227 logger.error(f'ensure_loaded_async: worker crashed: {e}')
229 threading.Thread(
230 target=_worker, daemon=True,
231 name=f'ensure_loaded_{model_type}',
232 ).start()
234 def load(self, model_id: str) -> Optional[ModelEntry]:
235 """Load a specific model by ID. Downloads if needed."""
236 entry = self._catalog.get(model_id)
237 if not entry:
238 logger.error(f"Model not found in catalog: {model_id}")
239 return None
241 if entry.loaded:
242 # Reconcile against the loader's live probe — entry.loaded
243 # is a CACHE that drifts when the underlying process dies
244 # externally (CUDA hang, OOM, manual kill, idle auto-stop
245 # outside our lifecycle). Without this probe, the load()
246 # short-circuits with "Model already loaded" and the user
247 # sees a dead service ("Draft boot decision" loop on
248 # 2026-05-04 — task #80). Loaders that override is_loaded()
249 # (LlamaLoader, TTSLoader, STTLoader, VLMLoader) return the
250 # actual liveness; the base class falls back to entry.loaded
251 # so loaders without an override preserve current behavior.
252 loader = self._loaders.get(entry.model_type)
253 try:
254 actually_loaded = (
255 loader.is_loaded(entry) if loader else True
256 )
257 except Exception as _probe_err:
258 logger.warning(
259 f"is_loaded probe failed for {model_id}: "
260 f"{type(_probe_err).__name__}: {_probe_err} — "
261 f"trusting cache flag"
262 )
263 actually_loaded = True
264 if actually_loaded:
265 logger.info(f"Model already loaded: {model_id} ({entry.device})")
266 return entry
267 # Cache lies — process is dead. Reconcile state: release
268 # VRAM, clear the catalog flag, fall through to a fresh load.
269 logger.warning(
270 f"Stale cache for {model_id}: catalog says loaded but "
271 f"is_loaded() probe says dead — reconciling and "
272 f"respawning"
273 )
274 try:
275 self._release_vram(entry)
276 except Exception:
277 pass
278 try:
279 self._catalog.mark_unloaded(model_id)
280 except Exception:
281 pass
283 cs = self._get_compute_state()
284 fit = entry.matches_compute(
285 cs['vram_free_gb'], cs['ram_free_gb'], cs['gpu_available'])
286 if fit == 'impossible':
287 if cs['gpu_available'] and entry.vram_gb > 0:
288 swapped = self._attempt_swap(entry, cs)
289 if swapped:
290 cs = self._get_compute_state()
291 fit = entry.matches_compute(
292 cs['vram_free_gb'], cs['ram_free_gb'], cs['gpu_available'])
294 if fit == 'impossible':
295 logger.error(f"Cannot load {model_id}: insufficient compute "
296 f"(need {entry.vram_gb}GB VRAM or {entry.ram_gb}GB RAM)")
297 self._catalog.mark_error(model_id, 'Insufficient compute')
298 return None
300 logger.info(f"Loading {model_id} ({entry.model_type}) in {fit} mode...")
302 # Allocate VRAM BEFORE load so other models see the reservation.
303 # Rolled back on failure.
304 if not self._register_vram(entry, fit):
305 logger.warning(f"Skipping {model_id}: VRAM full")
306 return None
307 try:
308 success = self._dispatch_load(entry, fit)
309 if success:
310 self._catalog.mark_loaded(model_id, device=fit)
311 self._register_lifecycle(entry)
312 self._register_service_tool(entry)
313 logger.info(f"Loaded {model_id} on {fit}")
314 return entry
315 else:
316 self._release_vram(entry)
317 self._catalog.mark_error(model_id, 'Loader returned failure')
318 return None
319 except Exception as e:
320 self._release_vram(entry)
321 logger.error(f"Failed to load {model_id}: {e}")
322 self._catalog.mark_error(model_id, str(e))
323 return None
325 def unload(self, model_id: str) -> bool:
326 """Unload a model and release its resources."""
327 entry = self._catalog.get(model_id)
328 if not entry or not entry.loaded:
329 return False
331 try:
332 self._dispatch_unload(entry)
333 except Exception as e:
334 logger.warning(f"Unload dispatch failed for {model_id}: {e}")
336 self._release_vram(entry)
337 self._deregister_service_tool(entry)
338 self._catalog.mark_unloaded(model_id)
339 logger.info(f"Unloaded {model_id}")
340 return True
342 def download(self, model_id: str) -> bool:
343 """Download a model without loading it."""
344 entry = self._catalog.get(model_id)
345 if not entry:
346 return False
347 if entry.downloaded:
348 return True
349 try:
350 success = self._dispatch_download(entry)
351 if success:
352 self._catalog.mark_downloaded(model_id)
353 return success
354 except Exception as e:
355 logger.error(f"Download failed for {model_id}: {e}")
356 self._catalog.mark_error(model_id, str(e))
357 return False
359 # ── Capability introspection ────────────────────────────────────
361 def available_capabilities(self) -> Dict[str, Any]:
362 """What this node can do right now — universal, any model type.
364 Merges THREE sources into one capability map:
365 1. ModelCatalog — static entries (LLM, TTS, STT, VLM, video_gen, audio_gen, etc.)
366 2. ServiceToolRegistry — dynamic sidecars (ACE Step, DiffRhythm, txt2img, etc.)
367 3. RuntimeToolManager — running services with health status
369 Returns dict keyed by category (model_type OR service tag), each with:
370 - available: bool
371 - loaded: list of loaded model/service IDs
372 - can_load: list that fit current compute but aren't loaded
373 - capabilities: merged capability set across all entries
374 - services: list of running dynamic services in this category
376 New categories from dynamic services appear automatically —
377 no code changes needed when a new service type registers.
378 """
379 cs = self._get_compute_state()
380 result = {}
382 # 1. Catalog models (static + dynamically registered)
383 for mt in self._catalog.list_types():
384 entries = self._catalog.list_by_type(mt)
385 loaded = [e.id for e in entries if e.loaded]
386 can_load = [
387 e.id for e in entries
388 if not e.loaded and e.enabled
389 and e.matches_compute(
390 cs['vram_free_gb'], cs['ram_free_gb'],
391 cs['gpu_available']) != 'impossible'
392 ]
393 # Merge capabilities from all available models
394 merged_caps = {}
395 for e in entries:
396 if e.loaded or (e.enabled and e.matches_compute(
397 cs['vram_free_gb'], cs['ram_free_gb'],
398 cs['gpu_available']) != 'impossible'):
399 for k, v in e.capabilities.items():
400 if v:
401 merged_caps[k] = True
402 result[mt] = {
403 'available': bool(loaded or can_load),
404 'loaded': loaded,
405 'can_load': can_load,
406 'capabilities': merged_caps,
407 'services': [],
408 }
410 # 2. Dynamic services (ServiceToolRegistry) — may introduce NEW categories
411 try:
412 from integrations.service_tools.registry import service_tool_registry
413 for name, tool_info in service_tool_registry._tools.items():
414 tags = getattr(tool_info, 'tags', []) or []
415 # Each tag can map to a category
416 categories = set()
417 for tag in tags:
418 if tag in result:
419 categories.add(tag)
420 # If no existing category matches, use the tool name as category
421 if not categories:
422 cat = tags[0] if tags else name
423 categories.add(cat)
425 for cat in categories:
426 if cat not in result:
427 result[cat] = {
428 'available': True,
429 'loaded': [],
430 'can_load': [],
431 'capabilities': {},
432 'services': [],
433 }
434 result[cat]['available'] = True
435 result[cat]['services'].append(name)
436 except Exception:
437 pass
439 # 3. Running services (RuntimeToolManager) — mark health status
440 try:
441 from integrations.service_tools.runtime_manager import runtime_tool_manager
442 all_status = runtime_tool_manager.get_all_status()
443 for tool_name, status in all_status.items():
444 if status.get('running'):
445 # Find which category this tool belongs to
446 for cat, info in result.items():
447 if tool_name in info.get('services', []):
448 info['available'] = True
449 except Exception:
450 pass
452 return result
454 def can_do(self, model_type: str, capability: str = None) -> bool:
455 """Quick check: can this node handle a task right now?
457 Works for ANY model type or service category — including ones
458 that only exist as dynamic services (not in the catalog).
460 Usage:
461 orchestrator.can_do('tts') # any TTS?
462 orchestrator.can_do('audio_gen', 'music_gen') # music specifically?
463 orchestrator.can_do('video_gen', 'txt2vid') # text-to-video?
464 orchestrator.can_do('robot_locomotion') # new category from service?
465 """
466 caps = self.available_capabilities()
467 type_info = caps.get(model_type, {})
468 if not type_info.get('available'):
469 return False
470 if not capability:
471 return True
472 # Check merged capabilities from models + services
473 if type_info.get('capabilities', {}).get(capability):
474 return True
475 # Deeper check: individual model entries
476 all_ids = type_info.get('loaded', []) + type_info.get('can_load', [])
477 for mid in all_ids:
478 entry = self._catalog.get(mid)
479 if entry and entry.capabilities.get(capability):
480 return True
481 return False
483 def capability_prompt(self) -> str:
484 """Auto-generate a compact capability summary for LLM prompt injection.
486 Dynamically built from live state — new services appear automatically,
487 offline services disappear. No hardcoding needed.
489 Returns empty string if nothing special is available (no prompt bloat).
490 Format designed for small LLMs: one line per capability, action-oriented.
491 """
492 caps = self.available_capabilities()
493 lines = []
495 for cat, info in caps.items():
496 if not info.get('available'):
497 continue
498 # Skip LLM (the agent IS the LLM) and embedding (internal)
499 if cat in ('llm', 'embedding'):
500 continue
502 cap_list = sorted(info.get('capabilities', {}).keys())
503 loaded = info.get('loaded', [])
504 services = info.get('services', [])
506 # Build human-readable one-liner
507 status_parts = []
508 if loaded:
509 status_parts.append(f"ready: {', '.join(loaded[:3])}")
510 elif services:
511 status_parts.append(f"via: {', '.join(services[:3])}")
512 elif info.get('can_load'):
513 status_parts.append(f"available: {', '.join(info['can_load'][:2])}")
514 cap_str = f" — {', '.join(cap_list[:5])}" if cap_list else ''
516 label = cat.replace('_', ' ')
517 status = f" ({'; '.join(status_parts)})" if status_parts else ''
518 lines.append(f"- {label}{cap_str}{status}")
520 if not lines:
521 return ''
523 return (
524 'Available capabilities on this node (use via tools — '
525 'call generate_media or synthesize_multilingual_audio):\n'
526 + '\n'.join(lines)
527 )
529 # ── Loader dispatch ───────────────────────────────────────────
531 def _dispatch_load(self, entry: ModelEntry, run_mode: str) -> bool:
532 """Route loading to the registered loader for this model type."""
533 loader = self._loaders.get(entry.model_type)
534 if loader:
535 return loader.load(entry, run_mode)
536 # Fallback: try RuntimeToolManager for sidecar-based tools
537 return self._load_generic(entry, run_mode)
539 def _dispatch_unload(self, entry: ModelEntry) -> None:
540 """Route unloading to the registered loader."""
541 loader = self._loaders.get(entry.model_type)
542 if loader:
543 loader.unload(entry)
545 def _dispatch_download(self, entry: ModelEntry) -> bool:
546 """Route downloading to the registered loader or generic fallback."""
547 loader = self._loaders.get(entry.model_type)
548 if loader:
549 return loader.download(entry)
550 if entry.source == 'pip':
551 return self._install_pip(entry)
552 return False
554 def _load_generic(self, entry: ModelEntry, run_mode: str) -> bool:
555 """Fallback: try RuntimeToolManager for sidecar-based tools."""
556 try:
557 from integrations.service_tools.runtime_manager import runtime_tool_manager
558 tool_name = entry.id.replace(f'{entry.model_type}-', '')
559 result = runtime_tool_manager.setup_tool(tool_name)
560 return result.get('running', False)
561 except Exception as e:
562 logger.warning(f"Generic load failed for {entry.id}: {e}")
563 return False
565 def _install_pip(self, entry: ModelEntry) -> bool:
566 """Install a pip package for a model backend."""
567 pkg = entry.files.get('package') or entry.repo_id
568 if not pkg:
569 return False
570 try:
571 _kw = dict(capture_output=True, text=True, timeout=300)
572 if sys.platform == 'win32':
573 _kw['creationflags'] = subprocess.CREATE_NO_WINDOW
574 result = subprocess.run(
575 [sys.executable, '-m', 'pip', 'install', pkg, '--quiet'],
576 **_kw)
577 return result.returncode == 0
578 except Exception as e:
579 logger.error(f"pip install failed for {pkg}: {e}")
580 return False
582 # ── VRAM integration ──────────────────────────────────────────
583 #
584 # KEY ALIGNMENT: VRAMManager._allocations and VRAM_BUDGETS use raw tool
585 # names ("whisper", "tts_chatterbox_turbo"). RuntimeToolManager (RTM) also
586 # uses these raw names. We use the SAME key convention to avoid
587 # double-counting when both RTM and the Orchestrator register the same model.
589 _CATALOG_TO_VRAM_KEY = {
590 # STT — faster-whisper (primary engine)
591 'stt-faster-whisper-tiny': 'whisper_tiny',
592 'stt-faster-whisper-base': 'whisper_base',
593 'stt-faster-whisper-small': 'whisper_small',
594 'stt-faster-whisper-medium': 'whisper_medium',
595 'stt-faster-whisper-large': 'whisper_large',
596 # STT — sherpa-onnx (CPU-only ONNX, no GPU VRAM)
597 'stt-sherpa-moonshine-tiny': 'sherpa_moonshine_tiny',
598 'stt-sherpa-moonshine-base': 'sherpa_moonshine_base',
599 'stt-sherpa-whisper-tiny': 'sherpa_whisper_tiny',
600 'stt-sherpa-whisper-base': 'sherpa_whisper_base',
601 'stt-sherpa-whisper-small': 'sherpa_whisper_small',
602 'stt-sherpa-whisper-medium': 'sherpa_whisper_medium',
603 # STT — legacy fallback IDs (used by old catalog entries)
604 'stt-whisper-base': 'whisper_base',
605 'stt-whisper-medium': 'whisper_medium',
606 'stt-whisper-large': 'whisper_large',
607 # TTS
608 'tts-chatterbox-turbo': 'tts_chatterbox_turbo',
609 'tts-f5-tts': 'tts_f5',
610 'tts-indic-parler': 'tts_indic_parler',
611 'tts-cosyvoice3': 'tts_cosyvoice3',
612 'tts-chatterbox-ml': 'tts_chatterbox_ml',
613 'tts-kokoro': 'tts_kokoro',
614 # VLM
615 'vlm-minicpm-v2': 'minicpm',
616 'vlm-qwen3vl': 'qwen3vl',
617 # VLM — CPU-only backends (no GPU VRAM tracking needed, included for completeness)
618 'vlm-mobilevlm': 'mobilevlm',
619 'vlm-clip': 'clip',
620 # Video gen
621 'video_gen-wan2gp': 'wan2gp',
622 'video_gen-ltx2': 'ltx2',
623 }
625 def _vram_key(self, entry: ModelEntry) -> str:
626 """Get the VRAMManager allocation key for a catalog entry.
628 For LLMs, always uses 'llm' — there's only one LLM loaded at a time
629 (llama-server is single-model). This makes registration idempotent
630 regardless of whether LlamaConfig or the Orchestrator registers first.
631 """
632 if entry.model_type == 'llm':
633 return 'llm'
634 return self._CATALOG_TO_VRAM_KEY.get(entry.id, entry.id)
636 def _register_vram(self, entry: ModelEntry, run_mode: str) -> bool:
637 """Register VRAM allocation. Returns False if GPU is full."""
638 if run_mode != 'gpu' or entry.vram_gb <= 0:
639 return True
640 try:
641 from integrations.service_tools.vram_manager import vram_manager
642 return vram_manager.allocate(self._vram_key(entry))
643 except ImportError:
644 return True
646 def _release_vram(self, entry: ModelEntry) -> None:
647 """Release VRAM allocation."""
648 try:
649 from integrations.service_tools.vram_manager import vram_manager
650 tool_key = self._vram_key(entry)
651 freed = vram_manager._allocations.pop(tool_key, 0)
652 if freed:
653 logger.info(f"VRAM released: {tool_key} = {freed}GB")
654 except ImportError:
655 pass
657 # ── Lifecycle integration ─────────────────────────────────────
659 def _register_lifecycle(self, entry: ModelEntry) -> None:
660 """Register model with ModelLifecycleManager — central tracker for ALL GPU models.
662 Every model that loads on GPU MUST register here so the lifecycle
663 manager can evict/offload/restore models when VRAM is needed.
665 Two LLM-specific eviction policies are applied here based on the
666 catalog entry id:
668 * **Draft 0.8B classifier** (``llm-qwen3.5-0.8b*``) →
669 ``pinned=True``. The 0.8B is first-contact for EVERY chat
670 message (speculative_dispatcher.dispatch_draft_first) so
671 evicting it on idle means every message cold-starts the
672 classifier → full LangChain pipeline fallthrough. Pinning
673 costs ~550 MB + mmproj permanently, which is cheaper than
674 paying cold-start cost per request.
676 * **Main+VLM LLMs** (``llm-qwen*-vl*``, or ``purposes``
677 containing any of ``vision`` / ``caption`` / ``grounding``)
678 → ``pinned=True``. The same llama-server serves both
679 chat AND the VLM agentic loop; losing it kills both
680 surfaces and any in-flight agentic loop has no recovery
681 context. See 2026-05-03 incident: the 4B-VL died
682 silently between 16:05 and 16:59, no event in any log
683 (Nunba uses ``subprocess.PIPE`` for the main server's
684 stdout and only drains it during startup, so any final
685 output was lost), VRAM freed back to 7.83 GB / 8 GB
686 despite the prior ``pressure_evict_only`` policy. Pin
687 forces the model to survive every code path including
688 phase-3 pressure eviction.
690 * **Chat-only main LLMs** (``llm-qwen*-2b*`` /
691 ``llm-qwen*-4b*`` without VL) → ``pressure_evict_only=
692 True``. Survive passive idle sweep (phase 7), yield
693 under real VRAM pressure (phase 3). A brief reload
694 costs only chat continuity, no in-flight VLM loop.
695 Before this distinction, the 2026-04-11 incident showed
696 the 4B being killed every 5 minutes mid-session because
697 its 340s idle exceeded the 300s timeout, even though
698 nothing else needed the VRAM.
700 * All other models (STT, TTS, standalone vision sidecars)
701 keep the default passive idle eviction — their cold
702 start is cheap and the VRAM is better spent on the
703 model the user's about to use next.
704 """
705 try:
706 from integrations.service_tools.model_lifecycle import (
707 get_model_lifecycle_manager, ModelState, ModelDevice, ModelPriority)
708 mlm = get_model_lifecycle_manager()
709 device = (ModelDevice.GPU if entry.device == 'gpu'
710 else ModelDevice.CPU if entry.device in ('cpu', 'cpu_offload')
711 else ModelDevice.CPU)
712 # Map catalog names to offload table names (e.g., 'stt-whisper-large' → 'whisper')
713 offload_name = entry.id.split('-')[1] if '-' in entry.id else entry.id
714 # Use the offload table key if it exists, otherwise the catalog ID
715 from integrations.service_tools.model_lifecycle import CPU_OFFLOAD_TABLE
716 if offload_name not in CPU_OFFLOAD_TABLE:
717 offload_name = entry.id
719 # Eviction-policy flags. See the ModelState docstring for the
720 # full rationale and the 2026-04-12 incident context.
721 _pinned = False
722 _pressure_evict_only = False
723 if entry.model_type == 'llm':
724 # Use catalog purpose assignment (admin-configurable).
725 # Fallback: pattern match on ID for backward compat.
726 _purposes = getattr(entry, 'purposes', []) or []
727 _id_lower = entry.id.lower()
728 _is_draft = 'draft' in _purposes or (
729 not _purposes and any(
730 t in _id_lower for t in ('0.8b', 'draft', 'caption')
731 )
732 )
733 # Main LLM that ALSO serves VLM (one llama-server with
734 # mmproj loaded — chat + agentic-loop on the same model)
735 # gets pinned. See the docstring above for the 2026-05-03
736 # silent-death incident.
737 _is_vlm_main = (
738 not _is_draft and (
739 any(p in _purposes for p in ('vision', 'caption', 'grounding'))
740 or (not _purposes and any(
741 t in _id_lower for t in ('-vl-', '-vlm-', 'mmproj')
742 ))
743 )
744 )
745 if _is_draft or _is_vlm_main:
746 _pinned = True
747 else:
748 # Chat-only main tier — survive idle sweeps, yield
749 # under real VRAM pressure.
750 _pressure_evict_only = True
752 _priority = ModelPriority.ACTIVE if entry.model_type == 'llm' else ModelPriority.WARM
753 mlm._models[offload_name] = ModelState(
754 name=offload_name,
755 device=device,
756 priority=_priority,
757 pinned=_pinned,
758 pressure_evict_only=_pressure_evict_only,
759 )
760 if hasattr(mlm, 'notify_access'):
761 mlm.notify_access(offload_name)
762 _policy = ('pinned' if _pinned
763 else 'pressure_evict_only' if _pressure_evict_only
764 else 'default_idle_evict')
765 logger.info(
766 f"Lifecycle: registered {offload_name} "
767 f"(device={device}, policy={_policy})"
768 )
769 except ImportError:
770 pass
771 except Exception as e:
772 logger.debug(f"Lifecycle registration failed for {entry.id}: {e}")
774 # ── Service tool registration ────────────────────────────────────
775 # When a model loads, register its corresponding service tool so
776 # the LLM sees the capability via get_tools() → {{tools}}.
777 # Each tool class (AceStepTool, CosyVoiceTool, etc.) self-registers
778 # with service_tool_registry — we just trigger the registration.
780 # Maps catalog model_type or id-prefix to the tool module + class
781 _SERVICE_TOOL_MAP = {
782 'audio_gen-acestep': ('integrations.service_tools.acestep_tool', 'AceStepTool'),
783 'stt-whisper': ('integrations.service_tools.whisper_tool', 'WhisperTool'),
784 'tts-cosyvoice3': ('integrations.service_tools.cosyvoice_tool', 'CosyVoiceTool'),
785 'tts-f5': ('integrations.service_tools.f5_tts_tool', 'F5TTSTool'),
786 'tts-indic-parler': ('integrations.service_tools.indic_parler_tool', 'IndicParlerTool'),
787 'tts-pocket': ('integrations.service_tools.pocket_tts_tool', 'PocketTTSTool'),
788 }
790 def _register_service_tool(self, entry: ModelEntry) -> None:
791 """Register loaded model with service_tool_registry."""
792 for prefix, (mod_path, cls_name) in self._SERVICE_TOOL_MAP.items():
793 if entry.id.startswith(prefix):
794 try:
795 import importlib
796 mod = importlib.import_module(mod_path)
797 tool_cls = getattr(mod, cls_name, None)
798 if tool_cls:
799 reg = getattr(tool_cls, 'register', None) or \
800 getattr(tool_cls, 'register_functions', None)
801 if reg:
802 reg()
803 logger.info(f"Service tool registered: {cls_name}")
804 except Exception as e:
805 logger.debug(f"Service tool registration skipped for {entry.id}: {e}")
806 return
808 def _deregister_service_tool(self, entry: ModelEntry) -> None:
809 """Remove tool from service_tool_registry on unload."""
810 for prefix, (mod_path, cls_name) in self._SERVICE_TOOL_MAP.items():
811 if entry.id.startswith(prefix):
812 try:
813 from integrations.service_tools.registry import service_tool_registry
814 # Extract tool name from class convention (AceStepTool → acestep)
815 tool_name = prefix.split('-', 1)[-1] if '-' in prefix else prefix
816 if tool_name in service_tool_registry._tools:
817 service_tool_registry._tools[tool_name].is_healthy = False
818 logger.info(f"Service tool deregistered: {tool_name}")
819 except Exception:
820 pass
821 return
823 # ── Model swapping ──────────────────────────────────────────────
825 def _attempt_swap(self, needed: ModelEntry, cs: dict) -> bool:
826 """Try to free GPU VRAM by evicting a lower-priority model."""
827 try:
828 from integrations.service_tools.model_lifecycle import (
829 get_model_lifecycle_manager)
830 mlm = get_model_lifecycle_manager()
831 swapped = mlm.request_swap(
832 needed_model=needed.id,
833 needed_type='gpu',
834 )
835 if swapped:
836 logger.info(f"Swap initiated to make room for {needed.id}")
837 time.sleep(1.0)
838 try:
839 from integrations.service_tools.vram_manager import vram_manager
840 vram_manager.refresh_gpu_info()
841 except Exception:
842 pass
843 return True
844 except ImportError:
845 pass
846 return False
848 # ── External sync — for bypass paths that load outside orchestrator ──
850 def notify_loaded(self, model_type: str, model_name: str,
851 device: str = 'gpu', vram_gb: float = 0) -> None:
852 """Called by subsystems that loaded a model outside the orchestrator."""
853 entry = self._find_entry_by_name(model_type, model_name)
854 if not entry:
855 return
856 self._catalog.mark_loaded(entry.id, device=device)
857 if not entry.downloaded:
858 self._catalog.mark_downloaded(entry.id)
859 self._register_vram(entry, device)
860 self._register_lifecycle(entry)
861 logger.info(f"Catalog synced: {entry.id} loaded on {device} (external)")
862 # Push capability notification to frontend via SSE + Liquid UI
863 _cap_event = {
864 'capability': model_type,
865 'status': 'ready',
866 'name': model_name,
867 }
868 from core.platform.events import broadcast_sse_safe
869 broadcast_sse_safe('capability_update', _cap_event)
870 try:
871 from core.platform.service_registry import ServiceRegistry
872 _lui = ServiceRegistry.get('LiquidUIService')
873 if _lui:
874 _lui.agent_ui_update('system', {
875 'type': 'notification',
876 'title': 'Capability Ready',
877 'message': f'{model_name} is ready',
878 'severity': 'success',
879 })
880 except Exception:
881 pass
883 def notify_unloaded(self, model_type: str, model_name: str) -> None:
884 """Called by subsystems that unloaded a model outside the orchestrator."""
885 entry = self._find_entry_by_name(model_type, model_name)
886 if not entry:
887 return
888 self._release_vram(entry)
889 self._catalog.mark_unloaded(entry.id)
890 logger.info(f"Catalog synced: {entry.id} unloaded (external)")
892 def notify_downloaded(self, model_type: str, model_name: str) -> None:
893 """Called when a model is downloaded outside the orchestrator."""
894 entry = self._find_entry_by_name(model_type, model_name)
895 if entry and not entry.downloaded:
896 self._catalog.mark_downloaded(entry.id)
897 logger.info(f"Catalog synced: {entry.id} downloaded (external)")
899 def _find_entry_by_name(self, model_type: str, model_name: str) -> Optional[ModelEntry]:
900 """Find a catalog entry by type + display name, partial name, or file name."""
901 name_lower = model_name.lower()
902 for entry in self._catalog.list_by_type(model_type):
903 if entry.name == model_name or model_name in entry.id:
904 return entry
905 if name_lower in entry.name.lower() or name_lower in entry.id:
906 return entry
907 if entry.files.get('model') and model_name in entry.files['model']:
908 return entry
909 return None
911 # ── Scan downloaded state ─────────────────────────────────────
913 def _scan_downloaded(self) -> None:
914 """Check which catalog entries have their files on disk."""
915 for entry in self._catalog.list_all():
916 if entry.source == 'api':
917 entry.downloaded = True
918 continue
919 # Delegate to registered loader if available
920 loader = self._loaders.get(entry.model_type)
921 if loader:
922 try:
923 entry.downloaded = loader.is_downloaded(entry)
924 except Exception:
925 pass
926 elif entry.source == 'pip':
927 pkg = entry.files.get('package') or entry.repo_id
928 if pkg:
929 import importlib.util
930 entry.downloaded = importlib.util.find_spec(
931 pkg.replace('-', '_')) is not None
933 # ── Dashboard ─────────────────────────────────────────────────
935 def reconcile_live_state(self) -> int:
936 """Sync catalog flags with the actual runtime state of each loader.
938 For every catalog entry, ask the matching loader "is this
939 actually loaded right now?" and update the catalog if the
940 answer differs from the stored flag. This catches:
942 - Workers that idle-auto-stopped (catalog says loaded, worker dead)
943 - Workers that crashed (same)
944 - Workers started outside the orchestrator (loaded but catalog False)
946 Returns the number of entries whose state changed.
947 """
948 changed = 0
949 for entry in self._catalog.list_all():
950 loader = self._loaders.get(entry.model_type)
951 if loader is None:
952 continue
953 try:
954 live = bool(loader.is_loaded(entry))
955 except Exception as e:
956 logger.debug(f"is_loaded probe failed for {entry.id}: {e}")
957 continue
958 if live != bool(entry.loaded):
959 if live:
960 self._catalog.mark_loaded(
961 entry.id, device=entry.device or 'unknown',
962 )
963 else:
964 self._catalog.mark_unloaded(entry.id)
965 self._release_vram(entry)
966 changed += 1
967 logger.info(
968 f"reconcile: {entry.id} catalog={entry.loaded} → live={live}"
969 )
970 return changed
972 def get_status(self) -> dict:
973 """Full system state for admin dashboard.
975 Reconciles catalog flags with live loader state before returning,
976 so the UI always sees reality (not a stale snapshot). Also tags
977 each entry with stale-state diagnostics (loader probe outcome,
978 worker health), so the UI can render warning pills.
979 """
980 reconcile_changed = self.reconcile_live_state()
982 cs = self._get_compute_state()
983 entries = self._catalog.to_json()
985 # Per-entry stale-state annotation. Four axes of suspicion:
986 # 1. loader_missing — catalog claims a model_type with no loader
987 # 2. probe_failed — is_loaded() raised (worker likely gone)
988 # 3. has_error — entry.error is set (last load failed)
989 # 4. download_missing — entry marked downloaded but path absent
990 # The UI reads `stale` (truthy) + `stale_reasons` (user-facing list).
991 stale_total = 0
992 for e in entries:
993 reasons = []
994 mtype = e.get('model_type')
995 loader = self._loaders.get(mtype)
996 if loader is None:
997 reasons.append(f"no loader for type '{mtype}'")
998 else:
999 try:
1000 live = bool(loader.is_loaded(self._catalog.get(e['id'])))
1001 if live != bool(e.get('loaded')):
1002 reasons.append('catalog/loader state drift')
1003 except Exception as ex:
1004 reasons.append(f'probe failed: {ex}')
1005 if e.get('error'):
1006 reasons.append(f"last error: {e['error']}")
1007 e['stale'] = bool(reasons)
1008 e['stale_reasons'] = reasons
1009 if reasons:
1010 stale_total += 1
1012 by_type = {}
1013 for e in entries:
1014 t = e.get('model_type', 'unknown')
1015 by_type.setdefault(t, []).append(e)
1017 loaded = [e for e in entries if e.get('loaded')]
1018 downloaded = [e for e in entries if e.get('downloaded')]
1020 return {
1021 'compute': cs,
1022 'total_models': len(entries),
1023 'loaded_count': len(loaded),
1024 'downloaded_count': len(downloaded),
1025 'stale_count': stale_total,
1026 'reconcile_changed': reconcile_changed,
1027 'models_by_type': by_type,
1028 'loaded_models': loaded,
1029 'all_models': entries,
1030 }
1033# ── Singleton ─────────────────────────────────────────────────────
1034_orchestrator_instance: Optional[ModelOrchestrator] = None
1035_orchestrator_lock = threading.Lock()
1038def get_orchestrator() -> ModelOrchestrator:
1039 """Get or create the global ModelOrchestrator singleton."""
1040 global _orchestrator_instance
1041 if _orchestrator_instance is None:
1042 with _orchestrator_lock:
1043 if _orchestrator_instance is None:
1044 _orchestrator_instance = ModelOrchestrator()
1045 return _orchestrator_instance