Coverage for integrations / robotics / sensor_adapters.py: 72.3%

119 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Sensor Adapters — Bridge existing hardware adapters to SensorStore. 

3 

4Each adapter converts hardware-specific data into SensorReading objects 

5and pushes them to the SensorStore. This is the glue between: 

6 - GPIO events → proximity/contact readings 

7 - Serial messages → IMU/GPS/encoder readings 

8 - ROS 2 topics → any sensor type 

9 - WAMP IoT topics → any sensor type 

10 

11Usage: 

12 from integrations.robotics.sensor_adapters import SerialSensorBridge 

13 bridge = SerialSensorBridge( 

14 port='/dev/ttyUSB0', 

15 mappings=[{ 

16 'line_pattern': 'IMU:(.+),(.+),(.+)', 

17 'sensor_id': 'imu_0', 

18 'sensor_type': 'imu', 

19 'fields': ['accel_x', 'accel_y', 'accel_z'], 

20 }], 

21 ) 

22 bridge.start() 

23""" 

24import json 

25import logging 

26import re 

27import threading 

28import time 

29from typing import Any, Callable, Dict, List, Optional 

30 

31from .sensor_model import SensorReading 

32from .sensor_store import get_sensor_store 

33 

34logger = logging.getLogger('hevolve_robotics') 

35 

36 

37class SerialSensorBridge: 

38 """Parse serial messages into SensorReading objects. 

39 

40 Configurable via JSON schema mapping — maps line patterns to sensor 

41 readings with named fields. 

42 """ 

43 

44 def __init__( 

45 self, 

46 port: str = '', 

47 baudrate: int = 115200, 

48 mappings: Optional[List[Dict]] = None, 

49 store=None, 

50 ): 

51 self._port = port 

52 self._baudrate = baudrate 

53 self._store = store or get_sensor_store() 

54 self._running = False 

55 self._thread: Optional[threading.Thread] = None 

56 

57 # Compile pattern mappings 

58 self._mappings = [] 

59 for m in (mappings or []): 

60 try: 

61 compiled = re.compile(m.get('line_pattern', '')) 

62 self._mappings.append({ 

63 'pattern': compiled, 

64 'sensor_id': m.get('sensor_id', 'serial_0'), 

65 'sensor_type': m.get('sensor_type', 'imu'), 

66 'fields': m.get('fields', []), 

67 'frame_id': m.get('frame_id', 'base_link'), 

68 }) 

69 except re.error as e: 

70 logger.warning(f"SerialSensorBridge: invalid pattern: {e}") 

71 

72 def parse_line(self, line: str) -> Optional[SensorReading]: 

73 """Parse a single serial line into a SensorReading, or None.""" 

74 for mapping in self._mappings: 

75 match = mapping['pattern'].search(line) 

76 if match: 

77 groups = match.groups() 

78 data = {} 

79 for i, field_name in enumerate(mapping['fields']): 

80 if i < len(groups): 

81 try: 

82 data[field_name] = float(groups[i]) 

83 except (ValueError, TypeError): 

84 data[field_name] = groups[i] 

85 

86 return SensorReading( 

87 sensor_id=mapping['sensor_id'], 

88 sensor_type=mapping['sensor_type'], 

89 frame_id=mapping['frame_id'], 

90 data=data, 

91 source='serial', 

92 ) 

93 return None 

94 

95 def start(self): 

96 """Start reading serial port in a background thread.""" 

97 if self._running or not self._port: 

98 return 

99 self._running = True 

100 self._thread = threading.Thread( 

101 target=self._read_loop, name=f'serial_sensor_{self._port}', daemon=True, 

102 ) 

103 self._thread.start() 

104 

105 def stop(self): 

106 self._running = False 

107 

108 def _read_loop(self): 

109 try: 

110 import serial 

111 except ImportError: 

112 logger.warning("SerialSensorBridge: pyserial not installed") 

113 return 

114 

115 while self._running: 

116 try: 

117 ser = serial.Serial(self._port, self._baudrate, timeout=0.1) 

118 while self._running: 

119 line = ser.readline().decode('utf-8', errors='ignore').strip() 

120 if line: 

121 reading = self.parse_line(line) 

122 if reading: 

123 self._store.put_reading(reading) 

124 ser.close() 

125 except Exception as e: 

126 logger.debug(f"SerialSensorBridge read error: {e}") 

127 time.sleep(1.0) 

128 

129 

130class GPIOSensorBridge: 

131 """Convert GPIO pin state changes to proximity/contact sensor readings.""" 

132 

133 def __init__( 

134 self, 

135 pin_mappings: Optional[Dict[int, Dict]] = None, 

136 store=None, 

137 ): 

138 """ 

139 Args: 

140 pin_mappings: {pin: {'sensor_id': 'prox_0', 'sensor_type': 'proximity', 

141 'active_low': True, 'distance_m': 0.01}} 

142 """ 

143 self._store = store or get_sensor_store() 

144 self._pin_mappings = pin_mappings or {} 

145 

146 def on_pin_change(self, pin: int, value: int): 

147 """Called when a GPIO pin changes state. 

148 

149 Typically hooked into the existing GPIOAdapter's event system. 

150 """ 

151 mapping = self._pin_mappings.get(pin) 

152 if not mapping: 

153 return 

154 

155 active_low = mapping.get('active_low', True) 

156 is_active = (value == 0) if active_low else (value == 1) 

157 

158 sensor_type = mapping.get('sensor_type', 'contact') 

159 data = {} 

160 

161 if sensor_type == 'proximity': 

162 data['distance_m'] = mapping.get('distance_m', 0.0) if is_active else 999.0 

163 data['object_detected'] = is_active 

164 elif sensor_type == 'contact': 

165 data['is_contact'] = is_active 

166 data['force_n'] = mapping.get('force_n', 1.0) if is_active else 0.0 

167 else: 

168 data['value'] = value 

169 data['active'] = is_active 

170 

171 reading = SensorReading( 

172 sensor_id=mapping.get('sensor_id', f'gpio_{pin}'), 

173 sensor_type=sensor_type, 

174 data=data, 

175 source='gpio', 

176 ) 

177 self._store.put_reading(reading) 

178 

179 

180class ROSSensorBridge: 

181 """Subscribe to ROS 2 sensor topics and convert to SensorReadings. 

182 

183 Extends the existing ROSBridgeAdapter pattern for sensor_msgs types: 

184 Imu, NavSatFix, LaserScan, JointState. 

185 """ 

186 

187 def __init__( 

188 self, 

189 topic_mappings: Optional[Dict[str, Dict]] = None, 

190 store=None, 

191 ): 

192 """ 

193 Args: 

194 topic_mappings: {'/imu/data': {'sensor_id': 'imu_0', 'sensor_type': 'imu'}, 

195 '/gps/fix': {'sensor_id': 'gps_0', 'sensor_type': 'gps'}} 

196 """ 

197 self._store = store or get_sensor_store() 

198 self._topic_mappings = topic_mappings or {} 

199 

200 def on_ros_message(self, topic: str, msg_data: Dict): 

201 """Called when a ROS 2 message is received. 

202 

203 msg_data should be the deserialized message as a dict. 

204 """ 

205 mapping = self._topic_mappings.get(topic) 

206 if not mapping: 

207 return 

208 

209 sensor_type = mapping.get('sensor_type', '') 

210 sensor_id = mapping.get('sensor_id', topic.replace('/', '_').strip('_')) 

211 

212 data = self._extract_sensor_data(sensor_type, msg_data) 

213 if data is None: 

214 return 

215 

216 reading = SensorReading( 

217 sensor_id=sensor_id, 

218 sensor_type=sensor_type, 

219 frame_id=msg_data.get('header', {}).get('frame_id', 'base_link'), 

220 data=data, 

221 source='ros', 

222 ) 

223 self._store.put_reading(reading) 

224 

225 def _extract_sensor_data(self, sensor_type: str, msg: Dict) -> Optional[Dict]: 

226 """Extract sensor-specific data from a ROS message dict.""" 

227 if sensor_type == 'imu': 

228 lin = msg.get('linear_acceleration', {}) 

229 ang = msg.get('angular_velocity', {}) 

230 return { 

231 'accel_x': lin.get('x', 0), 'accel_y': lin.get('y', 0), 

232 'accel_z': lin.get('z', 0), 

233 'gyro_x': ang.get('x', 0), 'gyro_y': ang.get('y', 0), 

234 'gyro_z': ang.get('z', 0), 

235 } 

236 elif sensor_type == 'gps': 

237 return { 

238 'latitude': msg.get('latitude', 0), 

239 'longitude': msg.get('longitude', 0), 

240 'altitude': msg.get('altitude', 0), 

241 } 

242 elif sensor_type == 'lidar': 

243 return { 

244 'ranges': msg.get('ranges', []), 

245 'angle_min': msg.get('angle_min', 0), 

246 'angle_max': msg.get('angle_max', 0), 

247 'angle_increment': msg.get('angle_increment', 0), 

248 } 

249 elif sensor_type == 'encoder': 

250 positions = msg.get('position', []) 

251 velocities = msg.get('velocity', []) 

252 return { 

253 'position_ticks': positions[0] if positions else 0, 

254 'velocity_ticks_per_sec': velocities[0] if velocities else 0, 

255 } 

256 return msg # Pass through for unknown types 

257 

258 

259class WAMPSensorBridge: 

260 """Listen on WAMP topics for IoT sensor payloads.""" 

261 

262 def __init__( 

263 self, 

264 topic_mappings: Optional[Dict[str, Dict]] = None, 

265 store=None, 

266 ): 

267 """ 

268 Args: 

269 topic_mappings: {'com.hart.sensors.imu': {'sensor_id': 'imu_0', 'sensor_type': 'imu'}} 

270 """ 

271 self._store = store or get_sensor_store() 

272 self._topic_mappings = topic_mappings or {} 

273 

274 def on_wamp_event(self, topic: str, payload: Dict): 

275 """Called when a WAMP event is received on a sensor topic.""" 

276 mapping = self._topic_mappings.get(topic) 

277 if not mapping: 

278 return 

279 

280 reading = SensorReading( 

281 sensor_id=mapping.get('sensor_id', topic.split('.')[-1]), 

282 sensor_type=mapping.get('sensor_type', 'unknown'), 

283 data=payload.get('data', payload), 

284 quality=payload.get('quality', 1.0), 

285 source='wamp', 

286 ) 

287 self._store.put_reading(reading)