Coverage for integrations / remote_desktop / peripheral_backends / usbip_backend.py: 53.7%

67 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2USB over IP Backend — Wraps Linux usbip kernel module for USB device forwarding. 

3 

4Host side: `usbip bind -b <busid>` → exposes USB device 

5Viewer side: `usbip attach -r <host> -b <busid>` → attaches remote USB device 

6 

7Requires: usbip kernel module loaded (usbip-core, usbip-host, vhci-hcd). 

8NixOS: nixos/modules/hart-peripheral-bridge.nix loads these automatically. 

9""" 

10 

11import logging 

12import platform 

13import subprocess 

14from typing import List 

15 

16logger = logging.getLogger('hevolve.remote_desktop') 

17 

18 

19class USBIPBackend: 

20 """USB over IP via Linux usbip kernel module.""" 

21 

22 def __init__(self): 

23 self._forwarded = {} # busid → info 

24 

25 def discover(self) -> list: 

26 """Discover locally connected USB devices via `usbip list -l`.""" 

27 if not self.available: 

28 return [] 

29 

30 try: 

31 output = subprocess.check_output( 

32 ['usbip', 'list', '-l'], 

33 timeout=5, 

34 text=True, 

35 stderr=subprocess.DEVNULL, 

36 ) 

37 return self._parse_usbip_list(output) 

38 except Exception as e: 

39 logger.debug(f"USB discovery failed: {e}") 

40 return [] 

41 

42 def forward(self, peripheral_info, transport) -> bool: 

43 """Bind a USB device for remote access via usbip. 

44 

45 Args: 

46 peripheral_info: PeripheralInfo with peripheral_id = bus_id. 

47 transport: TransportChannel (used for signaling, not data). 

48 

49 Returns: 

50 True if bind succeeded. 

51 """ 

52 bus_id = peripheral_info.peripheral_id 

53 try: 

54 subprocess.check_call( 

55 ['usbip', 'bind', '-b', bus_id], 

56 timeout=10, 

57 stdout=subprocess.DEVNULL, 

58 stderr=subprocess.DEVNULL, 

59 ) 

60 self._forwarded[bus_id] = peripheral_info 

61 logger.info(f"USB device bound: {bus_id}") 

62 

63 # Notify remote via transport 

64 if transport and hasattr(transport, 'send_event'): 

65 transport.send_event({ 

66 'type': 'peripheral', 

67 'action': 'USB_AVAILABLE', 

68 'bus_id': bus_id, 

69 'name': peripheral_info.name, 

70 }) 

71 return True 

72 except Exception as e: 

73 logger.warning(f"USB bind failed for {bus_id}: {e}") 

74 return False 

75 

76 def attach_remote(self, host_ip: str, bus_id: str) -> bool: 

77 """Attach a remote USB device (viewer side). 

78 

79 Args: 

80 host_ip: IP address of the host sharing the USB device. 

81 bus_id: USB bus ID to attach. 

82 

83 Returns: 

84 True if attach succeeded. 

85 """ 

86 try: 

87 subprocess.check_call( 

88 ['usbip', 'attach', '-r', host_ip, '-b', bus_id], 

89 timeout=10, 

90 stdout=subprocess.DEVNULL, 

91 stderr=subprocess.DEVNULL, 

92 ) 

93 logger.info(f"USB device attached: {bus_id} from {host_ip}") 

94 return True 

95 except Exception as e: 

96 logger.warning(f"USB attach failed: {e}") 

97 return False 

98 

99 def stop(self, peripheral_id: str) -> bool: 

100 """Unbind a USB device.""" 

101 try: 

102 subprocess.check_call( 

103 ['usbip', 'unbind', '-b', peripheral_id], 

104 timeout=10, 

105 stdout=subprocess.DEVNULL, 

106 stderr=subprocess.DEVNULL, 

107 ) 

108 self._forwarded.pop(peripheral_id, None) 

109 logger.info(f"USB device unbound: {peripheral_id}") 

110 return True 

111 except Exception as e: 

112 logger.debug(f"USB unbind failed: {e}") 

113 return False 

114 

115 def stop_all(self) -> None: 

116 """Unbind all forwarded USB devices.""" 

117 for bus_id in list(self._forwarded.keys()): 

118 self.stop(bus_id) 

119 

120 @property 

121 def available(self) -> bool: 

122 """Check if usbip is available on this system.""" 

123 if platform.system() != 'Linux': 

124 return False 

125 try: 

126 subprocess.check_call( 

127 ['which', 'usbip'], 

128 timeout=5, 

129 stdout=subprocess.DEVNULL, 

130 stderr=subprocess.DEVNULL, 

131 ) 

132 return True 

133 except Exception: 

134 return False 

135 

136 @property 

137 def peripheral_type_name(self) -> str: 

138 return 'usb' 

139 

140 def _parse_usbip_list(self, output: str) -> list: 

141 """Parse `usbip list -l` output into PeripheralInfo-like dicts.""" 

142 from integrations.remote_desktop.peripheral_bridge import ( 

143 PeripheralInfo, PeripheralType, 

144 ) 

145 results = [] 

146 current_busid = None 

147 current_name = '' 

148 

149 for line in output.split('\n'): 

150 line = line.strip() 

151 if line.startswith('-'): 

152 # Bus ID line: " - busid 1-1 (0bda:8153)" 

153 parts = line.split() 

154 if len(parts) >= 3 and parts[1] == 'busid': 

155 current_busid = parts[2] 

156 # Vendor:product in parentheses 

157 for p in parts: 

158 if '(' in p: 

159 vid_pid = p.strip('()') 

160 break 

161 elif current_busid and ':' in line: 

162 # Description line 

163 current_name = line.strip() 

164 results.append(PeripheralInfo( 

165 peripheral_id=current_busid, 

166 name=current_name, 

167 peripheral_type=PeripheralType.USB, 

168 connected=True, 

169 )) 

170 current_busid = None 

171 

172 return results