Coverage for integrations / remote_desktop / signaling.py: 70.3%

118 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Signaling — WAMP-based connection negotiation for remote desktop. 

3 

4How two devices find each other and agree on transport tier: 

5 1. Viewer sends connect_request to host's signal topic (includes OTP) 

6 2. Host verifies OTP → sends connect_accept with transport offers 

7 3. Both sides run auto_negotiate_transport() to pick best tier 

8 4. Streaming begins on selected transport 

9 

10Channels: 

11 WAMP topic: com.hartos.remote_desktop.signal.{device_id} 

12 HTTP fallback: POST/GET /api/remote-desktop/signal/<device_id> 

13 

14Reuses: 

15 - crossbar_server.py WAMP publish/subscribe 

16 - security/channel_encryption.py encrypt_event() for encrypted signals 

17 - /api/remote-desktop/signal endpoints (already wired in hart_intelligence) 

18""" 

19 

20import json 

21import logging 

22import time 

23import uuid 

24from dataclasses import dataclass, field, asdict 

25from enum import Enum 

26from typing import Any, Callable, Dict, List, Optional 

27from core.port_registry import get_port 

28from core.http_pool import pooled_get, pooled_post 

29 

30logger = logging.getLogger('hevolve.remote_desktop') 

31 

32 

33class SignalType(Enum): 

34 CONNECT_REQUEST = 'connect_request' 

35 CONNECT_ACCEPT = 'connect_accept' 

36 CONNECT_REJECT = 'connect_reject' 

37 TRANSPORT_OFFER = 'transport_offer' 

38 BYE = 'bye' 

39 

40 

41@dataclass 

42class SignalingMessage: 

43 """A signaling message between two devices.""" 

44 msg_type: str 

45 sender_device_id: str 

46 target_device_id: str 

47 payload: Dict[str, Any] = field(default_factory=dict) 

48 message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:12]) 

49 timestamp: float = field(default_factory=time.time) 

50 

51 def to_dict(self) -> dict: 

52 return asdict(self) 

53 

54 @classmethod 

55 def from_dict(cls, data: dict) -> 'SignalingMessage': 

56 return cls( 

57 msg_type=data.get('msg_type', ''), 

58 sender_device_id=data.get('sender_device_id', ''), 

59 target_device_id=data.get('target_device_id', ''), 

60 payload=data.get('payload', {}), 

61 message_id=data.get('message_id', str(uuid.uuid4())[:12]), 

62 timestamp=data.get('timestamp', time.time()), 

63 ) 

64 

65 

66class SignalingChannel: 

67 """WAMP-based signaling with HTTP fallback. 

68 

69 Subscribes to device's WAMP topic for incoming connection requests. 

70 Publishes to target device's topic for outgoing signals. 

71 Falls back to HTTP API when WAMP is unavailable. 

72 """ 

73 

74 TOPIC_PREFIX = 'com.hartos.remote_desktop.signal' 

75 

76 def __init__(self, device_id: str, api_base: Optional[str] = None): 

77 """ 

78 Args: 

79 device_id: This device's ID (subscribes to signal.{device_id}) 

80 api_base: HTTP API base URL (e.g., 'http://localhost:6777') 

81 """ 

82 self._device_id = device_id 

83 self._api_base = api_base or f'http://localhost:{get_port("backend")}' 

84 self._wamp_session = None 

85 self._callback: Optional[Callable[[SignalingMessage], None]] = None 

86 self._pending_signals: List[SignalingMessage] = [] 

87 self._connected = False 

88 

89 def start(self) -> bool: 

90 """Start listening for signals on WAMP topic.""" 

91 try: 

92 from crossbar_server import wamp_session 

93 self._wamp_session = wamp_session 

94 if self._wamp_session: 

95 topic = f"{self.TOPIC_PREFIX}.{self._device_id}" 

96 self._wamp_session.subscribe( 

97 self._on_wamp_signal, topic 

98 ) 

99 self._connected = True 

100 logger.info(f"Signaling channel started (WAMP: {topic})") 

101 return True 

102 except ImportError: 

103 pass 

104 

105 # WAMP not available — use HTTP polling mode 

106 logger.info("Signaling channel using HTTP fallback") 

107 self._connected = True 

108 return True 

109 

110 def send_signal(self, target_device_id: str, 

111 message: SignalingMessage) -> bool: 

112 """Send a signaling message to another device.""" 

113 message.sender_device_id = self._device_id 

114 message.target_device_id = target_device_id 

115 

116 # Try WAMP first 

117 if self._wamp_session: 

118 return self._send_wamp(target_device_id, message) 

119 

120 # Fall back to HTTP 

121 return self._send_http(target_device_id, message) 

122 

123 def on_signal(self, callback: Callable[[SignalingMessage], None]) -> None: 

124 """Register handler for incoming signals.""" 

125 self._callback = callback 

126 

127 def get_pending(self) -> List[SignalingMessage]: 

128 """Get and clear pending signals (for HTTP polling).""" 

129 pending = list(self._pending_signals) 

130 self._pending_signals.clear() 

131 return pending 

132 

133 def close(self) -> None: 

134 """Close the signaling channel.""" 

135 self._connected = False 

136 self._wamp_session = None 

137 self._callback = None 

138 logger.info("Signaling channel closed") 

139 

140 # ── WAMP transport ─────────────────────────────────────── 

141 

142 def _send_wamp(self, target_device_id: str, 

143 message: SignalingMessage) -> bool: 

144 """Send signal via WAMP publish.""" 

145 try: 

146 topic = f"{self.TOPIC_PREFIX}.{target_device_id}" 

147 payload = json.dumps(message.to_dict(), separators=(',', ':')) 

148 

149 # Encrypt if possible 

150 try: 

151 from integrations.remote_desktop.security import encrypt_event 

152 encrypted = encrypt_event(message.to_dict(), None) 

153 if encrypted: 

154 payload = json.dumps(encrypted, separators=(',', ':')) 

155 except Exception: 

156 pass 

157 

158 import asyncio 

159 asyncio.ensure_future( 

160 self._wamp_session.publish(topic, payload) 

161 ) 

162 logger.debug(f"Signal sent (WAMP): {message.msg_type} → {target_device_id[:8]}") 

163 return True 

164 except Exception as e: 

165 logger.warning(f"WAMP signal send failed, trying HTTP: {e}") 

166 return self._send_http(target_device_id, message) 

167 

168 def _on_wamp_signal(self, payload_str: str) -> None: 

169 """Handle incoming WAMP signal.""" 

170 try: 

171 data = json.loads(payload_str) 

172 

173 # Try decrypt 

174 try: 

175 from integrations.remote_desktop.security import decrypt_event 

176 decrypted = decrypt_event(data) 

177 if decrypted: 

178 data = decrypted 

179 except Exception: 

180 pass 

181 

182 message = SignalingMessage.from_dict(data) 

183 

184 if self._callback: 

185 self._callback(message) 

186 else: 

187 self._pending_signals.append(message) 

188 except Exception as e: 

189 logger.debug(f"Signal parse error: {e}") 

190 

191 # ── HTTP fallback ──────────────────────────────────────── 

192 

193 def _send_http(self, target_device_id: str, 

194 message: SignalingMessage) -> bool: 

195 """Send signal via HTTP API endpoint.""" 

196 try: 

197 resp = pooled_post( 

198 f"{self._api_base}/api/remote-desktop/signal", 

199 json={ 

200 'target_device_id': target_device_id, 

201 'message': message.to_dict(), 

202 }, 

203 timeout=5, 

204 ) 

205 return resp.status_code in (200, 201) 

206 except Exception as e: 

207 logger.debug(f"HTTP signal send failed: {e}") 

208 return False 

209 

210 def poll_signals_http(self) -> List[SignalingMessage]: 

211 """Poll for incoming signals via HTTP (fallback for no WAMP).""" 

212 try: 

213 resp = pooled_get( 

214 f"{self._api_base}/api/remote-desktop/signal/{self._device_id}", 

215 timeout=5, 

216 ) 

217 if resp.status_code == 200: 

218 data = resp.json() 

219 signals = data.get('signals', []) 

220 return [SignalingMessage.from_dict(s) for s in signals] 

221 except Exception as e: 

222 logger.debug(f"HTTP signal poll failed: {e}") 

223 return [] 

224 

225 

226# ── Convenience functions ───────────────────────────────────── 

227 

228def create_connect_request(sender_id: str, target_id: str, 

229 password: str, mode: str = 'full_control', 

230 session_id: Optional[str] = None) -> SignalingMessage: 

231 """Create a CONNECT_REQUEST signaling message.""" 

232 return SignalingMessage( 

233 msg_type=SignalType.CONNECT_REQUEST.value, 

234 sender_device_id=sender_id, 

235 target_device_id=target_id, 

236 payload={ 

237 'password': password, 

238 'mode': mode, 

239 'session_id': session_id or str(uuid.uuid4()), 

240 }, 

241 ) 

242 

243 

244def create_connect_accept(sender_id: str, target_id: str, 

245 session_id: str, 

246 transport_offers: Optional[dict] = None) -> SignalingMessage: 

247 """Create a CONNECT_ACCEPT signaling message with transport offers.""" 

248 from integrations.remote_desktop.transport import get_local_ip 

249 offers = transport_offers or {} 

250 

251 # Auto-populate LAN offer 

252 if 'lan_ip' not in offers: 

253 local_ip = get_local_ip() 

254 if local_ip: 

255 offers['lan_ip'] = local_ip 

256 

257 offers.setdefault('wamp_available', True) 

258 

259 return SignalingMessage( 

260 msg_type=SignalType.CONNECT_ACCEPT.value, 

261 sender_device_id=sender_id, 

262 target_device_id=target_id, 

263 payload={ 

264 'session_id': session_id, 

265 'transport_offers': offers, 

266 }, 

267 ) 

268 

269 

270def create_connect_reject(sender_id: str, target_id: str, 

271 reason: str) -> SignalingMessage: 

272 """Create a CONNECT_REJECT signaling message.""" 

273 return SignalingMessage( 

274 msg_type=SignalType.CONNECT_REJECT.value, 

275 sender_device_id=sender_id, 

276 target_device_id=target_id, 

277 payload={'reason': reason}, 

278 ) 

279 

280 

281def create_bye(sender_id: str, target_id: str, 

282 session_id: str) -> SignalingMessage: 

283 """Create a BYE signaling message (session end).""" 

284 return SignalingMessage( 

285 msg_type=SignalType.BYE.value, 

286 sender_device_id=sender_id, 

287 target_device_id=target_id, 

288 payload={'session_id': session_id}, 

289 )