Coverage for integrations / remote_desktop / peripheral_backends / usbip_backend.py: 53.7%
67 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2USB over IP Backend — Wraps Linux usbip kernel module for USB device forwarding.
4Host side: `usbip bind -b <busid>` → exposes USB device
5Viewer side: `usbip attach -r <host> -b <busid>` → attaches remote USB device
7Requires: usbip kernel module loaded (usbip-core, usbip-host, vhci-hcd).
8NixOS: nixos/modules/hart-peripheral-bridge.nix loads these automatically.
9"""
11import logging
12import platform
13import subprocess
14from typing import List
16logger = logging.getLogger('hevolve.remote_desktop')
19class USBIPBackend:
20 """USB over IP via Linux usbip kernel module."""
22 def __init__(self):
23 self._forwarded = {} # busid → info
25 def discover(self) -> list:
26 """Discover locally connected USB devices via `usbip list -l`."""
27 if not self.available:
28 return []
30 try:
31 output = subprocess.check_output(
32 ['usbip', 'list', '-l'],
33 timeout=5,
34 text=True,
35 stderr=subprocess.DEVNULL,
36 )
37 return self._parse_usbip_list(output)
38 except Exception as e:
39 logger.debug(f"USB discovery failed: {e}")
40 return []
42 def forward(self, peripheral_info, transport) -> bool:
43 """Bind a USB device for remote access via usbip.
45 Args:
46 peripheral_info: PeripheralInfo with peripheral_id = bus_id.
47 transport: TransportChannel (used for signaling, not data).
49 Returns:
50 True if bind succeeded.
51 """
52 bus_id = peripheral_info.peripheral_id
53 try:
54 subprocess.check_call(
55 ['usbip', 'bind', '-b', bus_id],
56 timeout=10,
57 stdout=subprocess.DEVNULL,
58 stderr=subprocess.DEVNULL,
59 )
60 self._forwarded[bus_id] = peripheral_info
61 logger.info(f"USB device bound: {bus_id}")
63 # Notify remote via transport
64 if transport and hasattr(transport, 'send_event'):
65 transport.send_event({
66 'type': 'peripheral',
67 'action': 'USB_AVAILABLE',
68 'bus_id': bus_id,
69 'name': peripheral_info.name,
70 })
71 return True
72 except Exception as e:
73 logger.warning(f"USB bind failed for {bus_id}: {e}")
74 return False
76 def attach_remote(self, host_ip: str, bus_id: str) -> bool:
77 """Attach a remote USB device (viewer side).
79 Args:
80 host_ip: IP address of the host sharing the USB device.
81 bus_id: USB bus ID to attach.
83 Returns:
84 True if attach succeeded.
85 """
86 try:
87 subprocess.check_call(
88 ['usbip', 'attach', '-r', host_ip, '-b', bus_id],
89 timeout=10,
90 stdout=subprocess.DEVNULL,
91 stderr=subprocess.DEVNULL,
92 )
93 logger.info(f"USB device attached: {bus_id} from {host_ip}")
94 return True
95 except Exception as e:
96 logger.warning(f"USB attach failed: {e}")
97 return False
99 def stop(self, peripheral_id: str) -> bool:
100 """Unbind a USB device."""
101 try:
102 subprocess.check_call(
103 ['usbip', 'unbind', '-b', peripheral_id],
104 timeout=10,
105 stdout=subprocess.DEVNULL,
106 stderr=subprocess.DEVNULL,
107 )
108 self._forwarded.pop(peripheral_id, None)
109 logger.info(f"USB device unbound: {peripheral_id}")
110 return True
111 except Exception as e:
112 logger.debug(f"USB unbind failed: {e}")
113 return False
115 def stop_all(self) -> None:
116 """Unbind all forwarded USB devices."""
117 for bus_id in list(self._forwarded.keys()):
118 self.stop(bus_id)
120 @property
121 def available(self) -> bool:
122 """Check if usbip is available on this system."""
123 if platform.system() != 'Linux':
124 return False
125 try:
126 subprocess.check_call(
127 ['which', 'usbip'],
128 timeout=5,
129 stdout=subprocess.DEVNULL,
130 stderr=subprocess.DEVNULL,
131 )
132 return True
133 except Exception:
134 return False
136 @property
137 def peripheral_type_name(self) -> str:
138 return 'usb'
140 def _parse_usbip_list(self, output: str) -> list:
141 """Parse `usbip list -l` output into PeripheralInfo-like dicts."""
142 from integrations.remote_desktop.peripheral_bridge import (
143 PeripheralInfo, PeripheralType,
144 )
145 results = []
146 current_busid = None
147 current_name = ''
149 for line in output.split('\n'):
150 line = line.strip()
151 if line.startswith('-'):
152 # Bus ID line: " - busid 1-1 (0bda:8153)"
153 parts = line.split()
154 if len(parts) >= 3 and parts[1] == 'busid':
155 current_busid = parts[2]
156 # Vendor:product in parentheses
157 for p in parts:
158 if '(' in p:
159 vid_pid = p.strip('()')
160 break
161 elif current_busid and ':' in line:
162 # Description line
163 current_name = line.strip()
164 results.append(PeripheralInfo(
165 peripheral_id=current_busid,
166 name=current_name,
167 peripheral_type=PeripheralType.USB,
168 connected=True,
169 ))
170 current_busid = None
172 return results