Coverage for integrations / service_tools / model_catalog.py: 69.6%

388 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2ModelCatalog — single source of truth for ALL model types. 

3 

4One schema covers LLM, TTS, STT, VLM, image gen, video gen, etc. 

5JSON-backed so the admin UI can CRUD entries at runtime. 

6 

7Adding a new model of ANY type: 

8 1. catalog.register(ModelEntry(...)) — programmatic 

9 2. POST /api/admin/models — via admin UI 

10 3. Edit model_catalog.json in the data dir — manual 

11 

12The catalog does NOT load/unload models — that's the orchestrator's job. 

13This is purely metadata + state tracking. 

14""" 

15 

16import json 

17import logging 

18import os 

19import threading 

20import time 

21from dataclasses import dataclass, field, asdict 

22from enum import Enum 

23from pathlib import Path 

24from typing import Dict, List, Optional, Any 

25 

26logger = logging.getLogger('ModelCatalog') 

27 

28 

29# ── Model type enum — single source of truth ───────────────────── 

30# Use ModelType.LLM (not 'llm') everywhere. The .value IS the string 

31# so JSON serialization and dict key usage work unchanged. 

32# 

33# Usage: 

34# ModelType.LLM → <ModelType.LLM: 'llm'> 

35# ModelType.LLM.value → 'llm' 

36# ModelType.LLM.label → 'Large Language Model' 

37# ModelType('llm') → ModelType.LLM (lookup from string) 

38# str(ModelType.LLM) → 'llm' (clean string for JSON/logs) 

39 

40class ModelType(str, Enum): 

41 """Canonical model type identifiers. Inherits from str so 

42 ModelType.LLM == 'llm' is True — backwards compatible with 

43 all existing string comparisons, dict keys, and JSON.""" 

44 

45 LLM = 'llm' 

46 TTS = 'tts' 

47 STT = 'stt' 

48 VLM = 'vlm' 

49 IMAGE_GEN = 'image_gen' 

50 VIDEO_GEN = 'video_gen' 

51 AUDIO_GEN = 'audio_gen' 

52 EMBEDDING = 'embedding' 

53 

54 @property 

55 def label(self) -> str: 

56 return _MODEL_TYPE_LABELS[self] 

57 

58 def __str__(self) -> str: 

59 return self.value 

60 

61 

62_MODEL_TYPE_LABELS = { 

63 ModelType.LLM: 'Large Language Model', 

64 ModelType.TTS: 'Text-to-Speech', 

65 ModelType.STT: 'Speech-to-Text', 

66 ModelType.VLM: 'Vision-Language Model', 

67 ModelType.IMAGE_GEN: 'Image Generation', 

68 ModelType.VIDEO_GEN: 'Video Generation', 

69 ModelType.AUDIO_GEN: 'Audio/Music Generation', 

70 ModelType.EMBEDDING: 'Embedding Model', 

71} 

72 

73# Backwards-compatible dict for code that iterates MODEL_TYPES 

74MODEL_TYPES = {mt.value: mt.label for mt in ModelType} 

75 

76# Backend runtimes 

77BACKENDS = { 

78 'llama.cpp': 'llama.cpp server (GGUF)', 

79 'torch': 'PyTorch (HuggingFace)', 

80 'onnx': 'ONNX Runtime', 

81 'piper': 'Piper TTS (ONNX, CPU)', 

82 'api': 'Remote API endpoint', 

83 'sidecar': 'Subprocess sidecar', 

84 'in_process': 'In-process Python module', 

85} 

86 

87# Download sources 

88SOURCES = { 

89 'huggingface': 'HuggingFace Hub', 

90 'ollama': 'Ollama registry', 

91 'github': 'GitHub release', 

92 'pip': 'Python package (pip)', 

93 'api': 'Remote API (no download)', 

94 'local': 'Already on disk', 

95 'custom_url': 'Custom download URL', 

96} 

97 

98 

99@dataclass 

100class ModelEntry: 

101 """Universal model descriptor — works for any model type.""" 

102 

103 # ── Identity ────────────────────────────────────────────────── 

104 id: str # Unique slug: "qwen3.5-4b-vl", "chatterbox-turbo" 

105 name: str # Human-readable display name 

106 model_type: str # Key from MODEL_TYPES 

107 version: str = '1.0' # Semver or commit hash 

108 

109 # ── Source & Files ──────────────────────────────────────────── 

110 source: str = 'huggingface' # Key from SOURCES 

111 repo_id: str = '' # HuggingFace repo, Ollama model name, pip package 

112 files: Dict[str, str] = field(default_factory=dict) 

113 download_url: str = '' # For custom_url source 

114 

115 # ── Compute Requirements ────────────────────────────────────── 

116 vram_gb: float = 0.0 # GPU VRAM needed (0 = CPU-capable) 

117 ram_gb: float = 1.0 # System RAM needed 

118 disk_gb: float = 0.0 # Disk space for model files 

119 min_capability_tier: str = 'lite' # 'lite', 'standard', 'full' 

120 

121 # ── Runtime ─────────────────────────────────────────────────── 

122 backend: str = 'torch' # Key from BACKENDS 

123 supports_gpu: bool = True 

124 supports_cpu: bool = True 

125 supports_cpu_offload: bool = False 

126 cpu_offload_method: str = 'none' # 'torch_to_cpu', 'restart_cpu', 'none' 

127 idle_timeout_s: float = 600.0 

128 min_build: Optional[int] = None 

129 

130 # ── Capabilities (generic key-value) ────────────────────────── 

131 capabilities: Dict[str, Any] = field(default_factory=dict) 

132 

133 # ── Selection metadata ──────────────────────────────────────── 

134 quality_score: float = 0.5 

135 speed_score: float = 0.5 

136 cost_per_1k: float = 0.0 

137 priority: int = 50 

138 

139 # ── Routing (for TTS/STT language-based routing) ────────────── 

140 languages: List[str] = field(default_factory=list) 

141 language_priority: Dict[str, int] = field(default_factory=dict) 

142 

143 # ── State (runtime, NOT persisted to JSON) ──────────────────── 

144 downloaded: bool = False 

145 loaded: bool = False 

146 device: str = 'unloaded' 

147 active_since: Optional[float] = None 

148 error: Optional[str] = None 

149 

150 # ── Tags for filtering ──────────────────────────────────────── 

151 tags: List[str] = field(default_factory=list) 

152 

153 # ── User-configurable flags ─────────────────────────────────── 

154 enabled: bool = True 

155 auto_load: bool = False 

156 pinned: bool = False 

157 purposes: List[str] = field(default_factory=list) # e.g. ['draft', 'main', 'caption'] 

158 

159 def to_dict(self) -> dict: 

160 """Serialize to JSON-safe dict (excludes runtime state).""" 

161 d = asdict(self) 

162 for key in ('downloaded', 'loaded', 'device', 'active_since', 'error'): 

163 d.pop(key, None) 

164 return d 

165 

166 @classmethod 

167 def from_dict(cls, d: dict) -> 'ModelEntry': 

168 """Deserialize from JSON dict, ignoring unknown keys.""" 

169 known = {f.name for f in cls.__dataclass_fields__.values()} 

170 filtered = {k: v for k, v in d.items() if k in known} 

171 return cls(**filtered) 

172 

173 def matches_compute(self, budget_vram_gb: float, budget_ram_gb: float, 

174 gpu_available: bool) -> str: 

175 """Check if this model can run given current compute. 

176 

177 Returns: 'gpu', 'cpu', 'cpu_offload', or 'impossible' 

178 """ 

179 if gpu_available and budget_vram_gb >= self.vram_gb: 

180 return 'gpu' 

181 if self.supports_cpu_offload and gpu_available and budget_vram_gb >= self.vram_gb * 0.5: 

182 return 'cpu_offload' 

183 if self.supports_cpu and budget_ram_gb >= self.ram_gb: 

184 return 'cpu' 

185 return 'impossible' 

186 

187 

188class ModelCatalog: 

189 """Central registry of all models across all subsystems. 

190 

191 JSON-persisted. Thread-safe for concurrent reads; write-locked for mutations. 

192 

193 Subsystem population is pluggable: call register_populator() to add 

194 a callback that discovers models from a subsystem (LLM presets, TTS engines, 

195 etc.). This avoids hard dependencies on application-layer modules. 

196 """ 

197 

198 def __init__(self, catalog_path: Optional[str] = None): 

199 try: 

200 from core.platform_paths import get_db_dir 

201 data_dir = Path(get_db_dir()) 

202 except ImportError: 

203 data_dir = Path.home() / 'Documents' / 'Nunba' / 'data' 

204 data_dir.mkdir(parents=True, exist_ok=True) 

205 self._path = Path(catalog_path) if catalog_path else data_dir / 'model_catalog.json' 

206 self._entries: Dict[str, ModelEntry] = {} 

207 self._lock = threading.Lock() 

208 self._dirty = False 

209 self._populators: List = [] # list of (name, callable) 

210 self._load() 

211 

212 # ── Populator registration ───────────────────────────────────── 

213 

214 def register_populator(self, name: str, fn) -> None: 

215 """Register a subsystem populator callback. 

216 

217 The callback receives the catalog as its only argument and should call 

218 catalog.register(entry, persist=False) for each model it discovers. 

219 It must return the count of new entries added. 

220 """ 

221 self._populators.append((name, fn)) 

222 

223 # ── CRUD ────────────────────────────────────────────────────── 

224 

225 def register(self, entry: ModelEntry, persist: bool = True) -> None: 

226 """Add or update a model entry.""" 

227 with self._lock: 

228 self._entries[entry.id] = entry 

229 self._dirty = True 

230 if persist: 

231 self._save() 

232 logger.info(f"Registered model: {entry.id} ({entry.model_type}, {entry.backend})") 

233 

234 def unregister(self, model_id: str, persist: bool = True) -> bool: 

235 """Remove a model entry. Returns True if found.""" 

236 with self._lock: 

237 removed = self._entries.pop(model_id, None) 

238 if removed: 

239 self._dirty = True 

240 if removed and persist: 

241 self._save() 

242 logger.info(f"Unregistered model: {model_id}") 

243 return removed is not None 

244 

245 def override(self, model_id: str, *, persist: bool = False, **fields) -> bool: 

246 """Apply field-level overrides to an already-registered entry. 

247 

248 Use this when one populator needs to narrow another populator's 

249 entry (e.g. Nunba's populate_media_gen amending HARTOS's fallback 

250 audio_gen-acestep surface). Unlike direct ``entry.field = value`` 

251 mutation, override() takes the catalog lock, validates field names 

252 against the ModelEntry dataclass, sets the dirty flag, and logs 

253 the change — so the single-writer semantics of register/unregister 

254 extend to cross-populator amendments. 

255 

256 Unknown fields raise ValueError. Returns False if model_id is not 

257 registered (no-op). Defaults to persist=False because overrides 

258 typically happen during populator boot (same convention as 

259 register(persist=False)). 

260 """ 

261 allowed = set(ModelEntry.__dataclass_fields__) 

262 unknown = [k for k in fields if k not in allowed] 

263 if unknown: 

264 raise ValueError( 

265 f"override(): unknown field(s) for ModelEntry: {sorted(unknown)}", 

266 ) 

267 with self._lock: 

268 entry = self._entries.get(model_id) 

269 if entry is None: 

270 return False 

271 for key, value in fields.items(): 

272 setattr(entry, key, value) 

273 self._dirty = True 

274 if persist: 

275 self._save() 

276 logger.info( 

277 f"Overrode model {model_id} fields: {sorted(fields.keys())}", 

278 ) 

279 return True 

280 

281 def get(self, model_id: str) -> Optional[ModelEntry]: 

282 """Get a model by ID.""" 

283 return self._entries.get(model_id) 

284 

285 def list_all(self) -> List[ModelEntry]: 

286 """All registered models.""" 

287 return list(self._entries.values()) 

288 

289 def list_types(self) -> List[str]: 

290 """All distinct model types that have at least one enabled entry.""" 

291 return list({e.model_type for e in self._entries.values() if e.enabled}) 

292 

293 def list_by_type(self, model_type: str) -> List[ModelEntry]: 

294 """All models of a given type (e.g. 'tts', 'llm').""" 

295 return [e for e in self._entries.values() 

296 if e.model_type == model_type and e.enabled] 

297 

298 def list_by_tag(self, tag: str) -> List[ModelEntry]: 

299 """All models with a given tag.""" 

300 return [e for e in self._entries.values() if tag in e.tags] 

301 

302 # ── Compute-aware selection ─────────────────────────────────── 

303 

304 def select_best(self, model_type: str, budget_vram_gb: float = 0, 

305 budget_ram_gb: float = 4, gpu_available: bool = False, 

306 language: Optional[str] = None, 

307 require_capability: Optional[Dict[str, Any]] = None, 

308 ) -> Optional[ModelEntry]: 

309 """Select the best model of a given type for current compute. 

310 

311 Selection priority: 

312 1. Filter by type + enabled + compute fit + capability tier 

313 2. If language specified, prefer models that serve it 

314 3. Sort by quality_score * speed_score * priority 

315 4. Return top pick 

316 """ 

317 candidates = self.list_by_type(model_type) 

318 

319 # Get current capability tier to enforce min_capability_tier 

320 current_tier = self._get_capability_tier() 

321 

322 # Filter by compute fit + capability tier 

323 scored = [] 

324 for entry in candidates: 

325 # Capability tier gate 

326 if not self._tier_sufficient(current_tier, entry.min_capability_tier): 

327 continue 

328 

329 # Already-loaded models always fit — they're using resources we 

330 # already allocated, so never skip them due to budget calculations 

331 if entry.loaded: 

332 fit = entry.device or 'cpu' 

333 else: 

334 fit = entry.matches_compute(budget_vram_gb, budget_ram_gb, gpu_available) 

335 if fit == 'impossible': 

336 continue 

337 

338 score = entry.quality_score * 100 + entry.priority 

339 

340 if fit == 'gpu': 

341 score += 200 

342 elif fit == 'cpu_offload': 

343 score += 100 

344 

345 if language and entry.languages: 

346 if language in entry.languages: 

347 lang_prio = entry.language_priority.get(language, 50) 

348 # Language preference is dominant — rank 0 (preferred engine 

349 # for this language) gets +300, rank 1 gets +270, default +150. 

350 # This ensures tts_router's LANG_ENGINE_PREFERENCE order wins 

351 # over small quality_score differences between engines. 

352 score += (300 - lang_prio * 3) 

353 else: 

354 score -= 500 

355 

356 if require_capability: 

357 cap_match = all( 

358 entry.capabilities.get(k) == v 

359 for k, v in require_capability.items() 

360 ) 

361 if not cap_match: 

362 continue 

363 

364 if entry.downloaded: 

365 score += 50 

366 

367 # Strongly prefer already-loaded models — avoids downloading a 

368 # second model when one of the same type is already running 

369 if entry.loaded: 

370 score += 1000 

371 

372 scored.append((score, fit, entry)) 

373 

374 if not scored: 

375 return None 

376 

377 scored.sort(key=lambda x: x[0], reverse=True) 

378 best_score, best_fit, best = scored[0] 

379 logger.info(f"Selected {best.id} ({best.model_type}) — " 

380 f"fit={best_fit}, score={best_score:.0f}") 

381 return best 

382 

383 def select_all_fitting(self, model_type: str, budget_vram_gb: float = 0, 

384 budget_ram_gb: float = 4, gpu_available: bool = False, 

385 ) -> List[tuple]: 

386 """Return all fitting models with their run modes, sorted by score.""" 

387 candidates = self.list_by_type(model_type) 

388 result = [] 

389 for entry in candidates: 

390 fit = entry.matches_compute(budget_vram_gb, budget_ram_gb, gpu_available) 

391 if fit != 'impossible': 

392 result.append((entry, fit)) 

393 result.sort(key=lambda x: x[0].quality_score * 100 + x[0].priority, reverse=True) 

394 return result 

395 

396 # ── State updates ───────────────────────────────────────────── 

397 

398 def mark_downloaded(self, model_id: str, downloaded: bool = True) -> None: 

399 entry = self._entries.get(model_id) 

400 if entry: 

401 entry.downloaded = downloaded 

402 

403 def mark_loaded(self, model_id: str, device: str = 'gpu') -> None: 

404 entry = self._entries.get(model_id) 

405 if entry: 

406 entry.loaded = True 

407 entry.device = device 

408 entry.active_since = time.time() 

409 entry.error = None 

410 

411 def mark_unloaded(self, model_id: str) -> None: 

412 entry = self._entries.get(model_id) 

413 if entry: 

414 entry.loaded = False 

415 entry.device = 'unloaded' 

416 entry.active_since = None 

417 

418 def mark_error(self, model_id: str, error: str) -> None: 

419 entry = self._entries.get(model_id) 

420 if entry: 

421 entry.error = error 

422 entry.loaded = False 

423 

424 # ── Purpose assignment ────────────────────────────────────── 

425 

426 # Universal purpose list — every task Nunba supports. A single model 

427 # can serve ANY combination (e.g. Qwen3.5-0.8B → ['draft','caption', 

428 # 'grounding']; Qwen3-4B-Omni → ['main','tts','stt']). 

429 # Purposes are NOT gated by model_type — model capabilities drive it, 

430 # not an artificial type label. 

431 ALL_PURPOSES: List[str] = [ 

432 'draft', # Fast classifier / speculative decode LLM 

433 'main', # Primary LLM for chat/reasoning 

434 'vision', # Image understanding (VLM — generic) 

435 'caption', # Image/video captioning (VLM) 

436 'grounding', # GUI element grounding (VLM — click targets) 

437 'tts', # Text-to-speech 

438 'stt', # Speech-to-text / ASR 

439 'diarization', # Speaker segmentation 

440 'vad', # Voice activity detection 

441 'embedding', # Text embeddings (retrieval, RAG) 

442 'rerank', # Cross-encoder reranking for retrieval 

443 'ocr', # Text extraction from images 

444 'music', # Music generation 

445 'image-gen', # Text-to-image 

446 'video-gen', # Text-to-video 

447 'translate', # Machine translation (when dedicated model) 

448 ] 

449 

450 def get_by_purpose(self, purpose: str) -> Optional[ModelEntry]: 

451 """Return the model assigned to *purpose*, or None.""" 

452 for entry in self._entries.values(): 

453 if purpose in entry.purposes and entry.enabled: 

454 return entry 

455 return None 

456 

457 def set_purpose(self, model_id: str, purpose: str, enabled: bool = True) -> bool: 

458 """Toggle a purpose on/off for a model. 

459 

460 When enabling: clears the same purpose from any other model 

461 (one model per purpose globally), then adds it. 

462 When disabling: removes the purpose from this model. 

463 Persists to disk. Returns True on success. 

464 """ 

465 with self._lock: 

466 entry = self._entries.get(model_id) 

467 if entry is None: 

468 return False 

469 if purpose not in self.ALL_PURPOSES: 

470 return False 

471 if enabled: 

472 # Clear the same purpose from any other model 

473 for other in self._entries.values(): 

474 if other.id != model_id and purpose in other.purposes: 

475 other.purposes = [p for p in other.purposes if p != purpose] 

476 if purpose not in entry.purposes: 

477 entry.purposes.append(purpose) 

478 else: 

479 entry.purposes = [p for p in entry.purposes if p != purpose] 

480 self._dirty = True 

481 self._save() 

482 logger.info(f"Model {model_id} purpose {purpose!r} {'enabled' if enabled else 'disabled'} " 

483 f"→ purposes={entry.purposes}") 

484 return True 

485 

486 # ── Auto-populate from registered subsystem populators ───────── 

487 

488 def populate_from_subsystems(self) -> int: 

489 """Run all registered populators + built-in STT/VLM entries. 

490 

491 Called on first run or when catalog is empty. Does NOT overwrite 

492 existing entries (user edits via admin UI are preserved). 

493 Returns number of new entries added. 

494 """ 

495 # Snapshot IDs BEFORE populator run so we can detect stale auto-entries 

496 ids_before = set(self._entries.keys()) 

497 

498 added = 0 

499 # Run application-registered populators (LLM, TTS, etc.) 

500 for name, fn in self._populators: 

501 try: 

502 count = fn(self) 

503 added += count 

504 if count: 

505 logger.info(f"Populator '{name}' added {count} entries") 

506 except Exception as e: 

507 logger.debug(f"Populator '{name}' failed: {e}") 

508 # Built-in entries that don't depend on application modules 

509 added += self._populate_tts_models() 

510 added += self._populate_stt_models() 

511 added += self._populate_vlm_models() 

512 added += self._populate_videogen_models() 

513 added += self._populate_audiogen_models() 

514 

515 # Cleanup: remove stale auto-entries that no populator emitted this boot. 

516 # An entry is "auto-populated" if its ID starts with a known prefix and 

517 # it wasn't modified by the user (no custom tags, no non-default purposes, 

518 # not pinned). Stale = prefix-matched but not re-registered this boot. 

519 ids_after = set(self._entries.keys()) 

520 touched_this_boot = ids_after - ids_before # new in this run 

521 # For entries that existed before AND still exist, populator.register() 

522 # would have overwritten them — so check timestamps on _entries that 

523 # weren't touched but have auto-populatable prefixes. 

524 AUTO_PREFIXES = ('tts-', 'stt-', 'vlm-', 'video_gen-', 'audio_gen-') 

525 stale = [] 

526 for eid, entry in list(self._entries.items()): 

527 if eid in touched_this_boot: 

528 continue # freshly registered this boot 

529 if not any(eid.startswith(p) for p in AUTO_PREFIXES): 

530 continue # not an auto-prefix (e.g. llm-* user-registered) 

531 if entry.pinned or entry.purposes or (entry.tags and set(entry.tags) - {'local', 'tts', 'stt', 'vision', 'cpu-friendly'}): 

532 continue # user customized — preserve 

533 stale.append(eid) 

534 

535 if stale: 

536 for eid in stale: 

537 self._entries.pop(eid, None) 

538 self._dirty = True 

539 logger.info(f"Cleaned {len(stale)} stale auto-entries: {stale}") 

540 

541 if added > 0 or stale: 

542 self._save() 

543 logger.info(f"Auto-populated {added} entries, cleaned {len(stale)} stale") 

544 return added 

545 

546 def _populate_tts_models(self) -> int: 

547 """Populate TTS engine entries from tts_router.ENGINE_REGISTRY. 

548 

549 Lazy-imports populate_tts_catalog to avoid circular imports at 

550 module load time. tts_router → model_catalog direction is only 

551 present inside function bodies (never at module scope). 

552 """ 

553 try: 

554 from integrations.channels.media.tts_router import populate_tts_catalog 

555 return populate_tts_catalog(self) 

556 except Exception as e: 

557 logger.debug(f"TTS catalog population skipped: {e}") 

558 return 0 

559 

560 def _populate_stt_models(self) -> int: 

561 """STT model entries — delegated to whisper_tool.populate_stt_catalog(). 

562 

563 whisper_tool is the single source of truth for STT model specs 

564 (engine names, VRAM thresholds, sherpa-onnx archive mappings). 

565 Falls back to a minimal inline set if whisper_tool is unavailable. 

566 """ 

567 try: 

568 from integrations.service_tools.whisper_tool import populate_stt_catalog 

569 return populate_stt_catalog(self) 

570 except Exception as e: 

571 logger.debug(f"STT catalog population via whisper_tool skipped: {e}") 

572 

573 # Minimal fallback (whisper_tool not yet importable at catalog init time) 

574 added = 0 

575 _fallback = [ 

576 ('stt-whisper-base', 'Whisper Base (faster-whisper)', 0.2, 0.5, 0.75, 0.9), 

577 ('stt-whisper-medium', 'Whisper Medium (faster-whisper)', 1.5, 2.0, 0.85, 0.7), 

578 ('stt-whisper-large', 'Whisper Large v3 (faster-whisper)', 3.0, 4.0, 0.93, 0.5), 

579 ] 

580 for mid, name, vram, ram, quality, speed in _fallback: 

581 if mid in self._entries: 

582 continue 

583 entry = ModelEntry( 

584 id=mid, name=name, model_type=ModelType.STT, 

585 source='huggingface', 

586 vram_gb=vram, ram_gb=ram, 

587 backend='torch', supports_gpu=vram > 0, supports_cpu=True, 

588 supports_cpu_offload=True, cpu_offload_method='torch_to_cpu', 

589 idle_timeout_s=300, 

590 capabilities={'realtime': True, 'diarization': False, 

591 'multilingual': True}, 

592 quality_score=quality, speed_score=speed, 

593 languages=['multilingual'], 

594 tags=['local', 'stt', 'cpu-friendly'], 

595 ) 

596 self.register(entry, persist=False) 

597 added += 1 

598 return added 

599 

600 def _populate_vlm_models(self) -> int: 

601 """VLM model entries — delegated to lightweight_backend.populate_vlm_catalog(). 

602 

603 lightweight_backend is the single source of truth for VLM backend names 

604 and hardware tier thresholds. Falls back to MiniCPM only if unavailable. 

605 """ 

606 try: 

607 from integrations.vision.lightweight_backend import populate_vlm_catalog 

608 return populate_vlm_catalog(self) 

609 except Exception as e: 

610 logger.debug(f"VLM catalog population via lightweight_backend skipped: {e}") 

611 

612 # Minimal fallback — MiniCPM only 

613 added = 0 

614 if 'vlm-minicpm-v2' not in self._entries: 

615 entry = ModelEntry( 

616 id='vlm-minicpm-v2', name='MiniCPM-V-2', # 4GB VRAM → standard tier 

617 model_type=ModelType.VLM, source='huggingface', 

618 repo_id='openbmb/MiniCPM-V-2', 

619 vram_gb=4.0, ram_gb=4.0, disk_gb=4.0, 

620 min_capability_tier='standard', # 4GB VRAM = standard, not full 

621 backend='sidecar', supports_gpu=True, supports_cpu=False, 

622 idle_timeout_s=900, 

623 capabilities={'image_input': True, 'video_input': False, 

624 'description_loop': True}, 

625 quality_score=0.8, speed_score=0.7, 

626 tags=['local', 'vision'], 

627 ) 

628 self.register(entry, persist=False) 

629 added += 1 

630 return added 

631 

632 def _populate_videogen_models(self) -> int: 

633 """Video generation model entries — delegated to media_agent.populate_videogen_catalog(). 

634 

635 media_agent is the single source of truth for video gen tool names 

636 and VRAM routing thresholds. Falls back to inline entries if unavailable. 

637 """ 

638 try: 

639 from integrations.service_tools.media_agent import populate_videogen_catalog 

640 return populate_videogen_catalog(self) 

641 except Exception as e: 

642 logger.debug(f"Video gen catalog population via media_agent skipped: {e}") 

643 

644 # Minimal fallback 

645 added = 0 

646 _fallback = [ 

647 ('video_gen-wan2gp', 'Wan2GP', 8.0, 12.0, 0.88, 0.65), 

648 ('video_gen-ltx2', 'LTX2', 4.0, 8.0, 0.75, 0.80), 

649 ] 

650 for mid, name, vram, ram, quality, speed in _fallback: 

651 if mid in self._entries: 

652 continue 

653 entry = ModelEntry( 

654 id=mid, name=name, model_type=ModelType.VIDEO_GEN, 

655 source='huggingface', 

656 vram_gb=vram, ram_gb=ram, 

657 backend='sidecar', supports_gpu=True, 

658 supports_cpu=(vram < 6), 

659 supports_cpu_offload=(vram < 6), 

660 idle_timeout_s=600, 

661 capabilities={'txt2vid': True, 'img2vid': False}, 

662 quality_score=quality, speed_score=speed, 

663 tags=['local', 'video_gen'], 

664 ) 

665 self.register(entry, persist=False) 

666 added += 1 

667 return added 

668 

669 def _populate_audiogen_models(self) -> int: 

670 """Audio/music generation entries — delegated to media_agent.populate_audiogen_catalog(). 

671 

672 media_agent is the single source of truth for audio gen tool names 

673 (ACE Step, DiffRhythm) and capability routing. Falls back to inline entries. 

674 Removes stale entries from previous catalog versions. 

675 """ 

676 # Clean up stale entries with no capabilities (from old catalog JSON) 

677 for old_id in list(self._entries.keys()): 

678 if old_id.startswith('audio_gen-') and not self._entries[old_id].capabilities: 

679 del self._entries[old_id] 

680 

681 try: 

682 from integrations.service_tools.media_agent import populate_audiogen_catalog 

683 return populate_audiogen_catalog(self) 

684 except Exception as e: 

685 logger.debug(f"Audio gen catalog population via media_agent skipped: {e}") 

686 

687 # Minimal fallback 

688 added = 0 

689 _fallback = [ 

690 ('audio_gen-acestep', 'ACE-Step 1.5', 6.0, 6.0, 0.85, 0.90), 

691 ('audio_gen-diffrhythm', 'DiffRhythm v1.2', 4.0, 4.0, 0.80, 0.75), 

692 ] 

693 for mid, name, vram, ram, quality, speed in _fallback: 

694 if mid in self._entries: 

695 continue 

696 entry = ModelEntry( 

697 id=mid, name=name, model_type=ModelType.AUDIO_GEN, 

698 source='huggingface', 

699 vram_gb=vram, ram_gb=ram, 

700 backend='sidecar', supports_gpu=True, 

701 supports_cpu=(vram < 5), 

702 supports_cpu_offload=(vram < 5), 

703 idle_timeout_s=600, 

704 capabilities={'music_gen': 'acestep' in mid, 

705 'singing': True, 'lyrics_input': True}, 

706 quality_score=quality, speed_score=speed, 

707 tags=['local', 'audio_gen'], 

708 ) 

709 self.register(entry, persist=False) 

710 added += 1 

711 return added 

712 

713 # ── Capability tier helpers ──────────────────────────────────── 

714 

715 _TIER_RANK = {'embedded': 0, 'observer': 1, 'lite': 2, 'standard': 3, 

716 'full': 4, 'compute_host': 5} 

717 

718 def _get_capability_tier(self) -> str: 

719 """Get current node capability tier, or 'full' as fallback.""" 

720 try: 

721 from security.system_requirements import get_tier_name, _capabilities 

722 tier_name = get_tier_name() 

723 if tier_name == 'embedded' and _capabilities is None: 

724 return 'full' 

725 return tier_name 

726 except ImportError: 

727 return 'full' 

728 

729 @classmethod 

730 def _tier_sufficient(cls, current: str, required: str) -> bool: 

731 """Check if current capability tier meets the model's minimum requirement.""" 

732 cur_rank = cls._TIER_RANK.get(current, 4) 

733 req_rank = cls._TIER_RANK.get(required, 0) 

734 return cur_rank >= req_rank 

735 

736 # ── Persistence ─────────────────────────────────────────────── 

737 

738 def _load(self) -> None: 

739 """Load catalog from JSON file. 

740 

741 On load, ALL entries have their ``loaded`` state cleared to False 

742 and ``device`` reset to 'unloaded'. This prevents stale 

743 "loaded" markers from a previous Nunba session from surviving 

744 across restarts — the old state claimed models were loaded even 

745 though the llama-server processes died with the previous session. 

746 ``ensure_loaded_async`` then trusted the stale state and skipped 

747 ``start_server()``, leaving the LLM down. See T21 #164. 

748 

749 ``downloaded`` is NOT cleared — model files persist on disk 

750 across restarts and the catalog's downloaded flag is still valid. 

751 """ 

752 if not self._path.exists(): 

753 logger.info(f"No catalog at {self._path} — will auto-populate on first use") 

754 return 

755 try: 

756 with open(self._path, 'r', encoding='utf-8') as f: 

757 data = json.load(f) 

758 _cleared = 0 

759 for d in data.get('models', []): 

760 try: 

761 entry = ModelEntry.from_dict(d) 

762 # Clear stale loaded state from previous session. 

763 # Processes don't survive restart; loaded markers must 

764 # not either. The eager-boot + ensure_loaded_async 

765 # paths will re-mark as loaded once the server is 

766 # actually alive and verified via /v1/models. 

767 if entry.loaded: 

768 entry.loaded = False 

769 entry.device = 'unloaded' 

770 entry.active_since = None 

771 _cleared += 1 

772 self._entries[entry.id] = entry 

773 except Exception as e: 

774 logger.warning(f"Skipped malformed catalog entry: {e}") 

775 if _cleared: 

776 logger.info( 

777 f"Loaded {len(self._entries)} models from catalog " 

778 f"(cleared {_cleared} stale loaded markers)") 

779 self._save() # Persist the cleared state 

780 else: 

781 logger.info(f"Loaded {len(self._entries)} models from catalog") 

782 except Exception as e: 

783 logger.error(f"Failed to load catalog: {e}") 

784 

785 def _save(self) -> None: 

786 """Persist catalog to JSON.""" 

787 with self._lock: 

788 data = { 

789 'version': 1, 

790 'updated_at': time.time(), 

791 'models': [e.to_dict() for e in self._entries.values()], 

792 } 

793 try: 

794 # Use unique temp file per save to prevent WinError 32 when 

795 # multiple populators call _save() concurrently. 

796 import tempfile 

797 fd, tmp_path = tempfile.mkstemp( 

798 suffix='.tmp', prefix='model_catalog_', 

799 dir=str(self._path.parent)) 

800 try: 

801 with os.fdopen(fd, 'w', encoding='utf-8') as f: 

802 json.dump(data, f, indent=2, ensure_ascii=False) 

803 Path(tmp_path).replace(self._path) 

804 except Exception: 

805 # Clean up temp file on failure 

806 try: 

807 os.unlink(tmp_path) 

808 except OSError: 

809 pass 

810 raise 

811 self._dirty = False 

812 except Exception as e: 

813 logger.error(f"Failed to save catalog: {e}") 

814 

815 def to_json(self) -> list: 

816 """Return all entries as JSON-safe list (for API responses).""" 

817 result = [] 

818 for entry in self._entries.values(): 

819 d = entry.to_dict() 

820 d['downloaded'] = entry.downloaded 

821 d['loaded'] = entry.loaded 

822 d['device'] = entry.device 

823 d['error'] = entry.error 

824 result.append(d) 

825 return result 

826 

827 

828# ── Singleton ───────────────────────────────────────────────────── 

829_catalog_instance: Optional[ModelCatalog] = None 

830_catalog_lock = threading.Lock() 

831 

832 

833def get_catalog() -> ModelCatalog: 

834 """Get or create the global ModelCatalog singleton.""" 

835 global _catalog_instance 

836 if _catalog_instance is None: 

837 with _catalog_lock: 

838 if _catalog_instance is None: 

839 _catalog_instance = ModelCatalog() 

840 if not _catalog_instance.list_all(): 

841 _catalog_instance.populate_from_subsystems() 

842 return _catalog_instance