Coverage for integrations / remote_desktop / dlna_bridge.py: 56.2%
256 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2DLNA Bridge — Cast HARTOS remote desktop sessions to DLNA/UPnP renderers.
4Architecture:
5 Discovery: SSDP M-SEARCH (239.255.255.250:1900) for MediaRenderer devices
6 Control: UPnP AVTransport SOAP (SetAVTransportURI + Play/Stop)
7 Streaming: MJPEG HTTP server from FrameCapture/WindowCapture output
9HARTOS doesn't reimplement DLNA — it uses raw SSDP (stdlib socket) for discovery
10and HTTP POST (urllib) for UPnP control. Optional async_upnp_client if available.
12Reuses:
13 - frame_capture.py: FrameCapture as default frame source
14 - window_capture.py: WindowCapture for per-window casting
15 - security.py: audit_session_event() for cast audit
16"""
18import http.server
19import logging
20import socket
21import struct
22import threading
23import time
24from dataclasses import dataclass, field
25from typing import Callable, Dict, Generator, List, Optional
27logger = logging.getLogger('hevolve.remote_desktop')
30@dataclass
31class DLNARenderer:
32 """Discovered DLNA/UPnP renderer (smart TV, speaker, etc.)."""
33 device_id: str # UDN (Unique Device Name)
34 friendly_name: str
35 ip: str
36 port: int
37 location: str = '' # SSDP Location URL
38 control_url: str = '' # AVTransport control URL
39 supports_video: bool = True
40 supports_audio: bool = True
41 manufacturer: str = ''
42 model: str = ''
44 def to_dict(self) -> dict:
45 return {
46 'device_id': self.device_id,
47 'friendly_name': self.friendly_name,
48 'ip': self.ip,
49 'port': self.port,
50 'supports_video': self.supports_video,
51 'supports_audio': self.supports_audio,
52 'manufacturer': self.manufacturer,
53 'model': self.model,
54 }
57@dataclass
58class CastSession:
59 """Active DLNA cast session."""
60 cast_session_id: str
61 renderer: DLNARenderer
62 source_session_id: str # Remote desktop session being cast
63 stream_url: str # MJPEG URL the renderer pulls from
64 started_at: float = 0.0
65 active: bool = True
68class MJPEGStreamServer:
69 """Lightweight HTTP server serving MJPEG stream from a frame source.
71 Binds to a local port, serves multipart/x-mixed-replace JPEG frames.
72 DLNA renderers pull from this URL.
73 """
75 def __init__(self, host: str = '0.0.0.0', port: int = 0):
76 self._host = host
77 self._port = port
78 self._server: Optional[http.server.HTTPServer] = None
79 self._thread: Optional[threading.Thread] = None
80 self._frame_source: Optional[Callable] = None
81 self._running = False
82 self._current_frame: Optional[bytes] = None
83 self._frame_lock = threading.Lock()
85 def start(self, frame_source: Callable[[], Optional[bytes]],
86 port: int = 0) -> str:
87 """Start serving MJPEG stream.
89 Args:
90 frame_source: Callable that returns JPEG bytes (or None).
91 port: Port to bind (0 = auto-assign).
93 Returns:
94 Stream URL (e.g., http://192.168.1.10:8554/stream.mjpeg).
95 """
96 self._frame_source = frame_source
97 self._running = True
99 # Find local IP
100 local_ip = self._get_local_ip()
102 # Build handler
103 server_ref = self
105 class MJPEGHandler(http.server.BaseHTTPRequestHandler):
106 def do_GET(self):
107 if self.path != '/stream.mjpeg':
108 self.send_error(404)
109 return
111 self.send_response(200)
112 self.send_header('Content-Type',
113 'multipart/x-mixed-replace; boundary=--frame')
114 self.send_header('Cache-Control', 'no-cache')
115 self.end_headers()
117 while server_ref._running:
118 frame = None
119 if server_ref._frame_source:
120 try:
121 frame = server_ref._frame_source()
122 except Exception:
123 pass
125 if frame:
126 try:
127 self.wfile.write(b'--frame\r\n')
128 self.wfile.write(
129 b'Content-Type: image/jpeg\r\n')
130 self.wfile.write(
131 f'Content-Length: {len(frame)}\r\n\r\n'
132 .encode())
133 self.wfile.write(frame)
134 self.wfile.write(b'\r\n')
135 self.wfile.flush()
136 except (BrokenPipeError, ConnectionResetError):
137 break
138 time.sleep(0.033) # ~30fps cap
140 def log_message(self, format, *args):
141 pass # Suppress request logging
143 bind_port = port or self._port or 0
144 self._server = http.server.HTTPServer(
145 (self._host, bind_port), MJPEGHandler)
146 actual_port = self._server.server_address[1]
148 self._thread = threading.Thread(
149 target=self._server.serve_forever,
150 daemon=True,
151 name='mjpeg-stream',
152 )
153 self._thread.start()
155 url = f'http://{local_ip}:{actual_port}/stream.mjpeg'
156 logger.info(f"MJPEG stream server started: {url}")
157 return url
159 def stop(self) -> None:
160 """Stop the MJPEG stream server."""
161 self._running = False
162 if self._server:
163 self._server.shutdown()
164 self._server = None
165 if self._thread:
166 self._thread.join(timeout=5)
167 self._thread = None
169 @property
170 def is_running(self) -> bool:
171 return self._running
173 def _get_local_ip(self) -> str:
174 """Get local IP address for LAN access."""
175 try:
176 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
177 s.connect(('8.8.8.8', 80))
178 ip = s.getsockname()[0]
179 s.close()
180 return ip
181 except Exception:
182 return '127.0.0.1'
185class DLNABridge:
186 """Cast HARTOS streams to DLNA/UPnP renderers.
188 Discovery via SSDP M-SEARCH, control via UPnP AVTransport SOAP.
189 Frames served as MJPEG over HTTP (renderers pull from URL).
190 """
192 SSDP_ADDR = '239.255.255.250'
193 SSDP_PORT = 1900
195 @staticmethod
196 def _default_stream_port():
197 from core.port_registry import get_port
198 return get_port('dlna_stream')
200 def __init__(self):
201 self._cast_sessions: Dict[str, CastSession] = {}
202 self._stream_servers: Dict[str, MJPEGStreamServer] = {}
203 self._renderer_cache: List[DLNARenderer] = []
204 self._lock = threading.Lock()
206 def discover_renderers(self,
207 timeout: float = 5.0) -> List[DLNARenderer]:
208 """Discover DLNA/UPnP MediaRenderer devices on the LAN.
210 Uses SSDP M-SEARCH multicast to find devices advertising
211 urn:schemas-upnp-org:device:MediaRenderer:1.
213 Args:
214 timeout: Discovery timeout in seconds.
216 Returns:
217 List of discovered DLNARenderer devices.
218 """
219 renderers = []
221 m_search = (
222 'M-SEARCH * HTTP/1.1\r\n'
223 f'HOST: {self.SSDP_ADDR}:{self.SSDP_PORT}\r\n'
224 'MAN: "ssdp:discover"\r\n'
225 'MX: 3\r\n'
226 'ST: urn:schemas-upnp-org:device:MediaRenderer:1\r\n'
227 '\r\n'
228 ).encode()
230 try:
231 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
232 socket.IPPROTO_UDP)
233 sock.settimeout(timeout)
234 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
236 # Send M-SEARCH
237 sock.sendto(m_search, (self.SSDP_ADDR, self.SSDP_PORT))
239 # Collect responses
240 seen = set()
241 end_time = time.time() + timeout
243 while time.time() < end_time:
244 try:
245 data, addr = sock.recvfrom(4096)
246 response = data.decode('utf-8', errors='ignore')
248 # Parse LOCATION header
249 location = self._parse_header(response, 'LOCATION')
250 if not location or location in seen:
251 continue
252 seen.add(location)
254 # Fetch device description
255 renderer = self._fetch_device_info(location, addr[0])
256 if renderer:
257 renderers.append(renderer)
258 except socket.timeout:
259 break
260 except Exception:
261 continue
263 sock.close()
264 except Exception as e:
265 logger.debug(f"SSDP discovery failed: {e}")
267 self._renderer_cache = renderers
268 logger.info(f"DLNA discovery found {len(renderers)} renderer(s)")
269 return renderers
271 def cast_session(self, session_id: str, renderer_id: str,
272 frame_source: Optional[Callable] = None,
273 stream_port: int = 0) -> dict:
274 """Cast a session to a DLNA renderer.
276 Args:
277 session_id: Remote desktop session to cast.
278 renderer_id: Target renderer device_id.
279 frame_source: Callable returning JPEG bytes. If None, uses
280 FrameCapture (full-screen).
281 stream_port: Port for MJPEG server (0 = auto).
283 Returns:
284 {success, cast_session_id, renderer_name, stream_url}
285 """
286 # Find renderer
287 renderer = None
288 for r in self._renderer_cache:
289 if r.device_id == renderer_id:
290 renderer = r
291 break
293 if not renderer:
294 return {'success': False,
295 'error': f'Renderer not found: {renderer_id}'}
297 # Default frame source: full-screen capture
298 if frame_source is None:
299 frame_source = self._default_frame_source()
301 # Start MJPEG stream server
302 server = MJPEGStreamServer()
303 port = stream_port or self._default_stream_port()
304 stream_url = server.start(frame_source, port=port)
306 # Send SetAVTransportURI to renderer
307 ok = self._send_play(renderer, stream_url)
308 if not ok:
309 server.stop()
310 return {'success': False,
311 'error': 'Failed to send play command to renderer'}
313 # Track cast session
314 import uuid
315 cast_id = f'cast-{uuid.uuid4().hex[:8]}'
316 cast = CastSession(
317 cast_session_id=cast_id,
318 renderer=renderer,
319 source_session_id=session_id,
320 stream_url=stream_url,
321 started_at=time.time(),
322 )
324 with self._lock:
325 self._cast_sessions[cast_id] = cast
326 self._stream_servers[cast_id] = server
328 # Audit
329 self._audit('cast_started', session_id,
330 f'Renderer: {renderer.friendly_name}, URL: {stream_url}')
332 logger.info(f"Casting to {renderer.friendly_name} ({stream_url})")
333 return {
334 'success': True,
335 'cast_session_id': cast_id,
336 'renderer_name': renderer.friendly_name,
337 'stream_url': stream_url,
338 }
340 def stop_cast(self, cast_session_id: str) -> bool:
341 """Stop a cast session."""
342 with self._lock:
343 cast = self._cast_sessions.pop(cast_session_id, None)
344 server = self._stream_servers.pop(cast_session_id, None)
346 if not cast:
347 return False
349 # Send Stop to renderer
350 self._send_stop(cast.renderer)
352 # Stop MJPEG server
353 if server:
354 server.stop()
356 self._audit('cast_stopped', cast.source_session_id,
357 f'Renderer: {cast.renderer.friendly_name}')
359 logger.info(f"Cast stopped: {cast_session_id}")
360 return True
362 def stop_all(self) -> None:
363 """Stop all cast sessions."""
364 with self._lock:
365 cast_ids = list(self._cast_sessions.keys())
366 for cid in cast_ids:
367 self.stop_cast(cid)
369 def get_cast_status(self) -> List[dict]:
370 """Get status of all active cast sessions."""
371 with self._lock:
372 return [
373 {
374 'cast_session_id': c.cast_session_id,
375 'renderer': c.renderer.friendly_name,
376 'source_session': c.source_session_id,
377 'stream_url': c.stream_url,
378 'started_at': c.started_at,
379 'active': c.active,
380 }
381 for c in self._cast_sessions.values()
382 ]
384 def get_cached_renderers(self) -> List[DLNARenderer]:
385 """Get renderers from last discovery."""
386 return self._renderer_cache
388 # ── UPnP SOAP Control ─────────────────────────────────────
390 def _send_play(self, renderer: DLNARenderer,
391 stream_url: str) -> bool:
392 """Send SetAVTransportURI + Play SOAP commands to renderer."""
393 if not renderer.control_url:
394 logger.debug("No control URL for renderer")
395 return False
397 # SetAVTransportURI
398 set_uri_body = (
399 '<?xml version="1.0" encoding="utf-8"?>'
400 '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"'
401 ' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
402 '<s:Body>'
403 '<u:SetAVTransportURI '
404 'xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">'
405 '<InstanceID>0</InstanceID>'
406 f'<CurrentURI>{stream_url}</CurrentURI>'
407 '<CurrentURIMetaData></CurrentURIMetaData>'
408 '</u:SetAVTransportURI>'
409 '</s:Body></s:Envelope>'
410 )
412 ok = self._soap_post(
413 renderer.control_url,
414 'urn:schemas-upnp-org:service:AVTransport:1#SetAVTransportURI',
415 set_uri_body,
416 )
417 if not ok:
418 return False
420 # Play
421 play_body = (
422 '<?xml version="1.0" encoding="utf-8"?>'
423 '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"'
424 ' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
425 '<s:Body>'
426 '<u:Play '
427 'xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">'
428 '<InstanceID>0</InstanceID>'
429 '<Speed>1</Speed>'
430 '</u:Play>'
431 '</s:Body></s:Envelope>'
432 )
434 return self._soap_post(
435 renderer.control_url,
436 'urn:schemas-upnp-org:service:AVTransport:1#Play',
437 play_body,
438 )
440 def _send_stop(self, renderer: DLNARenderer) -> bool:
441 """Send Stop SOAP command to renderer."""
442 if not renderer.control_url:
443 return False
445 stop_body = (
446 '<?xml version="1.0" encoding="utf-8"?>'
447 '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"'
448 ' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
449 '<s:Body>'
450 '<u:Stop '
451 'xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">'
452 '<InstanceID>0</InstanceID>'
453 '</u:Stop>'
454 '</s:Body></s:Envelope>'
455 )
457 return self._soap_post(
458 renderer.control_url,
459 'urn:schemas-upnp-org:service:AVTransport:1#Stop',
460 stop_body,
461 )
463 def _soap_post(self, url: str, action: str, body: str) -> bool:
464 """Send a SOAP POST to a UPnP control URL."""
465 try:
466 import urllib.request
467 req = urllib.request.Request(
468 url,
469 data=body.encode('utf-8'),
470 headers={
471 'Content-Type': 'text/xml; charset="utf-8"',
472 'SOAPACTION': f'"{action}"',
473 },
474 method='POST',
475 )
476 resp = urllib.request.urlopen(req, timeout=5)
477 return resp.status == 200
478 except Exception as e:
479 logger.debug(f"SOAP POST failed: {e}")
480 return False
482 # ── SSDP Helpers ──────────────────────────────────────────
484 def _parse_header(self, response: str, header: str) -> Optional[str]:
485 """Parse a header from an SSDP response."""
486 for line in response.split('\r\n'):
487 if line.upper().startswith(header.upper() + ':'):
488 return line.split(':', 1)[1].strip()
489 return None
491 def _fetch_device_info(self, location: str,
492 ip: str) -> Optional[DLNARenderer]:
493 """Fetch and parse device description XML from Location URL."""
494 try:
495 import urllib.request
496 import xml.etree.ElementTree as ET
498 resp = urllib.request.urlopen(location, timeout=5)
499 xml_data = resp.read()
500 root = ET.fromstring(xml_data)
502 # Namespace
503 ns = {'d': 'urn:schemas-upnp-org:device-1-0'}
505 device = root.find('.//d:device', ns)
506 if device is None:
507 return None
509 device_type = device.findtext('d:deviceType', '', ns)
510 if 'MediaRenderer' not in device_type:
511 return None
513 friendly_name = device.findtext('d:friendlyName', 'Unknown', ns)
514 udn = device.findtext('d:UDN', '', ns)
515 manufacturer = device.findtext('d:manufacturer', '', ns)
516 model = device.findtext('d:modelName', '', ns)
518 # Find AVTransport control URL
519 control_url = ''
520 for service in device.findall('.//d:service', ns):
521 stype = service.findtext('d:serviceType', '', ns)
522 if 'AVTransport' in stype:
523 ctrl = service.findtext('d:controlURL', '', ns)
524 if ctrl:
525 # Make absolute
526 from urllib.parse import urljoin
527 control_url = urljoin(location, ctrl)
528 break
530 # Parse port from location
531 from urllib.parse import urlparse
532 parsed = urlparse(location)
533 port = parsed.port or 80
535 return DLNARenderer(
536 device_id=udn or f'dlna-{ip}:{port}',
537 friendly_name=friendly_name,
538 ip=ip,
539 port=port,
540 location=location,
541 control_url=control_url,
542 manufacturer=manufacturer,
543 model=model,
544 )
545 except Exception as e:
546 logger.debug(f"Failed to fetch device info from {location}: {e}")
547 return None
549 def _default_frame_source(self) -> Callable:
550 """Create default frame source from FrameCapture."""
551 try:
552 from integrations.remote_desktop.frame_capture import FrameCapture
553 capture = FrameCapture()
554 return capture.capture_frame
555 except Exception:
556 # Return a blank frame generator
557 def blank_frame():
558 return None
559 return blank_frame
561 def _audit(self, event_type: str, session_id: str,
562 detail: str) -> None:
563 """Audit log cast events."""
564 try:
565 from integrations.remote_desktop.security import audit_session_event
566 audit_session_event(event_type, session_id, 'dlna_bridge', detail)
567 except Exception:
568 pass
571# ── Singleton ────────────────────────────────────────────────
573_dlna_bridge: Optional[DLNABridge] = None
576def get_dlna_bridge() -> DLNABridge:
577 """Get or create the singleton DLNABridge."""
578 global _dlna_bridge
579 if _dlna_bridge is None:
580 _dlna_bridge = DLNABridge()
581 return _dlna_bridge