Coverage for integrations / remote_desktop / peripheral_backends / bluetooth_backend.py: 38.8%

98 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Bluetooth HID Backend — Relay BT HID events over HARTOS transport. 

3 

4Strategy: Enumerate paired BT devices, relay HID input events (not full BT stack). 

5Linux: dbus org.bluez for discovery, evdev for HID event reading. 

6Windows: Not yet supported (would need win32 Bluetooth APIs). 

7 

8This is event-level forwarding, not full Bluetooth device redirection. 

9""" 

10 

11import logging 

12import platform 

13import threading 

14from typing import List 

15 

16logger = logging.getLogger('hevolve.remote_desktop') 

17 

18# Optional: dbus for BlueZ discovery 

19_dbus = None 

20try: 

21 import dbus as _dbus_module 

22 _dbus = _dbus_module 

23except ImportError: 

24 pass 

25 

26# Optional: evdev for Linux HID input 

27_evdev = None 

28try: 

29 import evdev as _evdev_module 

30 _evdev = _evdev_module 

31except ImportError: 

32 pass 

33 

34 

35class BluetoothBackend: 

36 """Bluetooth HID event relay over HARTOS transport. 

37 

38 Discovers paired BT devices via BlueZ D-Bus, then relays HID 

39 events (keyboard/mouse/gamepad) from the BT device over the 

40 HARTOS transport channel to the remote host. 

41 """ 

42 

43 def __init__(self): 

44 self._forwarded = {} # device_id → relay thread 

45 self._relay_threads = {} # device_id → Thread 

46 self._running = {} # device_id → bool 

47 

48 def discover(self) -> list: 

49 """Enumerate paired Bluetooth devices. 

50 

51 Linux: Uses D-Bus BlueZ interface. 

52 """ 

53 if not self.available: 

54 return [] 

55 

56 if platform.system() == 'Linux' and _dbus: 

57 return self._discover_bluez() 

58 return [] 

59 

60 def forward(self, peripheral_info, transport) -> bool: 

61 """Start relaying HID events from a BT device. 

62 

63 Args: 

64 peripheral_info: PeripheralInfo with peripheral_id = BT MAC. 

65 transport: TransportChannel to send events on. 

66 

67 Returns: 

68 True if relay started. 

69 """ 

70 device_id = peripheral_info.peripheral_id 

71 if device_id in self._forwarded: 

72 return True # Already forwarding 

73 

74 self._running[device_id] = True 

75 self._forwarded[device_id] = peripheral_info 

76 

77 # Start relay thread 

78 thread = threading.Thread( 

79 target=self._relay_loop, 

80 args=(device_id, transport), 

81 daemon=True, 

82 name=f'bt-relay-{device_id[:8]}', 

83 ) 

84 self._relay_threads[device_id] = thread 

85 thread.start() 

86 

87 logger.info(f"BT HID relay started: {device_id}") 

88 return True 

89 

90 def stop(self, peripheral_id: str) -> bool: 

91 """Stop relaying a BT device.""" 

92 self._running[peripheral_id] = False 

93 thread = self._relay_threads.pop(peripheral_id, None) 

94 if thread: 

95 thread.join(timeout=3) 

96 self._forwarded.pop(peripheral_id, None) 

97 logger.info(f"BT HID relay stopped: {peripheral_id}") 

98 return True 

99 

100 def stop_all(self) -> None: 

101 """Stop all BT relays.""" 

102 for device_id in list(self._forwarded.keys()): 

103 self.stop(device_id) 

104 

105 @property 

106 def available(self) -> bool: 

107 """Check if BT backend dependencies are available.""" 

108 if platform.system() != 'Linux': 

109 return False 

110 return _dbus is not None 

111 

112 @property 

113 def peripheral_type_name(self) -> str: 

114 return 'bluetooth' 

115 

116 # ── BlueZ discovery ────────────────────────────────────── 

117 

118 def _discover_bluez(self) -> list: 

119 """Discover BT devices via BlueZ D-Bus interface.""" 

120 from integrations.remote_desktop.peripheral_bridge import ( 

121 PeripheralInfo, PeripheralType, 

122 ) 

123 results = [] 

124 try: 

125 bus = _dbus.SystemBus() 

126 manager = _dbus.Interface( 

127 bus.get_object('org.bluez', '/'), 

128 'org.freedesktop.DBus.ObjectManager', 

129 ) 

130 objects = manager.GetManagedObjects() 

131 

132 for path, interfaces in objects.items(): 

133 if 'org.bluez.Device1' not in interfaces: 

134 continue 

135 props = interfaces['org.bluez.Device1'] 

136 address = str(props.get('Address', '')) 

137 name = str(props.get('Name', 'Unknown BT Device')) 

138 paired = bool(props.get('Paired', False)) 

139 connected = bool(props.get('Connected', False)) 

140 

141 if paired: 

142 results.append(PeripheralInfo( 

143 peripheral_id=address, 

144 name=name, 

145 peripheral_type=PeripheralType.BLUETOOTH, 

146 connected=connected, 

147 )) 

148 except Exception as e: 

149 logger.debug(f"BlueZ discovery failed: {e}") 

150 

151 return results 

152 

153 # ── HID relay ──────────────────────────────────────────── 

154 

155 def _relay_loop(self, device_id: str, transport) -> None: 

156 """Relay HID input events from a BT device over transport. 

157 

158 Uses evdev to read from the corresponding /dev/input/event* device. 

159 """ 

160 if not _evdev: 

161 logger.debug("evdev not available for BT HID relay") 

162 return 

163 

164 # Find evdev device matching the BT address 

165 input_device = self._find_evdev_device(device_id) 

166 if not input_device: 

167 logger.debug(f"No evdev device found for BT {device_id}") 

168 return 

169 

170 try: 

171 for ev in input_device.read_loop(): 

172 if not self._running.get(device_id, False): 

173 break 

174 if ev.type == 0: # EV_SYN 

175 continue 

176 

177 # Send HID event over transport 

178 if transport and hasattr(transport, 'send_event'): 

179 transport.send_event({ 

180 'type': 'peripheral', 

181 'subtype': 'bt_hid', 

182 'device_id': device_id, 

183 'ev_type': ev.type, 

184 'ev_code': ev.code, 

185 'ev_value': ev.value, 

186 }) 

187 except Exception as e: 

188 logger.debug(f"BT HID relay error: {e}") 

189 

190 def _find_evdev_device(self, bt_address: str): 

191 """Find the evdev input device for a BT address.""" 

192 if not _evdev: 

193 return None 

194 

195 # Normalize address for matching 

196 normalized = bt_address.upper().replace(':', '') 

197 

198 for path in _evdev.list_devices(): 

199 try: 

200 dev = _evdev.InputDevice(path) 

201 # Check if device's uniq matches BT address 

202 if dev.uniq and dev.uniq.upper().replace(':', '') == normalized: 

203 return dev 

204 except Exception: 

205 continue 

206 return None