Coverage for integrations / remote_desktop / peripheral_backends / gamepad_backend.py: 39.0%
82 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Gamepad Backend — Forward gamepad input events over HARTOS transport.
4Sunshine/Moonlight already forward gamepads natively. This backend provides
5the same for the native HARTOS transport fallback.
7Linux: evdev → read input events → transport → remote receives
8Windows: Not yet supported (would need XInput/DirectInput)
10Poll interval: ~4ms (250Hz) for responsive gamepad input.
11"""
13import logging
14import platform
15import struct
16import threading
17import time
18from typing import Dict, List, Optional
20logger = logging.getLogger('hevolve.remote_desktop')
22# Optional: evdev for Linux gamepad
23_evdev = None
24try:
25 import evdev as _evdev_module
26 _evdev = _evdev_module
27except ImportError:
28 pass
31class GamepadBackend:
32 """Gamepad forwarding via evdev (Linux) or XInput (Windows)."""
34 POLL_INTERVAL = 0.004 # ~250Hz
36 def __init__(self):
37 self._forwarded: Dict[str, dict] = {}
38 self._relay_threads: Dict[str, threading.Thread] = {}
39 self._running: Dict[str, bool] = {}
41 def discover(self) -> list:
42 """Enumerate connected gamepads."""
43 if not self.available:
44 return []
46 if platform.system() == 'Linux' and _evdev:
47 return self._discover_evdev()
48 return []
50 def forward(self, peripheral_info, transport) -> bool:
51 """Start forwarding gamepad events over transport.
53 Args:
54 peripheral_info: PeripheralInfo for the gamepad.
55 transport: TransportChannel to send events on.
57 Returns:
58 True if forwarding started.
59 """
60 device_id = peripheral_info.peripheral_id
61 if device_id in self._forwarded:
62 return True
64 self._running[device_id] = True
65 self._forwarded[device_id] = {
66 'info': peripheral_info,
67 'started_at': time.time(),
68 }
70 thread = threading.Thread(
71 target=self._relay_loop,
72 args=(device_id, transport),
73 daemon=True,
74 name=f'gamepad-{device_id[:12]}',
75 )
76 self._relay_threads[device_id] = thread
77 thread.start()
79 logger.info(f"Gamepad relay started: {peripheral_info.name}")
80 return True
82 def stop(self, peripheral_id: str) -> bool:
83 """Stop forwarding a gamepad."""
84 self._running[peripheral_id] = False
85 thread = self._relay_threads.pop(peripheral_id, None)
86 if thread:
87 thread.join(timeout=3)
88 self._forwarded.pop(peripheral_id, None)
89 logger.info(f"Gamepad relay stopped: {peripheral_id}")
90 return True
92 def stop_all(self) -> None:
93 """Stop all gamepad relays."""
94 for device_id in list(self._forwarded.keys()):
95 self.stop(device_id)
97 @property
98 def available(self) -> bool:
99 """Check if gamepad support is available."""
100 if platform.system() == 'Linux' and _evdev:
101 return True
102 return False
104 @property
105 def peripheral_type_name(self) -> str:
106 return 'gamepad'
108 # ── Linux evdev ──────────────────────────────────────────
110 def _discover_evdev(self) -> list:
111 """Discover gamepads via evdev."""
112 from integrations.remote_desktop.peripheral_bridge import (
113 PeripheralInfo, PeripheralType,
114 )
115 results = []
116 for path in _evdev.list_devices():
117 try:
118 dev = _evdev.InputDevice(path)
119 caps = dev.capabilities(verbose=False)
121 # Check for gamepad-like capabilities:
122 # EV_ABS (3) for analog sticks, EV_KEY (1) for buttons
123 has_abs = 3 in caps # EV_ABS
124 has_key = 1 in caps # EV_KEY
125 if has_abs and has_key:
126 # Check for gamepad buttons (BTN_GAMEPAD range: 0x130-0x13f)
127 buttons = caps.get(1, [])
128 has_gamepad_btn = any(0x130 <= b <= 0x13f for b in buttons)
129 # Also check for joystick buttons (BTN_JOYSTICK: 0x120-0x12f)
130 has_joystick_btn = any(0x120 <= b <= 0x12f for b in buttons)
132 if has_gamepad_btn or has_joystick_btn:
133 results.append(PeripheralInfo(
134 peripheral_id=path,
135 name=dev.name,
136 peripheral_type=PeripheralType.GAMEPAD,
137 vendor_id=f'{dev.info.vendor:04x}' if dev.info else None,
138 product_id=f'{dev.info.product:04x}' if dev.info else None,
139 connected=True,
140 ))
141 except Exception:
142 continue
143 return results
145 def _relay_loop(self, device_id: str, transport) -> None:
146 """Relay gamepad events to remote via transport."""
147 if not _evdev:
148 return
150 try:
151 dev = _evdev.InputDevice(device_id)
152 except Exception as e:
153 logger.debug(f"Cannot open gamepad {device_id}: {e}")
154 return
156 try:
157 for ev in dev.read_loop():
158 if not self._running.get(device_id, False):
159 break
160 if ev.type == 0: # EV_SYN
161 continue
163 # Send gamepad event over transport
164 if transport and hasattr(transport, 'send_event'):
165 transport.send_event({
166 'type': 'peripheral',
167 'subtype': 'gamepad',
168 'device_id': device_id,
169 'ev_type': ev.type,
170 'ev_code': ev.code,
171 'ev_value': ev.value,
172 'timestamp': ev.timestamp(),
173 })
174 except Exception as e:
175 logger.debug(f"Gamepad relay error: {e}")