Coverage for integrations / remote_desktop / host_service.py: 55.9%

152 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Host Service — Native screen sharing host for HARTOS Remote Desktop. 

3 

4Only used when engine_selector picks NATIVE (no RustDesk/Sunshine installed). 

5RustDesk/Sunshine handle their own hosting — this is the pure-Python fallback. 

6 

7Captures screen via frame_capture.py, streams over transport.py, 

8receives input events via input_handler.py. Supports multi-viewer. 

9 

10Reuses: 

11 - frame_capture.py: FrameCapture (mss/dxcam/pyautogui circuit breaker) 

12 - input_handler.py: InputHandler (pynput/pyautogui with security gating) 

13 - transport.py: DirectWebSocketTransport for LAN, WAMP for WAN 

14 - session_manager.py: Session lifecycle 

15 - security/node_watchdog.py: Heartbeat monitoring 

16""" 

17 

18import logging 

19import threading 

20import time 

21from typing import Any, Callable, Dict, List, Optional # noqa: F401 

22 

23logger = logging.getLogger('hevolve.remote_desktop') 

24 

25 

26class HostService: 

27 """Native host: captures screen, streams frames, receives input. 

28 

29 Lifecycle: 

30 start() → capture_loop sends frames → handle_viewer() dispatches input → stop() 

31 """ 

32 

33 def __init__(self): 

34 self._running = False 

35 self._capture = None 

36 self._input_handler = None 

37 self._transport = None 

38 self._capture_thread: Optional[threading.Thread] = None 

39 self._viewers: Dict[str, dict] = {} # session_id → viewer info 

40 self._lock = threading.Lock() 

41 self._device_id: Optional[str] = None 

42 self._password: Optional[str] = None 

43 self._config: dict = {} 

44 

45 def start(self, config: Optional[dict] = None) -> dict: 

46 """Start hosting — begin screen capture and transport server. 

47 

48 Args: 

49 config: Optional FrameConfig overrides (max_fps, quality, etc.) 

50 

51 Returns: 

52 {device_id, password, lan_port, status} 

53 """ 

54 if self._running: 

55 return {'status': 'already_running', 'device_id': self._device_id} 

56 

57 self._config = config or {} 

58 

59 # Get device ID 

60 try: 

61 from integrations.remote_desktop.device_id import get_device_id, format_device_id 

62 self._device_id = get_device_id() 

63 formatted_id = format_device_id(self._device_id) 

64 except Exception as e: 

65 return {'status': 'error', 'error': f'Device ID unavailable: {e}'} 

66 

67 # Generate password 

68 try: 

69 from integrations.remote_desktop.session_manager import get_session_manager 

70 sm = get_session_manager() 

71 self._password = sm.generate_otp(self._device_id) 

72 except Exception as e: 

73 return {'status': 'error', 'error': f'Password generation failed: {e}'} 

74 

75 # Initialize frame capture 

76 try: 

77 from integrations.remote_desktop.frame_capture import FrameCapture, FrameConfig 

78 fc = FrameConfig( 

79 max_fps=self._config.get('max_fps', 30), 

80 quality=self._config.get('quality', 80), 

81 ) 

82 self._capture = FrameCapture(config=fc) 

83 except Exception as e: 

84 return {'status': 'error', 'error': f'Frame capture init failed: {e}'} 

85 

86 # Initialize input handler 

87 try: 

88 from integrations.remote_desktop.input_handler import InputHandler 

89 allow_control = self._config.get('allow_control', True) 

90 self._input_handler = InputHandler(allow_control=allow_control) 

91 except Exception as e: 

92 logger.warning(f"Input handler unavailable: {e}") 

93 

94 # Start transport server 

95 lan_port = 0 

96 try: 

97 from integrations.remote_desktop.transport import DirectWebSocketTransport 

98 self._transport = DirectWebSocketTransport(host_mode=True) 

99 lan_port = self._transport.start_server() 

100 

101 # Register event handler for input 

102 self._transport.on_event(self._on_viewer_event) 

103 except Exception as e: 

104 logger.warning(f"WebSocket transport unavailable: {e}") 

105 

106 # Register with NodeWatchdog 

107 self._register_watchdog() 

108 

109 # Start capture loop 

110 self._running = True 

111 self._capture_thread = threading.Thread( 

112 target=self._capture_loop, 

113 daemon=True, 

114 name='host-capture-loop', 

115 ) 

116 self._capture_thread.start() 

117 

118 logger.info(f"Native host started: {formatted_id} on port {lan_port}") 

119 return { 

120 'status': 'hosting', 

121 'device_id': self._device_id, 

122 'formatted_id': formatted_id, 

123 'password': self._password, 

124 'lan_port': lan_port, 

125 } 

126 

127 def stop(self) -> None: 

128 """Stop hosting — disconnect all viewers, stop capture.""" 

129 self._running = False 

130 

131 if self._capture_thread: 

132 self._capture_thread.join(timeout=5) 

133 self._capture_thread = None 

134 

135 if self._capture: 

136 self._capture.stop() 

137 self._capture = None 

138 

139 if self._transport: 

140 self._transport.close() 

141 self._transport = None 

142 

143 self._input_handler = None 

144 self._viewers.clear() 

145 logger.info("Native host stopped") 

146 

147 def handle_viewer(self, session_id: str, viewer_device_id: str) -> None: 

148 """Register a new viewer for this host session.""" 

149 with self._lock: 

150 self._viewers[session_id] = { 

151 'viewer_device_id': viewer_device_id, 

152 'connected_at': time.time(), 

153 } 

154 logger.info(f"Viewer connected: {viewer_device_id[:12]} " 

155 f"(session {session_id[:8]})") 

156 

157 def remove_viewer(self, session_id: str) -> None: 

158 """Remove a viewer.""" 

159 with self._lock: 

160 self._viewers.pop(session_id, None) 

161 

162 def _capture_loop(self) -> None: 

163 """Main capture loop — sends frames to all viewers via transport.""" 

164 if not self._capture: 

165 return 

166 

167 logger.debug("Capture loop started") 

168 for frame in self._capture.capture_loop(): 

169 if not self._running: 

170 break 

171 

172 # Send frame to all connected viewers via transport 

173 if self._transport and self._transport.connected: 

174 self._transport.send_frame(frame) 

175 

176 logger.debug("Capture loop ended") 

177 

178 def _on_viewer_event(self, event: dict) -> None: 

179 """Handle incoming event from viewer (input, clipboard, file, control).""" 

180 event_type = event.get('type', '') 

181 

182 if event_type in ('click', 'rightclick', 'doubleclick', 'middleclick', 

183 'move', 'mouse_move', 'drag', 'scroll', 

184 'mouse_down', 'mouse_up', 'key', 'type', 'hotkey'): 

185 # Input event → dispatch to InputHandler 

186 if self._input_handler: 

187 result = self._input_handler.handle_input_event(event) 

188 if not result.get('success'): 

189 logger.debug(f"Input event rejected: {result.get('error')}") 

190 

191 elif event_type == 'clipboard': 

192 # Remote clipboard → apply locally 

193 content = event.get('content', '') 

194 if content: 

195 try: 

196 from integrations.remote_desktop.clipboard_sync import ClipboardSync 

197 sync = ClipboardSync() 

198 sync.apply_remote_clipboard(content) 

199 except Exception: 

200 pass 

201 

202 elif event_type == 'file_ctrl': 

203 # File transfer control 

204 try: 

205 from integrations.remote_desktop.file_transfer import FileTransfer 

206 ft = FileTransfer() 

207 ft.handle_event(event) 

208 except Exception: 

209 pass 

210 

211 elif event_type == 'drag_drop': 

212 # Cross-device drag-and-drop 

213 try: 

214 from integrations.remote_desktop.drag_drop import DragDropBridge 

215 bridge = DragDropBridge( 

216 transport=self._transport, 

217 input_handler=self._input_handler, 

218 ) 

219 bridge.handle_file_drop_on_host(event) 

220 except Exception as e: 

221 logger.debug(f"Drag-drop event handling failed: {e}") 

222 

223 elif event_type == 'file_drop': 

224 # File drop simulation at cursor position 

225 if self._input_handler: 

226 self._input_handler.handle_input_event(event) 

227 

228 elif event_type == 'list_windows': 

229 # Window enumeration request → return available windows 

230 windows = self.get_available_windows() 

231 if self._transport and self._transport.connected: 

232 self._transport.send_event({ 

233 'type': 'window_list', 

234 'windows': windows, 

235 }) 

236 

237 elif event_type == 'stream_window': 

238 # Start streaming a specific window 

239 hwnd = event.get('hwnd', 0) 

240 title = event.get('title', '') 

241 result = self.start_window_stream(hwnd, title) 

242 if self._transport and self._transport.connected: 

243 self._transport.send_event({ 

244 'type': 'window_stream_result', 

245 'result': result, 

246 }) 

247 

248 elif event_type == 'stop_window_stream': 

249 window_session_id = event.get('window_session_id', '') 

250 self.stop_window_stream(window_session_id) 

251 

252 elif event_type == 'control': 

253 action = event.get('action', '') 

254 if action == 'disconnect': 

255 session_id = event.get('session_id', '') 

256 self.remove_viewer(session_id) 

257 

258 def get_available_windows(self) -> List[dict]: 

259 """Return windows available for per-window streaming.""" 

260 try: 

261 from integrations.remote_desktop.window_session import ( 

262 get_window_session_manager, 

263 ) 

264 return get_window_session_manager().list_available_windows() 

265 except Exception as e: 

266 logger.debug(f"Window enumeration failed: {e}") 

267 return [] 

268 

269 def start_window_stream(self, hwnd: int, title: str = '') -> dict: 

270 """Start streaming a specific window.""" 

271 try: 

272 from integrations.remote_desktop.window_session import ( 

273 get_window_session_manager, 

274 ) 

275 wsm = get_window_session_manager() 

276 return wsm.start_window_session( 

277 window_hwnd=hwnd, 

278 window_title=title, 

279 transport=self._transport, 

280 ) 

281 except Exception as e: 

282 return {'status': 'error', 'error': str(e)} 

283 

284 def stop_window_stream(self, window_session_id: str) -> bool: 

285 """Stop a window stream.""" 

286 try: 

287 from integrations.remote_desktop.window_session import ( 

288 get_window_session_manager, 

289 ) 

290 return get_window_session_manager().stop_window_session( 

291 window_session_id) 

292 except Exception: 

293 return False 

294 

295 def _register_watchdog(self) -> None: 

296 """Register with NodeWatchdog for auto-restart.""" 

297 try: 

298 from security.node_watchdog import get_watchdog 

299 watchdog = get_watchdog() 

300 if watchdog: 

301 watchdog.register( 

302 'remote_desktop_host', 

303 expected_interval=30, 

304 restart_fn=lambda: self.start(self._config), 

305 stop_fn=self.stop, 

306 ) 

307 except Exception: 

308 pass 

309 

310 @property 

311 def is_running(self) -> bool: 

312 return self._running 

313 

314 def get_status(self) -> dict: 

315 """Get host service status.""" 

316 with self._lock: 

317 viewer_count = len(self._viewers) 

318 return { 

319 'running': self._running, 

320 'device_id': self._device_id, 

321 'password': self._password, 

322 'viewers': viewer_count, 

323 'transport_connected': self._transport.connected if self._transport else False, 

324 'capture_stats': self._capture.get_stats() if self._capture else None, 

325 } 

326 

327 

328# ── Singleton ──────────────────────────────────────────────── 

329 

330_host_service: Optional[HostService] = None 

331 

332 

333def get_host_service() -> HostService: 

334 """Get or create the singleton HostService.""" 

335 global _host_service 

336 if _host_service is None: 

337 _host_service = HostService() 

338 return _host_service