Coverage for integrations / remote_desktop / peripheral_bridge.py: 73.1%
108 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Peripheral Bridge — Orchestrates peripheral forwarding across HARTOS transport.
4Wraps platform tools for USB, Bluetooth, and Gamepad forwarding:
5 USB: usbip (Linux kernel module)
6 BT: HID event relay via dbus/evdev
7 Gamepad: evdev → transport → remote
9HARTOS doesn't reimplement device drivers — it orchestrates existing system
10tools the same way it orchestrates RustDesk/Sunshine for remote desktop.
12Reuses:
13 - transport.py: TransportChannel for event relay
14 - security.py: audit_session_event() for peripheral audit
15 - service_manager.py: lifecycle pattern (singleton + NodeWatchdog)
16"""
18import logging
19import threading
20import time
21from dataclasses import dataclass, field
22from enum import Enum
23from typing import Callable, Dict, List, Optional
25logger = logging.getLogger('hevolve.remote_desktop')
28class PeripheralType(Enum):
29 USB = 'usb'
30 BLUETOOTH = 'bluetooth'
31 GAMEPAD = 'gamepad'
32 GENERIC_HID = 'generic_hid'
35@dataclass
36class PeripheralInfo:
37 """Discovered peripheral device."""
38 peripheral_id: str # Unique ID (USB busid, BT MAC, evdev path)
39 name: str
40 peripheral_type: PeripheralType
41 vendor_id: Optional[str] = None
42 product_id: Optional[str] = None
43 connected: bool = False
44 forwarded: bool = False
46 def to_dict(self) -> dict:
47 return {
48 'peripheral_id': self.peripheral_id,
49 'name': self.name,
50 'type': self.peripheral_type.value,
51 'vendor_id': self.vendor_id,
52 'product_id': self.product_id,
53 'connected': self.connected,
54 'forwarded': self.forwarded,
55 }
58class PeripheralBridge:
59 """Orchestrates peripheral forwarding across HARTOS transport.
61 Discovers local peripherals (USB, BT, gamepad) and forwards selected
62 devices to the remote host over the HARTOS transport channel.
63 """
65 def __init__(self):
66 self._forwarded: Dict[str, PeripheralInfo] = {}
67 self._backends: Dict[str, object] = {}
68 self._lock = threading.Lock()
69 self._init_backends()
71 def _init_backends(self) -> None:
72 """Initialize available backends."""
73 try:
74 from integrations.remote_desktop.peripheral_backends.usbip_backend import (
75 USBIPBackend,
76 )
77 backend = USBIPBackend()
78 if backend.available:
79 self._backends['usb'] = backend
80 except Exception:
81 pass
83 try:
84 from integrations.remote_desktop.peripheral_backends.bluetooth_backend import (
85 BluetoothBackend,
86 )
87 backend = BluetoothBackend()
88 if backend.available:
89 self._backends['bluetooth'] = backend
90 except Exception:
91 pass
93 try:
94 from integrations.remote_desktop.peripheral_backends.gamepad_backend import (
95 GamepadBackend,
96 )
97 backend = GamepadBackend()
98 if backend.available:
99 self._backends['gamepad'] = backend
100 except Exception:
101 pass
103 def discover_peripherals(self,
104 types: Optional[List[str]] = None) -> List[PeripheralInfo]:
105 """Enumerate connected peripherals.
107 Args:
108 types: Filter by type names (e.g., ['usb', 'gamepad']).
109 None = discover all types.
111 Returns:
112 List of PeripheralInfo for discovered devices.
113 """
114 results = []
115 for type_name, backend in self._backends.items():
116 if types and type_name not in types:
117 continue
118 try:
119 discovered = backend.discover()
120 # Mark forwarded status
121 for p in discovered:
122 if p.peripheral_id in self._forwarded:
123 p.forwarded = True
124 results.extend(discovered)
125 except Exception as e:
126 logger.debug(f"Discovery failed for {type_name}: {e}")
127 return results
129 def forward_peripheral(self, peripheral_id: str,
130 transport, session_id: str = '') -> dict:
131 """Start forwarding a peripheral to the remote device.
133 Args:
134 peripheral_id: ID of the peripheral to forward.
135 transport: TransportChannel to send events on.
136 session_id: Remote desktop session ID (for audit).
138 Returns:
139 {success, peripheral_id, name, type}
140 """
141 # Find the peripheral
142 all_peripherals = self.discover_peripherals()
143 target = None
144 for p in all_peripherals:
145 if p.peripheral_id == peripheral_id:
146 target = p
147 break
149 if not target:
150 return {'success': False, 'error': f'Peripheral not found: {peripheral_id}'}
152 if target.forwarded:
153 return {'success': True, 'note': 'Already forwarding'}
155 # Get backend for this type
156 backend = self._backends.get(target.peripheral_type.value)
157 if not backend:
158 return {'success': False,
159 'error': f'No backend for {target.peripheral_type.value}'}
161 # Forward
162 ok = backend.forward(target, transport)
163 if ok:
164 with self._lock:
165 target.forwarded = True
166 self._forwarded[peripheral_id] = target
168 # Audit
169 self._audit('peripheral_forward', session_id,
170 f'{target.peripheral_type.value}: {target.name}')
172 return {
173 'success': True,
174 'peripheral_id': peripheral_id,
175 'name': target.name,
176 'type': target.peripheral_type.value,
177 }
179 return {'success': False, 'error': 'Backend forward failed'}
181 def stop_forwarding(self, peripheral_id: str) -> bool:
182 """Stop forwarding a specific peripheral."""
183 with self._lock:
184 info = self._forwarded.pop(peripheral_id, None)
185 if not info:
186 return False
188 backend = self._backends.get(info.peripheral_type.value)
189 if backend:
190 return backend.stop(peripheral_id)
191 return False
193 def stop_all(self) -> None:
194 """Stop all peripheral forwarding."""
195 with self._lock:
196 ids = list(self._forwarded.keys())
197 for pid in ids:
198 self.stop_forwarding(pid)
200 def get_status(self) -> dict:
201 """Get peripheral bridge status."""
202 with self._lock:
203 forwarded = [p.to_dict() for p in self._forwarded.values()]
204 return {
205 'backends_available': list(self._backends.keys()),
206 'forwarded_count': len(forwarded),
207 'forwarded': forwarded,
208 }
210 def get_available_backends(self) -> List[str]:
211 """Get list of available backend types."""
212 return list(self._backends.keys())
214 def _audit(self, event_type: str, session_id: str, detail: str) -> None:
215 """Audit log peripheral events."""
216 try:
217 from integrations.remote_desktop.security import audit_session_event
218 audit_session_event(event_type, session_id, 'peripheral_bridge', detail)
219 except Exception:
220 pass
223# ── Singleton ────────────────────────────────────────────────
225_peripheral_bridge: Optional[PeripheralBridge] = None
228def get_peripheral_bridge() -> PeripheralBridge:
229 """Get or create the singleton PeripheralBridge."""
230 global _peripheral_bridge
231 if _peripheral_bridge is None:
232 _peripheral_bridge = PeripheralBridge()
233 return _peripheral_bridge