Coverage for integrations / remote_desktop / transport.py: 39.7%
282 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Transport Layer — 3-tier WebSocket/WAMP streaming (no STUN/TURN needed).
4Tier 1 (LAN): Direct WebSocket between devices (~1ms latency)
5Tier 2 (WAN): WAMP relay through existing Crossbar router (~50-100ms)
6Tier 3 (WAN P2P): WireGuard tunnel from compute mesh (~20ms)
8All tiers implement TransportChannel ABC so host/viewer code is tier-agnostic.
10Reuses:
11 - crossbar_server.py → WAMP pub/sub for Tier 2 relay
12 - vision_service.py:44 → WebSocket binary frame protocol pattern
13 - compute_mesh_service.py:74-76 → WireGuard tunnel for Tier 3
14 - channel_encryption.py → E2E encryption for all frames/events
15 - message_queue.py:31 → PRIORITY queue for input events
16"""
18import asyncio
19import json
20import logging
21import socket
22import struct
23import threading
24import time
25from abc import ABC, abstractmethod
26from enum import Enum
27from typing import Any, Callable, Dict, List, Optional
29logger = logging.getLogger('hevolve.remote_desktop')
31# ── Optional dependencies ───────────────────────────────────────
33_websockets = None
34try:
35 import websockets
36 _websockets = websockets
37except ImportError:
38 pass
41# ── Enums ───────────────────────────────────────────────────────
43class TransportTier(Enum):
44 LAN_DIRECT = 'lan_direct'
45 WIREGUARD_P2P = 'wireguard_p2p'
46 WAMP_RELAY = 'wamp_relay'
49class MessageType(Enum):
50 FRAME = 'frame' # Binary screen frame (host→viewer)
51 INPUT = 'input' # JSON mouse/keyboard event (viewer→host)
52 CLIPBOARD = 'clipboard' # JSON clipboard content (bidirectional)
53 FILE_CTRL = 'file_ctrl' # JSON file transfer control
54 FILE_DATA = 'file_data' # Binary file chunk
55 CONTROL = 'control' # JSON session control (both)
56 WINDOW_LIST = 'window_list' # JSON list of available windows (host→viewer)
57 WINDOW_FRAME = 'window_frame' # Binary frame for specific window session
58 DRAG_DROP = 'drag_drop' # JSON drag-and-drop event (bidirectional)
59 PERIPHERAL = 'peripheral' # JSON peripheral device event (bidirectional)
62# ── Transport Channel ABC ───────────────────────────────────────
64class TransportChannel(ABC):
65 """Abstract base for all transport tiers.
67 Binary messages (frames, file data) use raw bytes.
68 Text messages (events, clipboard, control) use JSON dicts.
69 """
71 def __init__(self):
72 self._frame_callback: Optional[Callable[[bytes], None]] = None
73 self._event_callback: Optional[Callable[[dict], None]] = None
74 self._connected = False
75 self._bytes_sent = 0
76 self._bytes_received = 0
77 self._frames_sent = 0
78 self._frames_received = 0
80 @abstractmethod
81 def send_frame(self, data: bytes) -> bool:
82 """Send binary frame data (JPEG/H264). Returns success."""
83 ...
85 @abstractmethod
86 def send_event(self, event: dict) -> bool:
87 """Send JSON event (input, clipboard, file_ctrl, control). Returns success."""
88 ...
90 @abstractmethod
91 def close(self) -> None:
92 """Close the transport channel."""
93 ...
95 @property
96 @abstractmethod
97 def tier(self) -> TransportTier:
98 """Return the transport tier."""
99 ...
101 def on_frame(self, callback: Callable[[bytes], None]) -> None:
102 """Register frame receive callback."""
103 self._frame_callback = callback
105 def on_event(self, callback: Callable[[dict], None]) -> None:
106 """Register event receive callback."""
107 self._event_callback = callback
109 @property
110 def connected(self) -> bool:
111 return self._connected
113 def get_stats(self) -> dict:
114 return {
115 'tier': self.tier.value,
116 'connected': self._connected,
117 'bytes_sent': self._bytes_sent,
118 'bytes_received': self._bytes_received,
119 'frames_sent': self._frames_sent,
120 'frames_received': self._frames_received,
121 }
124# ── Tier 1: Direct WebSocket (LAN) ─────────────────────────────
126class DirectWebSocketTransport(TransportChannel):
127 """Direct WebSocket between host and viewer (LAN or WireGuard IP).
129 Protocol (mirrors vision_service.py binary frame pattern):
130 - Binary message = frame data (JPEG bytes)
131 - Text message = JSON event {type, ...}
132 """
134 def __init__(self, host_mode: bool = False, host: str = '0.0.0.0',
135 port: int = 0):
136 super().__init__()
137 self._host_mode = host_mode
138 self._host = host
139 self._port = port
140 self._actual_port: Optional[int] = None
141 self._ws = None
142 self._server = None
143 self._loop: Optional[asyncio.AbstractEventLoop] = None
144 self._thread: Optional[threading.Thread] = None
145 self._clients: List[Any] = []
147 @property
148 def tier(self) -> TransportTier:
149 return TransportTier.LAN_DIRECT
151 @property
152 def actual_port(self) -> Optional[int]:
153 return self._actual_port
155 def start_server(self) -> int:
156 """Start WebSocket server (host mode). Returns assigned port."""
157 if not _websockets:
158 raise RuntimeError("websockets library not installed")
160 self._loop = asyncio.new_event_loop()
161 self._thread = threading.Thread(
162 target=self._run_server_loop,
163 daemon=True,
164 name='ws-transport-server',
165 )
166 self._thread.start()
168 # Wait for server to start (up to 5 seconds)
169 for _ in range(50):
170 if self._actual_port:
171 break
172 time.sleep(0.1)
174 self._connected = True
175 logger.info(f"WebSocket transport server on port {self._actual_port}")
176 return self._actual_port or 0
178 def _run_server_loop(self) -> None:
179 asyncio.set_event_loop(self._loop)
180 self._loop.run_until_complete(self._serve())
182 async def _serve(self) -> None:
183 async def handler(ws):
184 self._clients.append(ws)
185 try:
186 async for message in ws:
187 if isinstance(message, bytes):
188 self._bytes_received += len(message)
189 self._frames_received += 1
190 if self._frame_callback:
191 self._frame_callback(message)
192 else:
193 self._bytes_received += len(message)
194 if self._event_callback:
195 try:
196 event = json.loads(message)
197 self._event_callback(event)
198 except json.JSONDecodeError:
199 pass
200 finally:
201 self._clients.remove(ws)
203 self._server = await _websockets.serve(
204 handler, self._host, self._port,
205 max_size=10 * 1024 * 1024, # 10MB max message (frames can be large)
206 )
207 self._actual_port = self._server.sockets[0].getsockname()[1]
208 await self._server.wait_closed()
210 def connect(self, url: str) -> bool:
211 """Connect to WebSocket server (viewer mode).
213 Args:
214 url: WebSocket URL (e.g., 'ws://192.168.1.5:9876')
215 """
216 if not _websockets:
217 logger.error("websockets library not installed")
218 return False
220 self._loop = asyncio.new_event_loop()
221 self._thread = threading.Thread(
222 target=self._run_client_loop,
223 args=(url,),
224 daemon=True,
225 name='ws-transport-client',
226 )
227 self._thread.start()
229 # Wait for connection
230 for _ in range(50):
231 if self._connected:
232 return True
233 time.sleep(0.1)
235 return self._connected
237 def _run_client_loop(self, url: str) -> None:
238 asyncio.set_event_loop(self._loop)
239 self._loop.run_until_complete(self._client_handler(url))
241 async def _client_handler(self, url: str) -> None:
242 try:
243 async with _websockets.connect(url, max_size=10 * 1024 * 1024) as ws:
244 self._ws = ws
245 self._connected = True
246 async for message in ws:
247 if isinstance(message, bytes):
248 self._bytes_received += len(message)
249 self._frames_received += 1
250 if self._frame_callback:
251 self._frame_callback(message)
252 else:
253 self._bytes_received += len(message)
254 if self._event_callback:
255 try:
256 event = json.loads(message)
257 self._event_callback(event)
258 except json.JSONDecodeError:
259 pass
260 except Exception as e:
261 logger.error(f"WebSocket client error: {e}")
262 finally:
263 self._connected = False
265 def send_frame(self, data: bytes) -> bool:
266 if not self._connected:
267 return False
268 try:
269 if self._host_mode and self._clients:
270 # Server: broadcast to all clients
271 for client in self._clients[:]:
272 try:
273 asyncio.run_coroutine_threadsafe(
274 client.send(data), self._loop
275 )
276 except Exception:
277 pass
278 elif self._ws:
279 asyncio.run_coroutine_threadsafe(
280 self._ws.send(data), self._loop
281 )
282 self._bytes_sent += len(data)
283 self._frames_sent += 1
284 return True
285 except Exception as e:
286 logger.debug(f"Send frame failed: {e}")
287 return False
289 def send_event(self, event: dict) -> bool:
290 if not self._connected:
291 return False
292 try:
293 msg = json.dumps(event, separators=(',', ':'))
294 if self._host_mode and self._clients:
295 for client in self._clients[:]:
296 try:
297 asyncio.run_coroutine_threadsafe(
298 client.send(msg), self._loop
299 )
300 except Exception:
301 pass
302 elif self._ws:
303 asyncio.run_coroutine_threadsafe(
304 self._ws.send(msg), self._loop
305 )
306 self._bytes_sent += len(msg)
307 return True
308 except Exception as e:
309 logger.debug(f"Send event failed: {e}")
310 return False
312 def close(self) -> None:
313 self._connected = False
314 if self._server:
315 self._server.close()
316 if self._ws:
317 try:
318 if self._loop and self._loop.is_running():
319 asyncio.run_coroutine_threadsafe(
320 self._ws.close(), self._loop
321 )
322 except Exception:
323 pass
324 if self._loop:
325 self._loop.call_soon_threadsafe(self._loop.stop)
326 self._ws = None
327 self._server = None
330# ── Tier 2: WAMP Relay ──────────────────────────────────────────
332class WAMPRelayTransport(TransportChannel):
333 """WAMP pub/sub relay through existing Crossbar router.
335 Both devices already connected to ws://aws_rasa.hertzai.com:8088/ws.
336 Frames published to com.hartos.remote_desktop.frames.{session_id}.
337 Input events to com.hartos.remote_desktop.input.{session_id}.
339 Reuses crossbar_server.py WAMP session.
340 """
342 TOPIC_PREFIX = 'com.hartos.remote_desktop'
344 def __init__(self, session_id: str, role: str = 'host'):
345 """
346 Args:
347 session_id: Unique session identifier
348 role: 'host' (sends frames, receives input) or 'viewer' (vice versa)
349 """
350 super().__init__()
351 self._session_id = session_id
352 self._role = role
353 self._wamp_session = None
355 @property
356 def tier(self) -> TransportTier:
357 return TransportTier.WAMP_RELAY
359 def start(self) -> bool:
360 """Connect to existing WAMP session and subscribe to topics."""
361 try:
362 from crossbar_server import wamp_session
363 self._wamp_session = wamp_session
364 if not self._wamp_session:
365 logger.warning("WAMP session not available")
366 return False
368 self._connected = True
369 logger.info(f"WAMP relay transport started (session={self._session_id[:8]})")
370 return True
371 except ImportError:
372 logger.warning("crossbar_server not available for WAMP relay")
373 return False
375 def _topic(self, channel: str) -> str:
376 return f"{self.TOPIC_PREFIX}.{channel}.{self._session_id}"
378 def send_frame(self, data: bytes) -> bool:
379 if not self._connected or not self._wamp_session:
380 return False
381 try:
382 # Encode binary as base64 for WAMP text transport
383 import base64
384 payload = base64.b64encode(data).decode('ascii')
385 asyncio.ensure_future(
386 self._wamp_session.publish(
387 self._topic('frames'),
388 payload,
389 )
390 )
391 self._bytes_sent += len(data)
392 self._frames_sent += 1
393 return True
394 except Exception as e:
395 logger.debug(f"WAMP send frame failed: {e}")
396 return False
398 def send_event(self, event: dict) -> bool:
399 if not self._connected or not self._wamp_session:
400 return False
401 try:
402 asyncio.ensure_future(
403 self._wamp_session.publish(
404 self._topic('input' if self._role == 'viewer' else 'control'),
405 json.dumps(event, separators=(',', ':')),
406 )
407 )
408 self._bytes_sent += len(json.dumps(event))
409 return True
410 except Exception as e:
411 logger.debug(f"WAMP send event failed: {e}")
412 return False
414 def close(self) -> None:
415 self._connected = False
416 self._wamp_session = None
417 logger.info(f"WAMP relay transport closed (session={self._session_id[:8]})")
420# ── Tier 3: WireGuard P2P ──────────────────────────────────────
422class WireGuardTransport(DirectWebSocketTransport):
423 """Direct WebSocket through WireGuard tunnel.
425 Reuses compute_mesh_service.py:74-76 (port 6795, subnet 10.99.0.0/16).
426 Once tunnel established, behaves like DirectWebSocketTransport but
427 over the WireGuard mesh IP.
428 """
430 def __init__(self, wg_ip: str, port: int = 0):
431 super().__init__(host_mode=False, host=wg_ip, port=port)
432 self._wg_ip = wg_ip
434 @property
435 def tier(self) -> TransportTier:
436 return TransportTier.WIREGUARD_P2P
439# ── Transport Negotiation ───────────────────────────────────────
441def get_local_ip() -> Optional[str]:
442 """Get local LAN IP address."""
443 try:
444 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
445 s.settimeout(1)
446 s.connect(('8.8.8.8', 80))
447 ip = s.getsockname()[0]
448 s.close()
449 return ip
450 except Exception:
451 return None
454def probe_lan_connectivity(target_ip: str, port: int,
455 timeout: float = 2.0) -> bool:
456 """Check if target is reachable on LAN via TCP probe."""
457 try:
458 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
459 s.settimeout(timeout)
460 result = s.connect_ex((target_ip, port))
461 s.close()
462 return result == 0
463 except Exception:
464 return False
467def probe_wireguard_connectivity(wg_ip: str, timeout: float = 2.0) -> bool:
468 """Check if WireGuard tunnel is active by pinging mesh IP."""
469 try:
470 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
471 s.settimeout(timeout)
472 # Try common WireGuard mesh port
473 result = s.connect_ex((wg_ip, 6796))
474 s.close()
475 return result == 0
476 except Exception:
477 return False
480def auto_negotiate_transport(
481 session_id: str,
482 host_offers: dict,
483 role: str = 'viewer',
484) -> Optional[TransportChannel]:
485 """Auto-negotiate best transport tier.
487 Args:
488 session_id: Session ID
489 host_offers: {
490 'lan_ip': '192.168.1.5',
491 'lan_port': 9876,
492 'wg_ip': '10.99.0.5',
493 'wg_port': 9877,
494 'wamp_available': True,
495 }
496 role: 'host' or 'viewer'
498 Returns:
499 Best available TransportChannel, or None if all fail.
500 """
501 # Tier 1: Try LAN direct
502 lan_ip = host_offers.get('lan_ip')
503 lan_port = host_offers.get('lan_port')
504 if lan_ip and lan_port and _websockets:
505 if probe_lan_connectivity(lan_ip, lan_port):
506 transport = DirectWebSocketTransport()
507 if transport.connect(f"ws://{lan_ip}:{lan_port}"):
508 logger.info(f"Transport: Tier 1 LAN direct ({lan_ip}:{lan_port})")
509 return transport
511 # Tier 3: Try WireGuard P2P
512 wg_ip = host_offers.get('wg_ip')
513 wg_port = host_offers.get('wg_port')
514 if wg_ip and wg_port and _websockets:
515 if probe_wireguard_connectivity(wg_ip):
516 transport = WireGuardTransport(wg_ip, wg_port)
517 if transport.connect(f"ws://{wg_ip}:{wg_port}"):
518 logger.info(f"Transport: Tier 3 WireGuard P2P ({wg_ip}:{wg_port})")
519 return transport
521 # Tier 2: WAMP relay (always available)
522 if host_offers.get('wamp_available', True):
523 transport = WAMPRelayTransport(session_id, role=role)
524 if transport.start():
525 logger.info(f"Transport: Tier 2 WAMP relay")
526 return transport
528 logger.error("All transport tiers failed")
529 return None