Coverage for integrations / channels / hardware / gpio_adapter.py: 62.3%

138 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2GPIO Channel Adapter — Input/output pin control for embedded Linux boards. 

3 

4Input pins generate Message events (button presses, sensors). 

5Output pins respond to agent commands (LEDs, relays, PWM servos). 

6 

7Supports gpiod (modern Linux, preferred) and RPi.GPIO (legacy Raspberry Pi). 

8 

9Usage: 

10 from integrations.channels.hardware.gpio_adapter import GPIOAdapter 

11 adapter = GPIOAdapter(input_pins=[17, 27], output_pins=[22, 23]) 

12 adapter.on_message(handler) 

13 await adapter.start() 

14""" 

15import asyncio 

16import json 

17import logging 

18import os 

19import threading 

20import time 

21import uuid 

22from typing import Callable, Dict, List, Optional, Any 

23 

24from integrations.channels.base import ( 

25 ChannelAdapter, ChannelConfig, ChannelStatus, 

26 Message, SendResult, 

27) 

28 

29logger = logging.getLogger(__name__) 

30 

31# Debounce interval (ms) to prevent spurious triggers 

32DEFAULT_DEBOUNCE_MS = 200 

33 

34 

35class GPIOAdapter(ChannelAdapter): 

36 """Channel adapter for GPIO pin I/O on embedded Linux boards. 

37 

38 Input pins: trigger Message events on state change (HIGH→LOW or LOW→HIGH). 

39 Output pins: controlled via send_message (text = 'on'/'off'/'pwm:50'). 

40 """ 

41 

42 def __init__( 

43 self, 

44 input_pins: List[int] = None, 

45 output_pins: List[int] = None, 

46 debounce_ms: int = DEFAULT_DEBOUNCE_MS, 

47 config: ChannelConfig = None, 

48 ): 

49 super().__init__(config or ChannelConfig()) 

50 # Parse from env if not provided 

51 self._input_pins = input_pins or _parse_pin_list( 

52 os.environ.get('HEVOLVE_GPIO_INPUT_PINS', '')) 

53 self._output_pins = output_pins or _parse_pin_list( 

54 os.environ.get('HEVOLVE_GPIO_OUTPUT_PINS', '')) 

55 self._debounce_ms = debounce_ms 

56 self._gpio_lib = None # 'gpiod' or 'rpigpio' 

57 self._gpio = None 

58 self._poll_thread = None 

59 self._pin_states = {} # pin -> last known state 

60 self._pin_last_event = {} # pin -> timestamp of last event 

61 

62 @property 

63 def name(self) -> str: 

64 return 'gpio' 

65 

66 @staticmethod 

67 def is_available() -> bool: 

68 """Check if GPIO hardware is available on this system.""" 

69 try: 

70 import gpiod 

71 return True 

72 except ImportError: 

73 pass 

74 try: 

75 import RPi.GPIO 

76 return True 

77 except ImportError: 

78 pass 

79 return os.path.isdir('/sys/class/gpio') 

80 

81 async def connect(self) -> bool: 

82 """Initialize GPIO library and configure pins.""" 

83 # Try gpiod first (modern), then RPi.GPIO (legacy) 

84 try: 

85 import gpiod 

86 self._gpio_lib = 'gpiod' 

87 self._gpio = gpiod 

88 logger.info("GPIO adapter: using gpiod (modern Linux GPIO)") 

89 except ImportError: 

90 try: 

91 import RPi.GPIO as GPIO 

92 self._gpio_lib = 'rpigpio' 

93 self._gpio = GPIO 

94 GPIO.setmode(GPIO.BCM) 

95 GPIO.setwarnings(False) 

96 logger.info("GPIO adapter: using RPi.GPIO") 

97 except ImportError: 

98 logger.error("GPIO adapter: no GPIO library available") 

99 return False 

100 

101 # Configure pins 

102 try: 

103 self._setup_pins() 

104 except Exception as e: 

105 logger.error(f"GPIO adapter: pin setup failed: {e}") 

106 return False 

107 

108 # Start polling thread for input pins 

109 if self._input_pins: 

110 self._poll_thread = threading.Thread( 

111 target=self._poll_loop, daemon=True) 

112 self._poll_thread.start() 

113 

114 self.status = ChannelStatus.CONNECTED 

115 logger.info( 

116 f"GPIO adapter: inputs={self._input_pins}, " 

117 f"outputs={self._output_pins}" 

118 ) 

119 return True 

120 

121 async def disconnect(self) -> None: 

122 """Release GPIO resources.""" 

123 self._running = False 

124 if self._gpio_lib == 'rpigpio' and self._gpio: 

125 try: 

126 self._gpio.cleanup() 

127 except Exception: 

128 pass 

129 

130 async def send_message( 

131 self, chat_id: str, text: str, 

132 reply_to: Optional[str] = None, 

133 media: Optional[List] = None, 

134 buttons: Optional[List[Dict]] = None, 

135 ) -> SendResult: 

136 """Control an output pin. 

137 

138 chat_id: pin number as string (e.g. "22") 

139 text: "on", "off", "toggle", or "pwm:0-100" 

140 """ 

141 try: 

142 pin = int(chat_id) 

143 except ValueError: 

144 return SendResult(success=False, error=f"Invalid pin: {chat_id}") 

145 

146 if pin not in self._output_pins: 

147 return SendResult(success=False, error=f"Pin {pin} not configured as output") 

148 

149 cmd = text.strip().lower() 

150 try: 

151 if cmd == 'on': 

152 self._set_pin(pin, True) 

153 elif cmd == 'off': 

154 self._set_pin(pin, False) 

155 elif cmd == 'toggle': 

156 current = self._pin_states.get(pin, False) 

157 self._set_pin(pin, not current) 

158 elif cmd.startswith('pwm:'): 

159 duty = int(cmd.split(':')[1]) 

160 self._set_pwm(pin, max(0, min(100, duty))) 

161 else: 

162 return SendResult(success=False, error=f"Unknown command: {cmd}") 

163 

164 return SendResult(success=True, message_id=f"gpio_{pin}") 

165 except Exception as e: 

166 return SendResult(success=False, error=str(e)) 

167 

168 async def edit_message(self, chat_id: str, message_id: str, 

169 text: str, buttons=None) -> SendResult: 

170 return await self.send_message(chat_id, text) 

171 

172 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

173 return False 

174 

175 async def send_typing(self, chat_id: str) -> None: 

176 pass 

177 

178 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

179 try: 

180 pin = int(chat_id) 

181 return { 

182 'pin': pin, 

183 'type': 'input' if pin in self._input_pins else 'output', 

184 'state': self._pin_states.get(pin), 

185 } 

186 except ValueError: 

187 return None 

188 

189 # ─── Internal ─── 

190 

191 def _setup_pins(self): 

192 """Configure input and output pins.""" 

193 if self._gpio_lib == 'rpigpio': 

194 for pin in self._input_pins: 

195 self._gpio.setup(pin, self._gpio.IN, pull_up_down=self._gpio.PUD_UP) 

196 self._pin_states[pin] = self._gpio.input(pin) 

197 for pin in self._output_pins: 

198 self._gpio.setup(pin, self._gpio.OUT, initial=self._gpio.LOW) 

199 self._pin_states[pin] = False 

200 else: 

201 # gpiod — just initialize state tracking 

202 for pin in self._input_pins: 

203 self._pin_states[pin] = None 

204 for pin in self._output_pins: 

205 self._pin_states[pin] = False 

206 

207 def _set_pin(self, pin: int, high: bool): 

208 """Set an output pin high or low.""" 

209 if self._gpio_lib == 'rpigpio': 

210 self._gpio.output(pin, self._gpio.HIGH if high else self._gpio.LOW) 

211 self._pin_states[pin] = high 

212 

213 def _set_pwm(self, pin: int, duty: int): 

214 """Set PWM duty cycle (0-100) on a pin.""" 

215 if self._gpio_lib == 'rpigpio': 

216 pwm = self._gpio.PWM(pin, 1000) # 1kHz 

217 pwm.start(duty) 

218 self._pin_states[pin] = duty 

219 

220 def _read_pin(self, pin: int) -> bool: 

221 """Read current state of an input pin.""" 

222 if self._gpio_lib == 'rpigpio': 

223 return bool(self._gpio.input(pin)) 

224 return self._pin_states.get(pin, False) 

225 

226 def _poll_loop(self): 

227 """Poll input pins for state changes with debounce.""" 

228 poll_ms = int(os.environ.get('HEVOLVE_GPIO_POLL_MS', '50')) 

229 while self._running: 

230 for pin in self._input_pins: 

231 try: 

232 current = self._read_pin(pin) 

233 previous = self._pin_states.get(pin) 

234 

235 if current != previous: 

236 # Debounce check 

237 now = time.time() * 1000 

238 last = self._pin_last_event.get(pin, 0) 

239 if now - last < self._debounce_ms: 

240 continue 

241 

242 self._pin_states[pin] = current 

243 self._pin_last_event[pin] = now 

244 self._dispatch_gpio_event(pin, current) 

245 except Exception as e: 

246 logger.debug(f"GPIO poll error pin {pin}: {e}") 

247 

248 time.sleep(poll_ms / 1000.0) 

249 

250 def _dispatch_gpio_event(self, pin: int, state: bool): 

251 """Create Message from GPIO state change and dispatch.""" 

252 msg = Message( 

253 id=str(uuid.uuid4())[:8], 

254 channel='gpio', 

255 sender_id=f'gpio:{pin}', 

256 sender_name=f'GPIO pin {pin}', 

257 chat_id=str(pin), 

258 text=f'pin:{pin} state:{"HIGH" if state else "LOW"}', 

259 raw={'pin': pin, 'state': state, 'timestamp': time.time()}, 

260 ) 

261 

262 for handler in self._message_handlers: 

263 try: 

264 handler(msg) 

265 except Exception as e: 

266 logger.error(f"GPIO handler error: {e}") 

267 

268 

269def _parse_pin_list(env_value: str) -> List[int]: 

270 """Parse comma-separated pin numbers from env var.""" 

271 if not env_value: 

272 return [] 

273 try: 

274 return [int(p.strip()) for p in env_value.split(',') if p.strip()] 

275 except ValueError: 

276 return []