Coverage for integrations / agent_engine / model_bus_service.py: 32.0%

416 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HART OS Model Bus Service — Unified AI Access for Every Application. 

3 

4The Model Bus is the OS-level abstraction that makes AI a native capability. 

5Any application — Linux, Android, Windows, or Web — can access any deployed 

6model through a single unified interface. 

7 

8Transports: 

9 - Unix socket: /run/hart/model-bus.sock (native Linux apps) 

10 - D-Bus: com.hart.ModelBus (desktop apps) 

11 - HTTP API: localhost:6790 (Android, Wine, Web apps) 

12 

13The bus routes to the best available backend: 

14 1. Local llama.cpp (LLM) 

15 2. Local MiniCPM (vision) 

16 3. Local Whisper (STT) / Pocket TTS (TTS) 

17 4. Compute mesh peers (same user's other devices) 

18 5. Remote HevolveAI/hivemind (world model) 

19""" 

20import json 

21import logging 

22import os 

23import socket 

24import threading 

25import time 

26 

27from core.port_registry import get_port 

28from integrations.service_tools.model_catalog import ModelType 

29from typing import Any, Dict, List, Optional 

30 

31logger = logging.getLogger('hevolve.model_bus') 

32 

33 

34# ─── Routing Status Publisher ───────────────────────────── 

35# Pushes conversational "Thinking" bubbles to the user's chat topic 

36# so the client shows real-time routing progress. 

37# Uses the same Crossbar channel the frontend already subscribes to. 

38 

39def _publish_routing_status(user_id: str, message: str, request_id: str = ''): 

40 """Push routing progress to user's UI via Crossbar thinking bubble. 

41 

42 The client (WebWorker/React Native) renders these as thinking indicators 

43 so the user sees: "Processing locally..." → "Checking hive network..." etc. 

44 """ 

45 from core.peer_link.crossbar_publish import publish_thinking_trace 

46 ok = publish_thinking_trace( 

47 text=message, 

48 user_id=user_id, 

49 request_id=request_id or '', 

50 bot_type='ComputeRouter', 

51 ) 

52 if ok: 

53 logger.debug( 

54 "ComputeRouter status published: user=%s msg=%r", 

55 user_id, message[:60] if message else '', 

56 ) 

57 elif user_id: 

58 # Helper returned False: HARTOS publish_async not yet resolvable. 

59 # Empty user_id is a clean no-op — don't log noise for that. 

60 logger.debug( 

61 "ComputeRouter status drop: user=%s msg=%r — " 

62 "HARTOS publish_async unresolvable (loader still init).", 

63 user_id, message[:60] if message else '', 

64 ) 

65 

66# ═══════════════════════════════════════════════════════════════ 

67# Model Bus Service 

68# ═══════════════════════════════════════════════════════════════ 

69 

70class ModelBusService: 

71 """Unified model access — any app, any model, any device.""" 

72 

73 # Backend health cache — avoids wasting seconds on dead backends. 

74 # Key = backend name, Value = (is_alive: bool, last_checked: float) 

75 # TTL: alive backends re-checked every 60s, dead backends use exponential 

76 # backoff (15s → 30s → 60s → 120s → 300s cap) to avoid CPU-hogging retries. 

77 _health_cache: Dict[str, tuple] = {} 

78 _health_lock = threading.Lock() # Guards _health_cache across threads 

79 _dead_backoff: Dict[str, float] = {} # backend_name → current dead TTL 

80 _ALIVE_TTL = 60.0 # Re-probe alive backends every 60s 

81 _DEAD_TTL_INIT = 15.0 # First dead retry after 15s 

82 _DEAD_TTL_MAX = 300.0 # Cap dead retry at 5 minutes 

83 _PROBE_TIMEOUT = 1.5 # Health probe: 1.5s max (not 10-60s) 

84 

85 def __init__( 

86 self, 

87 socket_path: str = '/run/hart/model-bus.sock', 

88 http_port: int = 6790, 

89 grpc_port: int = 6791, 

90 max_concurrent: int = 32, 

91 routing_strategy: str = 'speculative', 

92 llm_port: int = 8080, 

93 vision_port: int = 9891, 

94 backend_port: int = 6777, 

95 ): 

96 self.socket_path = socket_path 

97 self.http_port = http_port 

98 self.grpc_port = grpc_port 

99 self.max_concurrent = max_concurrent 

100 self.routing_strategy = routing_strategy 

101 self.llm_port = llm_port 

102 self.vision_port = vision_port 

103 self.backend_port = backend_port 

104 

105 self._backends: Dict[str, dict] = {} 

106 self._request_count = 0 

107 self._semaphore = threading.Semaphore(max_concurrent) 

108 self._lock = threading.Lock() 

109 self._running = False 

110 

111 logger.info( 

112 f"ModelBusService initialized: socket={socket_path}, " 

113 f"http={http_port}, strategy={routing_strategy}" 

114 ) 

115 

116 # ─── Fast Backend Health Check ──────────────────────────── 

117 

118 def _is_backend_alive(self, name: str, url: str, 

119 health_path: str = '/health') -> bool: 

120 """Check if backend is alive using cached health probe. 

121 

122 Returns instantly for cached results. Only probes when cache is stale. 

123 Dead backends use exponential backoff (15s → 300s cap) to avoid 

124 CPU-hogging retry storms when services aren't running. 

125 """ 

126 now = time.time() 

127 with self._health_lock: 

128 cached = self._health_cache.get(name) 

129 dead_ttl = self._dead_backoff.get(name, self._DEAD_TTL_INIT) 

130 if cached: 

131 is_alive, last_checked = cached 

132 ttl = self._ALIVE_TTL if is_alive else dead_ttl 

133 if now - last_checked < ttl: 

134 return is_alive 

135 

136 # Cache miss or stale — quick probe (outside lock to avoid blocking) 

137 try: 

138 from core.http_pool import pooled_get 

139 resp = pooled_get(f'{url}{health_path}', timeout=self._PROBE_TIMEOUT) 

140 alive = resp.status_code < 500 

141 except Exception: 

142 alive = False 

143 

144 with self._health_lock: 

145 self._health_cache[name] = (alive, now) 

146 if alive: 

147 # Reset backoff on recovery 

148 self._dead_backoff.pop(name, None) 

149 else: 

150 # Re-read current backoff inside lock (avoid stale local var race) 

151 current = self._dead_backoff.get(name, self._DEAD_TTL_INIT) 

152 new_ttl = min(current * 2, self._DEAD_TTL_MAX) 

153 self._dead_backoff[name] = new_ttl 

154 logger.debug("Backend %s at %s is DOWN (retry in %.0fs)", 

155 name, url, new_ttl) 

156 return alive 

157 

158 def _mark_backend_dead(self, name: str): 

159 """Mark a backend as dead after a request failure (instant skip next time).""" 

160 with self._health_lock: 

161 self._health_cache[name] = (False, time.time()) 

162 # Escalate exponential backoff 

163 current = self._dead_backoff.get(name, self._DEAD_TTL_INIT) 

164 self._dead_backoff[name] = min(current * 2, self._DEAD_TTL_MAX) 

165 

166 def _mark_backend_alive(self, name: str): 

167 """Mark backend alive after successful request.""" 

168 with self._health_lock: 

169 self._health_cache[name] = (True, time.time()) 

170 self._dead_backoff.pop(name, None) 

171 

172 # ─── Backend Discovery ─────────────────────────────────── 

173 

174 def discover_backends(self) -> Dict[str, dict]: 

175 """Discover all available model backends.""" 

176 from core.http_pool import pooled_get 

177 

178 backends = {} 

179 

180 # LLM (llama.cpp) 

181 try: 

182 resp = pooled_get( 

183 f'http://localhost:{self.llm_port}/health', timeout=3 

184 ) 

185 if resp.status_code == 200: 

186 backends['llm'] = { 

187 'type': ModelType.LLM, 

188 'url': f'http://localhost:{self.llm_port}', 

189 'status': 'ready', 

190 'local': True, 

191 } 

192 logger.info(f"Backend discovered: llm (port {self.llm_port})") 

193 except Exception: 

194 pass 

195 

196 # Vision (MiniCPM) 

197 try: 

198 resp = pooled_get( 

199 f'http://localhost:{self.vision_port}/health', timeout=3 

200 ) 

201 if resp.status_code == 200: 

202 backends['vision'] = { 

203 'type': 'vision', 

204 'url': f'http://localhost:{self.vision_port}', 

205 'status': 'ready', 

206 'local': True, 

207 } 

208 logger.info(f"Backend discovered: vision (port {self.vision_port})") 

209 except Exception: 

210 pass 

211 

212 # HART backend (for world model bridge) 

213 try: 

214 resp = pooled_get( 

215 f'http://localhost:{self.backend_port}/status', timeout=3 

216 ) 

217 if resp.status_code == 200: 

218 backends['backend'] = { 

219 'type': 'backend', 

220 'url': f'http://localhost:{self.backend_port}', 

221 'status': 'ready', 

222 'local': True, 

223 } 

224 logger.info(f"Backend discovered: backend (port {self.backend_port})") 

225 except Exception: 

226 pass 

227 

228 # Compute mesh peers 

229 try: 

230 resp = pooled_get(f'http://localhost:{get_port("mesh_relay")}/mesh/status', timeout=3) 

231 if resp.status_code == 200: 

232 mesh_data = resp.json() 

233 peer_count = mesh_data.get('peer_count', 0) 

234 if peer_count > 0: 

235 backends['mesh'] = { 

236 'type': 'mesh', 

237 'url': f'http://localhost:{get_port("mesh_relay")}', 

238 'status': 'ready', 

239 'local': False, 

240 'peers': peer_count, 

241 } 

242 logger.info(f"Backend discovered: mesh ({peer_count} peers)") 

243 except Exception: 

244 pass 

245 

246 self._backends = backends 

247 return backends 

248 

249 # ─── Inference Routing ─────────────────────────────────── 

250 

251 def infer( 

252 self, 

253 model_type: str = ModelType.LLM, 

254 prompt: str = '', 

255 options: Optional[Dict[str, Any]] = None, 

256 ) -> Dict[str, Any]: 

257 """Route inference to the best available backend. 

258 

259 Args: 

260 model_type: 'llm', 'vision', 'tts', 'stt', 'image_gen' 

261 prompt: The input text/query 

262 options: Additional options (image_path, voice, format, etc.) 

263 

264 Returns: 

265 Dict with 'response', 'model', 'backend', 'latency_ms' 

266 """ 

267 if not self._semaphore.acquire(timeout=30): 

268 return {'error': 'Model Bus overloaded — try again later'} 

269 

270 try: 

271 start_time = time.time() 

272 options = options or {} 

273 

274 # Apply guardrail check 

275 if not self._check_guardrails(prompt, model_type): 

276 return {'error': 'Request blocked by constitutional guardrails'} 

277 

278 # Route based on model type 

279 if model_type == ModelType.LLM: 

280 result = self._route_llm(prompt, options) 

281 elif model_type == 'vision': 

282 result = self._route_vision(prompt, options) 

283 elif model_type == ModelType.TTS: 

284 result = self._route_tts(prompt, options) 

285 elif model_type == ModelType.STT: 

286 result = self._route_stt(prompt, options) 

287 elif model_type == ModelType.VIDEO_GEN: 

288 result = self._route_video_gen(prompt, options) 

289 elif model_type == ModelType.IMAGE_GEN: 

290 result = self._route_image_gen(prompt, options) 

291 else: 

292 result = {'error': f'Unknown model type: {model_type}'} 

293 

294 latency_ms = int((time.time() - start_time) * 1000) 

295 result['latency_ms'] = latency_ms 

296 

297 with self._lock: 

298 self._request_count += 1 

299 

300 # Broadcast inference completion to EventBus 

301 try: 

302 from core.platform.events import emit_event 

303 emit_event('inference.completed', { 

304 'model_type': model_type, 

305 'model': result.get('model', ''), 

306 'backend': result.get('backend', ''), 

307 'latency_ms': latency_ms, 

308 'success': 'error' not in result, 

309 }) 

310 except Exception: 

311 pass 

312 

313 return result 

314 finally: 

315 self._semaphore.release() 

316 

317 def _route_llm(self, prompt: str, options: dict) -> dict: 

318 """Route LLM inference to best backend. 

319 

320 Uses health cache to skip dead backends instantly (0ms) instead of 

321 waiting 10-60s for timeout. Marks backends dead on failure so 

322 subsequent requests don't waste time on them either. 

323 """ 

324 from core.http_pool import pooled_post 

325 uid = options.get('user_id', '') 

326 rid = options.get('request_id', '') 

327 

328 # Build candidate list — skip backends we KNOW are dead (instant) 

329 backends_to_try = [] 

330 

331 if 'llm' in self._backends: 

332 url = self._backends['llm']['url'] 

333 if self._is_backend_alive('local_llm', url): 

334 backends_to_try.append(('local_llm', url)) 

335 else: 

336 logger.debug("Skipping local_llm (health cache: dead)") 

337 if 'mesh' in self._backends: 

338 url = self._backends['mesh']['url'] 

339 if self._is_backend_alive('mesh', url, '/health'): 

340 backends_to_try.append(('mesh', url)) 

341 if 'backend' in self._backends: 

342 url = self._backends['backend']['url'] 

343 if self._is_backend_alive('backend', url, '/status'): 

344 backends_to_try.append(('backend', url)) 

345 

346 if not backends_to_try: 

347 _publish_routing_status(uid, 

348 "Looking for an available AI backend...", rid) 

349 # Force re-discover (all cached as dead) 

350 self.discover_backends() 

351 # Rebuild from fresh discovery 

352 if 'llm' in self._backends: 

353 backends_to_try.append(('local_llm', self._backends['llm']['url'])) 

354 if 'mesh' in self._backends: 

355 backends_to_try.append(('mesh', self._backends['mesh']['url'])) 

356 if 'backend' in self._backends: 

357 backends_to_try.append(('backend', self._backends['backend']['url'])) 

358 if not backends_to_try: 

359 return {'error': 'No LLM backend available', 'response': None} 

360 

361 for backend_name, url in backends_to_try: 

362 try: 

363 if backend_name == 'local_llm': 

364 # llama.cpp OpenAI-compatible API 

365 resp = pooled_post( 

366 f'{url}/v1/chat/completions', 

367 json={ 

368 'model': 'local', 

369 'messages': [{'role': 'user', 'content': prompt}], 

370 'max_tokens': options.get('max_tokens', 512), 

371 }, 

372 timeout=options.get('timeout', 60), 

373 ) 

374 if resp.status_code == 200: 

375 self._mark_backend_alive('local_llm') 

376 data = resp.json() 

377 choices = data.get('choices', []) 

378 content = choices[0].get('message', {}).get('content', '') if choices else '' 

379 return { 

380 'response': content, 

381 'model': data.get('model', 'llama.cpp'), 

382 'backend': 'local_llm', 

383 } 

384 elif backend_name == 'mesh': 

385 _publish_routing_status(uid, 

386 'Routing to hive peer...', rid) 

387 # Compute mesh offload 

388 resp = pooled_post( 

389 f'{url}/mesh/infer', 

390 json={'model_type': ModelType.LLM, 'prompt': prompt}, 

391 timeout=options.get('timeout', 120), 

392 ) 

393 if resp.status_code == 200: 

394 self._mark_backend_alive('mesh') 

395 data = resp.json() 

396 return { 

397 'response': data.get('response', ''), 

398 'model': data.get('model', 'mesh_peer'), 

399 'backend': 'mesh', 

400 } 

401 elif backend_name == 'backend': 

402 _publish_routing_status(uid, 

403 'Routing to cloud backend...', rid) 

404 # HART backend (may route to cloud or world model) 

405 resp = pooled_post( 

406 f'{url}/chat', 

407 json={'prompt': prompt, 'user_id': 'model_bus', 'prompt_id': 'bus'}, 

408 timeout=options.get('timeout', 60), 

409 ) 

410 if resp.status_code == 200: 

411 self._mark_backend_alive('backend') 

412 data = resp.json() 

413 return { 

414 'response': data.get('response', str(data)), 

415 'model': 'hart_backend', 

416 'backend': 'backend', 

417 } 

418 except Exception as e: 

419 self._mark_backend_dead(backend_name) 

420 logger.warning(f"Backend {backend_name} failed (marked dead): {e}") 

421 continue 

422 

423 return {'error': 'All LLM backends failed', 'response': None} 

424 

425 def _route_vision(self, prompt: str, options: dict) -> dict: 

426 """Route vision inference.""" 

427 from core.http_pool import pooled_post 

428 

429 image_path = options.get('image_path', '') 

430 uid = options.get('user_id', '') 

431 rid = options.get('request_id', '') 

432 if not image_path: 

433 return {'error': 'image_path required for vision inference'} 

434 

435 if 'vision' in self._backends: 

436 try: 

437 with open(image_path, 'rb') as f: 

438 resp = pooled_post( 

439 f"{self._backends['vision']['url']}/describe", 

440 files={'image': f}, 

441 data={'prompt': prompt}, 

442 timeout=60, 

443 ) 

444 if resp.status_code == 200: 

445 return { 

446 'response': resp.json().get('description', ''), 

447 'model': 'minicpm', 

448 'backend': 'local_vision', 

449 } 

450 except Exception as e: 

451 logger.warning(f"Local vision failed: {e}") 

452 

453 # Fallback to mesh 

454 if 'mesh' in self._backends: 

455 _publish_routing_status(uid, 

456 'No local vision model — checking hive network...', rid) 

457 try: 

458 resp = pooled_post( 

459 f"{self._backends['mesh']['url']}/mesh/infer", 

460 json={'model_type': 'vision', 'prompt': prompt, 'image_path': image_path}, 

461 timeout=120, 

462 ) 

463 if resp.status_code == 200: 

464 return { 

465 'response': resp.json().get('response', ''), 

466 'model': 'mesh_vision', 

467 'backend': 'mesh', 

468 } 

469 except Exception: 

470 pass 

471 

472 _publish_routing_status(uid, 

473 "I can't analyze this image right now — no vision model available " 

474 "locally or on the hive. Try connecting a device with GPU.", rid) 

475 return {'error': 'No vision backend available', 'response': None} 

476 

477 def _route_tts(self, prompt: str, options: dict) -> dict: 

478 """Route TTS through smart TTS router (language-aware, GPU/hive/CPU). 

479 

480 Delegates to TTSRouter which considers language, GPU, VRAM, compute 

481 policy, hive peers, and urgency. Falls back to legacy chain if 

482 router is unavailable. 

483 """ 

484 uid = options.get('user_id', '') 

485 rid = options.get('request_id', '') 

486 try: 

487 from integrations.channels.media.tts_router import get_tts_router 

488 router = get_tts_router() 

489 result = router.synthesize( 

490 text=prompt, 

491 language=options.get('language'), 

492 voice=options.get('voice_audio') or options.get('voice'), 

493 source=options.get('source', 'agent_tool'), 

494 engine_override=options.get('engine'), 

495 ) 

496 if not result.error: 

497 return { 

498 'response': result.path, 

499 'model': f'{result.engine_id}', 

500 'backend': 'local_tts' if result.location == 'local' else result.location, 

501 'duration': result.duration, 

502 'latency_ms': result.latency_ms, 

503 'device': result.device, 

504 } 

505 logger.debug("TTS router failed: %s, falling back to legacy chain", result.error) 

506 except (ImportError, Exception) as e: 

507 logger.debug("TTS router unavailable (%s), using legacy chain", e) 

508 

509 # Legacy fallback: skip the full chain (luxtts→makeittalk→pocket) 

510 # which ignores language/GPU/policy. Go straight to pocket_tts 

511 # (guaranteed CPU, always available). The TTSRouter already handles 

512 # luxtts, makeittalk, and GPU engines with proper awareness. 

513 _publish_routing_status(uid, 'Generating speech (fallback)...', rid) 

514 return self._try_pocket_tts(prompt, options) 

515 

516 def _try_luxtts(self, prompt: str, options: dict) -> dict: 

517 """Try LuxTTS (48kHz, GPU/CPU, voice cloning).""" 

518 t0 = time.time() 

519 try: 

520 from integrations.service_tools.luxtts_tool import luxtts_synthesize 

521 voice = options.get('voice_audio') or options.get('voice') 

522 device = options.get('device') 

523 result = json.loads(luxtts_synthesize( 

524 prompt, voice_audio=voice, device=device, 

525 )) 

526 latency = (time.time() - t0) * 1000 

527 if 'error' in result: 

528 logger.info(f"LuxTTS unavailable: {result['error']}") 

529 return result 

530 return { 

531 'response': result.get('path', ''), 

532 'model': 'luxtts-48k', 

533 'backend': f"local_tts_{result.get('device', 'cpu')}", 

534 'voice': result.get('voice', ''), 

535 'duration': result.get('duration', 0), 

536 'sample_rate': 48000, 

537 'engine': 'luxtts', 

538 'rtf': result.get('rtf', 0), 

539 'realtime_factor': result.get('realtime_factor', 0), 

540 'latency_ms': round(latency, 1), 

541 } 

542 except ImportError: 

543 return {'error': 'LuxTTS not installed'} 

544 except Exception as e: 

545 logger.warning(f"LuxTTS failed: {e}") 

546 return {'error': f'LuxTTS error: {e}'} 

547 

548 def _try_makeittalk_tts(self, prompt: str, options: dict, base_url: str) -> dict: 

549 """Try MakeItTalk cloud TTS (POST /video-gen/ with text, no image = audio-only).""" 

550 import requests as http_requests 

551 t0 = time.time() 

552 try: 

553 voice = options.get('voice', 'af_bella') 

554 payload = { 

555 'text': prompt, 

556 'uid': options.get('user_id', 'model_bus'), 

557 'voiceName': voice, 

558 'kokuro': 'true', 

559 'audio_only': True, # hint: skip video pipeline 

560 } 

561 resp = http_requests.post( 

562 f'{base_url.rstrip("/")}/video-gen/', 

563 json=payload, 

564 timeout=options.get('timeout', 30), 

565 ) 

566 latency = (time.time() - t0) * 1000 

567 if resp.status_code == 200: 

568 data = resp.json() 

569 return { 

570 'response': data.get('audio_url', data.get('url', '')), 

571 'model': 'makeittalk-cloud', 

572 'backend': 'cloud_tts', 

573 'voice': voice, 

574 'engine': 'makeittalk', 

575 'latency_ms': round(latency, 1), 

576 } 

577 logger.warning("MakeItTalk returned %d: %s", resp.status_code, resp.text[:200]) 

578 return {'error': f'MakeItTalk HTTP {resp.status_code}'} 

579 except http_requests.ConnectionError: 

580 logger.info("MakeItTalk cloud connection refused at %s", base_url) 

581 return {'error': 'MakeItTalk connection failed'} 

582 except http_requests.Timeout: 

583 logger.info("MakeItTalk cloud timed out at %s", base_url) 

584 return {'error': 'MakeItTalk timeout'} 

585 except Exception as e: 

586 logger.warning("MakeItTalk cloud error: %s", e) 

587 return {'error': f'MakeItTalk error: {e}'} 

588 

589 def _try_pocket_tts(self, prompt: str, options: dict) -> dict: 

590 """Pocket TTS offline fallback (always available, CPU, zero cost).""" 

591 t0 = time.time() 

592 try: 

593 from integrations.service_tools.pocket_tts_tool import pocket_tts_synthesize 

594 voice = options.get('voice', 'alba') 

595 output_path = options.get('output_path') 

596 result = json.loads(pocket_tts_synthesize(prompt, voice, output_path)) 

597 latency = (time.time() - t0) * 1000 

598 if 'error' in result: 

599 return {'error': result['error'], 'response': None} 

600 return { 

601 'response': result.get('path', ''), 

602 'model': 'pocket-tts-100m', 

603 'backend': 'local_tts', 

604 'voice': result.get('voice', voice), 

605 'duration': result.get('duration', 0), 

606 'engine': result.get('engine', 'pocket-tts'), 

607 'latency_ms': round(latency, 1), 

608 } 

609 except ImportError: 

610 return {'error': 'Pocket TTS not available (pip install pocket-tts)', 'response': None} 

611 except Exception as e: 

612 return {'error': f'TTS inference failed: {e}', 'response': None} 

613 

614 def _route_stt(self, prompt: str, options: dict) -> dict: 

615 """Route speech-to-text inference via Whisper (sherpa-onnx / openai-whisper).""" 

616 t0 = time.time() 

617 try: 

618 from integrations.service_tools.whisper_tool import whisper_transcribe 

619 audio_path = options.get('audio_path', prompt) 

620 language = options.get('language') 

621 result = json.loads(whisper_transcribe(audio_path, language)) 

622 latency = (time.time() - t0) * 1000 

623 if 'error' in result: 

624 return {'error': result['error'], 'response': None} 

625 return { 

626 'response': result.get('text', ''), 

627 'model': 'whisper-stt-local', 

628 'backend': 'local_stt', 

629 'language': result.get('language', 'auto'), 

630 'latency_ms': round(latency, 1), 

631 } 

632 except ImportError: 

633 return {'error': 'Whisper STT not available (pip install sherpa-onnx)', 'response': None} 

634 except Exception as e: 

635 return {'error': f'STT inference failed: {e}', 'response': None} 

636 

637 def _route_video_gen(self, prompt: str, options: dict) -> dict: 

638 """Route video generation: local GPU → hive mesh peer → cloud fallback. 

639 

640 For no-GPU central instances, this offloads to a hive peer that has 

641 a GPU with LTX-2, ComfyUI, or Wan2GP loaded. 

642 

643 Publishes routing status to the user's chat topic so the UI shows 

644 real-time progress ("Generating video locally..." → "Checking hive..."). 

645 """ 

646 from core.http_pool import pooled_post 

647 

648 model = options.get('model', 'ltx2') 

649 timeout = options.get('timeout', 300) 

650 uid = options.get('user_id', '') 

651 rid = options.get('request_id', '') 

652 

653 # 1. Try local GPU servers — skip instantly if health cache says dead 

654 local_servers = [ 

655 ('ltx2', f"http://localhost:{get_port('ltx2_server', 5002)}"), 

656 ('comfyui', f"http://localhost:{get_port('comfyui', 8188)}"), 

657 ] 

658 for name, local_url in local_servers: 

659 if not self._is_backend_alive(f'video_{name}', local_url): 

660 continue # 0ms — skip dead server 

661 _publish_routing_status(uid, 'Generating video on this device...', rid) 

662 try: 

663 resp = pooled_post( 

664 f'{local_url}/generate', 

665 json={'prompt': prompt, 'model': model}, 

666 timeout=timeout, 

667 ) 

668 if resp.status_code == 200: 

669 self._mark_backend_alive(f'video_{name}') 

670 data = resp.json() 

671 video_url = data.get('video_url') or data.get('output_url', '') 

672 return { 

673 'response': video_url, 

674 'model': model, 

675 'backend': 'local_gpu', 

676 } 

677 except Exception: 

678 self._mark_backend_dead(f'video_{name}') 

679 continue 

680 

681 # 2. Try hive mesh peer with GPU (central has no GPU → offload) 

682 _publish_routing_status(uid, 

683 'No local GPU available. Checking hive network for a device with GPU...', rid) 

684 try: 

685 from integrations.agent_engine.compute_config import get_compute_policy 

686 policy = get_compute_policy() 

687 if policy.get('compute_policy') != 'local_only': 

688 from integrations.agent_engine.compute_mesh_service import get_compute_mesh 

689 mesh = get_compute_mesh() 

690 result = mesh.offload_to_best_peer( 

691 model_type=ModelType.VIDEO_GEN, 

692 prompt=prompt, 

693 options={**options, 'timeout': timeout}, 

694 ) 

695 if result and 'error' not in result: 

696 peer = result.get('offloaded_to', 'peer') 

697 _publish_routing_status(uid, 

698 f'Video generation running on hive peer {peer}...', rid) 

699 result['backend'] = 'hive_peer' 

700 return result 

701 logger.info("Hive mesh video offload: %s", 

702 result.get('error', 'no result')) 

703 except Exception as e: 

704 logger.debug("Hive mesh video offload unavailable: %s", e) 

705 

706 # 3. Cloud fallback (MakeItTalk or external service) — skip if known dead 

707 makeittalk_url = os.environ.get('MAKEITTALK_API_URL') 

708 if makeittalk_url and self._is_backend_alive( 

709 'makeittalk_cloud', makeittalk_url, '/health'): 

710 _publish_routing_status(uid, 

711 'No hive peers with GPU. Sending to cloud service...', rid) 

712 try: 

713 resp = pooled_post( 

714 f'{makeittalk_url.rstrip("/")}/video-gen/', 

715 json={ 

716 'text': prompt, 

717 'uid': options.get('user_id', 'model_bus'), 

718 **{k: v for k, v in options.items() 

719 if k in ('avatar_id', 'image_url', 'voice_id')}, 

720 }, 

721 timeout=min(timeout, 60), 

722 ) 

723 if resp.status_code == 200: 

724 self._mark_backend_alive('makeittalk_cloud') 

725 data = resp.json() 

726 return { 

727 'response': data.get('video_url', data.get('url', '')), 

728 'model': 'makeittalk-cloud', 

729 'backend': 'cloud', 

730 } 

731 except Exception as e: 

732 self._mark_backend_dead('makeittalk_cloud') 

733 logger.warning("MakeItTalk video cloud (marked dead): %s", e) 

734 

735 # All paths exhausted — conversational fallback 

736 _publish_routing_status(uid, 

737 "I wasn't able to generate the video right now. " 

738 "There's no GPU available on this server, no hive peers are online " 

739 "with a free GPU, and the cloud video service didn't respond. " 

740 "You can try again shortly, or connect a GPU device to your hive " 

741 "with: hart compute pair <device-address>", rid) 

742 return {'error': 'No video generation backend available', 

743 'response': ("I couldn't generate the video — no GPU is available " 

744 "locally or on the hive network right now. " 

745 "Try again in a moment or pair a GPU device.")} 

746 

747 def _route_image_gen(self, prompt: str, options: dict) -> dict: 

748 """Route image generation inference.""" 

749 return { 

750 'response': 'Image generation not yet implemented', 

751 'model': 'none', 

752 'backend': 'stub', 

753 } 

754 

755 # ─── Guardrail Gate ────────────────────────────────────── 

756 

757 def _check_guardrails(self, prompt: str, model_type: str) -> bool: 

758 """Apply constitutional guardrail check to request.""" 

759 try: 

760 from security.hive_guardrails import ConstitutionalFilter 

761 approved, reason = ConstitutionalFilter.check_prompt(prompt) 

762 if not approved: 

763 logger.warning(f"Guardrail blocked {model_type} request: {reason}") 

764 return False 

765 except ImportError: 

766 pass # Guardrails not available — allow request 

767 return True 

768 

769 # ─── Model Listing ─────────────────────────────────────── 

770 

771 def list_models(self) -> List[Dict[str, Any]]: 

772 """List all available models across all backends.""" 

773 models = [] 

774 

775 if 'llm' in self._backends: 

776 models.append({ 

777 'id': 'local-llm', 

778 'type': ModelType.LLM, 

779 'backend': 'llama.cpp', 

780 'local': True, 

781 'status': 'ready', 

782 }) 

783 

784 if 'vision' in self._backends: 

785 models.append({ 

786 'id': 'local-vision', 

787 'type': 'vision', 

788 'backend': 'minicpm', 

789 'local': True, 

790 'status': 'ready', 

791 }) 

792 

793 if 'mesh' in self._backends: 

794 models.append({ 

795 'id': 'mesh-models', 

796 'type': 'multiple', 

797 'backend': 'compute_mesh', 

798 'local': False, 

799 'status': 'ready', 

800 'peers': self._backends['mesh'].get('peers', 0), 

801 }) 

802 

803 # TTS — LuxTTS (if installed, GPU/CPU, 48kHz voice cloning) 

804 try: 

805 from zipvoice.luxvoice import LuxTTS as _LuxCheck # noqa: F401 

806 import torch as _t 

807 _lux_device = 'cuda' if _t.cuda.is_available() else 'cpu' 

808 models.append({ 

809 'id': 'luxtts-48k', 

810 'type': ModelType.TTS, 

811 'backend': 'luxtts', 

812 'local': True, 

813 'status': 'ready', 

814 'device': _lux_device, 

815 'features': ['voice_cloning', '48khz', 'gpu_accelerated', 'offline'], 

816 }) 

817 except ImportError: 

818 pass 

819 

820 # TTS — MakeItTalk cloud (if configured) 

821 makeittalk_url = os.environ.get('MAKEITTALK_API_URL') 

822 if makeittalk_url: 

823 models.append({ 

824 'id': 'makeittalk-cloud', 

825 'type': ModelType.TTS, 

826 'backend': 'makeittalk', 

827 'local': False, 

828 'status': 'ready', 

829 'url': makeittalk_url, 

830 'features': ['video_gen', 'lip_sync', 'multi_voice', 'multi_language'], 

831 }) 

832 

833 # TTS — Pocket TTS (always local, CPU — fallback when cloud unavailable) 

834 models.append({ 

835 'id': 'pocket-tts-100m', 

836 'type': ModelType.TTS, 

837 'backend': 'pocket_tts', 

838 'local': True, 

839 'status': 'ready', 

840 'features': ['voice_cloning', 'zero_shot', 'offline'], 

841 }) 

842 

843 # STT — Whisper (always local, sherpa-onnx or openai-whisper) 

844 models.append({ 

845 'id': 'whisper-stt-local', 

846 'type': ModelType.STT, 

847 'backend': 'whisper', 

848 'local': True, 

849 'status': 'ready', 

850 'features': ['multilingual', 'offline'], 

851 }) 

852 

853 return models 

854 

855 def get_status(self) -> Dict[str, Any]: 

856 """Get Model Bus status.""" 

857 return { 

858 'status': 'running' if self._running else 'stopped', 

859 'backends': {k: v.get('status') for k, v in self._backends.items()}, 

860 'backend_count': len(self._backends), 

861 'request_count': self._request_count, 

862 'max_concurrent': self.max_concurrent, 

863 'routing_strategy': self.routing_strategy, 

864 } 

865 

866 # ─── HTTP Server ───────────────────────────────────────── 

867 

868 def _create_flask_app(self): 

869 """Create Flask app for HTTP API.""" 

870 from flask import Flask, request, jsonify 

871 

872 app = Flask(__name__) 

873 

874 @app.route('/v1/chat', methods=['POST']) 

875 def chat(): 

876 data = request.get_json(force=True) 

877 result = self.infer( 

878 model_type=ModelType.LLM, 

879 prompt=data.get('prompt', ''), 

880 options=data, 

881 ) 

882 return jsonify(result) 

883 

884 @app.route('/v1/vision', methods=['POST']) 

885 def vision(): 

886 prompt = request.form.get('prompt', 'Describe this image') 

887 image = request.files.get('image') 

888 if image: 

889 import tempfile 

890 with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as f: 

891 image.save(f) 

892 result = self.infer('vision', prompt, {'image_path': f.name}) 

893 else: 

894 result = {'error': 'No image provided'} 

895 return jsonify(result) 

896 

897 @app.route('/v1/tts', methods=['POST']) 

898 def tts(): 

899 data = request.get_json(force=True) 

900 result = self.infer(ModelType.TTS, data.get('text', ''), data) 

901 return jsonify(result) 

902 

903 @app.route('/v1/stt', methods=['POST']) 

904 def stt(): 

905 audio = request.files.get('audio') 

906 if audio: 

907 import tempfile 

908 with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as f: 

909 audio.save(f) 

910 result = self.infer(ModelType.STT, '', {'audio_path': f.name}) 

911 else: 

912 result = {'error': 'No audio provided'} 

913 return jsonify(result) 

914 

915 @app.route('/v1/models', methods=['GET']) 

916 def models(): 

917 return jsonify({'models': self.list_models()}) 

918 

919 @app.route('/v1/status', methods=['GET']) 

920 def status(): 

921 return jsonify(self.get_status()) 

922 

923 @app.route('/v1/prefetch', methods=['POST']) 

924 def prefetch(): 

925 data = request.get_json(force=True) 

926 model_type = data.get('model_type', ModelType.LLM) 

927 # Prefetch is a hint — trigger backend warmup 

928 logger.info(f"Prefetch request for {model_type}") 

929 return jsonify({'status': 'prefetch_acknowledged', 'model_type': model_type}) 

930 

931 @app.route('/health', methods=['GET']) 

932 def health(): 

933 return jsonify({'status': 'ok', 'service': 'model-bus'}) 

934 

935 return app 

936 

937 # ─── Serve ─────────────────────────────────────────────── 

938 

939 def serve_forever(self): 

940 """Start the Model Bus service (HTTP + Unix socket).""" 

941 self._running = True 

942 

943 # Initial backend discovery 

944 self.discover_backends() 

945 

946 # Background: periodic backend re-discovery 

947 def _rediscover_loop(): 

948 while self._running: 

949 time.sleep(30) 

950 try: 

951 self.discover_backends() 

952 except Exception as e: 

953 logger.error(f"Backend discovery error: {e}") 

954 

955 discovery_thread = threading.Thread(target=_rediscover_loop, daemon=True) 

956 discovery_thread.start() 

957 

958 # Start Flask HTTP server 

959 app = self._create_flask_app() 

960 logger.info(f"Model Bus HTTP API starting on port {self.http_port}") 

961 

962 try: 

963 from waitress import serve 

964 serve(app, host='0.0.0.0', port=self.http_port, threads=8) 

965 except ImportError: 

966 app.run(host='0.0.0.0', port=self.http_port, threaded=True) 

967 

968 

969def start_dbus_bridge(): 

970 """Start D-Bus bridge for com.hart.ModelBus (called via D-Bus activation).""" 

971 logger.info("D-Bus bridge for com.hart.ModelBus — delegating to HTTP API") 

972 # D-Bus bridge is a thin wrapper — actual logic in HTTP service 

973 # This avoids running two Python processes 

974 import time 

975 while True: 

976 time.sleep(3600)