Coverage for integrations / agent_engine / model_bus_service.py: 32.0%
416 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HART OS Model Bus Service — Unified AI Access for Every Application.
4The Model Bus is the OS-level abstraction that makes AI a native capability.
5Any application — Linux, Android, Windows, or Web — can access any deployed
6model through a single unified interface.
8Transports:
9 - Unix socket: /run/hart/model-bus.sock (native Linux apps)
10 - D-Bus: com.hart.ModelBus (desktop apps)
11 - HTTP API: localhost:6790 (Android, Wine, Web apps)
13The bus routes to the best available backend:
14 1. Local llama.cpp (LLM)
15 2. Local MiniCPM (vision)
16 3. Local Whisper (STT) / Pocket TTS (TTS)
17 4. Compute mesh peers (same user's other devices)
18 5. Remote HevolveAI/hivemind (world model)
19"""
20import json
21import logging
22import os
23import socket
24import threading
25import time
27from core.port_registry import get_port
28from integrations.service_tools.model_catalog import ModelType
29from typing import Any, Dict, List, Optional
31logger = logging.getLogger('hevolve.model_bus')
34# ─── Routing Status Publisher ─────────────────────────────
35# Pushes conversational "Thinking" bubbles to the user's chat topic
36# so the client shows real-time routing progress.
37# Uses the same Crossbar channel the frontend already subscribes to.
39def _publish_routing_status(user_id: str, message: str, request_id: str = ''):
40 """Push routing progress to user's UI via Crossbar thinking bubble.
42 The client (WebWorker/React Native) renders these as thinking indicators
43 so the user sees: "Processing locally..." → "Checking hive network..." etc.
44 """
45 from core.peer_link.crossbar_publish import publish_thinking_trace
46 ok = publish_thinking_trace(
47 text=message,
48 user_id=user_id,
49 request_id=request_id or '',
50 bot_type='ComputeRouter',
51 )
52 if ok:
53 logger.debug(
54 "ComputeRouter status published: user=%s msg=%r",
55 user_id, message[:60] if message else '',
56 )
57 elif user_id:
58 # Helper returned False: HARTOS publish_async not yet resolvable.
59 # Empty user_id is a clean no-op — don't log noise for that.
60 logger.debug(
61 "ComputeRouter status drop: user=%s msg=%r — "
62 "HARTOS publish_async unresolvable (loader still init).",
63 user_id, message[:60] if message else '',
64 )
66# ═══════════════════════════════════════════════════════════════
67# Model Bus Service
68# ═══════════════════════════════════════════════════════════════
70class ModelBusService:
71 """Unified model access — any app, any model, any device."""
73 # Backend health cache — avoids wasting seconds on dead backends.
74 # Key = backend name, Value = (is_alive: bool, last_checked: float)
75 # TTL: alive backends re-checked every 60s, dead backends use exponential
76 # backoff (15s → 30s → 60s → 120s → 300s cap) to avoid CPU-hogging retries.
77 _health_cache: Dict[str, tuple] = {}
78 _health_lock = threading.Lock() # Guards _health_cache across threads
79 _dead_backoff: Dict[str, float] = {} # backend_name → current dead TTL
80 _ALIVE_TTL = 60.0 # Re-probe alive backends every 60s
81 _DEAD_TTL_INIT = 15.0 # First dead retry after 15s
82 _DEAD_TTL_MAX = 300.0 # Cap dead retry at 5 minutes
83 _PROBE_TIMEOUT = 1.5 # Health probe: 1.5s max (not 10-60s)
85 def __init__(
86 self,
87 socket_path: str = '/run/hart/model-bus.sock',
88 http_port: int = 6790,
89 grpc_port: int = 6791,
90 max_concurrent: int = 32,
91 routing_strategy: str = 'speculative',
92 llm_port: int = 8080,
93 vision_port: int = 9891,
94 backend_port: int = 6777,
95 ):
96 self.socket_path = socket_path
97 self.http_port = http_port
98 self.grpc_port = grpc_port
99 self.max_concurrent = max_concurrent
100 self.routing_strategy = routing_strategy
101 self.llm_port = llm_port
102 self.vision_port = vision_port
103 self.backend_port = backend_port
105 self._backends: Dict[str, dict] = {}
106 self._request_count = 0
107 self._semaphore = threading.Semaphore(max_concurrent)
108 self._lock = threading.Lock()
109 self._running = False
111 logger.info(
112 f"ModelBusService initialized: socket={socket_path}, "
113 f"http={http_port}, strategy={routing_strategy}"
114 )
116 # ─── Fast Backend Health Check ────────────────────────────
118 def _is_backend_alive(self, name: str, url: str,
119 health_path: str = '/health') -> bool:
120 """Check if backend is alive using cached health probe.
122 Returns instantly for cached results. Only probes when cache is stale.
123 Dead backends use exponential backoff (15s → 300s cap) to avoid
124 CPU-hogging retry storms when services aren't running.
125 """
126 now = time.time()
127 with self._health_lock:
128 cached = self._health_cache.get(name)
129 dead_ttl = self._dead_backoff.get(name, self._DEAD_TTL_INIT)
130 if cached:
131 is_alive, last_checked = cached
132 ttl = self._ALIVE_TTL if is_alive else dead_ttl
133 if now - last_checked < ttl:
134 return is_alive
136 # Cache miss or stale — quick probe (outside lock to avoid blocking)
137 try:
138 from core.http_pool import pooled_get
139 resp = pooled_get(f'{url}{health_path}', timeout=self._PROBE_TIMEOUT)
140 alive = resp.status_code < 500
141 except Exception:
142 alive = False
144 with self._health_lock:
145 self._health_cache[name] = (alive, now)
146 if alive:
147 # Reset backoff on recovery
148 self._dead_backoff.pop(name, None)
149 else:
150 # Re-read current backoff inside lock (avoid stale local var race)
151 current = self._dead_backoff.get(name, self._DEAD_TTL_INIT)
152 new_ttl = min(current * 2, self._DEAD_TTL_MAX)
153 self._dead_backoff[name] = new_ttl
154 logger.debug("Backend %s at %s is DOWN (retry in %.0fs)",
155 name, url, new_ttl)
156 return alive
158 def _mark_backend_dead(self, name: str):
159 """Mark a backend as dead after a request failure (instant skip next time)."""
160 with self._health_lock:
161 self._health_cache[name] = (False, time.time())
162 # Escalate exponential backoff
163 current = self._dead_backoff.get(name, self._DEAD_TTL_INIT)
164 self._dead_backoff[name] = min(current * 2, self._DEAD_TTL_MAX)
166 def _mark_backend_alive(self, name: str):
167 """Mark backend alive after successful request."""
168 with self._health_lock:
169 self._health_cache[name] = (True, time.time())
170 self._dead_backoff.pop(name, None)
172 # ─── Backend Discovery ───────────────────────────────────
174 def discover_backends(self) -> Dict[str, dict]:
175 """Discover all available model backends."""
176 from core.http_pool import pooled_get
178 backends = {}
180 # LLM (llama.cpp)
181 try:
182 resp = pooled_get(
183 f'http://localhost:{self.llm_port}/health', timeout=3
184 )
185 if resp.status_code == 200:
186 backends['llm'] = {
187 'type': ModelType.LLM,
188 'url': f'http://localhost:{self.llm_port}',
189 'status': 'ready',
190 'local': True,
191 }
192 logger.info(f"Backend discovered: llm (port {self.llm_port})")
193 except Exception:
194 pass
196 # Vision (MiniCPM)
197 try:
198 resp = pooled_get(
199 f'http://localhost:{self.vision_port}/health', timeout=3
200 )
201 if resp.status_code == 200:
202 backends['vision'] = {
203 'type': 'vision',
204 'url': f'http://localhost:{self.vision_port}',
205 'status': 'ready',
206 'local': True,
207 }
208 logger.info(f"Backend discovered: vision (port {self.vision_port})")
209 except Exception:
210 pass
212 # HART backend (for world model bridge)
213 try:
214 resp = pooled_get(
215 f'http://localhost:{self.backend_port}/status', timeout=3
216 )
217 if resp.status_code == 200:
218 backends['backend'] = {
219 'type': 'backend',
220 'url': f'http://localhost:{self.backend_port}',
221 'status': 'ready',
222 'local': True,
223 }
224 logger.info(f"Backend discovered: backend (port {self.backend_port})")
225 except Exception:
226 pass
228 # Compute mesh peers
229 try:
230 resp = pooled_get(f'http://localhost:{get_port("mesh_relay")}/mesh/status', timeout=3)
231 if resp.status_code == 200:
232 mesh_data = resp.json()
233 peer_count = mesh_data.get('peer_count', 0)
234 if peer_count > 0:
235 backends['mesh'] = {
236 'type': 'mesh',
237 'url': f'http://localhost:{get_port("mesh_relay")}',
238 'status': 'ready',
239 'local': False,
240 'peers': peer_count,
241 }
242 logger.info(f"Backend discovered: mesh ({peer_count} peers)")
243 except Exception:
244 pass
246 self._backends = backends
247 return backends
249 # ─── Inference Routing ───────────────────────────────────
251 def infer(
252 self,
253 model_type: str = ModelType.LLM,
254 prompt: str = '',
255 options: Optional[Dict[str, Any]] = None,
256 ) -> Dict[str, Any]:
257 """Route inference to the best available backend.
259 Args:
260 model_type: 'llm', 'vision', 'tts', 'stt', 'image_gen'
261 prompt: The input text/query
262 options: Additional options (image_path, voice, format, etc.)
264 Returns:
265 Dict with 'response', 'model', 'backend', 'latency_ms'
266 """
267 if not self._semaphore.acquire(timeout=30):
268 return {'error': 'Model Bus overloaded — try again later'}
270 try:
271 start_time = time.time()
272 options = options or {}
274 # Apply guardrail check
275 if not self._check_guardrails(prompt, model_type):
276 return {'error': 'Request blocked by constitutional guardrails'}
278 # Route based on model type
279 if model_type == ModelType.LLM:
280 result = self._route_llm(prompt, options)
281 elif model_type == 'vision':
282 result = self._route_vision(prompt, options)
283 elif model_type == ModelType.TTS:
284 result = self._route_tts(prompt, options)
285 elif model_type == ModelType.STT:
286 result = self._route_stt(prompt, options)
287 elif model_type == ModelType.VIDEO_GEN:
288 result = self._route_video_gen(prompt, options)
289 elif model_type == ModelType.IMAGE_GEN:
290 result = self._route_image_gen(prompt, options)
291 else:
292 result = {'error': f'Unknown model type: {model_type}'}
294 latency_ms = int((time.time() - start_time) * 1000)
295 result['latency_ms'] = latency_ms
297 with self._lock:
298 self._request_count += 1
300 # Broadcast inference completion to EventBus
301 try:
302 from core.platform.events import emit_event
303 emit_event('inference.completed', {
304 'model_type': model_type,
305 'model': result.get('model', ''),
306 'backend': result.get('backend', ''),
307 'latency_ms': latency_ms,
308 'success': 'error' not in result,
309 })
310 except Exception:
311 pass
313 return result
314 finally:
315 self._semaphore.release()
317 def _route_llm(self, prompt: str, options: dict) -> dict:
318 """Route LLM inference to best backend.
320 Uses health cache to skip dead backends instantly (0ms) instead of
321 waiting 10-60s for timeout. Marks backends dead on failure so
322 subsequent requests don't waste time on them either.
323 """
324 from core.http_pool import pooled_post
325 uid = options.get('user_id', '')
326 rid = options.get('request_id', '')
328 # Build candidate list — skip backends we KNOW are dead (instant)
329 backends_to_try = []
331 if 'llm' in self._backends:
332 url = self._backends['llm']['url']
333 if self._is_backend_alive('local_llm', url):
334 backends_to_try.append(('local_llm', url))
335 else:
336 logger.debug("Skipping local_llm (health cache: dead)")
337 if 'mesh' in self._backends:
338 url = self._backends['mesh']['url']
339 if self._is_backend_alive('mesh', url, '/health'):
340 backends_to_try.append(('mesh', url))
341 if 'backend' in self._backends:
342 url = self._backends['backend']['url']
343 if self._is_backend_alive('backend', url, '/status'):
344 backends_to_try.append(('backend', url))
346 if not backends_to_try:
347 _publish_routing_status(uid,
348 "Looking for an available AI backend...", rid)
349 # Force re-discover (all cached as dead)
350 self.discover_backends()
351 # Rebuild from fresh discovery
352 if 'llm' in self._backends:
353 backends_to_try.append(('local_llm', self._backends['llm']['url']))
354 if 'mesh' in self._backends:
355 backends_to_try.append(('mesh', self._backends['mesh']['url']))
356 if 'backend' in self._backends:
357 backends_to_try.append(('backend', self._backends['backend']['url']))
358 if not backends_to_try:
359 return {'error': 'No LLM backend available', 'response': None}
361 for backend_name, url in backends_to_try:
362 try:
363 if backend_name == 'local_llm':
364 # llama.cpp OpenAI-compatible API
365 resp = pooled_post(
366 f'{url}/v1/chat/completions',
367 json={
368 'model': 'local',
369 'messages': [{'role': 'user', 'content': prompt}],
370 'max_tokens': options.get('max_tokens', 512),
371 },
372 timeout=options.get('timeout', 60),
373 )
374 if resp.status_code == 200:
375 self._mark_backend_alive('local_llm')
376 data = resp.json()
377 choices = data.get('choices', [])
378 content = choices[0].get('message', {}).get('content', '') if choices else ''
379 return {
380 'response': content,
381 'model': data.get('model', 'llama.cpp'),
382 'backend': 'local_llm',
383 }
384 elif backend_name == 'mesh':
385 _publish_routing_status(uid,
386 'Routing to hive peer...', rid)
387 # Compute mesh offload
388 resp = pooled_post(
389 f'{url}/mesh/infer',
390 json={'model_type': ModelType.LLM, 'prompt': prompt},
391 timeout=options.get('timeout', 120),
392 )
393 if resp.status_code == 200:
394 self._mark_backend_alive('mesh')
395 data = resp.json()
396 return {
397 'response': data.get('response', ''),
398 'model': data.get('model', 'mesh_peer'),
399 'backend': 'mesh',
400 }
401 elif backend_name == 'backend':
402 _publish_routing_status(uid,
403 'Routing to cloud backend...', rid)
404 # HART backend (may route to cloud or world model)
405 resp = pooled_post(
406 f'{url}/chat',
407 json={'prompt': prompt, 'user_id': 'model_bus', 'prompt_id': 'bus'},
408 timeout=options.get('timeout', 60),
409 )
410 if resp.status_code == 200:
411 self._mark_backend_alive('backend')
412 data = resp.json()
413 return {
414 'response': data.get('response', str(data)),
415 'model': 'hart_backend',
416 'backend': 'backend',
417 }
418 except Exception as e:
419 self._mark_backend_dead(backend_name)
420 logger.warning(f"Backend {backend_name} failed (marked dead): {e}")
421 continue
423 return {'error': 'All LLM backends failed', 'response': None}
425 def _route_vision(self, prompt: str, options: dict) -> dict:
426 """Route vision inference."""
427 from core.http_pool import pooled_post
429 image_path = options.get('image_path', '')
430 uid = options.get('user_id', '')
431 rid = options.get('request_id', '')
432 if not image_path:
433 return {'error': 'image_path required for vision inference'}
435 if 'vision' in self._backends:
436 try:
437 with open(image_path, 'rb') as f:
438 resp = pooled_post(
439 f"{self._backends['vision']['url']}/describe",
440 files={'image': f},
441 data={'prompt': prompt},
442 timeout=60,
443 )
444 if resp.status_code == 200:
445 return {
446 'response': resp.json().get('description', ''),
447 'model': 'minicpm',
448 'backend': 'local_vision',
449 }
450 except Exception as e:
451 logger.warning(f"Local vision failed: {e}")
453 # Fallback to mesh
454 if 'mesh' in self._backends:
455 _publish_routing_status(uid,
456 'No local vision model — checking hive network...', rid)
457 try:
458 resp = pooled_post(
459 f"{self._backends['mesh']['url']}/mesh/infer",
460 json={'model_type': 'vision', 'prompt': prompt, 'image_path': image_path},
461 timeout=120,
462 )
463 if resp.status_code == 200:
464 return {
465 'response': resp.json().get('response', ''),
466 'model': 'mesh_vision',
467 'backend': 'mesh',
468 }
469 except Exception:
470 pass
472 _publish_routing_status(uid,
473 "I can't analyze this image right now — no vision model available "
474 "locally or on the hive. Try connecting a device with GPU.", rid)
475 return {'error': 'No vision backend available', 'response': None}
477 def _route_tts(self, prompt: str, options: dict) -> dict:
478 """Route TTS through smart TTS router (language-aware, GPU/hive/CPU).
480 Delegates to TTSRouter which considers language, GPU, VRAM, compute
481 policy, hive peers, and urgency. Falls back to legacy chain if
482 router is unavailable.
483 """
484 uid = options.get('user_id', '')
485 rid = options.get('request_id', '')
486 try:
487 from integrations.channels.media.tts_router import get_tts_router
488 router = get_tts_router()
489 result = router.synthesize(
490 text=prompt,
491 language=options.get('language'),
492 voice=options.get('voice_audio') or options.get('voice'),
493 source=options.get('source', 'agent_tool'),
494 engine_override=options.get('engine'),
495 )
496 if not result.error:
497 return {
498 'response': result.path,
499 'model': f'{result.engine_id}',
500 'backend': 'local_tts' if result.location == 'local' else result.location,
501 'duration': result.duration,
502 'latency_ms': result.latency_ms,
503 'device': result.device,
504 }
505 logger.debug("TTS router failed: %s, falling back to legacy chain", result.error)
506 except (ImportError, Exception) as e:
507 logger.debug("TTS router unavailable (%s), using legacy chain", e)
509 # Legacy fallback: skip the full chain (luxtts→makeittalk→pocket)
510 # which ignores language/GPU/policy. Go straight to pocket_tts
511 # (guaranteed CPU, always available). The TTSRouter already handles
512 # luxtts, makeittalk, and GPU engines with proper awareness.
513 _publish_routing_status(uid, 'Generating speech (fallback)...', rid)
514 return self._try_pocket_tts(prompt, options)
516 def _try_luxtts(self, prompt: str, options: dict) -> dict:
517 """Try LuxTTS (48kHz, GPU/CPU, voice cloning)."""
518 t0 = time.time()
519 try:
520 from integrations.service_tools.luxtts_tool import luxtts_synthesize
521 voice = options.get('voice_audio') or options.get('voice')
522 device = options.get('device')
523 result = json.loads(luxtts_synthesize(
524 prompt, voice_audio=voice, device=device,
525 ))
526 latency = (time.time() - t0) * 1000
527 if 'error' in result:
528 logger.info(f"LuxTTS unavailable: {result['error']}")
529 return result
530 return {
531 'response': result.get('path', ''),
532 'model': 'luxtts-48k',
533 'backend': f"local_tts_{result.get('device', 'cpu')}",
534 'voice': result.get('voice', ''),
535 'duration': result.get('duration', 0),
536 'sample_rate': 48000,
537 'engine': 'luxtts',
538 'rtf': result.get('rtf', 0),
539 'realtime_factor': result.get('realtime_factor', 0),
540 'latency_ms': round(latency, 1),
541 }
542 except ImportError:
543 return {'error': 'LuxTTS not installed'}
544 except Exception as e:
545 logger.warning(f"LuxTTS failed: {e}")
546 return {'error': f'LuxTTS error: {e}'}
548 def _try_makeittalk_tts(self, prompt: str, options: dict, base_url: str) -> dict:
549 """Try MakeItTalk cloud TTS (POST /video-gen/ with text, no image = audio-only)."""
550 import requests as http_requests
551 t0 = time.time()
552 try:
553 voice = options.get('voice', 'af_bella')
554 payload = {
555 'text': prompt,
556 'uid': options.get('user_id', 'model_bus'),
557 'voiceName': voice,
558 'kokuro': 'true',
559 'audio_only': True, # hint: skip video pipeline
560 }
561 resp = http_requests.post(
562 f'{base_url.rstrip("/")}/video-gen/',
563 json=payload,
564 timeout=options.get('timeout', 30),
565 )
566 latency = (time.time() - t0) * 1000
567 if resp.status_code == 200:
568 data = resp.json()
569 return {
570 'response': data.get('audio_url', data.get('url', '')),
571 'model': 'makeittalk-cloud',
572 'backend': 'cloud_tts',
573 'voice': voice,
574 'engine': 'makeittalk',
575 'latency_ms': round(latency, 1),
576 }
577 logger.warning("MakeItTalk returned %d: %s", resp.status_code, resp.text[:200])
578 return {'error': f'MakeItTalk HTTP {resp.status_code}'}
579 except http_requests.ConnectionError:
580 logger.info("MakeItTalk cloud connection refused at %s", base_url)
581 return {'error': 'MakeItTalk connection failed'}
582 except http_requests.Timeout:
583 logger.info("MakeItTalk cloud timed out at %s", base_url)
584 return {'error': 'MakeItTalk timeout'}
585 except Exception as e:
586 logger.warning("MakeItTalk cloud error: %s", e)
587 return {'error': f'MakeItTalk error: {e}'}
589 def _try_pocket_tts(self, prompt: str, options: dict) -> dict:
590 """Pocket TTS offline fallback (always available, CPU, zero cost)."""
591 t0 = time.time()
592 try:
593 from integrations.service_tools.pocket_tts_tool import pocket_tts_synthesize
594 voice = options.get('voice', 'alba')
595 output_path = options.get('output_path')
596 result = json.loads(pocket_tts_synthesize(prompt, voice, output_path))
597 latency = (time.time() - t0) * 1000
598 if 'error' in result:
599 return {'error': result['error'], 'response': None}
600 return {
601 'response': result.get('path', ''),
602 'model': 'pocket-tts-100m',
603 'backend': 'local_tts',
604 'voice': result.get('voice', voice),
605 'duration': result.get('duration', 0),
606 'engine': result.get('engine', 'pocket-tts'),
607 'latency_ms': round(latency, 1),
608 }
609 except ImportError:
610 return {'error': 'Pocket TTS not available (pip install pocket-tts)', 'response': None}
611 except Exception as e:
612 return {'error': f'TTS inference failed: {e}', 'response': None}
614 def _route_stt(self, prompt: str, options: dict) -> dict:
615 """Route speech-to-text inference via Whisper (sherpa-onnx / openai-whisper)."""
616 t0 = time.time()
617 try:
618 from integrations.service_tools.whisper_tool import whisper_transcribe
619 audio_path = options.get('audio_path', prompt)
620 language = options.get('language')
621 result = json.loads(whisper_transcribe(audio_path, language))
622 latency = (time.time() - t0) * 1000
623 if 'error' in result:
624 return {'error': result['error'], 'response': None}
625 return {
626 'response': result.get('text', ''),
627 'model': 'whisper-stt-local',
628 'backend': 'local_stt',
629 'language': result.get('language', 'auto'),
630 'latency_ms': round(latency, 1),
631 }
632 except ImportError:
633 return {'error': 'Whisper STT not available (pip install sherpa-onnx)', 'response': None}
634 except Exception as e:
635 return {'error': f'STT inference failed: {e}', 'response': None}
637 def _route_video_gen(self, prompt: str, options: dict) -> dict:
638 """Route video generation: local GPU → hive mesh peer → cloud fallback.
640 For no-GPU central instances, this offloads to a hive peer that has
641 a GPU with LTX-2, ComfyUI, or Wan2GP loaded.
643 Publishes routing status to the user's chat topic so the UI shows
644 real-time progress ("Generating video locally..." → "Checking hive...").
645 """
646 from core.http_pool import pooled_post
648 model = options.get('model', 'ltx2')
649 timeout = options.get('timeout', 300)
650 uid = options.get('user_id', '')
651 rid = options.get('request_id', '')
653 # 1. Try local GPU servers — skip instantly if health cache says dead
654 local_servers = [
655 ('ltx2', f"http://localhost:{get_port('ltx2_server', 5002)}"),
656 ('comfyui', f"http://localhost:{get_port('comfyui', 8188)}"),
657 ]
658 for name, local_url in local_servers:
659 if not self._is_backend_alive(f'video_{name}', local_url):
660 continue # 0ms — skip dead server
661 _publish_routing_status(uid, 'Generating video on this device...', rid)
662 try:
663 resp = pooled_post(
664 f'{local_url}/generate',
665 json={'prompt': prompt, 'model': model},
666 timeout=timeout,
667 )
668 if resp.status_code == 200:
669 self._mark_backend_alive(f'video_{name}')
670 data = resp.json()
671 video_url = data.get('video_url') or data.get('output_url', '')
672 return {
673 'response': video_url,
674 'model': model,
675 'backend': 'local_gpu',
676 }
677 except Exception:
678 self._mark_backend_dead(f'video_{name}')
679 continue
681 # 2. Try hive mesh peer with GPU (central has no GPU → offload)
682 _publish_routing_status(uid,
683 'No local GPU available. Checking hive network for a device with GPU...', rid)
684 try:
685 from integrations.agent_engine.compute_config import get_compute_policy
686 policy = get_compute_policy()
687 if policy.get('compute_policy') != 'local_only':
688 from integrations.agent_engine.compute_mesh_service import get_compute_mesh
689 mesh = get_compute_mesh()
690 result = mesh.offload_to_best_peer(
691 model_type=ModelType.VIDEO_GEN,
692 prompt=prompt,
693 options={**options, 'timeout': timeout},
694 )
695 if result and 'error' not in result:
696 peer = result.get('offloaded_to', 'peer')
697 _publish_routing_status(uid,
698 f'Video generation running on hive peer {peer}...', rid)
699 result['backend'] = 'hive_peer'
700 return result
701 logger.info("Hive mesh video offload: %s",
702 result.get('error', 'no result'))
703 except Exception as e:
704 logger.debug("Hive mesh video offload unavailable: %s", e)
706 # 3. Cloud fallback (MakeItTalk or external service) — skip if known dead
707 makeittalk_url = os.environ.get('MAKEITTALK_API_URL')
708 if makeittalk_url and self._is_backend_alive(
709 'makeittalk_cloud', makeittalk_url, '/health'):
710 _publish_routing_status(uid,
711 'No hive peers with GPU. Sending to cloud service...', rid)
712 try:
713 resp = pooled_post(
714 f'{makeittalk_url.rstrip("/")}/video-gen/',
715 json={
716 'text': prompt,
717 'uid': options.get('user_id', 'model_bus'),
718 **{k: v for k, v in options.items()
719 if k in ('avatar_id', 'image_url', 'voice_id')},
720 },
721 timeout=min(timeout, 60),
722 )
723 if resp.status_code == 200:
724 self._mark_backend_alive('makeittalk_cloud')
725 data = resp.json()
726 return {
727 'response': data.get('video_url', data.get('url', '')),
728 'model': 'makeittalk-cloud',
729 'backend': 'cloud',
730 }
731 except Exception as e:
732 self._mark_backend_dead('makeittalk_cloud')
733 logger.warning("MakeItTalk video cloud (marked dead): %s", e)
735 # All paths exhausted — conversational fallback
736 _publish_routing_status(uid,
737 "I wasn't able to generate the video right now. "
738 "There's no GPU available on this server, no hive peers are online "
739 "with a free GPU, and the cloud video service didn't respond. "
740 "You can try again shortly, or connect a GPU device to your hive "
741 "with: hart compute pair <device-address>", rid)
742 return {'error': 'No video generation backend available',
743 'response': ("I couldn't generate the video — no GPU is available "
744 "locally or on the hive network right now. "
745 "Try again in a moment or pair a GPU device.")}
747 def _route_image_gen(self, prompt: str, options: dict) -> dict:
748 """Route image generation inference."""
749 return {
750 'response': 'Image generation not yet implemented',
751 'model': 'none',
752 'backend': 'stub',
753 }
755 # ─── Guardrail Gate ──────────────────────────────────────
757 def _check_guardrails(self, prompt: str, model_type: str) -> bool:
758 """Apply constitutional guardrail check to request."""
759 try:
760 from security.hive_guardrails import ConstitutionalFilter
761 approved, reason = ConstitutionalFilter.check_prompt(prompt)
762 if not approved:
763 logger.warning(f"Guardrail blocked {model_type} request: {reason}")
764 return False
765 except ImportError:
766 pass # Guardrails not available — allow request
767 return True
769 # ─── Model Listing ───────────────────────────────────────
771 def list_models(self) -> List[Dict[str, Any]]:
772 """List all available models across all backends."""
773 models = []
775 if 'llm' in self._backends:
776 models.append({
777 'id': 'local-llm',
778 'type': ModelType.LLM,
779 'backend': 'llama.cpp',
780 'local': True,
781 'status': 'ready',
782 })
784 if 'vision' in self._backends:
785 models.append({
786 'id': 'local-vision',
787 'type': 'vision',
788 'backend': 'minicpm',
789 'local': True,
790 'status': 'ready',
791 })
793 if 'mesh' in self._backends:
794 models.append({
795 'id': 'mesh-models',
796 'type': 'multiple',
797 'backend': 'compute_mesh',
798 'local': False,
799 'status': 'ready',
800 'peers': self._backends['mesh'].get('peers', 0),
801 })
803 # TTS — LuxTTS (if installed, GPU/CPU, 48kHz voice cloning)
804 try:
805 from zipvoice.luxvoice import LuxTTS as _LuxCheck # noqa: F401
806 import torch as _t
807 _lux_device = 'cuda' if _t.cuda.is_available() else 'cpu'
808 models.append({
809 'id': 'luxtts-48k',
810 'type': ModelType.TTS,
811 'backend': 'luxtts',
812 'local': True,
813 'status': 'ready',
814 'device': _lux_device,
815 'features': ['voice_cloning', '48khz', 'gpu_accelerated', 'offline'],
816 })
817 except ImportError:
818 pass
820 # TTS — MakeItTalk cloud (if configured)
821 makeittalk_url = os.environ.get('MAKEITTALK_API_URL')
822 if makeittalk_url:
823 models.append({
824 'id': 'makeittalk-cloud',
825 'type': ModelType.TTS,
826 'backend': 'makeittalk',
827 'local': False,
828 'status': 'ready',
829 'url': makeittalk_url,
830 'features': ['video_gen', 'lip_sync', 'multi_voice', 'multi_language'],
831 })
833 # TTS — Pocket TTS (always local, CPU — fallback when cloud unavailable)
834 models.append({
835 'id': 'pocket-tts-100m',
836 'type': ModelType.TTS,
837 'backend': 'pocket_tts',
838 'local': True,
839 'status': 'ready',
840 'features': ['voice_cloning', 'zero_shot', 'offline'],
841 })
843 # STT — Whisper (always local, sherpa-onnx or openai-whisper)
844 models.append({
845 'id': 'whisper-stt-local',
846 'type': ModelType.STT,
847 'backend': 'whisper',
848 'local': True,
849 'status': 'ready',
850 'features': ['multilingual', 'offline'],
851 })
853 return models
855 def get_status(self) -> Dict[str, Any]:
856 """Get Model Bus status."""
857 return {
858 'status': 'running' if self._running else 'stopped',
859 'backends': {k: v.get('status') for k, v in self._backends.items()},
860 'backend_count': len(self._backends),
861 'request_count': self._request_count,
862 'max_concurrent': self.max_concurrent,
863 'routing_strategy': self.routing_strategy,
864 }
866 # ─── HTTP Server ─────────────────────────────────────────
868 def _create_flask_app(self):
869 """Create Flask app for HTTP API."""
870 from flask import Flask, request, jsonify
872 app = Flask(__name__)
874 @app.route('/v1/chat', methods=['POST'])
875 def chat():
876 data = request.get_json(force=True)
877 result = self.infer(
878 model_type=ModelType.LLM,
879 prompt=data.get('prompt', ''),
880 options=data,
881 )
882 return jsonify(result)
884 @app.route('/v1/vision', methods=['POST'])
885 def vision():
886 prompt = request.form.get('prompt', 'Describe this image')
887 image = request.files.get('image')
888 if image:
889 import tempfile
890 with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as f:
891 image.save(f)
892 result = self.infer('vision', prompt, {'image_path': f.name})
893 else:
894 result = {'error': 'No image provided'}
895 return jsonify(result)
897 @app.route('/v1/tts', methods=['POST'])
898 def tts():
899 data = request.get_json(force=True)
900 result = self.infer(ModelType.TTS, data.get('text', ''), data)
901 return jsonify(result)
903 @app.route('/v1/stt', methods=['POST'])
904 def stt():
905 audio = request.files.get('audio')
906 if audio:
907 import tempfile
908 with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as f:
909 audio.save(f)
910 result = self.infer(ModelType.STT, '', {'audio_path': f.name})
911 else:
912 result = {'error': 'No audio provided'}
913 return jsonify(result)
915 @app.route('/v1/models', methods=['GET'])
916 def models():
917 return jsonify({'models': self.list_models()})
919 @app.route('/v1/status', methods=['GET'])
920 def status():
921 return jsonify(self.get_status())
923 @app.route('/v1/prefetch', methods=['POST'])
924 def prefetch():
925 data = request.get_json(force=True)
926 model_type = data.get('model_type', ModelType.LLM)
927 # Prefetch is a hint — trigger backend warmup
928 logger.info(f"Prefetch request for {model_type}")
929 return jsonify({'status': 'prefetch_acknowledged', 'model_type': model_type})
931 @app.route('/health', methods=['GET'])
932 def health():
933 return jsonify({'status': 'ok', 'service': 'model-bus'})
935 return app
937 # ─── Serve ───────────────────────────────────────────────
939 def serve_forever(self):
940 """Start the Model Bus service (HTTP + Unix socket)."""
941 self._running = True
943 # Initial backend discovery
944 self.discover_backends()
946 # Background: periodic backend re-discovery
947 def _rediscover_loop():
948 while self._running:
949 time.sleep(30)
950 try:
951 self.discover_backends()
952 except Exception as e:
953 logger.error(f"Backend discovery error: {e}")
955 discovery_thread = threading.Thread(target=_rediscover_loop, daemon=True)
956 discovery_thread.start()
958 # Start Flask HTTP server
959 app = self._create_flask_app()
960 logger.info(f"Model Bus HTTP API starting on port {self.http_port}")
962 try:
963 from waitress import serve
964 serve(app, host='0.0.0.0', port=self.http_port, threads=8)
965 except ImportError:
966 app.run(host='0.0.0.0', port=self.http_port, threaded=True)
969def start_dbus_bridge():
970 """Start D-Bus bridge for com.hart.ModelBus (called via D-Bus activation)."""
971 logger.info("D-Bus bridge for com.hart.ModelBus — delegating to HTTP API")
972 # D-Bus bridge is a thin wrapper — actual logic in HTTP service
973 # This avoids running two Python processes
974 import time
975 while True:
976 time.sleep(3600)