Coverage for integrations / robotics / sensor_adapters.py: 72.3%
119 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Sensor Adapters — Bridge existing hardware adapters to SensorStore.
4Each adapter converts hardware-specific data into SensorReading objects
5and pushes them to the SensorStore. This is the glue between:
6 - GPIO events → proximity/contact readings
7 - Serial messages → IMU/GPS/encoder readings
8 - ROS 2 topics → any sensor type
9 - WAMP IoT topics → any sensor type
11Usage:
12 from integrations.robotics.sensor_adapters import SerialSensorBridge
13 bridge = SerialSensorBridge(
14 port='/dev/ttyUSB0',
15 mappings=[{
16 'line_pattern': 'IMU:(.+),(.+),(.+)',
17 'sensor_id': 'imu_0',
18 'sensor_type': 'imu',
19 'fields': ['accel_x', 'accel_y', 'accel_z'],
20 }],
21 )
22 bridge.start()
23"""
24import json
25import logging
26import re
27import threading
28import time
29from typing import Any, Callable, Dict, List, Optional
31from .sensor_model import SensorReading
32from .sensor_store import get_sensor_store
34logger = logging.getLogger('hevolve_robotics')
37class SerialSensorBridge:
38 """Parse serial messages into SensorReading objects.
40 Configurable via JSON schema mapping — maps line patterns to sensor
41 readings with named fields.
42 """
44 def __init__(
45 self,
46 port: str = '',
47 baudrate: int = 115200,
48 mappings: Optional[List[Dict]] = None,
49 store=None,
50 ):
51 self._port = port
52 self._baudrate = baudrate
53 self._store = store or get_sensor_store()
54 self._running = False
55 self._thread: Optional[threading.Thread] = None
57 # Compile pattern mappings
58 self._mappings = []
59 for m in (mappings or []):
60 try:
61 compiled = re.compile(m.get('line_pattern', ''))
62 self._mappings.append({
63 'pattern': compiled,
64 'sensor_id': m.get('sensor_id', 'serial_0'),
65 'sensor_type': m.get('sensor_type', 'imu'),
66 'fields': m.get('fields', []),
67 'frame_id': m.get('frame_id', 'base_link'),
68 })
69 except re.error as e:
70 logger.warning(f"SerialSensorBridge: invalid pattern: {e}")
72 def parse_line(self, line: str) -> Optional[SensorReading]:
73 """Parse a single serial line into a SensorReading, or None."""
74 for mapping in self._mappings:
75 match = mapping['pattern'].search(line)
76 if match:
77 groups = match.groups()
78 data = {}
79 for i, field_name in enumerate(mapping['fields']):
80 if i < len(groups):
81 try:
82 data[field_name] = float(groups[i])
83 except (ValueError, TypeError):
84 data[field_name] = groups[i]
86 return SensorReading(
87 sensor_id=mapping['sensor_id'],
88 sensor_type=mapping['sensor_type'],
89 frame_id=mapping['frame_id'],
90 data=data,
91 source='serial',
92 )
93 return None
95 def start(self):
96 """Start reading serial port in a background thread."""
97 if self._running or not self._port:
98 return
99 self._running = True
100 self._thread = threading.Thread(
101 target=self._read_loop, name=f'serial_sensor_{self._port}', daemon=True,
102 )
103 self._thread.start()
105 def stop(self):
106 self._running = False
108 def _read_loop(self):
109 try:
110 import serial
111 except ImportError:
112 logger.warning("SerialSensorBridge: pyserial not installed")
113 return
115 while self._running:
116 try:
117 ser = serial.Serial(self._port, self._baudrate, timeout=0.1)
118 while self._running:
119 line = ser.readline().decode('utf-8', errors='ignore').strip()
120 if line:
121 reading = self.parse_line(line)
122 if reading:
123 self._store.put_reading(reading)
124 ser.close()
125 except Exception as e:
126 logger.debug(f"SerialSensorBridge read error: {e}")
127 time.sleep(1.0)
130class GPIOSensorBridge:
131 """Convert GPIO pin state changes to proximity/contact sensor readings."""
133 def __init__(
134 self,
135 pin_mappings: Optional[Dict[int, Dict]] = None,
136 store=None,
137 ):
138 """
139 Args:
140 pin_mappings: {pin: {'sensor_id': 'prox_0', 'sensor_type': 'proximity',
141 'active_low': True, 'distance_m': 0.01}}
142 """
143 self._store = store or get_sensor_store()
144 self._pin_mappings = pin_mappings or {}
146 def on_pin_change(self, pin: int, value: int):
147 """Called when a GPIO pin changes state.
149 Typically hooked into the existing GPIOAdapter's event system.
150 """
151 mapping = self._pin_mappings.get(pin)
152 if not mapping:
153 return
155 active_low = mapping.get('active_low', True)
156 is_active = (value == 0) if active_low else (value == 1)
158 sensor_type = mapping.get('sensor_type', 'contact')
159 data = {}
161 if sensor_type == 'proximity':
162 data['distance_m'] = mapping.get('distance_m', 0.0) if is_active else 999.0
163 data['object_detected'] = is_active
164 elif sensor_type == 'contact':
165 data['is_contact'] = is_active
166 data['force_n'] = mapping.get('force_n', 1.0) if is_active else 0.0
167 else:
168 data['value'] = value
169 data['active'] = is_active
171 reading = SensorReading(
172 sensor_id=mapping.get('sensor_id', f'gpio_{pin}'),
173 sensor_type=sensor_type,
174 data=data,
175 source='gpio',
176 )
177 self._store.put_reading(reading)
180class ROSSensorBridge:
181 """Subscribe to ROS 2 sensor topics and convert to SensorReadings.
183 Extends the existing ROSBridgeAdapter pattern for sensor_msgs types:
184 Imu, NavSatFix, LaserScan, JointState.
185 """
187 def __init__(
188 self,
189 topic_mappings: Optional[Dict[str, Dict]] = None,
190 store=None,
191 ):
192 """
193 Args:
194 topic_mappings: {'/imu/data': {'sensor_id': 'imu_0', 'sensor_type': 'imu'},
195 '/gps/fix': {'sensor_id': 'gps_0', 'sensor_type': 'gps'}}
196 """
197 self._store = store or get_sensor_store()
198 self._topic_mappings = topic_mappings or {}
200 def on_ros_message(self, topic: str, msg_data: Dict):
201 """Called when a ROS 2 message is received.
203 msg_data should be the deserialized message as a dict.
204 """
205 mapping = self._topic_mappings.get(topic)
206 if not mapping:
207 return
209 sensor_type = mapping.get('sensor_type', '')
210 sensor_id = mapping.get('sensor_id', topic.replace('/', '_').strip('_'))
212 data = self._extract_sensor_data(sensor_type, msg_data)
213 if data is None:
214 return
216 reading = SensorReading(
217 sensor_id=sensor_id,
218 sensor_type=sensor_type,
219 frame_id=msg_data.get('header', {}).get('frame_id', 'base_link'),
220 data=data,
221 source='ros',
222 )
223 self._store.put_reading(reading)
225 def _extract_sensor_data(self, sensor_type: str, msg: Dict) -> Optional[Dict]:
226 """Extract sensor-specific data from a ROS message dict."""
227 if sensor_type == 'imu':
228 lin = msg.get('linear_acceleration', {})
229 ang = msg.get('angular_velocity', {})
230 return {
231 'accel_x': lin.get('x', 0), 'accel_y': lin.get('y', 0),
232 'accel_z': lin.get('z', 0),
233 'gyro_x': ang.get('x', 0), 'gyro_y': ang.get('y', 0),
234 'gyro_z': ang.get('z', 0),
235 }
236 elif sensor_type == 'gps':
237 return {
238 'latitude': msg.get('latitude', 0),
239 'longitude': msg.get('longitude', 0),
240 'altitude': msg.get('altitude', 0),
241 }
242 elif sensor_type == 'lidar':
243 return {
244 'ranges': msg.get('ranges', []),
245 'angle_min': msg.get('angle_min', 0),
246 'angle_max': msg.get('angle_max', 0),
247 'angle_increment': msg.get('angle_increment', 0),
248 }
249 elif sensor_type == 'encoder':
250 positions = msg.get('position', [])
251 velocities = msg.get('velocity', [])
252 return {
253 'position_ticks': positions[0] if positions else 0,
254 'velocity_ticks_per_sec': velocities[0] if velocities else 0,
255 }
256 return msg # Pass through for unknown types
259class WAMPSensorBridge:
260 """Listen on WAMP topics for IoT sensor payloads."""
262 def __init__(
263 self,
264 topic_mappings: Optional[Dict[str, Dict]] = None,
265 store=None,
266 ):
267 """
268 Args:
269 topic_mappings: {'com.hart.sensors.imu': {'sensor_id': 'imu_0', 'sensor_type': 'imu'}}
270 """
271 self._store = store or get_sensor_store()
272 self._topic_mappings = topic_mappings or {}
274 def on_wamp_event(self, topic: str, payload: Dict):
275 """Called when a WAMP event is received on a sensor topic."""
276 mapping = self._topic_mappings.get(topic)
277 if not mapping:
278 return
280 reading = SensorReading(
281 sensor_id=mapping.get('sensor_id', topic.split('.')[-1]),
282 sensor_type=mapping.get('sensor_type', 'unknown'),
283 data=payload.get('data', payload),
284 quality=payload.get('quality', 1.0),
285 source='wamp',
286 )
287 self._store.put_reading(reading)