Coverage for integrations / agent_engine / video_orchestrator.py: 53.9%
304 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Video Generation Orchestrator — HARTOS-native video pipeline.
4Absorbs the orchestration logic from MakeItTalk's views_c.py (request parsing,
5asset download/caching, text chunking, queue ETA, task dispatch) and dispatches
6GPU-bound subtasks to hive mesh peers or local GPU servers.
8MakeItTalk on a no-GPU VM is a dead service — HARTOS handles the orchestration
9and routes GPU work to whoever has a GPU (local, hive peer, or cloud).
11Pipeline:
12 1. Parse request (avatar, voice, text, flags)
13 2. Download/cache image + audio assets
14 3. Chunk text for streaming playback
15 4. Dispatch GPU subtasks: TTS + face crop (parallel) → lip-sync (sequential)
16 5. Publish per-chunk results to Crossbar pupit.{user_id} for real-time playback
17 6. Return 202 with queue position + ETA
19GPU subtasks dispatched via compute mesh:
20 - audio_generation: Text → WAV (requires GPU for neural TTS)
21 - crop_background_removal: Image → cropped face + bg removal (GPU optional)
22 - lip_sync_generation: Audio + Image → video (requires GPU)
23 - hd_upscale: Video → HD video (requires GPU, optional)
24"""
25import hashlib
26import json
27import logging
28import os
29import re
30import tempfile
31import threading
32import time
33import uuid
34from typing import Any, Dict, List, Optional, Tuple
36logger = logging.getLogger('hevolve.video_orchestrator')
38# ─── Constants ────────────────────────────────────────────
40# Text chunking parameters (from MakeItTalk merge_sentences)
41MIN_CHUNK_LEN = 50
42MAX_CHUNK_LEN = 60
44# Hallo max duration (seconds) — longer clips fall back to MakeItTalk pipeline
45HALLO_MAX_DURATION = 24
47# Queue ETA buffer (seconds)
48QUEUE_BUFFER_TIME = 300
50# Asset download timeout
51ASSET_DOWNLOAD_TIMEOUT = 30
53# Default time-per-second of audio for queue estimation
54# MakeItTalk uses duration * 60 as processing time estimate
55PROCESSING_TIME_FACTOR = 60
57# Asset cache directory
58ASSET_CACHE_DIR = os.path.join(
59 os.environ.get('HEVOLVE_DATA_DIR', os.path.expanduser('~/.hevolve')),
60 'cache', 'video_assets',
61)
64# ─── Text Chunking ───────────────────────────────────────
66def _sent_tokenize(text: str) -> List[str]:
67 """Split text into sentences. Uses nltk if available, else regex fallback."""
68 try:
69 from nltk.tokenize import sent_tokenize
70 return sent_tokenize(text)
71 except ImportError:
72 # Regex fallback — split on sentence-ending punctuation
73 sentences = re.split(r'(?<=[.!?])\s+', text.strip())
74 return [s for s in sentences if s.strip()]
77def merge_sentences(sentences: List[str],
78 min_len: int = MIN_CHUNK_LEN,
79 max_len: int = MAX_CHUNK_LEN) -> List[str]:
80 """Merge short sentences into chunks of min_len..max_len characters.
82 Ported from MakeItTalk's merge_sentences() utility.
83 Ensures each chunk is long enough for natural TTS output.
84 """
85 if not sentences:
86 return []
88 chunks = []
89 current = ''
91 for sent in sentences:
92 candidate = (current + ' ' + sent).strip() if current else sent.strip()
94 if len(candidate) > max_len and current:
95 # Current chunk is full — flush it
96 chunks.append(current.strip())
97 current = sent.strip()
98 else:
99 current = candidate
101 if current.strip():
102 # Merge short trailing chunk with previous if possible
103 if chunks and len(current.strip()) < min_len:
104 chunks[-1] = (chunks[-1] + ' ' + current).strip()
105 else:
106 chunks.append(current.strip())
108 return chunks
111def chunk_text(text: str) -> List[str]:
112 """Split text into TTS-friendly chunks for streaming playback."""
113 sentences = _sent_tokenize(text)
114 if not sentences:
115 return [text] if text.strip() else []
116 return merge_sentences(sentences)
119# ─── Asset Management ────────────────────────────────────
121def _ensure_cache_dir():
122 """Create asset cache directory if needed."""
123 os.makedirs(ASSET_CACHE_DIR, exist_ok=True)
124 os.makedirs(os.path.join(ASSET_CACHE_DIR, 'images'), exist_ok=True)
125 os.makedirs(os.path.join(ASSET_CACHE_DIR, 'audio'), exist_ok=True)
128def _cache_key(url: str) -> str:
129 """Deterministic cache key from URL."""
130 return hashlib.sha256(url.encode()).hexdigest()[:16]
133def download_asset(url: str, asset_type: str = 'images') -> Optional[str]:
134 """Download and cache an asset (image or audio) from URL.
136 Returns local file path, or None on failure.
137 Uses content-hash caching — same URL returns cached file instantly.
138 """
139 if not url:
140 return None
142 _ensure_cache_dir()
144 # Determine extension from URL
145 ext = ''
146 url_path = url.split('?')[0]
147 if '.' in url_path.split('/')[-1]:
148 ext = '.' + url_path.split('/')[-1].rsplit('.', 1)[-1]
149 if not ext:
150 ext = '.png' if asset_type == 'images' else '.wav'
152 cache_file = os.path.join(ASSET_CACHE_DIR, asset_type,
153 _cache_key(url) + ext)
155 # Return cached file if exists
156 if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0:
157 return cache_file
159 try:
160 from core.http_pool import pooled_get
161 resp = pooled_get(url, timeout=ASSET_DOWNLOAD_TIMEOUT)
162 if resp.status_code == 200:
163 with open(cache_file, 'wb') as f:
164 f.write(resp.content)
165 return cache_file
166 except Exception as e:
167 logger.warning("Asset download failed (%s): %s", url, e)
169 return None
172# ─── Queue Estimation ────────────────────────────────────
174def estimate_audio_duration(text: str) -> float:
175 """Estimate audio duration from text length (seconds).
177 Average speaking rate: ~150 words/minute = 2.5 words/second.
178 """
179 words = len(text.split())
180 return max(2.0, words / 2.5)
183def calculate_queue_eta(queue_depth: int, audio_duration: float) -> dict:
184 """Calculate queue position and ETA for a video generation request.
186 Ported from MakeItTalk's queue estimation logic.
188 Returns:
189 {
190 'total_jobs_in_queue': int,
191 'position': int,
192 'estimated_seconds': int,
193 'soft_time_limit': int,
194 'hard_time_limit': int,
195 }
196 """
197 position = queue_depth + 1
198 time_per_job = audio_duration * PROCESSING_TIME_FACTOR
199 estimated = int(position * time_per_job + QUEUE_BUFFER_TIME)
200 soft_limit = estimated
201 hard_limit = soft_limit + QUEUE_BUFFER_TIME
203 return {
204 'total_jobs_in_queue': queue_depth,
205 'position': position,
206 'estimated_seconds': estimated,
207 'soft_time_limit': soft_limit,
208 'hard_time_limit': hard_limit,
209 }
212# ─── Request Parsing ─────────────────────────────────────
214class VideoGenRequest:
215 """Parsed video generation request.
217 Normalizes the various parameter formats from MakeItTalk / chatbot_pipeline
218 into a clean internal representation.
219 """
221 def __init__(self, data: dict):
222 self.uid = data.get('uid') or f"{uuid.uuid4().hex[:8]}_{data.get('user_id', 'anon')}"
223 self.user_id = str(data.get('user_id', ''))
224 self.publish_id = str(data.get('publish_id', self.user_id))
225 self.text = data.get('text', '')
227 # Voice configuration
228 self.voice_name = data.get('voiceName', data.get('voice_name', ''))
229 self.gender = data.get('gender', 'male')
230 self.openvoice = _to_bool(data.get('openvoice', False))
231 self.chattts = _to_bool(data.get('chattts', False))
232 self.kokuro = _to_bool(data.get('kokuro', False))
234 # Image/avatar configuration
235 self.avatar_id = data.get('avatar_id', '')
236 self.image_url = data.get('image_url', '')
237 self.audio_sample_url = data.get('audio_sample_url', '')
239 # Processing flags
240 self.flag_hallo = _to_bool(data.get('flag_hallo', False))
241 self.hd_video = _to_bool(data.get('hd_vid', data.get('hd_video', False)))
242 self.vtoonify = _to_bool(data.get('vtoonify', False))
243 self.remove_bg = _to_bool(data.get('remove_bg', True))
244 self.crop = _to_bool(data.get('crop', True))
245 self.is_premium = _to_bool(data.get('is_premium', data.get('premium', False)))
246 self.chunking = _to_bool(data.get('chunking', False))
248 # Background options
249 self.inpainting = _to_bool(data.get('inpainting', False))
250 self.inpainting_prompt = data.get('prompt', '')
251 self.gradient = _to_bool(data.get('gradient', False))
252 self.solid_color = _to_bool(data.get('solid_color', False))
253 self.background_path = data.get('background_path', '')
255 # Request metadata
256 self.request_id = data.get('request_id', '')
258 def validate(self) -> Optional[str]:
259 """Validate request. Returns error string or None if valid."""
260 if not self.text and not self.audio_sample_url:
261 return 'Either text or audio_sample_url is required'
262 if not self.image_url and not self.avatar_id:
263 return 'Either image_url or avatar_id is required'
264 return None
267def _to_bool(val) -> bool:
268 """Convert various truthy representations to bool."""
269 if isinstance(val, bool):
270 return val
271 if isinstance(val, str):
272 return val.lower() in ('true', '1', 'yes')
273 return bool(val)
276# ─── GPU Subtask Dispatch ────────────────────────────────
278class SubtaskResult:
279 """Result from a dispatched GPU subtask."""
281 def __init__(self, success: bool = False, data: dict = None,
282 error: str = '', peer: str = 'local'):
283 self.success = success
284 self.data = data or {}
285 self.error = error
286 self.peer = peer
289def _dispatch_gpu_subtask(
290 task_type: str,
291 task_params: dict,
292 user_id: str = '',
293 timeout: int = 300,
294) -> SubtaskResult:
295 """Dispatch a GPU subtask to local GPU or hive mesh peer.
297 Follows the same routing chain as model_bus_service._route_video_gen():
298 1. Local GPU server (health-cached, skip dead in 0ms)
299 2. Hive mesh peer with GPU
300 3. Error — no GPU available
302 Args:
303 task_type: 'audio_generation', 'crop_background', 'lip_sync', 'hd_upscale'
304 task_params: Task-specific parameters dict
305 user_id: For routing status publishing
306 timeout: Max wait time (seconds)
308 Returns:
309 SubtaskResult with success/data/error
310 """
311 from core.port_registry import get_port
313 # Map task types to local GPU endpoints
314 local_endpoints = {
315 'audio_generation': {
316 'url': f"http://localhost:{get_port('tts_gpu', 5003)}",
317 'path': '/synthesize',
318 },
319 'crop_background': {
320 'url': f"http://localhost:{get_port('image_proc', 5004)}",
321 'path': '/process',
322 },
323 'lip_sync': {
324 'url': f"http://localhost:{get_port('lip_sync', 5005)}",
325 'path': '/generate',
326 },
327 'hd_upscale': {
328 'url': f"http://localhost:{get_port('video2x', 5006)}",
329 'path': '/hd-video/',
330 },
331 }
333 endpoint = local_endpoints.get(task_type, {})
335 # 1. Try local GPU — health-cached
336 if endpoint:
337 try:
338 from integrations.agent_engine.model_bus_service import get_model_bus
339 bus = get_model_bus()
340 local_url = endpoint['url']
341 backend_name = f'video_{task_type}'
343 if bus._is_backend_alive(backend_name, local_url):
344 from core.http_pool import pooled_post
345 resp = pooled_post(
346 f"{local_url}{endpoint['path']}",
347 json=task_params,
348 timeout=timeout,
349 )
350 if resp.status_code == 200:
351 bus._mark_backend_alive(backend_name)
352 return SubtaskResult(
353 success=True, data=resp.json(), peer='local_gpu')
354 bus._mark_backend_dead(backend_name)
355 except Exception as e:
356 logger.debug("Local GPU %s unavailable: %s", task_type, e)
358 # 2. Hive mesh peer
359 try:
360 from integrations.agent_engine.compute_config import get_compute_policy
361 policy = get_compute_policy()
362 if policy.get('compute_policy') != 'local_only':
363 from integrations.agent_engine.compute_mesh_service import get_compute_mesh
364 mesh = get_compute_mesh()
365 result = mesh.offload_to_best_peer(
366 model_type=task_type,
367 prompt=json.dumps(task_params),
368 options={'timeout': timeout, 'user_id': user_id},
369 )
370 if result and 'error' not in result:
371 return SubtaskResult(
372 success=True,
373 data=result,
374 peer=result.get('offloaded_to', 'hive_peer'),
375 )
376 logger.info("Hive offload %s: %s", task_type,
377 result.get('error', 'no result'))
378 except Exception as e:
379 logger.debug("Hive mesh %s unavailable: %s", task_type, e)
381 return SubtaskResult(success=False, error=f'No GPU backend for {task_type}')
384def _dispatch_parallel(tasks: List[Tuple[str, dict, str, int]]) -> List[SubtaskResult]:
385 """Dispatch multiple GPU subtasks in parallel.
387 Args:
388 tasks: List of (task_type, params, user_id, timeout) tuples
390 Returns:
391 List of SubtaskResult in same order as input
392 """
393 results = [None] * len(tasks)
395 def _run(idx, task_type, params, user_id, timeout):
396 results[idx] = _dispatch_gpu_subtask(task_type, params, user_id, timeout)
398 threads = []
399 for i, (ttype, params, uid, tout) in enumerate(tasks):
400 t = threading.Thread(target=_run, args=(i, ttype, params, uid, tout),
401 daemon=True)
402 t.start()
403 threads.append(t)
405 for t in threads:
406 t.join(timeout=600) # Hard cap: 10 minutes
408 # Replace None with error for timed-out tasks
409 return [r or SubtaskResult(success=False, error='Task timed out')
410 for r in results]
413# ─── Crossbar Publishing ─────────────────────────────────
415def _publish_chunk_result(publish_id: str, chunk_data: dict):
416 """Publish a completed chunk to pupit.{publish_id} for streaming playback.
418 The client (WebWorker/React Native) picks up each chunk and plays
419 the video segment immediately — streaming, not waiting for full video.
420 """
421 if not publish_id:
422 return
423 try:
424 from core.safe_hartos_attr import safe_hartos_attr
425 publish_async = safe_hartos_attr('publish_async')
426 if publish_async is None:
427 logger.debug(
428 "Video chunk publish skipped: HARTOS publish_async "
429 "unresolvable — publish_id=%s", publish_id,
430 )
431 return
432 topic = f'com.hertzai.pupit.{publish_id}'
433 publish_async(topic, json.dumps(chunk_data))
434 logger.debug(
435 "Video chunk published: publish_id=%s topic=%s",
436 publish_id, topic,
437 )
438 except Exception as e:
439 logger.debug(
440 "Video chunk publish failed: publish_id=%s err=%s",
441 publish_id, e,
442 )
445def _publish_status(user_id: str, message: str, request_id: str = ''):
446 """Publish routing status to user's chat (thinking bubble)."""
447 try:
448 from integrations.agent_engine.model_bus_service import _publish_routing_status
449 _publish_routing_status(user_id, message, request_id)
450 except Exception:
451 pass
454# ─── Main Orchestrator ───────────────────────────────────
456class VideoOrchestrator:
457 """Orchestrates video generation pipeline.
459 Replaces MakeItTalk's views_c.py orchestration with HARTOS-native
460 dispatch to local GPU or hive mesh peers.
462 Usage:
463 orch = get_video_orchestrator()
464 result = orch.generate(request_data)
465 """
467 def __init__(self):
468 self._active_jobs: Dict[str, dict] = {}
469 self._lock = threading.Lock()
470 self._job_counter = 0
472 @property
473 def queue_depth(self) -> int:
474 """Current number of active jobs."""
475 with self._lock:
476 return len(self._active_jobs)
478 def generate(self, data: dict) -> dict:
479 """Main entry point — orchestrate a video generation request.
481 Args:
482 data: Raw request dict from API endpoint
484 Returns:
485 On success (HTTP 202 pattern):
486 {
487 'status': 'accepted',
488 'uid': '<request-id>',
489 'total_jobs_in_queue': int,
490 'position': int,
491 'estimated_seconds': int,
492 }
493 On error:
494 {'error': '<message>'}
495 """
496 req = VideoGenRequest(data)
498 # Validate
499 err = req.validate()
500 if err:
501 return {'error': err}
503 # Estimate duration + queue position
504 audio_duration = estimate_audio_duration(req.text)
505 eta = calculate_queue_eta(self.queue_depth, audio_duration)
507 # Enforce Hallo duration constraint
508 if req.flag_hallo and audio_duration > HALLO_MAX_DURATION:
509 req.flag_hallo = False
510 logger.info("Hallo disabled — estimated %ds > %ds max",
511 audio_duration, HALLO_MAX_DURATION)
513 # Register job
514 job_id = req.uid
515 with self._lock:
516 self._active_jobs[job_id] = {
517 'uid': job_id,
518 'user_id': req.user_id,
519 'started': time.time(),
520 'status': 'queued',
521 }
523 # Dispatch asynchronously — don't block the API response
524 thread = threading.Thread(
525 target=self._execute_pipeline,
526 args=(req, eta),
527 daemon=True,
528 name=f'video-gen-{job_id}',
529 )
530 thread.start()
532 return {
533 'status': 'accepted',
534 'uid': job_id,
535 'total_jobs_in_queue': eta['total_jobs_in_queue'],
536 'position': eta['position'],
537 'estimated_seconds': eta['estimated_seconds'],
538 }
540 def _execute_pipeline(self, req: VideoGenRequest, eta: dict):
541 """Execute the full video generation pipeline in background.
543 Pipeline stages:
544 1. Download/cache assets (image, audio sample)
545 2. Split text into chunks (if chunking enabled)
546 3. For each chunk (or full text):
547 a. Dispatch TTS audio generation (GPU)
548 b. Dispatch face crop + bg removal (GPU, parallel with TTS)
549 c. Dispatch lip-sync video generation (GPU, sequential after a+b)
550 d. Optionally dispatch HD upscale (GPU)
551 e. Publish chunk result to Crossbar pupit.{publish_id}
552 4. Cleanup job tracking
553 """
554 job_id = req.uid
555 try:
556 self._update_job(job_id, 'processing')
557 _publish_status(req.user_id,
558 'Starting video generation...', req.request_id)
560 # 1. Download assets
561 image_path = None
562 audio_sample_path = None
564 if req.image_url:
565 _publish_status(req.user_id,
566 'Downloading image...', req.request_id)
567 image_path = download_asset(req.image_url, 'images')
568 if not image_path:
569 self._fail_job(job_id, req, 'Failed to download image')
570 return
572 if req.audio_sample_url:
573 audio_sample_path = download_asset(
574 req.audio_sample_url, 'audio')
576 # 2. Chunk text for streaming
577 if req.chunking and req.text:
578 chunks = chunk_text(req.text)
579 else:
580 chunks = [req.text] if req.text else []
582 if not chunks:
583 self._fail_job(job_id, req, 'No text to process')
584 return
586 _publish_status(
587 req.user_id,
588 f'Processing {len(chunks)} segment{"s" if len(chunks) > 1 else ""}...',
589 req.request_id,
590 )
592 # 3. Process each chunk
593 for idx, chunk_text_str in enumerate(chunks, 1):
594 chunk_uid = f"{job_id}_{idx}"
596 success = self._process_chunk(
597 req=req,
598 chunk_uid=chunk_uid,
599 chunk_text=chunk_text_str,
600 chunk_idx=idx,
601 total_chunks=len(chunks),
602 image_path=image_path,
603 audio_sample_path=audio_sample_path,
604 eta=eta,
605 )
607 if not success:
608 # Continue with remaining chunks — partial delivery
609 # is better than no delivery
610 logger.warning("Chunk %d/%d failed for %s",
611 idx, len(chunks), job_id)
613 self._update_job(job_id, 'completed')
614 _publish_status(req.user_id,
615 'Video generation complete.', req.request_id)
617 except Exception as e:
618 logger.error("Video pipeline failed for %s: %s", job_id, e,
619 exc_info=True)
620 self._fail_job(job_id, req, str(e))
621 finally:
622 # Cleanup job tracking
623 with self._lock:
624 self._active_jobs.pop(job_id, None)
626 def _process_chunk(
627 self,
628 req: VideoGenRequest,
629 chunk_uid: str,
630 chunk_text: str,
631 chunk_idx: int,
632 total_chunks: int,
633 image_path: Optional[str],
634 audio_sample_path: Optional[str],
635 eta: dict,
636 ) -> bool:
637 """Process a single text chunk through the full GPU pipeline.
639 Parallel dispatch: TTS + face crop run simultaneously.
640 Sequential: lip-sync waits for both to complete.
642 Returns True on success, False on failure.
643 """
644 _publish_status(
645 req.user_id,
646 f'Processing segment {chunk_idx}/{total_chunks}...',
647 req.request_id,
648 )
650 # ── Stage 1: Parallel — TTS + face crop ──
652 tts_params = {
653 'uid': chunk_uid,
654 'text': chunk_text,
655 'gender': req.gender,
656 'voice_name': req.voice_name,
657 'openvoice': req.openvoice,
658 'chattts': req.chattts,
659 'kokuro': req.kokuro,
660 }
661 if audio_sample_path:
662 tts_params['audio_sample_path'] = audio_sample_path
664 crop_params = {
665 'uid': chunk_uid,
666 'image_path': image_path,
667 'crop': req.crop,
668 'remove_bg': req.remove_bg,
669 'vtoonify': req.vtoonify,
670 'flag_hallo': req.flag_hallo,
671 'premium': req.is_premium,
672 'inpainting': req.inpainting,
673 'prompt': req.inpainting_prompt,
674 'gradient': req.gradient,
675 'solid_color': req.solid_color,
676 'background_path': req.background_path,
677 }
679 parallel_tasks = [
680 ('audio_generation', tts_params, req.user_id,
681 eta.get('soft_time_limit', 300)),
682 ('crop_background', crop_params, req.user_id, 120),
683 ]
685 results = _dispatch_parallel(parallel_tasks)
686 audio_result, crop_result = results[0], results[1]
688 if not audio_result.success:
689 # Try offline TTS fallback (CPU) via model_bus_service
690 _publish_status(req.user_id,
691 'GPU TTS unavailable, trying offline voice...',
692 req.request_id)
693 audio_result = self._tts_cpu_fallback(chunk_text, chunk_uid, req)
695 if not audio_result.success:
696 logger.error("Audio generation failed for chunk %s: %s",
697 chunk_uid, audio_result.error)
698 _publish_chunk_result(req.publish_id, {
699 'chunk_idx': chunk_idx,
700 'total_chunks': total_chunks,
701 'status': 'error',
702 'error': f'Audio generation failed: {audio_result.error}',
703 })
704 return False
706 # Image crop failure is non-fatal — use original image
707 effective_image = (crop_result.data if crop_result.success
708 else {'image_path': image_path})
710 # ── Stage 2: Sequential — lip-sync generation ──
712 _publish_status(
713 req.user_id,
714 f'Generating lip-sync video ({chunk_idx}/{total_chunks})...',
715 req.request_id,
716 )
718 lip_sync_params = {
719 'uid': chunk_uid,
720 'audio_result': audio_result.data,
721 'image_result': effective_image,
722 'text': chunk_text,
723 'flag_hallo': req.flag_hallo,
724 'hd_video': req.hd_video,
725 }
727 lip_result = _dispatch_gpu_subtask(
728 'lip_sync', lip_sync_params, req.user_id,
729 eta.get('soft_time_limit', 300),
730 )
732 if not lip_result.success:
733 logger.error("Lip sync failed for chunk %s: %s",
734 chunk_uid, lip_result.error)
735 _publish_chunk_result(req.publish_id, {
736 'chunk_idx': chunk_idx,
737 'total_chunks': total_chunks,
738 'status': 'error',
739 'error': f'Video generation failed: {lip_result.error}',
740 })
741 return False
743 # ── Stage 3: Optional HD upscale ──
745 video_data = lip_result.data
746 if req.hd_video:
747 _publish_status(req.user_id,
748 'Upscaling to HD...', req.request_id)
749 hd_result = _dispatch_gpu_subtask(
750 'hd_upscale',
751 {'uid': chunk_uid, 'video_result': video_data},
752 req.user_id, 600, # HD can take a while
753 )
754 if hd_result.success:
755 video_data = hd_result.data
756 else:
757 logger.info("HD upscale skipped for %s: %s",
758 chunk_uid, hd_result.error)
759 # Non-fatal — deliver SD video
761 # ── Stage 4: Publish chunk to client ──
763 chunk_result = {
764 'chunk_idx': chunk_idx,
765 'total_chunks': total_chunks,
766 'status': 'completed',
767 'uid': chunk_uid,
768 'video_url': video_data.get('video_url', ''),
769 'audio_url': audio_result.data.get('audio_url',
770 audio_result.data.get('gen_audio_url', '')),
771 'peer': lip_result.peer,
772 }
773 _publish_chunk_result(req.publish_id, chunk_result)
775 return True
777 def _tts_cpu_fallback(self, text: str, uid: str,
778 req: VideoGenRequest) -> SubtaskResult:
779 """CPU TTS fallback when no GPU is available for audio generation.
781 Uses HARTOS's offline TTS (Pocket TTS / LuxTTS) instead of GPU neural TTS.
782 """
783 try:
784 from integrations.agent_engine.model_bus_service import get_model_bus
785 bus = get_model_bus()
786 result = bus.infer(
787 prompt=text,
788 model_type='tts',
789 options={
790 'user_id': req.user_id,
791 'voice': req.voice_name or 'alba',
792 'request_id': req.request_id,
793 },
794 )
795 if result and result.get('response'):
796 audio_path = result['response']
797 return SubtaskResult(
798 success=True,
799 data={
800 'audio_path': audio_path,
801 'gen_audio_url': audio_path,
802 },
803 peer='local_cpu',
804 )
805 except Exception as e:
806 logger.debug("CPU TTS fallback failed: %s", e)
808 return SubtaskResult(success=False, error='All TTS backends failed')
810 def _update_job(self, job_id: str, status: str):
811 """Update job tracking status."""
812 with self._lock:
813 if job_id in self._active_jobs:
814 self._active_jobs[job_id]['status'] = status
816 def _fail_job(self, job_id: str, req: VideoGenRequest, error: str):
817 """Mark job as failed and notify user."""
818 self._update_job(job_id, 'failed')
819 _publish_status(
820 req.user_id,
821 f"Video generation failed: {error}. "
822 "No GPU is available locally or on the hive network. "
823 "You can try again shortly, or connect a GPU device with: "
824 "hart compute pair <device-address>",
825 req.request_id,
826 )
827 _publish_chunk_result(req.publish_id, {
828 'status': 'error',
829 'error': error,
830 'uid': job_id,
831 })
833 def get_job_status(self, job_id: str) -> Optional[dict]:
834 """Get status of an active job."""
835 with self._lock:
836 return self._active_jobs.get(job_id)
838 def get_stats(self) -> dict:
839 """Get orchestrator statistics."""
840 with self._lock:
841 return {
842 'active_jobs': len(self._active_jobs),
843 'jobs': {k: v.get('status', 'unknown')
844 for k, v in self._active_jobs.items()},
845 }
848# ─── Singleton ────────────────────────────────────────────
850_orchestrator: Optional[VideoOrchestrator] = None
851_orch_lock = threading.Lock()
854def get_video_orchestrator() -> VideoOrchestrator:
855 """Get or create the singleton VideoOrchestrator."""
856 global _orchestrator
857 if _orchestrator is None:
858 with _orch_lock:
859 if _orchestrator is None:
860 _orchestrator = VideoOrchestrator()
861 return _orchestrator
864def reset_video_orchestrator():
865 """Reset singleton (testing only)."""
866 global _orchestrator
867 _orchestrator = None