Coverage for integrations / remote_desktop / dlna_bridge.py: 56.2%

256 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2DLNA Bridge — Cast HARTOS remote desktop sessions to DLNA/UPnP renderers. 

3 

4Architecture: 

5 Discovery: SSDP M-SEARCH (239.255.255.250:1900) for MediaRenderer devices 

6 Control: UPnP AVTransport SOAP (SetAVTransportURI + Play/Stop) 

7 Streaming: MJPEG HTTP server from FrameCapture/WindowCapture output 

8 

9HARTOS doesn't reimplement DLNA — it uses raw SSDP (stdlib socket) for discovery 

10and HTTP POST (urllib) for UPnP control. Optional async_upnp_client if available. 

11 

12Reuses: 

13 - frame_capture.py: FrameCapture as default frame source 

14 - window_capture.py: WindowCapture for per-window casting 

15 - security.py: audit_session_event() for cast audit 

16""" 

17 

18import http.server 

19import logging 

20import socket 

21import struct 

22import threading 

23import time 

24from dataclasses import dataclass, field 

25from typing import Callable, Dict, Generator, List, Optional 

26 

27logger = logging.getLogger('hevolve.remote_desktop') 

28 

29 

30@dataclass 

31class DLNARenderer: 

32 """Discovered DLNA/UPnP renderer (smart TV, speaker, etc.).""" 

33 device_id: str # UDN (Unique Device Name) 

34 friendly_name: str 

35 ip: str 

36 port: int 

37 location: str = '' # SSDP Location URL 

38 control_url: str = '' # AVTransport control URL 

39 supports_video: bool = True 

40 supports_audio: bool = True 

41 manufacturer: str = '' 

42 model: str = '' 

43 

44 def to_dict(self) -> dict: 

45 return { 

46 'device_id': self.device_id, 

47 'friendly_name': self.friendly_name, 

48 'ip': self.ip, 

49 'port': self.port, 

50 'supports_video': self.supports_video, 

51 'supports_audio': self.supports_audio, 

52 'manufacturer': self.manufacturer, 

53 'model': self.model, 

54 } 

55 

56 

57@dataclass 

58class CastSession: 

59 """Active DLNA cast session.""" 

60 cast_session_id: str 

61 renderer: DLNARenderer 

62 source_session_id: str # Remote desktop session being cast 

63 stream_url: str # MJPEG URL the renderer pulls from 

64 started_at: float = 0.0 

65 active: bool = True 

66 

67 

68class MJPEGStreamServer: 

69 """Lightweight HTTP server serving MJPEG stream from a frame source. 

70 

71 Binds to a local port, serves multipart/x-mixed-replace JPEG frames. 

72 DLNA renderers pull from this URL. 

73 """ 

74 

75 def __init__(self, host: str = '0.0.0.0', port: int = 0): 

76 self._host = host 

77 self._port = port 

78 self._server: Optional[http.server.HTTPServer] = None 

79 self._thread: Optional[threading.Thread] = None 

80 self._frame_source: Optional[Callable] = None 

81 self._running = False 

82 self._current_frame: Optional[bytes] = None 

83 self._frame_lock = threading.Lock() 

84 

85 def start(self, frame_source: Callable[[], Optional[bytes]], 

86 port: int = 0) -> str: 

87 """Start serving MJPEG stream. 

88 

89 Args: 

90 frame_source: Callable that returns JPEG bytes (or None). 

91 port: Port to bind (0 = auto-assign). 

92 

93 Returns: 

94 Stream URL (e.g., http://192.168.1.10:8554/stream.mjpeg). 

95 """ 

96 self._frame_source = frame_source 

97 self._running = True 

98 

99 # Find local IP 

100 local_ip = self._get_local_ip() 

101 

102 # Build handler 

103 server_ref = self 

104 

105 class MJPEGHandler(http.server.BaseHTTPRequestHandler): 

106 def do_GET(self): 

107 if self.path != '/stream.mjpeg': 

108 self.send_error(404) 

109 return 

110 

111 self.send_response(200) 

112 self.send_header('Content-Type', 

113 'multipart/x-mixed-replace; boundary=--frame') 

114 self.send_header('Cache-Control', 'no-cache') 

115 self.end_headers() 

116 

117 while server_ref._running: 

118 frame = None 

119 if server_ref._frame_source: 

120 try: 

121 frame = server_ref._frame_source() 

122 except Exception: 

123 pass 

124 

125 if frame: 

126 try: 

127 self.wfile.write(b'--frame\r\n') 

128 self.wfile.write( 

129 b'Content-Type: image/jpeg\r\n') 

130 self.wfile.write( 

131 f'Content-Length: {len(frame)}\r\n\r\n' 

132 .encode()) 

133 self.wfile.write(frame) 

134 self.wfile.write(b'\r\n') 

135 self.wfile.flush() 

136 except (BrokenPipeError, ConnectionResetError): 

137 break 

138 time.sleep(0.033) # ~30fps cap 

139 

140 def log_message(self, format, *args): 

141 pass # Suppress request logging 

142 

143 bind_port = port or self._port or 0 

144 self._server = http.server.HTTPServer( 

145 (self._host, bind_port), MJPEGHandler) 

146 actual_port = self._server.server_address[1] 

147 

148 self._thread = threading.Thread( 

149 target=self._server.serve_forever, 

150 daemon=True, 

151 name='mjpeg-stream', 

152 ) 

153 self._thread.start() 

154 

155 url = f'http://{local_ip}:{actual_port}/stream.mjpeg' 

156 logger.info(f"MJPEG stream server started: {url}") 

157 return url 

158 

159 def stop(self) -> None: 

160 """Stop the MJPEG stream server.""" 

161 self._running = False 

162 if self._server: 

163 self._server.shutdown() 

164 self._server = None 

165 if self._thread: 

166 self._thread.join(timeout=5) 

167 self._thread = None 

168 

169 @property 

170 def is_running(self) -> bool: 

171 return self._running 

172 

173 def _get_local_ip(self) -> str: 

174 """Get local IP address for LAN access.""" 

175 try: 

176 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

177 s.connect(('8.8.8.8', 80)) 

178 ip = s.getsockname()[0] 

179 s.close() 

180 return ip 

181 except Exception: 

182 return '127.0.0.1' 

183 

184 

185class DLNABridge: 

186 """Cast HARTOS streams to DLNA/UPnP renderers. 

187 

188 Discovery via SSDP M-SEARCH, control via UPnP AVTransport SOAP. 

189 Frames served as MJPEG over HTTP (renderers pull from URL). 

190 """ 

191 

192 SSDP_ADDR = '239.255.255.250' 

193 SSDP_PORT = 1900 

194 

195 @staticmethod 

196 def _default_stream_port(): 

197 from core.port_registry import get_port 

198 return get_port('dlna_stream') 

199 

200 def __init__(self): 

201 self._cast_sessions: Dict[str, CastSession] = {} 

202 self._stream_servers: Dict[str, MJPEGStreamServer] = {} 

203 self._renderer_cache: List[DLNARenderer] = [] 

204 self._lock = threading.Lock() 

205 

206 def discover_renderers(self, 

207 timeout: float = 5.0) -> List[DLNARenderer]: 

208 """Discover DLNA/UPnP MediaRenderer devices on the LAN. 

209 

210 Uses SSDP M-SEARCH multicast to find devices advertising 

211 urn:schemas-upnp-org:device:MediaRenderer:1. 

212 

213 Args: 

214 timeout: Discovery timeout in seconds. 

215 

216 Returns: 

217 List of discovered DLNARenderer devices. 

218 """ 

219 renderers = [] 

220 

221 m_search = ( 

222 'M-SEARCH * HTTP/1.1\r\n' 

223 f'HOST: {self.SSDP_ADDR}:{self.SSDP_PORT}\r\n' 

224 'MAN: "ssdp:discover"\r\n' 

225 'MX: 3\r\n' 

226 'ST: urn:schemas-upnp-org:device:MediaRenderer:1\r\n' 

227 '\r\n' 

228 ).encode() 

229 

230 try: 

231 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 

232 socket.IPPROTO_UDP) 

233 sock.settimeout(timeout) 

234 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

235 

236 # Send M-SEARCH 

237 sock.sendto(m_search, (self.SSDP_ADDR, self.SSDP_PORT)) 

238 

239 # Collect responses 

240 seen = set() 

241 end_time = time.time() + timeout 

242 

243 while time.time() < end_time: 

244 try: 

245 data, addr = sock.recvfrom(4096) 

246 response = data.decode('utf-8', errors='ignore') 

247 

248 # Parse LOCATION header 

249 location = self._parse_header(response, 'LOCATION') 

250 if not location or location in seen: 

251 continue 

252 seen.add(location) 

253 

254 # Fetch device description 

255 renderer = self._fetch_device_info(location, addr[0]) 

256 if renderer: 

257 renderers.append(renderer) 

258 except socket.timeout: 

259 break 

260 except Exception: 

261 continue 

262 

263 sock.close() 

264 except Exception as e: 

265 logger.debug(f"SSDP discovery failed: {e}") 

266 

267 self._renderer_cache = renderers 

268 logger.info(f"DLNA discovery found {len(renderers)} renderer(s)") 

269 return renderers 

270 

271 def cast_session(self, session_id: str, renderer_id: str, 

272 frame_source: Optional[Callable] = None, 

273 stream_port: int = 0) -> dict: 

274 """Cast a session to a DLNA renderer. 

275 

276 Args: 

277 session_id: Remote desktop session to cast. 

278 renderer_id: Target renderer device_id. 

279 frame_source: Callable returning JPEG bytes. If None, uses 

280 FrameCapture (full-screen). 

281 stream_port: Port for MJPEG server (0 = auto). 

282 

283 Returns: 

284 {success, cast_session_id, renderer_name, stream_url} 

285 """ 

286 # Find renderer 

287 renderer = None 

288 for r in self._renderer_cache: 

289 if r.device_id == renderer_id: 

290 renderer = r 

291 break 

292 

293 if not renderer: 

294 return {'success': False, 

295 'error': f'Renderer not found: {renderer_id}'} 

296 

297 # Default frame source: full-screen capture 

298 if frame_source is None: 

299 frame_source = self._default_frame_source() 

300 

301 # Start MJPEG stream server 

302 server = MJPEGStreamServer() 

303 port = stream_port or self._default_stream_port() 

304 stream_url = server.start(frame_source, port=port) 

305 

306 # Send SetAVTransportURI to renderer 

307 ok = self._send_play(renderer, stream_url) 

308 if not ok: 

309 server.stop() 

310 return {'success': False, 

311 'error': 'Failed to send play command to renderer'} 

312 

313 # Track cast session 

314 import uuid 

315 cast_id = f'cast-{uuid.uuid4().hex[:8]}' 

316 cast = CastSession( 

317 cast_session_id=cast_id, 

318 renderer=renderer, 

319 source_session_id=session_id, 

320 stream_url=stream_url, 

321 started_at=time.time(), 

322 ) 

323 

324 with self._lock: 

325 self._cast_sessions[cast_id] = cast 

326 self._stream_servers[cast_id] = server 

327 

328 # Audit 

329 self._audit('cast_started', session_id, 

330 f'Renderer: {renderer.friendly_name}, URL: {stream_url}') 

331 

332 logger.info(f"Casting to {renderer.friendly_name} ({stream_url})") 

333 return { 

334 'success': True, 

335 'cast_session_id': cast_id, 

336 'renderer_name': renderer.friendly_name, 

337 'stream_url': stream_url, 

338 } 

339 

340 def stop_cast(self, cast_session_id: str) -> bool: 

341 """Stop a cast session.""" 

342 with self._lock: 

343 cast = self._cast_sessions.pop(cast_session_id, None) 

344 server = self._stream_servers.pop(cast_session_id, None) 

345 

346 if not cast: 

347 return False 

348 

349 # Send Stop to renderer 

350 self._send_stop(cast.renderer) 

351 

352 # Stop MJPEG server 

353 if server: 

354 server.stop() 

355 

356 self._audit('cast_stopped', cast.source_session_id, 

357 f'Renderer: {cast.renderer.friendly_name}') 

358 

359 logger.info(f"Cast stopped: {cast_session_id}") 

360 return True 

361 

362 def stop_all(self) -> None: 

363 """Stop all cast sessions.""" 

364 with self._lock: 

365 cast_ids = list(self._cast_sessions.keys()) 

366 for cid in cast_ids: 

367 self.stop_cast(cid) 

368 

369 def get_cast_status(self) -> List[dict]: 

370 """Get status of all active cast sessions.""" 

371 with self._lock: 

372 return [ 

373 { 

374 'cast_session_id': c.cast_session_id, 

375 'renderer': c.renderer.friendly_name, 

376 'source_session': c.source_session_id, 

377 'stream_url': c.stream_url, 

378 'started_at': c.started_at, 

379 'active': c.active, 

380 } 

381 for c in self._cast_sessions.values() 

382 ] 

383 

384 def get_cached_renderers(self) -> List[DLNARenderer]: 

385 """Get renderers from last discovery.""" 

386 return self._renderer_cache 

387 

388 # ── UPnP SOAP Control ───────────────────────────────────── 

389 

390 def _send_play(self, renderer: DLNARenderer, 

391 stream_url: str) -> bool: 

392 """Send SetAVTransportURI + Play SOAP commands to renderer.""" 

393 if not renderer.control_url: 

394 logger.debug("No control URL for renderer") 

395 return False 

396 

397 # SetAVTransportURI 

398 set_uri_body = ( 

399 '<?xml version="1.0" encoding="utf-8"?>' 

400 '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"' 

401 ' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">' 

402 '<s:Body>' 

403 '<u:SetAVTransportURI ' 

404 'xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">' 

405 '<InstanceID>0</InstanceID>' 

406 f'<CurrentURI>{stream_url}</CurrentURI>' 

407 '<CurrentURIMetaData></CurrentURIMetaData>' 

408 '</u:SetAVTransportURI>' 

409 '</s:Body></s:Envelope>' 

410 ) 

411 

412 ok = self._soap_post( 

413 renderer.control_url, 

414 'urn:schemas-upnp-org:service:AVTransport:1#SetAVTransportURI', 

415 set_uri_body, 

416 ) 

417 if not ok: 

418 return False 

419 

420 # Play 

421 play_body = ( 

422 '<?xml version="1.0" encoding="utf-8"?>' 

423 '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"' 

424 ' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">' 

425 '<s:Body>' 

426 '<u:Play ' 

427 'xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">' 

428 '<InstanceID>0</InstanceID>' 

429 '<Speed>1</Speed>' 

430 '</u:Play>' 

431 '</s:Body></s:Envelope>' 

432 ) 

433 

434 return self._soap_post( 

435 renderer.control_url, 

436 'urn:schemas-upnp-org:service:AVTransport:1#Play', 

437 play_body, 

438 ) 

439 

440 def _send_stop(self, renderer: DLNARenderer) -> bool: 

441 """Send Stop SOAP command to renderer.""" 

442 if not renderer.control_url: 

443 return False 

444 

445 stop_body = ( 

446 '<?xml version="1.0" encoding="utf-8"?>' 

447 '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"' 

448 ' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">' 

449 '<s:Body>' 

450 '<u:Stop ' 

451 'xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">' 

452 '<InstanceID>0</InstanceID>' 

453 '</u:Stop>' 

454 '</s:Body></s:Envelope>' 

455 ) 

456 

457 return self._soap_post( 

458 renderer.control_url, 

459 'urn:schemas-upnp-org:service:AVTransport:1#Stop', 

460 stop_body, 

461 ) 

462 

463 def _soap_post(self, url: str, action: str, body: str) -> bool: 

464 """Send a SOAP POST to a UPnP control URL.""" 

465 try: 

466 import urllib.request 

467 req = urllib.request.Request( 

468 url, 

469 data=body.encode('utf-8'), 

470 headers={ 

471 'Content-Type': 'text/xml; charset="utf-8"', 

472 'SOAPACTION': f'"{action}"', 

473 }, 

474 method='POST', 

475 ) 

476 resp = urllib.request.urlopen(req, timeout=5) 

477 return resp.status == 200 

478 except Exception as e: 

479 logger.debug(f"SOAP POST failed: {e}") 

480 return False 

481 

482 # ── SSDP Helpers ────────────────────────────────────────── 

483 

484 def _parse_header(self, response: str, header: str) -> Optional[str]: 

485 """Parse a header from an SSDP response.""" 

486 for line in response.split('\r\n'): 

487 if line.upper().startswith(header.upper() + ':'): 

488 return line.split(':', 1)[1].strip() 

489 return None 

490 

491 def _fetch_device_info(self, location: str, 

492 ip: str) -> Optional[DLNARenderer]: 

493 """Fetch and parse device description XML from Location URL.""" 

494 try: 

495 import urllib.request 

496 import xml.etree.ElementTree as ET 

497 

498 resp = urllib.request.urlopen(location, timeout=5) 

499 xml_data = resp.read() 

500 root = ET.fromstring(xml_data) 

501 

502 # Namespace 

503 ns = {'d': 'urn:schemas-upnp-org:device-1-0'} 

504 

505 device = root.find('.//d:device', ns) 

506 if device is None: 

507 return None 

508 

509 device_type = device.findtext('d:deviceType', '', ns) 

510 if 'MediaRenderer' not in device_type: 

511 return None 

512 

513 friendly_name = device.findtext('d:friendlyName', 'Unknown', ns) 

514 udn = device.findtext('d:UDN', '', ns) 

515 manufacturer = device.findtext('d:manufacturer', '', ns) 

516 model = device.findtext('d:modelName', '', ns) 

517 

518 # Find AVTransport control URL 

519 control_url = '' 

520 for service in device.findall('.//d:service', ns): 

521 stype = service.findtext('d:serviceType', '', ns) 

522 if 'AVTransport' in stype: 

523 ctrl = service.findtext('d:controlURL', '', ns) 

524 if ctrl: 

525 # Make absolute 

526 from urllib.parse import urljoin 

527 control_url = urljoin(location, ctrl) 

528 break 

529 

530 # Parse port from location 

531 from urllib.parse import urlparse 

532 parsed = urlparse(location) 

533 port = parsed.port or 80 

534 

535 return DLNARenderer( 

536 device_id=udn or f'dlna-{ip}:{port}', 

537 friendly_name=friendly_name, 

538 ip=ip, 

539 port=port, 

540 location=location, 

541 control_url=control_url, 

542 manufacturer=manufacturer, 

543 model=model, 

544 ) 

545 except Exception as e: 

546 logger.debug(f"Failed to fetch device info from {location}: {e}") 

547 return None 

548 

549 def _default_frame_source(self) -> Callable: 

550 """Create default frame source from FrameCapture.""" 

551 try: 

552 from integrations.remote_desktop.frame_capture import FrameCapture 

553 capture = FrameCapture() 

554 return capture.capture_frame 

555 except Exception: 

556 # Return a blank frame generator 

557 def blank_frame(): 

558 return None 

559 return blank_frame 

560 

561 def _audit(self, event_type: str, session_id: str, 

562 detail: str) -> None: 

563 """Audit log cast events.""" 

564 try: 

565 from integrations.remote_desktop.security import audit_session_event 

566 audit_session_event(event_type, session_id, 'dlna_bridge', detail) 

567 except Exception: 

568 pass 

569 

570 

571# ── Singleton ──────────────────────────────────────────────── 

572 

573_dlna_bridge: Optional[DLNABridge] = None 

574 

575 

576def get_dlna_bridge() -> DLNABridge: 

577 """Get or create the singleton DLNABridge.""" 

578 global _dlna_bridge 

579 if _dlna_bridge is None: 

580 _dlna_bridge = DLNABridge() 

581 return _dlna_bridge