Coverage for integrations / remote_desktop / peripheral_backends / gamepad_backend.py: 39.0%

82 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Gamepad Backend — Forward gamepad input events over HARTOS transport. 

3 

4Sunshine/Moonlight already forward gamepads natively. This backend provides 

5the same for the native HARTOS transport fallback. 

6 

7Linux: evdev → read input events → transport → remote receives 

8Windows: Not yet supported (would need XInput/DirectInput) 

9 

10Poll interval: ~4ms (250Hz) for responsive gamepad input. 

11""" 

12 

13import logging 

14import platform 

15import struct 

16import threading 

17import time 

18from typing import Dict, List, Optional 

19 

20logger = logging.getLogger('hevolve.remote_desktop') 

21 

22# Optional: evdev for Linux gamepad 

23_evdev = None 

24try: 

25 import evdev as _evdev_module 

26 _evdev = _evdev_module 

27except ImportError: 

28 pass 

29 

30 

31class GamepadBackend: 

32 """Gamepad forwarding via evdev (Linux) or XInput (Windows).""" 

33 

34 POLL_INTERVAL = 0.004 # ~250Hz 

35 

36 def __init__(self): 

37 self._forwarded: Dict[str, dict] = {} 

38 self._relay_threads: Dict[str, threading.Thread] = {} 

39 self._running: Dict[str, bool] = {} 

40 

41 def discover(self) -> list: 

42 """Enumerate connected gamepads.""" 

43 if not self.available: 

44 return [] 

45 

46 if platform.system() == 'Linux' and _evdev: 

47 return self._discover_evdev() 

48 return [] 

49 

50 def forward(self, peripheral_info, transport) -> bool: 

51 """Start forwarding gamepad events over transport. 

52 

53 Args: 

54 peripheral_info: PeripheralInfo for the gamepad. 

55 transport: TransportChannel to send events on. 

56 

57 Returns: 

58 True if forwarding started. 

59 """ 

60 device_id = peripheral_info.peripheral_id 

61 if device_id in self._forwarded: 

62 return True 

63 

64 self._running[device_id] = True 

65 self._forwarded[device_id] = { 

66 'info': peripheral_info, 

67 'started_at': time.time(), 

68 } 

69 

70 thread = threading.Thread( 

71 target=self._relay_loop, 

72 args=(device_id, transport), 

73 daemon=True, 

74 name=f'gamepad-{device_id[:12]}', 

75 ) 

76 self._relay_threads[device_id] = thread 

77 thread.start() 

78 

79 logger.info(f"Gamepad relay started: {peripheral_info.name}") 

80 return True 

81 

82 def stop(self, peripheral_id: str) -> bool: 

83 """Stop forwarding a gamepad.""" 

84 self._running[peripheral_id] = False 

85 thread = self._relay_threads.pop(peripheral_id, None) 

86 if thread: 

87 thread.join(timeout=3) 

88 self._forwarded.pop(peripheral_id, None) 

89 logger.info(f"Gamepad relay stopped: {peripheral_id}") 

90 return True 

91 

92 def stop_all(self) -> None: 

93 """Stop all gamepad relays.""" 

94 for device_id in list(self._forwarded.keys()): 

95 self.stop(device_id) 

96 

97 @property 

98 def available(self) -> bool: 

99 """Check if gamepad support is available.""" 

100 if platform.system() == 'Linux' and _evdev: 

101 return True 

102 return False 

103 

104 @property 

105 def peripheral_type_name(self) -> str: 

106 return 'gamepad' 

107 

108 # ── Linux evdev ────────────────────────────────────────── 

109 

110 def _discover_evdev(self) -> list: 

111 """Discover gamepads via evdev.""" 

112 from integrations.remote_desktop.peripheral_bridge import ( 

113 PeripheralInfo, PeripheralType, 

114 ) 

115 results = [] 

116 for path in _evdev.list_devices(): 

117 try: 

118 dev = _evdev.InputDevice(path) 

119 caps = dev.capabilities(verbose=False) 

120 

121 # Check for gamepad-like capabilities: 

122 # EV_ABS (3) for analog sticks, EV_KEY (1) for buttons 

123 has_abs = 3 in caps # EV_ABS 

124 has_key = 1 in caps # EV_KEY 

125 if has_abs and has_key: 

126 # Check for gamepad buttons (BTN_GAMEPAD range: 0x130-0x13f) 

127 buttons = caps.get(1, []) 

128 has_gamepad_btn = any(0x130 <= b <= 0x13f for b in buttons) 

129 # Also check for joystick buttons (BTN_JOYSTICK: 0x120-0x12f) 

130 has_joystick_btn = any(0x120 <= b <= 0x12f for b in buttons) 

131 

132 if has_gamepad_btn or has_joystick_btn: 

133 results.append(PeripheralInfo( 

134 peripheral_id=path, 

135 name=dev.name, 

136 peripheral_type=PeripheralType.GAMEPAD, 

137 vendor_id=f'{dev.info.vendor:04x}' if dev.info else None, 

138 product_id=f'{dev.info.product:04x}' if dev.info else None, 

139 connected=True, 

140 )) 

141 except Exception: 

142 continue 

143 return results 

144 

145 def _relay_loop(self, device_id: str, transport) -> None: 

146 """Relay gamepad events to remote via transport.""" 

147 if not _evdev: 

148 return 

149 

150 try: 

151 dev = _evdev.InputDevice(device_id) 

152 except Exception as e: 

153 logger.debug(f"Cannot open gamepad {device_id}: {e}") 

154 return 

155 

156 try: 

157 for ev in dev.read_loop(): 

158 if not self._running.get(device_id, False): 

159 break 

160 if ev.type == 0: # EV_SYN 

161 continue 

162 

163 # Send gamepad event over transport 

164 if transport and hasattr(transport, 'send_event'): 

165 transport.send_event({ 

166 'type': 'peripheral', 

167 'subtype': 'gamepad', 

168 'device_id': device_id, 

169 'ev_type': ev.type, 

170 'ev_code': ev.code, 

171 'ev_value': ev.value, 

172 'timestamp': ev.timestamp(), 

173 }) 

174 except Exception as e: 

175 logger.debug(f"Gamepad relay error: {e}")