Coverage for integrations / remote_desktop / peripheral_bridge.py: 73.1%

108 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Peripheral Bridge — Orchestrates peripheral forwarding across HARTOS transport. 

3 

4Wraps platform tools for USB, Bluetooth, and Gamepad forwarding: 

5 USB: usbip (Linux kernel module) 

6 BT: HID event relay via dbus/evdev 

7 Gamepad: evdev → transport → remote 

8 

9HARTOS doesn't reimplement device drivers — it orchestrates existing system 

10tools the same way it orchestrates RustDesk/Sunshine for remote desktop. 

11 

12Reuses: 

13 - transport.py: TransportChannel for event relay 

14 - security.py: audit_session_event() for peripheral audit 

15 - service_manager.py: lifecycle pattern (singleton + NodeWatchdog) 

16""" 

17 

18import logging 

19import threading 

20import time 

21from dataclasses import dataclass, field 

22from enum import Enum 

23from typing import Callable, Dict, List, Optional 

24 

25logger = logging.getLogger('hevolve.remote_desktop') 

26 

27 

28class PeripheralType(Enum): 

29 USB = 'usb' 

30 BLUETOOTH = 'bluetooth' 

31 GAMEPAD = 'gamepad' 

32 GENERIC_HID = 'generic_hid' 

33 

34 

35@dataclass 

36class PeripheralInfo: 

37 """Discovered peripheral device.""" 

38 peripheral_id: str # Unique ID (USB busid, BT MAC, evdev path) 

39 name: str 

40 peripheral_type: PeripheralType 

41 vendor_id: Optional[str] = None 

42 product_id: Optional[str] = None 

43 connected: bool = False 

44 forwarded: bool = False 

45 

46 def to_dict(self) -> dict: 

47 return { 

48 'peripheral_id': self.peripheral_id, 

49 'name': self.name, 

50 'type': self.peripheral_type.value, 

51 'vendor_id': self.vendor_id, 

52 'product_id': self.product_id, 

53 'connected': self.connected, 

54 'forwarded': self.forwarded, 

55 } 

56 

57 

58class PeripheralBridge: 

59 """Orchestrates peripheral forwarding across HARTOS transport. 

60 

61 Discovers local peripherals (USB, BT, gamepad) and forwards selected 

62 devices to the remote host over the HARTOS transport channel. 

63 """ 

64 

65 def __init__(self): 

66 self._forwarded: Dict[str, PeripheralInfo] = {} 

67 self._backends: Dict[str, object] = {} 

68 self._lock = threading.Lock() 

69 self._init_backends() 

70 

71 def _init_backends(self) -> None: 

72 """Initialize available backends.""" 

73 try: 

74 from integrations.remote_desktop.peripheral_backends.usbip_backend import ( 

75 USBIPBackend, 

76 ) 

77 backend = USBIPBackend() 

78 if backend.available: 

79 self._backends['usb'] = backend 

80 except Exception: 

81 pass 

82 

83 try: 

84 from integrations.remote_desktop.peripheral_backends.bluetooth_backend import ( 

85 BluetoothBackend, 

86 ) 

87 backend = BluetoothBackend() 

88 if backend.available: 

89 self._backends['bluetooth'] = backend 

90 except Exception: 

91 pass 

92 

93 try: 

94 from integrations.remote_desktop.peripheral_backends.gamepad_backend import ( 

95 GamepadBackend, 

96 ) 

97 backend = GamepadBackend() 

98 if backend.available: 

99 self._backends['gamepad'] = backend 

100 except Exception: 

101 pass 

102 

103 def discover_peripherals(self, 

104 types: Optional[List[str]] = None) -> List[PeripheralInfo]: 

105 """Enumerate connected peripherals. 

106 

107 Args: 

108 types: Filter by type names (e.g., ['usb', 'gamepad']). 

109 None = discover all types. 

110 

111 Returns: 

112 List of PeripheralInfo for discovered devices. 

113 """ 

114 results = [] 

115 for type_name, backend in self._backends.items(): 

116 if types and type_name not in types: 

117 continue 

118 try: 

119 discovered = backend.discover() 

120 # Mark forwarded status 

121 for p in discovered: 

122 if p.peripheral_id in self._forwarded: 

123 p.forwarded = True 

124 results.extend(discovered) 

125 except Exception as e: 

126 logger.debug(f"Discovery failed for {type_name}: {e}") 

127 return results 

128 

129 def forward_peripheral(self, peripheral_id: str, 

130 transport, session_id: str = '') -> dict: 

131 """Start forwarding a peripheral to the remote device. 

132 

133 Args: 

134 peripheral_id: ID of the peripheral to forward. 

135 transport: TransportChannel to send events on. 

136 session_id: Remote desktop session ID (for audit). 

137 

138 Returns: 

139 {success, peripheral_id, name, type} 

140 """ 

141 # Find the peripheral 

142 all_peripherals = self.discover_peripherals() 

143 target = None 

144 for p in all_peripherals: 

145 if p.peripheral_id == peripheral_id: 

146 target = p 

147 break 

148 

149 if not target: 

150 return {'success': False, 'error': f'Peripheral not found: {peripheral_id}'} 

151 

152 if target.forwarded: 

153 return {'success': True, 'note': 'Already forwarding'} 

154 

155 # Get backend for this type 

156 backend = self._backends.get(target.peripheral_type.value) 

157 if not backend: 

158 return {'success': False, 

159 'error': f'No backend for {target.peripheral_type.value}'} 

160 

161 # Forward 

162 ok = backend.forward(target, transport) 

163 if ok: 

164 with self._lock: 

165 target.forwarded = True 

166 self._forwarded[peripheral_id] = target 

167 

168 # Audit 

169 self._audit('peripheral_forward', session_id, 

170 f'{target.peripheral_type.value}: {target.name}') 

171 

172 return { 

173 'success': True, 

174 'peripheral_id': peripheral_id, 

175 'name': target.name, 

176 'type': target.peripheral_type.value, 

177 } 

178 

179 return {'success': False, 'error': 'Backend forward failed'} 

180 

181 def stop_forwarding(self, peripheral_id: str) -> bool: 

182 """Stop forwarding a specific peripheral.""" 

183 with self._lock: 

184 info = self._forwarded.pop(peripheral_id, None) 

185 if not info: 

186 return False 

187 

188 backend = self._backends.get(info.peripheral_type.value) 

189 if backend: 

190 return backend.stop(peripheral_id) 

191 return False 

192 

193 def stop_all(self) -> None: 

194 """Stop all peripheral forwarding.""" 

195 with self._lock: 

196 ids = list(self._forwarded.keys()) 

197 for pid in ids: 

198 self.stop_forwarding(pid) 

199 

200 def get_status(self) -> dict: 

201 """Get peripheral bridge status.""" 

202 with self._lock: 

203 forwarded = [p.to_dict() for p in self._forwarded.values()] 

204 return { 

205 'backends_available': list(self._backends.keys()), 

206 'forwarded_count': len(forwarded), 

207 'forwarded': forwarded, 

208 } 

209 

210 def get_available_backends(self) -> List[str]: 

211 """Get list of available backend types.""" 

212 return list(self._backends.keys()) 

213 

214 def _audit(self, event_type: str, session_id: str, detail: str) -> None: 

215 """Audit log peripheral events.""" 

216 try: 

217 from integrations.remote_desktop.security import audit_session_event 

218 audit_session_event(event_type, session_id, 'peripheral_bridge', detail) 

219 except Exception: 

220 pass 

221 

222 

223# ── Singleton ──────────────────────────────────────────────── 

224 

225_peripheral_bridge: Optional[PeripheralBridge] = None 

226 

227 

228def get_peripheral_bridge() -> PeripheralBridge: 

229 """Get or create the singleton PeripheralBridge.""" 

230 global _peripheral_bridge 

231 if _peripheral_bridge is None: 

232 _peripheral_bridge = PeripheralBridge() 

233 return _peripheral_bridge