Coverage for integrations / remote_desktop / orchestrator.py: 64.3%

297 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Remote Desktop Orchestrator — Unified AI-native coordinator for HARTOS Remote Desktop. 

3 

4This is THE single entry point for all remote desktop operations. It coordinates: 

5 - Engine selection (RustDesk vs Sunshine/Moonlight vs Native) 

6 - Service lifecycle (start, stop, health via ServiceManager) 

7 - Session management (create, authenticate, disconnect via SessionManager) 

8 - Cross-app clipboard bridge (works regardless of which engine) 

9 - File transfer (engine-specific routing) 

10 - AI-native features (context-aware engine switching, smart connect) 

11 

12Key insight: HARTOS doesn't replace RustDesk/Sunshine — it orchestrates them 

13as engines the way the coding agent orchestrates Aider/KiloCode/ClaudeCode. 

14The user never thinks about which engine is running. 

15""" 

16 

17import logging 

18import threading 

19import time 

20from typing import Any, Callable, Dict, List, Optional, Tuple 

21 

22logger = logging.getLogger('hevolve.remote_desktop') 

23 

24 

25class RemoteDesktopOrchestrator: 

26 """AI-native remote desktop orchestrator. 

27 

28 Composes ServiceManager, SessionManager, EngineSelector, ClipboardSync, 

29 and all engine bridges into a unified interface. 

30 """ 

31 

32 def __init__(self): 

33 self._active_sessions: Dict[str, dict] = {} # session_id → session_info 

34 self._clipboard_syncs: Dict[str, Any] = {} # session_id → ClipboardSync 

35 self._lock = threading.Lock() 

36 self._started = False 

37 

38 # ── Lifecycle ───────────────────────────────────────────── 

39 

40 def startup(self) -> dict: 

41 """Initialize the orchestrator at HARTOS boot. 

42 

43 Starts ServiceManager, detects engines, registers with NodeWatchdog. 

44 """ 

45 if self._started: 

46 return self.get_status() 

47 

48 try: 

49 from integrations.remote_desktop.service_manager import get_service_manager 

50 sm = get_service_manager() 

51 engine_status = sm.start_all_available() 

52 sm.register_with_watchdog() 

53 self._started = True 

54 logger.info("Remote Desktop Orchestrator started") 

55 return { 

56 'status': 'started', 

57 'engines': engine_status, 

58 } 

59 except Exception as e: 

60 logger.error(f"Orchestrator startup failed: {e}") 

61 return {'status': 'error', 'error': str(e)} 

62 

63 def shutdown(self) -> None: 

64 """Clean shutdown — disconnect all sessions, stop clipboard, stop engines.""" 

65 # Disconnect all sessions 

66 session_ids = list(self._active_sessions.keys()) 

67 for sid in session_ids: 

68 try: 

69 self.disconnect(sid) 

70 except Exception: 

71 pass 

72 

73 # Stop all engines 

74 try: 

75 from integrations.remote_desktop.service_manager import get_service_manager 

76 get_service_manager().stop_all() 

77 except Exception: 

78 pass 

79 

80 self._started = False 

81 logger.info("Remote Desktop Orchestrator shut down") 

82 

83 # ── Host Operations ────────────────────────────────────── 

84 

85 def start_hosting(self, engine: str = 'auto', allow_control: bool = True, 

86 use_case: str = 'general', user_id: Optional[str] = None) -> dict: 

87 """Start hosting this device for remote desktop access. 

88 

89 Args: 

90 engine: Engine to use ('auto', 'rustdesk', 'sunshine', 'native') 

91 allow_control: Allow remote input (False = view-only) 

92 use_case: Context hint for engine selection 

93 user_id: Host user ID for session tracking 

94 

95 Returns: 

96 {device_id, password, engine, session_id, status} 

97 """ 

98 # 1. Get device ID 

99 try: 

100 from integrations.remote_desktop.device_id import get_device_id, format_device_id 

101 device_id = get_device_id() 

102 formatted_id = format_device_id(device_id) 

103 except Exception as e: 

104 return {'status': 'error', 'error': f'Device ID unavailable: {e}'} 

105 

106 # 2. Select engine 

107 selected_engine = self._resolve_engine(engine, use_case, role='host') 

108 

109 # 3. Ensure engine is running 

110 from integrations.remote_desktop.service_manager import get_service_manager 

111 svc = get_service_manager() 

112 ready, msg = svc.ensure_engine(selected_engine) 

113 if not ready: 

114 # Fallback: try native transport if selected engine failed 

115 if selected_engine != 'native': 

116 logger.warning("Engine '%s' failed (%s), falling back to native transport", 

117 selected_engine, msg) 

118 selected_engine = 'native' 

119 ready, msg = svc.ensure_engine('native') 

120 if not ready: 

121 return {'status': 'error', 'error': f'All engines failed. Last: {msg}', 

122 'engine': selected_engine} 

123 else: 

124 return {'status': 'error', 'error': msg, 'engine': selected_engine} 

125 

126 # 4. Generate password (OTP) 

127 from integrations.remote_desktop.session_manager import ( 

128 get_session_manager, SessionMode, 

129 ) 

130 sm = get_session_manager() 

131 password = sm.generate_otp(device_id) 

132 

133 # 5. Engine-specific setup 

134 engine_info = {} 

135 if selected_engine == 'rustdesk': 

136 engine_info = self._setup_rustdesk_host(password) 

137 elif selected_engine == 'sunshine': 

138 engine_info = self._setup_sunshine_host() 

139 

140 # 6. Create session 

141 mode = SessionMode.FULL_CONTROL if allow_control else SessionMode.VIEW_ONLY 

142 session = sm.create_session( 

143 host_device_id=device_id, 

144 viewer_device_id='pending', 

145 mode=mode, 

146 host_user_id=user_id, 

147 ) 

148 

149 # 7. Track session 

150 session_info = { 

151 'session_id': session.session_id, 

152 'device_id': device_id, 

153 'formatted_id': formatted_id, 

154 'password': password, 

155 'engine': selected_engine, 

156 'mode': mode.value, 

157 'user_id': user_id, 

158 'started_at': time.time(), 

159 **engine_info, 

160 } 

161 with self._lock: 

162 self._active_sessions[session.session_id] = session_info 

163 

164 # 8. Audit 

165 self._audit('host_started', session.session_id, user_id or device_id, 

166 f'Engine: {selected_engine}, Mode: {mode.value}') 

167 

168 logger.info(f"Hosting started: {formatted_id} via {selected_engine}") 

169 return { 

170 'status': 'hosting', 

171 'device_id': device_id, 

172 'formatted_id': formatted_id, 

173 'password': password, 

174 'engine': selected_engine, 

175 'session_id': session.session_id, 

176 'mode': mode.value, 

177 } 

178 

179 def stop_hosting(self, session_id: Optional[str] = None) -> bool: 

180 """Stop hosting a session or all sessions.""" 

181 if session_id: 

182 return self.disconnect(session_id) 

183 

184 # Stop all hosting sessions 

185 with self._lock: 

186 host_sessions = [ 

187 sid for sid, info in self._active_sessions.items() 

188 if info.get('device_id') # Has device_id → is a host session 

189 ] 

190 for sid in host_sessions: 

191 self.disconnect(sid) 

192 return True 

193 

194 # ── Viewer Operations ──────────────────────────────────── 

195 

196 def connect(self, device_id: str, password: str, 

197 mode: str = 'full_control', engine: str = 'auto', 

198 use_case: str = 'general', gui: bool = True, 

199 user_id: Optional[str] = None) -> dict: 

200 """Connect to a remote device. 

201 

202 Args: 

203 device_id: Remote device's ID 

204 password: Access password / OTP 

205 mode: 'full_control', 'view_only', or 'file_transfer' 

206 engine: Engine preference 

207 use_case: Context hint for engine selection 

208 gui: Open GUI window (False for headless/agent use) 

209 user_id: Viewer user ID for session tracking 

210 

211 Returns: 

212 {session_id, engine, status} 

213 """ 

214 # 1. Select engine 

215 if mode == 'file_transfer': 

216 use_case = 'file_transfer' 

217 selected_engine = self._resolve_engine(engine, use_case, role='viewer') 

218 

219 # 2. Ensure engine is ready 

220 from integrations.remote_desktop.service_manager import get_service_manager 

221 svc = get_service_manager() 

222 ready, msg = svc.ensure_engine(selected_engine) 

223 if not ready: 

224 # Fallback: try native transport if selected engine failed 

225 if selected_engine != 'native': 

226 logger.warning("Viewer engine '%s' failed (%s), falling back to native", 

227 selected_engine, msg) 

228 selected_engine = 'native' 

229 ready, msg = svc.ensure_engine('native') 

230 if not ready: 

231 return {'status': 'error', 

232 'error': f'All engines failed. Last: {msg}', 

233 'engine': selected_engine} 

234 else: 

235 return {'status': 'error', 'error': msg, 'engine': selected_engine} 

236 

237 # 3. Authenticate 

238 auth_ok, auth_msg = self._authenticate(device_id, password, user_id) 

239 if not auth_ok: 

240 return {'status': 'auth_failed', 'error': auth_msg} 

241 

242 # 4. Engine-specific connection 

243 connect_result = {} 

244 if selected_engine == 'rustdesk': 

245 connect_result = self._connect_rustdesk(device_id, password, 

246 mode == 'file_transfer') 

247 elif selected_engine == 'moonlight': 

248 connect_result = self._connect_moonlight(device_id, gui) 

249 elif selected_engine == 'native': 

250 connect_result = self._connect_native(device_id, password) 

251 

252 if connect_result.get('status') == 'error': 

253 return connect_result 

254 

255 # 5. Create session 

256 from integrations.remote_desktop.session_manager import ( 

257 get_session_manager, SessionMode, 

258 ) 

259 sm = get_session_manager() 

260 mode_enum = { 

261 'full_control': SessionMode.FULL_CONTROL, 

262 'view_only': SessionMode.VIEW_ONLY, 

263 'file_transfer': SessionMode.FILE_TRANSFER, 

264 }.get(mode, SessionMode.FULL_CONTROL) 

265 

266 from integrations.remote_desktop.device_id import get_device_id 

267 local_device_id = get_device_id() 

268 

269 session = sm.create_session( 

270 host_device_id=device_id, 

271 viewer_device_id=local_device_id, 

272 mode=mode_enum, 

273 viewer_user_id=user_id, 

274 ) 

275 

276 # 6. Start clipboard bridge 

277 self._start_clipboard_bridge(session.session_id, selected_engine) 

278 

279 # 7. Track session 

280 session_info = { 

281 'session_id': session.session_id, 

282 'remote_device_id': device_id, 

283 'engine': selected_engine, 

284 'mode': mode, 

285 'user_id': user_id, 

286 'gui': gui, 

287 'connected_at': time.time(), 

288 **connect_result, 

289 } 

290 with self._lock: 

291 self._active_sessions[session.session_id] = session_info 

292 

293 # 8. Audit 

294 self._audit('viewer_connected', session.session_id, 

295 user_id or local_device_id, 

296 f'Remote: {device_id}, Engine: {selected_engine}') 

297 

298 logger.info(f"Connected to {device_id} via {selected_engine}") 

299 return { 

300 'status': 'connected', 

301 'session_id': session.session_id, 

302 'engine': selected_engine, 

303 'mode': mode, 

304 } 

305 

306 def disconnect(self, session_id: Optional[str] = None) -> bool: 

307 """Disconnect a session or all sessions. 

308 

309 Args: 

310 session_id: Specific session to disconnect. None = disconnect all. 

311 """ 

312 if session_id is None: 

313 sessions = list(self._active_sessions.keys()) 

314 for sid in sessions: 

315 self._disconnect_one(sid) 

316 return True 

317 

318 return self._disconnect_one(session_id) 

319 

320 def _disconnect_one(self, session_id: str) -> bool: 

321 """Disconnect a single session.""" 

322 with self._lock: 

323 info = self._active_sessions.pop(session_id, None) 

324 

325 if not info: 

326 return False 

327 

328 # Stop clipboard sync 

329 clipboard = self._clipboard_syncs.pop(session_id, None) 

330 if clipboard: 

331 clipboard.stop_monitoring() 

332 

333 # Disconnect session manager 

334 try: 

335 from integrations.remote_desktop.session_manager import get_session_manager 

336 get_session_manager().disconnect_session(session_id) 

337 except Exception: 

338 pass 

339 

340 # Audit 

341 self._audit('session_disconnected', session_id, 

342 info.get('user_id', 'unknown'), 

343 f'Engine: {info.get("engine", "unknown")}') 

344 

345 logger.info(f"Session {session_id[:8]} disconnected") 

346 return True 

347 

348 # ── AI-Native Operations ───────────────────────────────── 

349 

350 def smart_connect(self, device_id: str, password: str, 

351 context: Optional[dict] = None, 

352 user_id: Optional[str] = None) -> dict: 

353 """AI-driven connection — auto-select engine and mode from context. 

354 

355 Context examples: 

356 {'intent': 'file_transfer'} → RustDesk file transfer mode 

357 {'intent': 'gaming'} → Moonlight 

358 {'intent': 'support'} → RustDesk full control 

359 {'intent': 'observe'} → View-only mode 

360 {'app': 'game'} → Moonlight for low-latency 

361 """ 

362 if context is None: 

363 context = {} 

364 

365 intent = context.get('intent', 'general') 

366 app = context.get('app', '') 

367 

368 # Infer use case 

369 use_case = 'general' 

370 mode = 'full_control' 

371 

372 if intent == 'file_transfer' or 'transfer' in intent: 

373 use_case = 'file_transfer' 

374 mode = 'file_transfer' 

375 elif intent in ('gaming', 'game') or 'game' in app.lower(): 

376 use_case = 'gaming' 

377 elif intent == 'support': 

378 use_case = 'remote_support' 

379 elif intent in ('observe', 'view', 'watch'): 

380 mode = 'view_only' 

381 elif intent in ('app_streaming', 'tab_detach', 'window'): 

382 use_case = 'general' 

383 mode = 'full_control' 

384 elif intent == 'vlm': 

385 use_case = 'vlm_computer_use' 

386 

387 gui = context.get('gui', True) 

388 

389 return self.connect( 

390 device_id=device_id, 

391 password=password, 

392 mode=mode, 

393 engine='auto', 

394 use_case=use_case, 

395 gui=gui, 

396 user_id=user_id, 

397 ) 

398 

399 def switch_engine(self, session_id: str, new_engine: str) -> dict: 

400 """Switch engine mid-session (e.g., RustDesk → Moonlight for gaming). 

401 

402 Preserves clipboard bridge, transfers session state. 

403 """ 

404 with self._lock: 

405 info = self._active_sessions.get(session_id) 

406 if not info: 

407 return {'status': 'error', 'error': 'Session not found'} 

408 

409 old_engine = info.get('engine', 'unknown') 

410 if old_engine == new_engine: 

411 return {'status': 'no_change', 'engine': old_engine} 

412 

413 remote_device_id = info.get('remote_device_id') 

414 if not remote_device_id: 

415 return {'status': 'error', 'error': 'Not a viewer session'} 

416 

417 # Disconnect old engine connection (keep session alive) 

418 self._disconnect_engine(old_engine, remote_device_id) 

419 

420 # Ensure new engine 

421 from integrations.remote_desktop.service_manager import get_service_manager 

422 ready, msg = get_service_manager().ensure_engine(new_engine) 

423 if not ready: 

424 # Re-connect old engine 

425 return {'status': 'error', 'error': f'Cannot switch to {new_engine}: {msg}'} 

426 

427 # Connect new engine 

428 password = info.get('password', '') 

429 mode = info.get('mode', 'full_control') 

430 connect_result = {} 

431 if new_engine == 'rustdesk': 

432 connect_result = self._connect_rustdesk(remote_device_id, password, 

433 mode == 'file_transfer') 

434 elif new_engine == 'moonlight': 

435 connect_result = self._connect_moonlight(remote_device_id, info.get('gui', True)) 

436 elif new_engine == 'native': 

437 connect_result = self._connect_native(remote_device_id, password) 

438 

439 # Update session info 

440 with self._lock: 

441 if session_id in self._active_sessions: 

442 self._active_sessions[session_id]['engine'] = new_engine 

443 self._active_sessions[session_id]['switched_at'] = time.time() 

444 

445 self._audit('engine_switched', session_id, info.get('user_id', 'unknown'), 

446 f'{old_engine} → {new_engine}') 

447 

448 logger.info(f"Session {session_id[:8]} switched {old_engine} → {new_engine}") 

449 return { 

450 'status': 'switched', 

451 'old_engine': old_engine, 

452 'new_engine': new_engine, 

453 'session_id': session_id, 

454 } 

455 

456 def recommend_engine_switch(self, session_id: str) -> Optional[dict]: 

457 """AI-native: Check if a better engine is available for the current session. 

458 

459 Returns recommendation dict or None if no switch recommended. 

460 """ 

461 with self._lock: 

462 info = self._active_sessions.get(session_id) 

463 if not info: 

464 return None 

465 

466 current = info.get('engine', 'native') 

467 mode = info.get('mode', 'full_control') 

468 

469 try: 

470 from integrations.remote_desktop.engine_selector import ( 

471 get_available_engines, Engine, UseCase, 

472 ) 

473 available = get_available_engines() 

474 

475 # File transfer on non-RustDesk → suggest RustDesk 

476 if mode == 'file_transfer' and current != 'rustdesk': 

477 if Engine.RUSTDESK in available: 

478 return { 

479 'recommend': 'rustdesk', 

480 'reason': 'RustDesk has native file transfer support', 

481 'current': current, 

482 } 

483 

484 # Gaming/VLM on RustDesk → suggest Moonlight 

485 if current == 'rustdesk' and mode != 'file_transfer': 

486 if Engine.MOONLIGHT in available: 

487 return { 

488 'recommend': 'moonlight', 

489 'reason': 'Moonlight offers lower latency for interactive use', 

490 'current': current, 

491 } 

492 except Exception: 

493 pass 

494 

495 return None 

496 

497 # ── Cross-App Clipboard Bridge ─────────────────────────── 

498 

499 def _start_clipboard_bridge(self, session_id: str, engine: str) -> bool: 

500 """Start clipboard bridge for a session. 

501 

502 Works across engine boundaries — HARTOS-level clipboard monitoring. 

503 """ 

504 try: 

505 from integrations.remote_desktop.clipboard_sync import ClipboardSync 

506 

507 def on_clipboard_change(content): 

508 self._handle_clipboard_outbound(session_id, engine, content) 

509 

510 sync = ClipboardSync(on_change=on_clipboard_change, dlp_enabled=True) 

511 started = sync.start_monitoring() 

512 if started: 

513 self._clipboard_syncs[session_id] = sync 

514 logger.debug(f"Clipboard bridge started for session {session_id[:8]}") 

515 return started 

516 except Exception as e: 

517 logger.debug(f"Clipboard bridge failed: {e}") 

518 return False 

519 

520 def _handle_clipboard_outbound(self, session_id: str, engine: str, 

521 content: str) -> Optional[dict]: 

522 """Push local clipboard change to remote via active engine.""" 

523 # DLP scan clipboard content for PII before sync 

524 try: 

525 from security.dlp_engine import get_dlp_engine 

526 dlp = get_dlp_engine() 

527 allowed, reason = dlp.check_outbound(content) 

528 if not allowed: 

529 logger.warning("Clipboard content blocked by DLP: %s", reason or 'PII detected') 

530 return {'synced': False, 'reason': 'DLP blocked'} 

531 except (ImportError, Exception): 

532 pass 

533 

534 # Each engine handles clipboard differently: 

535 # - RustDesk: clipboard flows through its own protocol 

536 # - Sunshine/Moonlight: no cross-clipboard API — use native fallback 

537 # - Native: send via transport channel 

538 

539 if engine == 'native': 

540 # Send via transport as clipboard event 

541 logger.debug(f"Clipboard → remote via native transport ({len(content)} chars)") 

542 # For RustDesk/Sunshine, clipboard sync is handled by the engine itself. 

543 # Our clipboard bridge catches what leaks through to ensure nothing is missed. 

544 return None 

545 

546 # ── Window Streaming (Tab Detach) ───────────────────────── 

547 

548 def list_remote_windows(self, session_id: Optional[str] = None) -> List[dict]: 

549 """List available windows on the local host for per-window streaming. 

550 

551 If a session_id is provided, sends a list_windows request to the remote 

552 host via the session's transport. Otherwise, lists local windows. 

553 """ 

554 try: 

555 from integrations.remote_desktop.window_session import ( 

556 get_window_session_manager, 

557 ) 

558 return get_window_session_manager().list_available_windows() 

559 except Exception as e: 

560 return [{'error': str(e)}] 

561 

562 def stream_window(self, window_hwnd: int, 

563 window_title: str = '', 

564 transport=None) -> dict: 

565 """Start streaming a specific window (creates a sub-session).""" 

566 try: 

567 from integrations.remote_desktop.window_session import ( 

568 get_window_session_manager, 

569 ) 

570 return get_window_session_manager().start_window_session( 

571 window_hwnd=window_hwnd, 

572 window_title=window_title, 

573 transport=transport, 

574 ) 

575 except Exception as e: 

576 return {'status': 'error', 'error': str(e)} 

577 

578 def stop_window_stream(self, window_session_id: str) -> bool: 

579 """Stop a window stream.""" 

580 try: 

581 from integrations.remote_desktop.window_session import ( 

582 get_window_session_manager, 

583 ) 

584 return get_window_session_manager().stop_window_session( 

585 window_session_id) 

586 except Exception: 

587 return False 

588 

589 def detach_tab(self, window_session_id: str) -> dict: 

590 """Detach a window stream into a standalone viewer.""" 

591 try: 

592 from integrations.remote_desktop.window_session import ( 

593 get_window_session_manager, 

594 ) 

595 return get_window_session_manager().detach_window( 

596 window_session_id) 

597 except Exception as e: 

598 return {'status': 'error', 'error': str(e)} 

599 

600 def get_window_sessions(self) -> List[dict]: 

601 """Get all active window streaming sessions.""" 

602 try: 

603 from integrations.remote_desktop.window_session import ( 

604 get_window_session_manager, 

605 ) 

606 return get_window_session_manager().get_active_window_sessions() 

607 except Exception: 

608 return [] 

609 

610 # ── Peripheral Forwarding ───────────────────────────────── 

611 

612 def list_peripherals(self, 

613 types: Optional[list] = None) -> List[dict]: 

614 """Discover locally connected peripherals (USB, BT, gamepad).""" 

615 try: 

616 from integrations.remote_desktop.peripheral_bridge import ( 

617 get_peripheral_bridge, 

618 ) 

619 peripherals = get_peripheral_bridge().discover_peripherals(types) 

620 return [p.to_dict() for p in peripherals] 

621 except Exception as e: 

622 return [{'error': str(e)}] 

623 

624 def forward_peripheral(self, session_id: str, 

625 peripheral_id: str) -> dict: 

626 """Forward a local peripheral to the remote device in a session.""" 

627 with self._lock: 

628 info = self._active_sessions.get(session_id) 

629 if not info: 

630 return {'status': 'error', 'error': 'Session not found'} 

631 

632 try: 

633 from integrations.remote_desktop.peripheral_bridge import ( 

634 get_peripheral_bridge, 

635 ) 

636 # Get the transport for this session (native only) 

637 transport = info.get('transport') 

638 result = get_peripheral_bridge().forward_peripheral( 

639 peripheral_id, transport, session_id) 

640 return result 

641 except Exception as e: 

642 return {'success': False, 'error': str(e)} 

643 

644 def stop_peripheral_forwarding(self, 

645 peripheral_id: str) -> bool: 

646 """Stop forwarding a peripheral.""" 

647 try: 

648 from integrations.remote_desktop.peripheral_bridge import ( 

649 get_peripheral_bridge, 

650 ) 

651 return get_peripheral_bridge().stop_forwarding(peripheral_id) 

652 except Exception: 

653 return False 

654 

655 # ── DLNA Screen Casting ────────────────────────────────── 

656 

657 def discover_cast_targets(self, 

658 timeout: float = 5.0) -> List[dict]: 

659 """Discover DLNA/UPnP renderers on the local network.""" 

660 try: 

661 from integrations.remote_desktop.dlna_bridge import get_dlna_bridge 

662 renderers = get_dlna_bridge().discover_renderers(timeout) 

663 return [r.to_dict() for r in renderers] 

664 except Exception as e: 

665 return [{'error': str(e)}] 

666 

667 def cast_to_device(self, session_id: str, 

668 renderer_id: str, 

669 stream_port: int = 0) -> dict: 

670 """Cast a remote desktop session to a DLNA renderer.""" 

671 try: 

672 from integrations.remote_desktop.dlna_bridge import get_dlna_bridge 

673 return get_dlna_bridge().cast_session( 

674 session_id, renderer_id, stream_port=stream_port) 

675 except Exception as e: 

676 return {'success': False, 'error': str(e)} 

677 

678 def stop_cast(self, cast_session_id: str) -> bool: 

679 """Stop a DLNA cast session.""" 

680 try: 

681 from integrations.remote_desktop.dlna_bridge import get_dlna_bridge 

682 return get_dlna_bridge().stop_cast(cast_session_id) 

683 except Exception: 

684 return False 

685 

686 def get_cast_status(self) -> List[dict]: 

687 """Get status of all active DLNA cast sessions.""" 

688 try: 

689 from integrations.remote_desktop.dlna_bridge import get_dlna_bridge 

690 return get_dlna_bridge().get_cast_status() 

691 except Exception: 

692 return [] 

693 

694 # ── File Transfer ──────────────────────────────────────── 

695 

696 def transfer_file(self, session_id: str, local_path: str) -> dict: 

697 """Transfer a file to remote device via the session's engine.""" 

698 import os 

699 if not os.path.exists(local_path): 

700 return {'status': 'error', 'error': f'File not found: {local_path}'} 

701 

702 with self._lock: 

703 info = self._active_sessions.get(session_id) 

704 if not info: 

705 return {'status': 'error', 'error': 'Session not found'} 

706 

707 # DLP scan 

708 try: 

709 from integrations.remote_desktop.security import scan_file_transfer 

710 allowed, reason = scan_file_transfer(os.path.basename(local_path)) 

711 if not allowed: 

712 return {'status': 'blocked', 'error': f'DLP: {reason}'} 

713 except Exception: 

714 pass 

715 

716 engine = info.get('engine', 'native') 

717 remote_id = info.get('remote_device_id', '') 

718 

719 if engine == 'rustdesk': 

720 try: 

721 from integrations.remote_desktop.rustdesk_bridge import get_rustdesk_bridge 

722 ok, msg = get_rustdesk_bridge().connect(remote_id, file_transfer=True) 

723 return {'status': 'transferring' if ok else 'error', 

724 'message': msg, 'engine': engine} 

725 except Exception as e: 

726 return {'status': 'error', 'error': str(e)} 

727 

728 return { 

729 'status': 'error', 

730 'error': f'File transfer not supported via {engine}. Switch to RustDesk.', 

731 'recommendation': 'Use switch_engine() to switch to RustDesk for file transfer.', 

732 } 

733 

734 # ── Status ─────────────────────────────────────────────── 

735 

736 def get_status(self) -> dict: 

737 """Get unified orchestrator status.""" 

738 # Engine status 

739 engine_status = {} 

740 try: 

741 from integrations.remote_desktop.service_manager import get_service_manager 

742 engine_status = get_service_manager().get_all_status() 

743 except Exception as e: 

744 engine_status = {'error': str(e)} 

745 

746 # Active sessions 

747 with self._lock: 

748 sessions = [ 

749 { 

750 'session_id': sid, 

751 'engine': info.get('engine'), 

752 'mode': info.get('mode'), 

753 'remote_device_id': info.get('remote_device_id'), 

754 'device_id': info.get('device_id'), 

755 'clipboard_active': sid in self._clipboard_syncs, 

756 } 

757 for sid, info in self._active_sessions.items() 

758 ] 

759 

760 # Device ID 

761 device_id = None 

762 formatted_id = None 

763 try: 

764 from integrations.remote_desktop.device_id import get_device_id, format_device_id 

765 device_id = get_device_id() 

766 formatted_id = format_device_id(device_id) 

767 except Exception: 

768 pass 

769 

770 # Window sessions 

771 window_sessions = self.get_window_sessions() 

772 

773 # Peripherals 

774 peripheral_status = {} 

775 try: 

776 from integrations.remote_desktop.peripheral_bridge import ( 

777 get_peripheral_bridge, 

778 ) 

779 peripheral_status = get_peripheral_bridge().get_status() 

780 except Exception: 

781 pass 

782 

783 # DLNA casts 

784 cast_status = self.get_cast_status() 

785 

786 return { 

787 'started': self._started, 

788 'device_id': device_id, 

789 'formatted_id': formatted_id, 

790 'engines': engine_status, 

791 'sessions': sessions, 

792 'active_session_count': len(sessions), 

793 'window_sessions': window_sessions, 

794 'window_session_count': len(window_sessions), 

795 'peripherals': peripheral_status, 

796 'casts': cast_status, 

797 'cast_count': len(cast_status), 

798 } 

799 

800 def get_sessions(self) -> List[dict]: 

801 """Get list of active sessions.""" 

802 with self._lock: 

803 return list(self._active_sessions.values()) 

804 

805 # ── Internal Helpers ───────────────────────────────────── 

806 

807 def _resolve_engine(self, engine: str, use_case: str, role: str) -> str: 

808 """Resolve 'auto' engine to a specific engine name.""" 

809 if engine != 'auto': 

810 return engine 

811 

812 try: 

813 from integrations.remote_desktop.engine_selector import ( 

814 select_engine, UseCase, Engine, 

815 ) 

816 uc = { 

817 'general': UseCase.GENERAL, 

818 'remote_support': UseCase.REMOTE_SUPPORT, 

819 'file_transfer': UseCase.FILE_TRANSFER, 

820 'gaming': UseCase.GAMING, 

821 'vlm_computer_use': UseCase.VLM_COMPUTER_USE, 

822 }.get(use_case, UseCase.GENERAL) 

823 

824 result = select_engine(use_case=uc, role=role) 

825 return result.value 

826 except Exception: 

827 return 'native' 

828 

829 def _authenticate(self, device_id: str, password: str, 

830 user_id: Optional[str] = None) -> Tuple[bool, str]: 

831 """Authenticate connection attempt.""" 

832 try: 

833 from integrations.remote_desktop.security import authenticate_connection 

834 from integrations.remote_desktop.device_id import get_device_id 

835 local_id = get_device_id() 

836 return authenticate_connection( 

837 host_device_id=device_id, 

838 viewer_device_id=local_id, 

839 password=password, 

840 viewer_user_id=user_id, 

841 ) 

842 except Exception as e: 

843 # If security module fails, allow with warning 

844 logger.warning(f"Auth module unavailable, allowing connection: {e}") 

845 return True, 'auth_bypassed' 

846 

847 def _setup_rustdesk_host(self, password: str) -> dict: 

848 """Configure RustDesk for hosting.""" 

849 try: 

850 from integrations.remote_desktop.rustdesk_bridge import get_rustdesk_bridge 

851 bridge = get_rustdesk_bridge() 

852 bridge.set_password(password) 

853 bridge.start_service() 

854 rustdesk_id = bridge.get_id() 

855 return {'rustdesk_id': rustdesk_id} 

856 except Exception as e: 

857 logger.warning(f"RustDesk host setup: {e}") 

858 return {} 

859 

860 def _setup_sunshine_host(self) -> dict: 

861 """Ensure Sunshine is running for hosting.""" 

862 try: 

863 from integrations.remote_desktop.sunshine_bridge import get_sunshine_bridge 

864 bridge = get_sunshine_bridge() 

865 if not bridge.is_running(): 

866 bridge.start_service() 

867 clients = bridge.get_paired_clients() or [] 

868 return {'sunshine_clients': len(clients)} 

869 except Exception as e: 

870 logger.warning(f"Sunshine host setup: {e}") 

871 return {} 

872 

873 def _connect_rustdesk(self, device_id: str, password: str, 

874 file_transfer: bool = False) -> dict: 

875 """Connect via RustDesk.""" 

876 try: 

877 from integrations.remote_desktop.rustdesk_bridge import get_rustdesk_bridge 

878 bridge = get_rustdesk_bridge() 

879 ok, msg = bridge.connect(device_id, password=password, 

880 file_transfer=file_transfer) 

881 if ok: 

882 return {'status': 'connected', 'message': msg} 

883 return {'status': 'error', 'error': msg} 

884 except Exception as e: 

885 return {'status': 'error', 'error': str(e)} 

886 

887 def _connect_moonlight(self, host: str, gui: bool = True) -> dict: 

888 """Connect via Moonlight.""" 

889 try: 

890 from integrations.remote_desktop.sunshine_bridge import get_moonlight_bridge 

891 bridge = get_moonlight_bridge() 

892 ok, msg = bridge.stream(host) 

893 if ok: 

894 return {'status': 'connected', 'message': msg} 

895 return {'status': 'error', 'error': msg} 

896 except Exception as e: 

897 return {'status': 'error', 'error': str(e)} 

898 

899 def _connect_native(self, device_id: str, password: str) -> dict: 

900 """Connect via native HARTOS transport (fallback).""" 

901 # Native transport uses frame_capture + transport + input_handler 

902 # Full implementation in host_service.py / viewer_client.py 

903 return {'status': 'connected', 'transport': 'native'} 

904 

905 def _disconnect_engine(self, engine: str, device_id: str) -> None: 

906 """Disconnect engine-specific connection (for engine switching).""" 

907 try: 

908 if engine == 'rustdesk': 

909 from integrations.remote_desktop.rustdesk_bridge import get_rustdesk_bridge 

910 get_rustdesk_bridge().disconnect_all() 

911 except Exception: 

912 pass 

913 

914 def _audit(self, event_type: str, session_id: str, actor_id: str, 

915 detail: Optional[str] = None) -> None: 

916 """Audit log a session event.""" 

917 try: 

918 from integrations.remote_desktop.security import audit_session_event 

919 audit_session_event(event_type, session_id, actor_id, detail) 

920 except Exception: 

921 pass 

922 

923 

924# ── Singleton ──────────────────────────────────────────────── 

925 

926_orchestrator: Optional[RemoteDesktopOrchestrator] = None 

927 

928 

929def get_orchestrator() -> RemoteDesktopOrchestrator: 

930 """Get or create the singleton RemoteDesktopOrchestrator.""" 

931 global _orchestrator 

932 if _orchestrator is None: 

933 _orchestrator = RemoteDesktopOrchestrator() 

934 return _orchestrator