Coverage for integrations / remote_desktop / transport.py: 39.7%

282 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Transport Layer — 3-tier WebSocket/WAMP streaming (no STUN/TURN needed). 

3 

4Tier 1 (LAN): Direct WebSocket between devices (~1ms latency) 

5Tier 2 (WAN): WAMP relay through existing Crossbar router (~50-100ms) 

6Tier 3 (WAN P2P): WireGuard tunnel from compute mesh (~20ms) 

7 

8All tiers implement TransportChannel ABC so host/viewer code is tier-agnostic. 

9 

10Reuses: 

11 - crossbar_server.py → WAMP pub/sub for Tier 2 relay 

12 - vision_service.py:44 → WebSocket binary frame protocol pattern 

13 - compute_mesh_service.py:74-76 → WireGuard tunnel for Tier 3 

14 - channel_encryption.py → E2E encryption for all frames/events 

15 - message_queue.py:31 → PRIORITY queue for input events 

16""" 

17 

18import asyncio 

19import json 

20import logging 

21import socket 

22import struct 

23import threading 

24import time 

25from abc import ABC, abstractmethod 

26from enum import Enum 

27from typing import Any, Callable, Dict, List, Optional 

28 

29logger = logging.getLogger('hevolve.remote_desktop') 

30 

31# ── Optional dependencies ─────────────────────────────────────── 

32 

33_websockets = None 

34try: 

35 import websockets 

36 _websockets = websockets 

37except ImportError: 

38 pass 

39 

40 

41# ── Enums ─────────────────────────────────────────────────────── 

42 

43class TransportTier(Enum): 

44 LAN_DIRECT = 'lan_direct' 

45 WIREGUARD_P2P = 'wireguard_p2p' 

46 WAMP_RELAY = 'wamp_relay' 

47 

48 

49class MessageType(Enum): 

50 FRAME = 'frame' # Binary screen frame (host→viewer) 

51 INPUT = 'input' # JSON mouse/keyboard event (viewer→host) 

52 CLIPBOARD = 'clipboard' # JSON clipboard content (bidirectional) 

53 FILE_CTRL = 'file_ctrl' # JSON file transfer control 

54 FILE_DATA = 'file_data' # Binary file chunk 

55 CONTROL = 'control' # JSON session control (both) 

56 WINDOW_LIST = 'window_list' # JSON list of available windows (host→viewer) 

57 WINDOW_FRAME = 'window_frame' # Binary frame for specific window session 

58 DRAG_DROP = 'drag_drop' # JSON drag-and-drop event (bidirectional) 

59 PERIPHERAL = 'peripheral' # JSON peripheral device event (bidirectional) 

60 

61 

62# ── Transport Channel ABC ─────────────────────────────────────── 

63 

64class TransportChannel(ABC): 

65 """Abstract base for all transport tiers. 

66 

67 Binary messages (frames, file data) use raw bytes. 

68 Text messages (events, clipboard, control) use JSON dicts. 

69 """ 

70 

71 def __init__(self): 

72 self._frame_callback: Optional[Callable[[bytes], None]] = None 

73 self._event_callback: Optional[Callable[[dict], None]] = None 

74 self._connected = False 

75 self._bytes_sent = 0 

76 self._bytes_received = 0 

77 self._frames_sent = 0 

78 self._frames_received = 0 

79 

80 @abstractmethod 

81 def send_frame(self, data: bytes) -> bool: 

82 """Send binary frame data (JPEG/H264). Returns success.""" 

83 ... 

84 

85 @abstractmethod 

86 def send_event(self, event: dict) -> bool: 

87 """Send JSON event (input, clipboard, file_ctrl, control). Returns success.""" 

88 ... 

89 

90 @abstractmethod 

91 def close(self) -> None: 

92 """Close the transport channel.""" 

93 ... 

94 

95 @property 

96 @abstractmethod 

97 def tier(self) -> TransportTier: 

98 """Return the transport tier.""" 

99 ... 

100 

101 def on_frame(self, callback: Callable[[bytes], None]) -> None: 

102 """Register frame receive callback.""" 

103 self._frame_callback = callback 

104 

105 def on_event(self, callback: Callable[[dict], None]) -> None: 

106 """Register event receive callback.""" 

107 self._event_callback = callback 

108 

109 @property 

110 def connected(self) -> bool: 

111 return self._connected 

112 

113 def get_stats(self) -> dict: 

114 return { 

115 'tier': self.tier.value, 

116 'connected': self._connected, 

117 'bytes_sent': self._bytes_sent, 

118 'bytes_received': self._bytes_received, 

119 'frames_sent': self._frames_sent, 

120 'frames_received': self._frames_received, 

121 } 

122 

123 

124# ── Tier 1: Direct WebSocket (LAN) ───────────────────────────── 

125 

126class DirectWebSocketTransport(TransportChannel): 

127 """Direct WebSocket between host and viewer (LAN or WireGuard IP). 

128 

129 Protocol (mirrors vision_service.py binary frame pattern): 

130 - Binary message = frame data (JPEG bytes) 

131 - Text message = JSON event {type, ...} 

132 """ 

133 

134 def __init__(self, host_mode: bool = False, host: str = '0.0.0.0', 

135 port: int = 0): 

136 super().__init__() 

137 self._host_mode = host_mode 

138 self._host = host 

139 self._port = port 

140 self._actual_port: Optional[int] = None 

141 self._ws = None 

142 self._server = None 

143 self._loop: Optional[asyncio.AbstractEventLoop] = None 

144 self._thread: Optional[threading.Thread] = None 

145 self._clients: List[Any] = [] 

146 

147 @property 

148 def tier(self) -> TransportTier: 

149 return TransportTier.LAN_DIRECT 

150 

151 @property 

152 def actual_port(self) -> Optional[int]: 

153 return self._actual_port 

154 

155 def start_server(self) -> int: 

156 """Start WebSocket server (host mode). Returns assigned port.""" 

157 if not _websockets: 

158 raise RuntimeError("websockets library not installed") 

159 

160 self._loop = asyncio.new_event_loop() 

161 self._thread = threading.Thread( 

162 target=self._run_server_loop, 

163 daemon=True, 

164 name='ws-transport-server', 

165 ) 

166 self._thread.start() 

167 

168 # Wait for server to start (up to 5 seconds) 

169 for _ in range(50): 

170 if self._actual_port: 

171 break 

172 time.sleep(0.1) 

173 

174 self._connected = True 

175 logger.info(f"WebSocket transport server on port {self._actual_port}") 

176 return self._actual_port or 0 

177 

178 def _run_server_loop(self) -> None: 

179 asyncio.set_event_loop(self._loop) 

180 self._loop.run_until_complete(self._serve()) 

181 

182 async def _serve(self) -> None: 

183 async def handler(ws): 

184 self._clients.append(ws) 

185 try: 

186 async for message in ws: 

187 if isinstance(message, bytes): 

188 self._bytes_received += len(message) 

189 self._frames_received += 1 

190 if self._frame_callback: 

191 self._frame_callback(message) 

192 else: 

193 self._bytes_received += len(message) 

194 if self._event_callback: 

195 try: 

196 event = json.loads(message) 

197 self._event_callback(event) 

198 except json.JSONDecodeError: 

199 pass 

200 finally: 

201 self._clients.remove(ws) 

202 

203 self._server = await _websockets.serve( 

204 handler, self._host, self._port, 

205 max_size=10 * 1024 * 1024, # 10MB max message (frames can be large) 

206 ) 

207 self._actual_port = self._server.sockets[0].getsockname()[1] 

208 await self._server.wait_closed() 

209 

210 def connect(self, url: str) -> bool: 

211 """Connect to WebSocket server (viewer mode). 

212 

213 Args: 

214 url: WebSocket URL (e.g., 'ws://192.168.1.5:9876') 

215 """ 

216 if not _websockets: 

217 logger.error("websockets library not installed") 

218 return False 

219 

220 self._loop = asyncio.new_event_loop() 

221 self._thread = threading.Thread( 

222 target=self._run_client_loop, 

223 args=(url,), 

224 daemon=True, 

225 name='ws-transport-client', 

226 ) 

227 self._thread.start() 

228 

229 # Wait for connection 

230 for _ in range(50): 

231 if self._connected: 

232 return True 

233 time.sleep(0.1) 

234 

235 return self._connected 

236 

237 def _run_client_loop(self, url: str) -> None: 

238 asyncio.set_event_loop(self._loop) 

239 self._loop.run_until_complete(self._client_handler(url)) 

240 

241 async def _client_handler(self, url: str) -> None: 

242 try: 

243 async with _websockets.connect(url, max_size=10 * 1024 * 1024) as ws: 

244 self._ws = ws 

245 self._connected = True 

246 async for message in ws: 

247 if isinstance(message, bytes): 

248 self._bytes_received += len(message) 

249 self._frames_received += 1 

250 if self._frame_callback: 

251 self._frame_callback(message) 

252 else: 

253 self._bytes_received += len(message) 

254 if self._event_callback: 

255 try: 

256 event = json.loads(message) 

257 self._event_callback(event) 

258 except json.JSONDecodeError: 

259 pass 

260 except Exception as e: 

261 logger.error(f"WebSocket client error: {e}") 

262 finally: 

263 self._connected = False 

264 

265 def send_frame(self, data: bytes) -> bool: 

266 if not self._connected: 

267 return False 

268 try: 

269 if self._host_mode and self._clients: 

270 # Server: broadcast to all clients 

271 for client in self._clients[:]: 

272 try: 

273 asyncio.run_coroutine_threadsafe( 

274 client.send(data), self._loop 

275 ) 

276 except Exception: 

277 pass 

278 elif self._ws: 

279 asyncio.run_coroutine_threadsafe( 

280 self._ws.send(data), self._loop 

281 ) 

282 self._bytes_sent += len(data) 

283 self._frames_sent += 1 

284 return True 

285 except Exception as e: 

286 logger.debug(f"Send frame failed: {e}") 

287 return False 

288 

289 def send_event(self, event: dict) -> bool: 

290 if not self._connected: 

291 return False 

292 try: 

293 msg = json.dumps(event, separators=(',', ':')) 

294 if self._host_mode and self._clients: 

295 for client in self._clients[:]: 

296 try: 

297 asyncio.run_coroutine_threadsafe( 

298 client.send(msg), self._loop 

299 ) 

300 except Exception: 

301 pass 

302 elif self._ws: 

303 asyncio.run_coroutine_threadsafe( 

304 self._ws.send(msg), self._loop 

305 ) 

306 self._bytes_sent += len(msg) 

307 return True 

308 except Exception as e: 

309 logger.debug(f"Send event failed: {e}") 

310 return False 

311 

312 def close(self) -> None: 

313 self._connected = False 

314 if self._server: 

315 self._server.close() 

316 if self._ws: 

317 try: 

318 if self._loop and self._loop.is_running(): 

319 asyncio.run_coroutine_threadsafe( 

320 self._ws.close(), self._loop 

321 ) 

322 except Exception: 

323 pass 

324 if self._loop: 

325 self._loop.call_soon_threadsafe(self._loop.stop) 

326 self._ws = None 

327 self._server = None 

328 

329 

330# ── Tier 2: WAMP Relay ────────────────────────────────────────── 

331 

332class WAMPRelayTransport(TransportChannel): 

333 """WAMP pub/sub relay through existing Crossbar router. 

334 

335 Both devices already connected to ws://aws_rasa.hertzai.com:8088/ws. 

336 Frames published to com.hartos.remote_desktop.frames.{session_id}. 

337 Input events to com.hartos.remote_desktop.input.{session_id}. 

338 

339 Reuses crossbar_server.py WAMP session. 

340 """ 

341 

342 TOPIC_PREFIX = 'com.hartos.remote_desktop' 

343 

344 def __init__(self, session_id: str, role: str = 'host'): 

345 """ 

346 Args: 

347 session_id: Unique session identifier 

348 role: 'host' (sends frames, receives input) or 'viewer' (vice versa) 

349 """ 

350 super().__init__() 

351 self._session_id = session_id 

352 self._role = role 

353 self._wamp_session = None 

354 

355 @property 

356 def tier(self) -> TransportTier: 

357 return TransportTier.WAMP_RELAY 

358 

359 def start(self) -> bool: 

360 """Connect to existing WAMP session and subscribe to topics.""" 

361 try: 

362 from crossbar_server import wamp_session 

363 self._wamp_session = wamp_session 

364 if not self._wamp_session: 

365 logger.warning("WAMP session not available") 

366 return False 

367 

368 self._connected = True 

369 logger.info(f"WAMP relay transport started (session={self._session_id[:8]})") 

370 return True 

371 except ImportError: 

372 logger.warning("crossbar_server not available for WAMP relay") 

373 return False 

374 

375 def _topic(self, channel: str) -> str: 

376 return f"{self.TOPIC_PREFIX}.{channel}.{self._session_id}" 

377 

378 def send_frame(self, data: bytes) -> bool: 

379 if not self._connected or not self._wamp_session: 

380 return False 

381 try: 

382 # Encode binary as base64 for WAMP text transport 

383 import base64 

384 payload = base64.b64encode(data).decode('ascii') 

385 asyncio.ensure_future( 

386 self._wamp_session.publish( 

387 self._topic('frames'), 

388 payload, 

389 ) 

390 ) 

391 self._bytes_sent += len(data) 

392 self._frames_sent += 1 

393 return True 

394 except Exception as e: 

395 logger.debug(f"WAMP send frame failed: {e}") 

396 return False 

397 

398 def send_event(self, event: dict) -> bool: 

399 if not self._connected or not self._wamp_session: 

400 return False 

401 try: 

402 asyncio.ensure_future( 

403 self._wamp_session.publish( 

404 self._topic('input' if self._role == 'viewer' else 'control'), 

405 json.dumps(event, separators=(',', ':')), 

406 ) 

407 ) 

408 self._bytes_sent += len(json.dumps(event)) 

409 return True 

410 except Exception as e: 

411 logger.debug(f"WAMP send event failed: {e}") 

412 return False 

413 

414 def close(self) -> None: 

415 self._connected = False 

416 self._wamp_session = None 

417 logger.info(f"WAMP relay transport closed (session={self._session_id[:8]})") 

418 

419 

420# ── Tier 3: WireGuard P2P ────────────────────────────────────── 

421 

422class WireGuardTransport(DirectWebSocketTransport): 

423 """Direct WebSocket through WireGuard tunnel. 

424 

425 Reuses compute_mesh_service.py:74-76 (port 6795, subnet 10.99.0.0/16). 

426 Once tunnel established, behaves like DirectWebSocketTransport but 

427 over the WireGuard mesh IP. 

428 """ 

429 

430 def __init__(self, wg_ip: str, port: int = 0): 

431 super().__init__(host_mode=False, host=wg_ip, port=port) 

432 self._wg_ip = wg_ip 

433 

434 @property 

435 def tier(self) -> TransportTier: 

436 return TransportTier.WIREGUARD_P2P 

437 

438 

439# ── Transport Negotiation ─────────────────────────────────────── 

440 

441def get_local_ip() -> Optional[str]: 

442 """Get local LAN IP address.""" 

443 try: 

444 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

445 s.settimeout(1) 

446 s.connect(('8.8.8.8', 80)) 

447 ip = s.getsockname()[0] 

448 s.close() 

449 return ip 

450 except Exception: 

451 return None 

452 

453 

454def probe_lan_connectivity(target_ip: str, port: int, 

455 timeout: float = 2.0) -> bool: 

456 """Check if target is reachable on LAN via TCP probe.""" 

457 try: 

458 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

459 s.settimeout(timeout) 

460 result = s.connect_ex((target_ip, port)) 

461 s.close() 

462 return result == 0 

463 except Exception: 

464 return False 

465 

466 

467def probe_wireguard_connectivity(wg_ip: str, timeout: float = 2.0) -> bool: 

468 """Check if WireGuard tunnel is active by pinging mesh IP.""" 

469 try: 

470 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

471 s.settimeout(timeout) 

472 # Try common WireGuard mesh port 

473 result = s.connect_ex((wg_ip, 6796)) 

474 s.close() 

475 return result == 0 

476 except Exception: 

477 return False 

478 

479 

480def auto_negotiate_transport( 

481 session_id: str, 

482 host_offers: dict, 

483 role: str = 'viewer', 

484) -> Optional[TransportChannel]: 

485 """Auto-negotiate best transport tier. 

486 

487 Args: 

488 session_id: Session ID 

489 host_offers: { 

490 'lan_ip': '192.168.1.5', 

491 'lan_port': 9876, 

492 'wg_ip': '10.99.0.5', 

493 'wg_port': 9877, 

494 'wamp_available': True, 

495 } 

496 role: 'host' or 'viewer' 

497 

498 Returns: 

499 Best available TransportChannel, or None if all fail. 

500 """ 

501 # Tier 1: Try LAN direct 

502 lan_ip = host_offers.get('lan_ip') 

503 lan_port = host_offers.get('lan_port') 

504 if lan_ip and lan_port and _websockets: 

505 if probe_lan_connectivity(lan_ip, lan_port): 

506 transport = DirectWebSocketTransport() 

507 if transport.connect(f"ws://{lan_ip}:{lan_port}"): 

508 logger.info(f"Transport: Tier 1 LAN direct ({lan_ip}:{lan_port})") 

509 return transport 

510 

511 # Tier 3: Try WireGuard P2P 

512 wg_ip = host_offers.get('wg_ip') 

513 wg_port = host_offers.get('wg_port') 

514 if wg_ip and wg_port and _websockets: 

515 if probe_wireguard_connectivity(wg_ip): 

516 transport = WireGuardTransport(wg_ip, wg_port) 

517 if transport.connect(f"ws://{wg_ip}:{wg_port}"): 

518 logger.info(f"Transport: Tier 3 WireGuard P2P ({wg_ip}:{wg_port})") 

519 return transport 

520 

521 # Tier 2: WAMP relay (always available) 

522 if host_offers.get('wamp_available', True): 

523 transport = WAMPRelayTransport(session_id, role=role) 

524 if transport.start(): 

525 logger.info(f"Transport: Tier 2 WAMP relay") 

526 return transport 

527 

528 logger.error("All transport tiers failed") 

529 return None