Coverage for integrations / remote_desktop / peripheral_backends / bluetooth_backend.py: 38.8%
98 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Bluetooth HID Backend — Relay BT HID events over HARTOS transport.
4Strategy: Enumerate paired BT devices, relay HID input events (not full BT stack).
5Linux: dbus org.bluez for discovery, evdev for HID event reading.
6Windows: Not yet supported (would need win32 Bluetooth APIs).
8This is event-level forwarding, not full Bluetooth device redirection.
9"""
11import logging
12import platform
13import threading
14from typing import List
16logger = logging.getLogger('hevolve.remote_desktop')
18# Optional: dbus for BlueZ discovery
19_dbus = None
20try:
21 import dbus as _dbus_module
22 _dbus = _dbus_module
23except ImportError:
24 pass
26# Optional: evdev for Linux HID input
27_evdev = None
28try:
29 import evdev as _evdev_module
30 _evdev = _evdev_module
31except ImportError:
32 pass
35class BluetoothBackend:
36 """Bluetooth HID event relay over HARTOS transport.
38 Discovers paired BT devices via BlueZ D-Bus, then relays HID
39 events (keyboard/mouse/gamepad) from the BT device over the
40 HARTOS transport channel to the remote host.
41 """
43 def __init__(self):
44 self._forwarded = {} # device_id → relay thread
45 self._relay_threads = {} # device_id → Thread
46 self._running = {} # device_id → bool
48 def discover(self) -> list:
49 """Enumerate paired Bluetooth devices.
51 Linux: Uses D-Bus BlueZ interface.
52 """
53 if not self.available:
54 return []
56 if platform.system() == 'Linux' and _dbus:
57 return self._discover_bluez()
58 return []
60 def forward(self, peripheral_info, transport) -> bool:
61 """Start relaying HID events from a BT device.
63 Args:
64 peripheral_info: PeripheralInfo with peripheral_id = BT MAC.
65 transport: TransportChannel to send events on.
67 Returns:
68 True if relay started.
69 """
70 device_id = peripheral_info.peripheral_id
71 if device_id in self._forwarded:
72 return True # Already forwarding
74 self._running[device_id] = True
75 self._forwarded[device_id] = peripheral_info
77 # Start relay thread
78 thread = threading.Thread(
79 target=self._relay_loop,
80 args=(device_id, transport),
81 daemon=True,
82 name=f'bt-relay-{device_id[:8]}',
83 )
84 self._relay_threads[device_id] = thread
85 thread.start()
87 logger.info(f"BT HID relay started: {device_id}")
88 return True
90 def stop(self, peripheral_id: str) -> bool:
91 """Stop relaying a BT device."""
92 self._running[peripheral_id] = False
93 thread = self._relay_threads.pop(peripheral_id, None)
94 if thread:
95 thread.join(timeout=3)
96 self._forwarded.pop(peripheral_id, None)
97 logger.info(f"BT HID relay stopped: {peripheral_id}")
98 return True
100 def stop_all(self) -> None:
101 """Stop all BT relays."""
102 for device_id in list(self._forwarded.keys()):
103 self.stop(device_id)
105 @property
106 def available(self) -> bool:
107 """Check if BT backend dependencies are available."""
108 if platform.system() != 'Linux':
109 return False
110 return _dbus is not None
112 @property
113 def peripheral_type_name(self) -> str:
114 return 'bluetooth'
116 # ── BlueZ discovery ──────────────────────────────────────
118 def _discover_bluez(self) -> list:
119 """Discover BT devices via BlueZ D-Bus interface."""
120 from integrations.remote_desktop.peripheral_bridge import (
121 PeripheralInfo, PeripheralType,
122 )
123 results = []
124 try:
125 bus = _dbus.SystemBus()
126 manager = _dbus.Interface(
127 bus.get_object('org.bluez', '/'),
128 'org.freedesktop.DBus.ObjectManager',
129 )
130 objects = manager.GetManagedObjects()
132 for path, interfaces in objects.items():
133 if 'org.bluez.Device1' not in interfaces:
134 continue
135 props = interfaces['org.bluez.Device1']
136 address = str(props.get('Address', ''))
137 name = str(props.get('Name', 'Unknown BT Device'))
138 paired = bool(props.get('Paired', False))
139 connected = bool(props.get('Connected', False))
141 if paired:
142 results.append(PeripheralInfo(
143 peripheral_id=address,
144 name=name,
145 peripheral_type=PeripheralType.BLUETOOTH,
146 connected=connected,
147 ))
148 except Exception as e:
149 logger.debug(f"BlueZ discovery failed: {e}")
151 return results
153 # ── HID relay ────────────────────────────────────────────
155 def _relay_loop(self, device_id: str, transport) -> None:
156 """Relay HID input events from a BT device over transport.
158 Uses evdev to read from the corresponding /dev/input/event* device.
159 """
160 if not _evdev:
161 logger.debug("evdev not available for BT HID relay")
162 return
164 # Find evdev device matching the BT address
165 input_device = self._find_evdev_device(device_id)
166 if not input_device:
167 logger.debug(f"No evdev device found for BT {device_id}")
168 return
170 try:
171 for ev in input_device.read_loop():
172 if not self._running.get(device_id, False):
173 break
174 if ev.type == 0: # EV_SYN
175 continue
177 # Send HID event over transport
178 if transport and hasattr(transport, 'send_event'):
179 transport.send_event({
180 'type': 'peripheral',
181 'subtype': 'bt_hid',
182 'device_id': device_id,
183 'ev_type': ev.type,
184 'ev_code': ev.code,
185 'ev_value': ev.value,
186 })
187 except Exception as e:
188 logger.debug(f"BT HID relay error: {e}")
190 def _find_evdev_device(self, bt_address: str):
191 """Find the evdev input device for a BT address."""
192 if not _evdev:
193 return None
195 # Normalize address for matching
196 normalized = bt_address.upper().replace(':', '')
198 for path in _evdev.list_devices():
199 try:
200 dev = _evdev.InputDevice(path)
201 # Check if device's uniq matches BT address
202 if dev.uniq and dev.uniq.upper().replace(':', '') == normalized:
203 return dev
204 except Exception:
205 continue
206 return None