Coverage for integrations / channels / hardware / gpio_adapter.py: 62.3%
138 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2GPIO Channel Adapter — Input/output pin control for embedded Linux boards.
4Input pins generate Message events (button presses, sensors).
5Output pins respond to agent commands (LEDs, relays, PWM servos).
7Supports gpiod (modern Linux, preferred) and RPi.GPIO (legacy Raspberry Pi).
9Usage:
10 from integrations.channels.hardware.gpio_adapter import GPIOAdapter
11 adapter = GPIOAdapter(input_pins=[17, 27], output_pins=[22, 23])
12 adapter.on_message(handler)
13 await adapter.start()
14"""
15import asyncio
16import json
17import logging
18import os
19import threading
20import time
21import uuid
22from typing import Callable, Dict, List, Optional, Any
24from integrations.channels.base import (
25 ChannelAdapter, ChannelConfig, ChannelStatus,
26 Message, SendResult,
27)
29logger = logging.getLogger(__name__)
31# Debounce interval (ms) to prevent spurious triggers
32DEFAULT_DEBOUNCE_MS = 200
35class GPIOAdapter(ChannelAdapter):
36 """Channel adapter for GPIO pin I/O on embedded Linux boards.
38 Input pins: trigger Message events on state change (HIGH→LOW or LOW→HIGH).
39 Output pins: controlled via send_message (text = 'on'/'off'/'pwm:50').
40 """
42 def __init__(
43 self,
44 input_pins: List[int] = None,
45 output_pins: List[int] = None,
46 debounce_ms: int = DEFAULT_DEBOUNCE_MS,
47 config: ChannelConfig = None,
48 ):
49 super().__init__(config or ChannelConfig())
50 # Parse from env if not provided
51 self._input_pins = input_pins or _parse_pin_list(
52 os.environ.get('HEVOLVE_GPIO_INPUT_PINS', ''))
53 self._output_pins = output_pins or _parse_pin_list(
54 os.environ.get('HEVOLVE_GPIO_OUTPUT_PINS', ''))
55 self._debounce_ms = debounce_ms
56 self._gpio_lib = None # 'gpiod' or 'rpigpio'
57 self._gpio = None
58 self._poll_thread = None
59 self._pin_states = {} # pin -> last known state
60 self._pin_last_event = {} # pin -> timestamp of last event
62 @property
63 def name(self) -> str:
64 return 'gpio'
66 @staticmethod
67 def is_available() -> bool:
68 """Check if GPIO hardware is available on this system."""
69 try:
70 import gpiod
71 return True
72 except ImportError:
73 pass
74 try:
75 import RPi.GPIO
76 return True
77 except ImportError:
78 pass
79 return os.path.isdir('/sys/class/gpio')
81 async def connect(self) -> bool:
82 """Initialize GPIO library and configure pins."""
83 # Try gpiod first (modern), then RPi.GPIO (legacy)
84 try:
85 import gpiod
86 self._gpio_lib = 'gpiod'
87 self._gpio = gpiod
88 logger.info("GPIO adapter: using gpiod (modern Linux GPIO)")
89 except ImportError:
90 try:
91 import RPi.GPIO as GPIO
92 self._gpio_lib = 'rpigpio'
93 self._gpio = GPIO
94 GPIO.setmode(GPIO.BCM)
95 GPIO.setwarnings(False)
96 logger.info("GPIO adapter: using RPi.GPIO")
97 except ImportError:
98 logger.error("GPIO adapter: no GPIO library available")
99 return False
101 # Configure pins
102 try:
103 self._setup_pins()
104 except Exception as e:
105 logger.error(f"GPIO adapter: pin setup failed: {e}")
106 return False
108 # Start polling thread for input pins
109 if self._input_pins:
110 self._poll_thread = threading.Thread(
111 target=self._poll_loop, daemon=True)
112 self._poll_thread.start()
114 self.status = ChannelStatus.CONNECTED
115 logger.info(
116 f"GPIO adapter: inputs={self._input_pins}, "
117 f"outputs={self._output_pins}"
118 )
119 return True
121 async def disconnect(self) -> None:
122 """Release GPIO resources."""
123 self._running = False
124 if self._gpio_lib == 'rpigpio' and self._gpio:
125 try:
126 self._gpio.cleanup()
127 except Exception:
128 pass
130 async def send_message(
131 self, chat_id: str, text: str,
132 reply_to: Optional[str] = None,
133 media: Optional[List] = None,
134 buttons: Optional[List[Dict]] = None,
135 ) -> SendResult:
136 """Control an output pin.
138 chat_id: pin number as string (e.g. "22")
139 text: "on", "off", "toggle", or "pwm:0-100"
140 """
141 try:
142 pin = int(chat_id)
143 except ValueError:
144 return SendResult(success=False, error=f"Invalid pin: {chat_id}")
146 if pin not in self._output_pins:
147 return SendResult(success=False, error=f"Pin {pin} not configured as output")
149 cmd = text.strip().lower()
150 try:
151 if cmd == 'on':
152 self._set_pin(pin, True)
153 elif cmd == 'off':
154 self._set_pin(pin, False)
155 elif cmd == 'toggle':
156 current = self._pin_states.get(pin, False)
157 self._set_pin(pin, not current)
158 elif cmd.startswith('pwm:'):
159 duty = int(cmd.split(':')[1])
160 self._set_pwm(pin, max(0, min(100, duty)))
161 else:
162 return SendResult(success=False, error=f"Unknown command: {cmd}")
164 return SendResult(success=True, message_id=f"gpio_{pin}")
165 except Exception as e:
166 return SendResult(success=False, error=str(e))
168 async def edit_message(self, chat_id: str, message_id: str,
169 text: str, buttons=None) -> SendResult:
170 return await self.send_message(chat_id, text)
172 async def delete_message(self, chat_id: str, message_id: str) -> bool:
173 return False
175 async def send_typing(self, chat_id: str) -> None:
176 pass
178 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
179 try:
180 pin = int(chat_id)
181 return {
182 'pin': pin,
183 'type': 'input' if pin in self._input_pins else 'output',
184 'state': self._pin_states.get(pin),
185 }
186 except ValueError:
187 return None
189 # ─── Internal ───
191 def _setup_pins(self):
192 """Configure input and output pins."""
193 if self._gpio_lib == 'rpigpio':
194 for pin in self._input_pins:
195 self._gpio.setup(pin, self._gpio.IN, pull_up_down=self._gpio.PUD_UP)
196 self._pin_states[pin] = self._gpio.input(pin)
197 for pin in self._output_pins:
198 self._gpio.setup(pin, self._gpio.OUT, initial=self._gpio.LOW)
199 self._pin_states[pin] = False
200 else:
201 # gpiod — just initialize state tracking
202 for pin in self._input_pins:
203 self._pin_states[pin] = None
204 for pin in self._output_pins:
205 self._pin_states[pin] = False
207 def _set_pin(self, pin: int, high: bool):
208 """Set an output pin high or low."""
209 if self._gpio_lib == 'rpigpio':
210 self._gpio.output(pin, self._gpio.HIGH if high else self._gpio.LOW)
211 self._pin_states[pin] = high
213 def _set_pwm(self, pin: int, duty: int):
214 """Set PWM duty cycle (0-100) on a pin."""
215 if self._gpio_lib == 'rpigpio':
216 pwm = self._gpio.PWM(pin, 1000) # 1kHz
217 pwm.start(duty)
218 self._pin_states[pin] = duty
220 def _read_pin(self, pin: int) -> bool:
221 """Read current state of an input pin."""
222 if self._gpio_lib == 'rpigpio':
223 return bool(self._gpio.input(pin))
224 return self._pin_states.get(pin, False)
226 def _poll_loop(self):
227 """Poll input pins for state changes with debounce."""
228 poll_ms = int(os.environ.get('HEVOLVE_GPIO_POLL_MS', '50'))
229 while self._running:
230 for pin in self._input_pins:
231 try:
232 current = self._read_pin(pin)
233 previous = self._pin_states.get(pin)
235 if current != previous:
236 # Debounce check
237 now = time.time() * 1000
238 last = self._pin_last_event.get(pin, 0)
239 if now - last < self._debounce_ms:
240 continue
242 self._pin_states[pin] = current
243 self._pin_last_event[pin] = now
244 self._dispatch_gpio_event(pin, current)
245 except Exception as e:
246 logger.debug(f"GPIO poll error pin {pin}: {e}")
248 time.sleep(poll_ms / 1000.0)
250 def _dispatch_gpio_event(self, pin: int, state: bool):
251 """Create Message from GPIO state change and dispatch."""
252 msg = Message(
253 id=str(uuid.uuid4())[:8],
254 channel='gpio',
255 sender_id=f'gpio:{pin}',
256 sender_name=f'GPIO pin {pin}',
257 chat_id=str(pin),
258 text=f'pin:{pin} state:{"HIGH" if state else "LOW"}',
259 raw={'pin': pin, 'state': state, 'timestamp': time.time()},
260 )
262 for handler in self._message_handlers:
263 try:
264 handler(msg)
265 except Exception as e:
266 logger.error(f"GPIO handler error: {e}")
269def _parse_pin_list(env_value: str) -> List[int]:
270 """Parse comma-separated pin numbers from env var."""
271 if not env_value:
272 return []
273 try:
274 return [int(p.strip()) for p in env_value.split(',') if p.strip()]
275 except ValueError:
276 return []