Coverage for integrations / remote_desktop / signaling.py: 70.3%
118 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Signaling — WAMP-based connection negotiation for remote desktop.
4How two devices find each other and agree on transport tier:
5 1. Viewer sends connect_request to host's signal topic (includes OTP)
6 2. Host verifies OTP → sends connect_accept with transport offers
7 3. Both sides run auto_negotiate_transport() to pick best tier
8 4. Streaming begins on selected transport
10Channels:
11 WAMP topic: com.hartos.remote_desktop.signal.{device_id}
12 HTTP fallback: POST/GET /api/remote-desktop/signal/<device_id>
14Reuses:
15 - crossbar_server.py WAMP publish/subscribe
16 - security/channel_encryption.py encrypt_event() for encrypted signals
17 - /api/remote-desktop/signal endpoints (already wired in hart_intelligence)
18"""
20import json
21import logging
22import time
23import uuid
24from dataclasses import dataclass, field, asdict
25from enum import Enum
26from typing import Any, Callable, Dict, List, Optional
27from core.port_registry import get_port
28from core.http_pool import pooled_get, pooled_post
30logger = logging.getLogger('hevolve.remote_desktop')
33class SignalType(Enum):
34 CONNECT_REQUEST = 'connect_request'
35 CONNECT_ACCEPT = 'connect_accept'
36 CONNECT_REJECT = 'connect_reject'
37 TRANSPORT_OFFER = 'transport_offer'
38 BYE = 'bye'
41@dataclass
42class SignalingMessage:
43 """A signaling message between two devices."""
44 msg_type: str
45 sender_device_id: str
46 target_device_id: str
47 payload: Dict[str, Any] = field(default_factory=dict)
48 message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:12])
49 timestamp: float = field(default_factory=time.time)
51 def to_dict(self) -> dict:
52 return asdict(self)
54 @classmethod
55 def from_dict(cls, data: dict) -> 'SignalingMessage':
56 return cls(
57 msg_type=data.get('msg_type', ''),
58 sender_device_id=data.get('sender_device_id', ''),
59 target_device_id=data.get('target_device_id', ''),
60 payload=data.get('payload', {}),
61 message_id=data.get('message_id', str(uuid.uuid4())[:12]),
62 timestamp=data.get('timestamp', time.time()),
63 )
66class SignalingChannel:
67 """WAMP-based signaling with HTTP fallback.
69 Subscribes to device's WAMP topic for incoming connection requests.
70 Publishes to target device's topic for outgoing signals.
71 Falls back to HTTP API when WAMP is unavailable.
72 """
74 TOPIC_PREFIX = 'com.hartos.remote_desktop.signal'
76 def __init__(self, device_id: str, api_base: Optional[str] = None):
77 """
78 Args:
79 device_id: This device's ID (subscribes to signal.{device_id})
80 api_base: HTTP API base URL (e.g., 'http://localhost:6777')
81 """
82 self._device_id = device_id
83 self._api_base = api_base or f'http://localhost:{get_port("backend")}'
84 self._wamp_session = None
85 self._callback: Optional[Callable[[SignalingMessage], None]] = None
86 self._pending_signals: List[SignalingMessage] = []
87 self._connected = False
89 def start(self) -> bool:
90 """Start listening for signals on WAMP topic."""
91 try:
92 from crossbar_server import wamp_session
93 self._wamp_session = wamp_session
94 if self._wamp_session:
95 topic = f"{self.TOPIC_PREFIX}.{self._device_id}"
96 self._wamp_session.subscribe(
97 self._on_wamp_signal, topic
98 )
99 self._connected = True
100 logger.info(f"Signaling channel started (WAMP: {topic})")
101 return True
102 except ImportError:
103 pass
105 # WAMP not available — use HTTP polling mode
106 logger.info("Signaling channel using HTTP fallback")
107 self._connected = True
108 return True
110 def send_signal(self, target_device_id: str,
111 message: SignalingMessage) -> bool:
112 """Send a signaling message to another device."""
113 message.sender_device_id = self._device_id
114 message.target_device_id = target_device_id
116 # Try WAMP first
117 if self._wamp_session:
118 return self._send_wamp(target_device_id, message)
120 # Fall back to HTTP
121 return self._send_http(target_device_id, message)
123 def on_signal(self, callback: Callable[[SignalingMessage], None]) -> None:
124 """Register handler for incoming signals."""
125 self._callback = callback
127 def get_pending(self) -> List[SignalingMessage]:
128 """Get and clear pending signals (for HTTP polling)."""
129 pending = list(self._pending_signals)
130 self._pending_signals.clear()
131 return pending
133 def close(self) -> None:
134 """Close the signaling channel."""
135 self._connected = False
136 self._wamp_session = None
137 self._callback = None
138 logger.info("Signaling channel closed")
140 # ── WAMP transport ───────────────────────────────────────
142 def _send_wamp(self, target_device_id: str,
143 message: SignalingMessage) -> bool:
144 """Send signal via WAMP publish."""
145 try:
146 topic = f"{self.TOPIC_PREFIX}.{target_device_id}"
147 payload = json.dumps(message.to_dict(), separators=(',', ':'))
149 # Encrypt if possible
150 try:
151 from integrations.remote_desktop.security import encrypt_event
152 encrypted = encrypt_event(message.to_dict(), None)
153 if encrypted:
154 payload = json.dumps(encrypted, separators=(',', ':'))
155 except Exception:
156 pass
158 import asyncio
159 asyncio.ensure_future(
160 self._wamp_session.publish(topic, payload)
161 )
162 logger.debug(f"Signal sent (WAMP): {message.msg_type} → {target_device_id[:8]}")
163 return True
164 except Exception as e:
165 logger.warning(f"WAMP signal send failed, trying HTTP: {e}")
166 return self._send_http(target_device_id, message)
168 def _on_wamp_signal(self, payload_str: str) -> None:
169 """Handle incoming WAMP signal."""
170 try:
171 data = json.loads(payload_str)
173 # Try decrypt
174 try:
175 from integrations.remote_desktop.security import decrypt_event
176 decrypted = decrypt_event(data)
177 if decrypted:
178 data = decrypted
179 except Exception:
180 pass
182 message = SignalingMessage.from_dict(data)
184 if self._callback:
185 self._callback(message)
186 else:
187 self._pending_signals.append(message)
188 except Exception as e:
189 logger.debug(f"Signal parse error: {e}")
191 # ── HTTP fallback ────────────────────────────────────────
193 def _send_http(self, target_device_id: str,
194 message: SignalingMessage) -> bool:
195 """Send signal via HTTP API endpoint."""
196 try:
197 resp = pooled_post(
198 f"{self._api_base}/api/remote-desktop/signal",
199 json={
200 'target_device_id': target_device_id,
201 'message': message.to_dict(),
202 },
203 timeout=5,
204 )
205 return resp.status_code in (200, 201)
206 except Exception as e:
207 logger.debug(f"HTTP signal send failed: {e}")
208 return False
210 def poll_signals_http(self) -> List[SignalingMessage]:
211 """Poll for incoming signals via HTTP (fallback for no WAMP)."""
212 try:
213 resp = pooled_get(
214 f"{self._api_base}/api/remote-desktop/signal/{self._device_id}",
215 timeout=5,
216 )
217 if resp.status_code == 200:
218 data = resp.json()
219 signals = data.get('signals', [])
220 return [SignalingMessage.from_dict(s) for s in signals]
221 except Exception as e:
222 logger.debug(f"HTTP signal poll failed: {e}")
223 return []
226# ── Convenience functions ─────────────────────────────────────
228def create_connect_request(sender_id: str, target_id: str,
229 password: str, mode: str = 'full_control',
230 session_id: Optional[str] = None) -> SignalingMessage:
231 """Create a CONNECT_REQUEST signaling message."""
232 return SignalingMessage(
233 msg_type=SignalType.CONNECT_REQUEST.value,
234 sender_device_id=sender_id,
235 target_device_id=target_id,
236 payload={
237 'password': password,
238 'mode': mode,
239 'session_id': session_id or str(uuid.uuid4()),
240 },
241 )
244def create_connect_accept(sender_id: str, target_id: str,
245 session_id: str,
246 transport_offers: Optional[dict] = None) -> SignalingMessage:
247 """Create a CONNECT_ACCEPT signaling message with transport offers."""
248 from integrations.remote_desktop.transport import get_local_ip
249 offers = transport_offers or {}
251 # Auto-populate LAN offer
252 if 'lan_ip' not in offers:
253 local_ip = get_local_ip()
254 if local_ip:
255 offers['lan_ip'] = local_ip
257 offers.setdefault('wamp_available', True)
259 return SignalingMessage(
260 msg_type=SignalType.CONNECT_ACCEPT.value,
261 sender_device_id=sender_id,
262 target_device_id=target_id,
263 payload={
264 'session_id': session_id,
265 'transport_offers': offers,
266 },
267 )
270def create_connect_reject(sender_id: str, target_id: str,
271 reason: str) -> SignalingMessage:
272 """Create a CONNECT_REJECT signaling message."""
273 return SignalingMessage(
274 msg_type=SignalType.CONNECT_REJECT.value,
275 sender_device_id=sender_id,
276 target_device_id=target_id,
277 payload={'reason': reason},
278 )
281def create_bye(sender_id: str, target_id: str,
282 session_id: str) -> SignalingMessage:
283 """Create a BYE signaling message (session end)."""
284 return SignalingMessage(
285 msg_type=SignalType.BYE.value,
286 sender_device_id=sender_id,
287 target_device_id=target_id,
288 payload={'session_id': session_id},
289 )