Coverage for integrations / agent_engine / video_orchestrator.py: 53.9%

304 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Video Generation Orchestrator — HARTOS-native video pipeline. 

3 

4Absorbs the orchestration logic from MakeItTalk's views_c.py (request parsing, 

5asset download/caching, text chunking, queue ETA, task dispatch) and dispatches 

6GPU-bound subtasks to hive mesh peers or local GPU servers. 

7 

8MakeItTalk on a no-GPU VM is a dead service — HARTOS handles the orchestration 

9and routes GPU work to whoever has a GPU (local, hive peer, or cloud). 

10 

11Pipeline: 

12 1. Parse request (avatar, voice, text, flags) 

13 2. Download/cache image + audio assets 

14 3. Chunk text for streaming playback 

15 4. Dispatch GPU subtasks: TTS + face crop (parallel) → lip-sync (sequential) 

16 5. Publish per-chunk results to Crossbar pupit.{user_id} for real-time playback 

17 6. Return 202 with queue position + ETA 

18 

19GPU subtasks dispatched via compute mesh: 

20 - audio_generation: Text → WAV (requires GPU for neural TTS) 

21 - crop_background_removal: Image → cropped face + bg removal (GPU optional) 

22 - lip_sync_generation: Audio + Image → video (requires GPU) 

23 - hd_upscale: Video → HD video (requires GPU, optional) 

24""" 

25import hashlib 

26import json 

27import logging 

28import os 

29import re 

30import tempfile 

31import threading 

32import time 

33import uuid 

34from typing import Any, Dict, List, Optional, Tuple 

35 

36logger = logging.getLogger('hevolve.video_orchestrator') 

37 

38# ─── Constants ──────────────────────────────────────────── 

39 

40# Text chunking parameters (from MakeItTalk merge_sentences) 

41MIN_CHUNK_LEN = 50 

42MAX_CHUNK_LEN = 60 

43 

44# Hallo max duration (seconds) — longer clips fall back to MakeItTalk pipeline 

45HALLO_MAX_DURATION = 24 

46 

47# Queue ETA buffer (seconds) 

48QUEUE_BUFFER_TIME = 300 

49 

50# Asset download timeout 

51ASSET_DOWNLOAD_TIMEOUT = 30 

52 

53# Default time-per-second of audio for queue estimation 

54# MakeItTalk uses duration * 60 as processing time estimate 

55PROCESSING_TIME_FACTOR = 60 

56 

57# Asset cache directory 

58ASSET_CACHE_DIR = os.path.join( 

59 os.environ.get('HEVOLVE_DATA_DIR', os.path.expanduser('~/.hevolve')), 

60 'cache', 'video_assets', 

61) 

62 

63 

64# ─── Text Chunking ─────────────────────────────────────── 

65 

66def _sent_tokenize(text: str) -> List[str]: 

67 """Split text into sentences. Uses nltk if available, else regex fallback.""" 

68 try: 

69 from nltk.tokenize import sent_tokenize 

70 return sent_tokenize(text) 

71 except ImportError: 

72 # Regex fallback — split on sentence-ending punctuation 

73 sentences = re.split(r'(?<=[.!?])\s+', text.strip()) 

74 return [s for s in sentences if s.strip()] 

75 

76 

77def merge_sentences(sentences: List[str], 

78 min_len: int = MIN_CHUNK_LEN, 

79 max_len: int = MAX_CHUNK_LEN) -> List[str]: 

80 """Merge short sentences into chunks of min_len..max_len characters. 

81 

82 Ported from MakeItTalk's merge_sentences() utility. 

83 Ensures each chunk is long enough for natural TTS output. 

84 """ 

85 if not sentences: 

86 return [] 

87 

88 chunks = [] 

89 current = '' 

90 

91 for sent in sentences: 

92 candidate = (current + ' ' + sent).strip() if current else sent.strip() 

93 

94 if len(candidate) > max_len and current: 

95 # Current chunk is full — flush it 

96 chunks.append(current.strip()) 

97 current = sent.strip() 

98 else: 

99 current = candidate 

100 

101 if current.strip(): 

102 # Merge short trailing chunk with previous if possible 

103 if chunks and len(current.strip()) < min_len: 

104 chunks[-1] = (chunks[-1] + ' ' + current).strip() 

105 else: 

106 chunks.append(current.strip()) 

107 

108 return chunks 

109 

110 

111def chunk_text(text: str) -> List[str]: 

112 """Split text into TTS-friendly chunks for streaming playback.""" 

113 sentences = _sent_tokenize(text) 

114 if not sentences: 

115 return [text] if text.strip() else [] 

116 return merge_sentences(sentences) 

117 

118 

119# ─── Asset Management ──────────────────────────────────── 

120 

121def _ensure_cache_dir(): 

122 """Create asset cache directory if needed.""" 

123 os.makedirs(ASSET_CACHE_DIR, exist_ok=True) 

124 os.makedirs(os.path.join(ASSET_CACHE_DIR, 'images'), exist_ok=True) 

125 os.makedirs(os.path.join(ASSET_CACHE_DIR, 'audio'), exist_ok=True) 

126 

127 

128def _cache_key(url: str) -> str: 

129 """Deterministic cache key from URL.""" 

130 return hashlib.sha256(url.encode()).hexdigest()[:16] 

131 

132 

133def download_asset(url: str, asset_type: str = 'images') -> Optional[str]: 

134 """Download and cache an asset (image or audio) from URL. 

135 

136 Returns local file path, or None on failure. 

137 Uses content-hash caching — same URL returns cached file instantly. 

138 """ 

139 if not url: 

140 return None 

141 

142 _ensure_cache_dir() 

143 

144 # Determine extension from URL 

145 ext = '' 

146 url_path = url.split('?')[0] 

147 if '.' in url_path.split('/')[-1]: 

148 ext = '.' + url_path.split('/')[-1].rsplit('.', 1)[-1] 

149 if not ext: 

150 ext = '.png' if asset_type == 'images' else '.wav' 

151 

152 cache_file = os.path.join(ASSET_CACHE_DIR, asset_type, 

153 _cache_key(url) + ext) 

154 

155 # Return cached file if exists 

156 if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0: 

157 return cache_file 

158 

159 try: 

160 from core.http_pool import pooled_get 

161 resp = pooled_get(url, timeout=ASSET_DOWNLOAD_TIMEOUT) 

162 if resp.status_code == 200: 

163 with open(cache_file, 'wb') as f: 

164 f.write(resp.content) 

165 return cache_file 

166 except Exception as e: 

167 logger.warning("Asset download failed (%s): %s", url, e) 

168 

169 return None 

170 

171 

172# ─── Queue Estimation ──────────────────────────────────── 

173 

174def estimate_audio_duration(text: str) -> float: 

175 """Estimate audio duration from text length (seconds). 

176 

177 Average speaking rate: ~150 words/minute = 2.5 words/second. 

178 """ 

179 words = len(text.split()) 

180 return max(2.0, words / 2.5) 

181 

182 

183def calculate_queue_eta(queue_depth: int, audio_duration: float) -> dict: 

184 """Calculate queue position and ETA for a video generation request. 

185 

186 Ported from MakeItTalk's queue estimation logic. 

187 

188 Returns: 

189 { 

190 'total_jobs_in_queue': int, 

191 'position': int, 

192 'estimated_seconds': int, 

193 'soft_time_limit': int, 

194 'hard_time_limit': int, 

195 } 

196 """ 

197 position = queue_depth + 1 

198 time_per_job = audio_duration * PROCESSING_TIME_FACTOR 

199 estimated = int(position * time_per_job + QUEUE_BUFFER_TIME) 

200 soft_limit = estimated 

201 hard_limit = soft_limit + QUEUE_BUFFER_TIME 

202 

203 return { 

204 'total_jobs_in_queue': queue_depth, 

205 'position': position, 

206 'estimated_seconds': estimated, 

207 'soft_time_limit': soft_limit, 

208 'hard_time_limit': hard_limit, 

209 } 

210 

211 

212# ─── Request Parsing ───────────────────────────────────── 

213 

214class VideoGenRequest: 

215 """Parsed video generation request. 

216 

217 Normalizes the various parameter formats from MakeItTalk / chatbot_pipeline 

218 into a clean internal representation. 

219 """ 

220 

221 def __init__(self, data: dict): 

222 self.uid = data.get('uid') or f"{uuid.uuid4().hex[:8]}_{data.get('user_id', 'anon')}" 

223 self.user_id = str(data.get('user_id', '')) 

224 self.publish_id = str(data.get('publish_id', self.user_id)) 

225 self.text = data.get('text', '') 

226 

227 # Voice configuration 

228 self.voice_name = data.get('voiceName', data.get('voice_name', '')) 

229 self.gender = data.get('gender', 'male') 

230 self.openvoice = _to_bool(data.get('openvoice', False)) 

231 self.chattts = _to_bool(data.get('chattts', False)) 

232 self.kokuro = _to_bool(data.get('kokuro', False)) 

233 

234 # Image/avatar configuration 

235 self.avatar_id = data.get('avatar_id', '') 

236 self.image_url = data.get('image_url', '') 

237 self.audio_sample_url = data.get('audio_sample_url', '') 

238 

239 # Processing flags 

240 self.flag_hallo = _to_bool(data.get('flag_hallo', False)) 

241 self.hd_video = _to_bool(data.get('hd_vid', data.get('hd_video', False))) 

242 self.vtoonify = _to_bool(data.get('vtoonify', False)) 

243 self.remove_bg = _to_bool(data.get('remove_bg', True)) 

244 self.crop = _to_bool(data.get('crop', True)) 

245 self.is_premium = _to_bool(data.get('is_premium', data.get('premium', False))) 

246 self.chunking = _to_bool(data.get('chunking', False)) 

247 

248 # Background options 

249 self.inpainting = _to_bool(data.get('inpainting', False)) 

250 self.inpainting_prompt = data.get('prompt', '') 

251 self.gradient = _to_bool(data.get('gradient', False)) 

252 self.solid_color = _to_bool(data.get('solid_color', False)) 

253 self.background_path = data.get('background_path', '') 

254 

255 # Request metadata 

256 self.request_id = data.get('request_id', '') 

257 

258 def validate(self) -> Optional[str]: 

259 """Validate request. Returns error string or None if valid.""" 

260 if not self.text and not self.audio_sample_url: 

261 return 'Either text or audio_sample_url is required' 

262 if not self.image_url and not self.avatar_id: 

263 return 'Either image_url or avatar_id is required' 

264 return None 

265 

266 

267def _to_bool(val) -> bool: 

268 """Convert various truthy representations to bool.""" 

269 if isinstance(val, bool): 

270 return val 

271 if isinstance(val, str): 

272 return val.lower() in ('true', '1', 'yes') 

273 return bool(val) 

274 

275 

276# ─── GPU Subtask Dispatch ──────────────────────────────── 

277 

278class SubtaskResult: 

279 """Result from a dispatched GPU subtask.""" 

280 

281 def __init__(self, success: bool = False, data: dict = None, 

282 error: str = '', peer: str = 'local'): 

283 self.success = success 

284 self.data = data or {} 

285 self.error = error 

286 self.peer = peer 

287 

288 

289def _dispatch_gpu_subtask( 

290 task_type: str, 

291 task_params: dict, 

292 user_id: str = '', 

293 timeout: int = 300, 

294) -> SubtaskResult: 

295 """Dispatch a GPU subtask to local GPU or hive mesh peer. 

296 

297 Follows the same routing chain as model_bus_service._route_video_gen(): 

298 1. Local GPU server (health-cached, skip dead in 0ms) 

299 2. Hive mesh peer with GPU 

300 3. Error — no GPU available 

301 

302 Args: 

303 task_type: 'audio_generation', 'crop_background', 'lip_sync', 'hd_upscale' 

304 task_params: Task-specific parameters dict 

305 user_id: For routing status publishing 

306 timeout: Max wait time (seconds) 

307 

308 Returns: 

309 SubtaskResult with success/data/error 

310 """ 

311 from core.port_registry import get_port 

312 

313 # Map task types to local GPU endpoints 

314 local_endpoints = { 

315 'audio_generation': { 

316 'url': f"http://localhost:{get_port('tts_gpu', 5003)}", 

317 'path': '/synthesize', 

318 }, 

319 'crop_background': { 

320 'url': f"http://localhost:{get_port('image_proc', 5004)}", 

321 'path': '/process', 

322 }, 

323 'lip_sync': { 

324 'url': f"http://localhost:{get_port('lip_sync', 5005)}", 

325 'path': '/generate', 

326 }, 

327 'hd_upscale': { 

328 'url': f"http://localhost:{get_port('video2x', 5006)}", 

329 'path': '/hd-video/', 

330 }, 

331 } 

332 

333 endpoint = local_endpoints.get(task_type, {}) 

334 

335 # 1. Try local GPU — health-cached 

336 if endpoint: 

337 try: 

338 from integrations.agent_engine.model_bus_service import get_model_bus 

339 bus = get_model_bus() 

340 local_url = endpoint['url'] 

341 backend_name = f'video_{task_type}' 

342 

343 if bus._is_backend_alive(backend_name, local_url): 

344 from core.http_pool import pooled_post 

345 resp = pooled_post( 

346 f"{local_url}{endpoint['path']}", 

347 json=task_params, 

348 timeout=timeout, 

349 ) 

350 if resp.status_code == 200: 

351 bus._mark_backend_alive(backend_name) 

352 return SubtaskResult( 

353 success=True, data=resp.json(), peer='local_gpu') 

354 bus._mark_backend_dead(backend_name) 

355 except Exception as e: 

356 logger.debug("Local GPU %s unavailable: %s", task_type, e) 

357 

358 # 2. Hive mesh peer 

359 try: 

360 from integrations.agent_engine.compute_config import get_compute_policy 

361 policy = get_compute_policy() 

362 if policy.get('compute_policy') != 'local_only': 

363 from integrations.agent_engine.compute_mesh_service import get_compute_mesh 

364 mesh = get_compute_mesh() 

365 result = mesh.offload_to_best_peer( 

366 model_type=task_type, 

367 prompt=json.dumps(task_params), 

368 options={'timeout': timeout, 'user_id': user_id}, 

369 ) 

370 if result and 'error' not in result: 

371 return SubtaskResult( 

372 success=True, 

373 data=result, 

374 peer=result.get('offloaded_to', 'hive_peer'), 

375 ) 

376 logger.info("Hive offload %s: %s", task_type, 

377 result.get('error', 'no result')) 

378 except Exception as e: 

379 logger.debug("Hive mesh %s unavailable: %s", task_type, e) 

380 

381 return SubtaskResult(success=False, error=f'No GPU backend for {task_type}') 

382 

383 

384def _dispatch_parallel(tasks: List[Tuple[str, dict, str, int]]) -> List[SubtaskResult]: 

385 """Dispatch multiple GPU subtasks in parallel. 

386 

387 Args: 

388 tasks: List of (task_type, params, user_id, timeout) tuples 

389 

390 Returns: 

391 List of SubtaskResult in same order as input 

392 """ 

393 results = [None] * len(tasks) 

394 

395 def _run(idx, task_type, params, user_id, timeout): 

396 results[idx] = _dispatch_gpu_subtask(task_type, params, user_id, timeout) 

397 

398 threads = [] 

399 for i, (ttype, params, uid, tout) in enumerate(tasks): 

400 t = threading.Thread(target=_run, args=(i, ttype, params, uid, tout), 

401 daemon=True) 

402 t.start() 

403 threads.append(t) 

404 

405 for t in threads: 

406 t.join(timeout=600) # Hard cap: 10 minutes 

407 

408 # Replace None with error for timed-out tasks 

409 return [r or SubtaskResult(success=False, error='Task timed out') 

410 for r in results] 

411 

412 

413# ─── Crossbar Publishing ───────────────────────────────── 

414 

415def _publish_chunk_result(publish_id: str, chunk_data: dict): 

416 """Publish a completed chunk to pupit.{publish_id} for streaming playback. 

417 

418 The client (WebWorker/React Native) picks up each chunk and plays 

419 the video segment immediately — streaming, not waiting for full video. 

420 """ 

421 if not publish_id: 

422 return 

423 try: 

424 from core.safe_hartos_attr import safe_hartos_attr 

425 publish_async = safe_hartos_attr('publish_async') 

426 if publish_async is None: 

427 logger.debug( 

428 "Video chunk publish skipped: HARTOS publish_async " 

429 "unresolvable — publish_id=%s", publish_id, 

430 ) 

431 return 

432 topic = f'com.hertzai.pupit.{publish_id}' 

433 publish_async(topic, json.dumps(chunk_data)) 

434 logger.debug( 

435 "Video chunk published: publish_id=%s topic=%s", 

436 publish_id, topic, 

437 ) 

438 except Exception as e: 

439 logger.debug( 

440 "Video chunk publish failed: publish_id=%s err=%s", 

441 publish_id, e, 

442 ) 

443 

444 

445def _publish_status(user_id: str, message: str, request_id: str = ''): 

446 """Publish routing status to user's chat (thinking bubble).""" 

447 try: 

448 from integrations.agent_engine.model_bus_service import _publish_routing_status 

449 _publish_routing_status(user_id, message, request_id) 

450 except Exception: 

451 pass 

452 

453 

454# ─── Main Orchestrator ─────────────────────────────────── 

455 

456class VideoOrchestrator: 

457 """Orchestrates video generation pipeline. 

458 

459 Replaces MakeItTalk's views_c.py orchestration with HARTOS-native 

460 dispatch to local GPU or hive mesh peers. 

461 

462 Usage: 

463 orch = get_video_orchestrator() 

464 result = orch.generate(request_data) 

465 """ 

466 

467 def __init__(self): 

468 self._active_jobs: Dict[str, dict] = {} 

469 self._lock = threading.Lock() 

470 self._job_counter = 0 

471 

472 @property 

473 def queue_depth(self) -> int: 

474 """Current number of active jobs.""" 

475 with self._lock: 

476 return len(self._active_jobs) 

477 

478 def generate(self, data: dict) -> dict: 

479 """Main entry point — orchestrate a video generation request. 

480 

481 Args: 

482 data: Raw request dict from API endpoint 

483 

484 Returns: 

485 On success (HTTP 202 pattern): 

486 { 

487 'status': 'accepted', 

488 'uid': '<request-id>', 

489 'total_jobs_in_queue': int, 

490 'position': int, 

491 'estimated_seconds': int, 

492 } 

493 On error: 

494 {'error': '<message>'} 

495 """ 

496 req = VideoGenRequest(data) 

497 

498 # Validate 

499 err = req.validate() 

500 if err: 

501 return {'error': err} 

502 

503 # Estimate duration + queue position 

504 audio_duration = estimate_audio_duration(req.text) 

505 eta = calculate_queue_eta(self.queue_depth, audio_duration) 

506 

507 # Enforce Hallo duration constraint 

508 if req.flag_hallo and audio_duration > HALLO_MAX_DURATION: 

509 req.flag_hallo = False 

510 logger.info("Hallo disabled — estimated %ds > %ds max", 

511 audio_duration, HALLO_MAX_DURATION) 

512 

513 # Register job 

514 job_id = req.uid 

515 with self._lock: 

516 self._active_jobs[job_id] = { 

517 'uid': job_id, 

518 'user_id': req.user_id, 

519 'started': time.time(), 

520 'status': 'queued', 

521 } 

522 

523 # Dispatch asynchronously — don't block the API response 

524 thread = threading.Thread( 

525 target=self._execute_pipeline, 

526 args=(req, eta), 

527 daemon=True, 

528 name=f'video-gen-{job_id}', 

529 ) 

530 thread.start() 

531 

532 return { 

533 'status': 'accepted', 

534 'uid': job_id, 

535 'total_jobs_in_queue': eta['total_jobs_in_queue'], 

536 'position': eta['position'], 

537 'estimated_seconds': eta['estimated_seconds'], 

538 } 

539 

540 def _execute_pipeline(self, req: VideoGenRequest, eta: dict): 

541 """Execute the full video generation pipeline in background. 

542 

543 Pipeline stages: 

544 1. Download/cache assets (image, audio sample) 

545 2. Split text into chunks (if chunking enabled) 

546 3. For each chunk (or full text): 

547 a. Dispatch TTS audio generation (GPU) 

548 b. Dispatch face crop + bg removal (GPU, parallel with TTS) 

549 c. Dispatch lip-sync video generation (GPU, sequential after a+b) 

550 d. Optionally dispatch HD upscale (GPU) 

551 e. Publish chunk result to Crossbar pupit.{publish_id} 

552 4. Cleanup job tracking 

553 """ 

554 job_id = req.uid 

555 try: 

556 self._update_job(job_id, 'processing') 

557 _publish_status(req.user_id, 

558 'Starting video generation...', req.request_id) 

559 

560 # 1. Download assets 

561 image_path = None 

562 audio_sample_path = None 

563 

564 if req.image_url: 

565 _publish_status(req.user_id, 

566 'Downloading image...', req.request_id) 

567 image_path = download_asset(req.image_url, 'images') 

568 if not image_path: 

569 self._fail_job(job_id, req, 'Failed to download image') 

570 return 

571 

572 if req.audio_sample_url: 

573 audio_sample_path = download_asset( 

574 req.audio_sample_url, 'audio') 

575 

576 # 2. Chunk text for streaming 

577 if req.chunking and req.text: 

578 chunks = chunk_text(req.text) 

579 else: 

580 chunks = [req.text] if req.text else [] 

581 

582 if not chunks: 

583 self._fail_job(job_id, req, 'No text to process') 

584 return 

585 

586 _publish_status( 

587 req.user_id, 

588 f'Processing {len(chunks)} segment{"s" if len(chunks) > 1 else ""}...', 

589 req.request_id, 

590 ) 

591 

592 # 3. Process each chunk 

593 for idx, chunk_text_str in enumerate(chunks, 1): 

594 chunk_uid = f"{job_id}_{idx}" 

595 

596 success = self._process_chunk( 

597 req=req, 

598 chunk_uid=chunk_uid, 

599 chunk_text=chunk_text_str, 

600 chunk_idx=idx, 

601 total_chunks=len(chunks), 

602 image_path=image_path, 

603 audio_sample_path=audio_sample_path, 

604 eta=eta, 

605 ) 

606 

607 if not success: 

608 # Continue with remaining chunks — partial delivery 

609 # is better than no delivery 

610 logger.warning("Chunk %d/%d failed for %s", 

611 idx, len(chunks), job_id) 

612 

613 self._update_job(job_id, 'completed') 

614 _publish_status(req.user_id, 

615 'Video generation complete.', req.request_id) 

616 

617 except Exception as e: 

618 logger.error("Video pipeline failed for %s: %s", job_id, e, 

619 exc_info=True) 

620 self._fail_job(job_id, req, str(e)) 

621 finally: 

622 # Cleanup job tracking 

623 with self._lock: 

624 self._active_jobs.pop(job_id, None) 

625 

626 def _process_chunk( 

627 self, 

628 req: VideoGenRequest, 

629 chunk_uid: str, 

630 chunk_text: str, 

631 chunk_idx: int, 

632 total_chunks: int, 

633 image_path: Optional[str], 

634 audio_sample_path: Optional[str], 

635 eta: dict, 

636 ) -> bool: 

637 """Process a single text chunk through the full GPU pipeline. 

638 

639 Parallel dispatch: TTS + face crop run simultaneously. 

640 Sequential: lip-sync waits for both to complete. 

641 

642 Returns True on success, False on failure. 

643 """ 

644 _publish_status( 

645 req.user_id, 

646 f'Processing segment {chunk_idx}/{total_chunks}...', 

647 req.request_id, 

648 ) 

649 

650 # ── Stage 1: Parallel — TTS + face crop ── 

651 

652 tts_params = { 

653 'uid': chunk_uid, 

654 'text': chunk_text, 

655 'gender': req.gender, 

656 'voice_name': req.voice_name, 

657 'openvoice': req.openvoice, 

658 'chattts': req.chattts, 

659 'kokuro': req.kokuro, 

660 } 

661 if audio_sample_path: 

662 tts_params['audio_sample_path'] = audio_sample_path 

663 

664 crop_params = { 

665 'uid': chunk_uid, 

666 'image_path': image_path, 

667 'crop': req.crop, 

668 'remove_bg': req.remove_bg, 

669 'vtoonify': req.vtoonify, 

670 'flag_hallo': req.flag_hallo, 

671 'premium': req.is_premium, 

672 'inpainting': req.inpainting, 

673 'prompt': req.inpainting_prompt, 

674 'gradient': req.gradient, 

675 'solid_color': req.solid_color, 

676 'background_path': req.background_path, 

677 } 

678 

679 parallel_tasks = [ 

680 ('audio_generation', tts_params, req.user_id, 

681 eta.get('soft_time_limit', 300)), 

682 ('crop_background', crop_params, req.user_id, 120), 

683 ] 

684 

685 results = _dispatch_parallel(parallel_tasks) 

686 audio_result, crop_result = results[0], results[1] 

687 

688 if not audio_result.success: 

689 # Try offline TTS fallback (CPU) via model_bus_service 

690 _publish_status(req.user_id, 

691 'GPU TTS unavailable, trying offline voice...', 

692 req.request_id) 

693 audio_result = self._tts_cpu_fallback(chunk_text, chunk_uid, req) 

694 

695 if not audio_result.success: 

696 logger.error("Audio generation failed for chunk %s: %s", 

697 chunk_uid, audio_result.error) 

698 _publish_chunk_result(req.publish_id, { 

699 'chunk_idx': chunk_idx, 

700 'total_chunks': total_chunks, 

701 'status': 'error', 

702 'error': f'Audio generation failed: {audio_result.error}', 

703 }) 

704 return False 

705 

706 # Image crop failure is non-fatal — use original image 

707 effective_image = (crop_result.data if crop_result.success 

708 else {'image_path': image_path}) 

709 

710 # ── Stage 2: Sequential — lip-sync generation ── 

711 

712 _publish_status( 

713 req.user_id, 

714 f'Generating lip-sync video ({chunk_idx}/{total_chunks})...', 

715 req.request_id, 

716 ) 

717 

718 lip_sync_params = { 

719 'uid': chunk_uid, 

720 'audio_result': audio_result.data, 

721 'image_result': effective_image, 

722 'text': chunk_text, 

723 'flag_hallo': req.flag_hallo, 

724 'hd_video': req.hd_video, 

725 } 

726 

727 lip_result = _dispatch_gpu_subtask( 

728 'lip_sync', lip_sync_params, req.user_id, 

729 eta.get('soft_time_limit', 300), 

730 ) 

731 

732 if not lip_result.success: 

733 logger.error("Lip sync failed for chunk %s: %s", 

734 chunk_uid, lip_result.error) 

735 _publish_chunk_result(req.publish_id, { 

736 'chunk_idx': chunk_idx, 

737 'total_chunks': total_chunks, 

738 'status': 'error', 

739 'error': f'Video generation failed: {lip_result.error}', 

740 }) 

741 return False 

742 

743 # ── Stage 3: Optional HD upscale ── 

744 

745 video_data = lip_result.data 

746 if req.hd_video: 

747 _publish_status(req.user_id, 

748 'Upscaling to HD...', req.request_id) 

749 hd_result = _dispatch_gpu_subtask( 

750 'hd_upscale', 

751 {'uid': chunk_uid, 'video_result': video_data}, 

752 req.user_id, 600, # HD can take a while 

753 ) 

754 if hd_result.success: 

755 video_data = hd_result.data 

756 else: 

757 logger.info("HD upscale skipped for %s: %s", 

758 chunk_uid, hd_result.error) 

759 # Non-fatal — deliver SD video 

760 

761 # ── Stage 4: Publish chunk to client ── 

762 

763 chunk_result = { 

764 'chunk_idx': chunk_idx, 

765 'total_chunks': total_chunks, 

766 'status': 'completed', 

767 'uid': chunk_uid, 

768 'video_url': video_data.get('video_url', ''), 

769 'audio_url': audio_result.data.get('audio_url', 

770 audio_result.data.get('gen_audio_url', '')), 

771 'peer': lip_result.peer, 

772 } 

773 _publish_chunk_result(req.publish_id, chunk_result) 

774 

775 return True 

776 

777 def _tts_cpu_fallback(self, text: str, uid: str, 

778 req: VideoGenRequest) -> SubtaskResult: 

779 """CPU TTS fallback when no GPU is available for audio generation. 

780 

781 Uses HARTOS's offline TTS (Pocket TTS / LuxTTS) instead of GPU neural TTS. 

782 """ 

783 try: 

784 from integrations.agent_engine.model_bus_service import get_model_bus 

785 bus = get_model_bus() 

786 result = bus.infer( 

787 prompt=text, 

788 model_type='tts', 

789 options={ 

790 'user_id': req.user_id, 

791 'voice': req.voice_name or 'alba', 

792 'request_id': req.request_id, 

793 }, 

794 ) 

795 if result and result.get('response'): 

796 audio_path = result['response'] 

797 return SubtaskResult( 

798 success=True, 

799 data={ 

800 'audio_path': audio_path, 

801 'gen_audio_url': audio_path, 

802 }, 

803 peer='local_cpu', 

804 ) 

805 except Exception as e: 

806 logger.debug("CPU TTS fallback failed: %s", e) 

807 

808 return SubtaskResult(success=False, error='All TTS backends failed') 

809 

810 def _update_job(self, job_id: str, status: str): 

811 """Update job tracking status.""" 

812 with self._lock: 

813 if job_id in self._active_jobs: 

814 self._active_jobs[job_id]['status'] = status 

815 

816 def _fail_job(self, job_id: str, req: VideoGenRequest, error: str): 

817 """Mark job as failed and notify user.""" 

818 self._update_job(job_id, 'failed') 

819 _publish_status( 

820 req.user_id, 

821 f"Video generation failed: {error}. " 

822 "No GPU is available locally or on the hive network. " 

823 "You can try again shortly, or connect a GPU device with: " 

824 "hart compute pair <device-address>", 

825 req.request_id, 

826 ) 

827 _publish_chunk_result(req.publish_id, { 

828 'status': 'error', 

829 'error': error, 

830 'uid': job_id, 

831 }) 

832 

833 def get_job_status(self, job_id: str) -> Optional[dict]: 

834 """Get status of an active job.""" 

835 with self._lock: 

836 return self._active_jobs.get(job_id) 

837 

838 def get_stats(self) -> dict: 

839 """Get orchestrator statistics.""" 

840 with self._lock: 

841 return { 

842 'active_jobs': len(self._active_jobs), 

843 'jobs': {k: v.get('status', 'unknown') 

844 for k, v in self._active_jobs.items()}, 

845 } 

846 

847 

848# ─── Singleton ──────────────────────────────────────────── 

849 

850_orchestrator: Optional[VideoOrchestrator] = None 

851_orch_lock = threading.Lock() 

852 

853 

854def get_video_orchestrator() -> VideoOrchestrator: 

855 """Get or create the singleton VideoOrchestrator.""" 

856 global _orchestrator 

857 if _orchestrator is None: 

858 with _orch_lock: 

859 if _orchestrator is None: 

860 _orchestrator = VideoOrchestrator() 

861 return _orchestrator 

862 

863 

864def reset_video_orchestrator(): 

865 """Reset singleton (testing only).""" 

866 global _orchestrator 

867 _orchestrator = None