Coverage for integrations / robotics / sensor_store.py: 94.4%

107 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2SensorStore — Thread-safe multi-modal sensor data store. 

3 

4Follows the FrameStore pattern (RLock, per-key bounded deques, TTL). 

5Stores SensorReadings from all modalities: IMU, GPS, LiDAR, encoders, 

6force/torque, proximity, temperature, etc. 

7 

8HevolveAI's world model operates in one latent space — this store is the 

9buffer between hardware adapters and the WorldModelBridge that feeds 

10sensor data into that unified latent space. 

11 

12Usage: 

13 from integrations.robotics.sensor_store import SensorStore 

14 store = SensorStore() 

15 store.put_reading(SensorReading(sensor_id='imu_0', sensor_type='imu', 

16 data={'accel_x': 0.1, 'accel_y': -9.8})) 

17 latest = store.get_latest('imu_0') 

18""" 

19import logging 

20import os 

21import threading 

22import time 

23from collections import deque 

24from typing import Dict, List, Optional 

25 

26from .sensor_model import SensorReading, DEFAULT_TTL 

27 

28logger = logging.getLogger('hevolve_robotics') 

29 

30# Opt-in broadcast: sensor data can be high-frequency, so PeerLink 

31# broadcast is gated behind an env var. Set HEVOLVE_SENSOR_BROADCAST=true 

32# to publish every reading to the MessageBus 'sensor.reading' topic. 

33_SENSOR_BROADCAST_ENABLED: Optional[bool] = None 

34 

35 

36def _is_sensor_broadcast_enabled() -> bool: 

37 """Check HEVOLVE_SENSOR_BROADCAST env var (cached after first call).""" 

38 global _SENSOR_BROADCAST_ENABLED 

39 if _SENSOR_BROADCAST_ENABLED is None: 

40 _SENSOR_BROADCAST_ENABLED = os.environ.get( 

41 'HEVOLVE_SENSOR_BROADCAST', '' 

42 ).lower() in ('true', '1', 'yes') 

43 return _SENSOR_BROADCAST_ENABLED 

44 

45# Singleton 

46_store = None 

47_store_lock = threading.Lock() 

48 

49 

50def get_sensor_store() -> 'SensorStore': 

51 """Get or create the singleton SensorStore.""" 

52 global _store 

53 if _store is None: 

54 with _store_lock: 

55 if _store is None: 

56 _store = SensorStore() 

57 return _store 

58 

59 

60class SensorStore: 

61 """Thread-safe multi-modal sensor data store. 

62 

63 Per-sensor bounded deque with configurable max entries and TTL. 

64 Automatic cleanup of expired entries on read. 

65 """ 

66 

67 def __init__( 

68 self, 

69 max_entries_per_sensor: int = 100, 

70 ttl_overrides: Optional[Dict[str, float]] = None, 

71 ): 

72 self._lock = threading.RLock() 

73 self._max_entries = max_entries_per_sensor 

74 self._ttl = dict(DEFAULT_TTL) 

75 if ttl_overrides: 

76 self._ttl.update(ttl_overrides) 

77 self._default_ttl = 5.0 # Default for unknown types 

78 

79 # Per-sensor storage: sensor_id → deque of SensorReading 

80 self._sensors: Dict[str, deque] = {} 

81 # Per-sensor counters 

82 self._counts: Dict[str, int] = {} 

83 

84 def put_reading(self, reading: SensorReading): 

85 """Store a sensor reading with auto-cleanup of expired entries. 

86 

87 When HEVOLVE_SENSOR_BROADCAST=true, also publishes to the 

88 MessageBus 'sensor.reading' topic so peer nodes receive the 

89 data over PeerLink (skip_crossbar — sensor data stays local/P2P). 

90 """ 

91 with self._lock: 

92 sid = reading.sensor_id 

93 if sid not in self._sensors: 

94 self._sensors[sid] = deque(maxlen=self._max_entries) 

95 self._counts[sid] = 0 

96 self._sensors[sid].append(reading) 

97 self._counts[sid] = self._counts.get(sid, 0) + 1 

98 

99 # Best-effort PeerLink broadcast (same pattern as 

100 # safety_monitor._gossip_estop — fire-and-forget, never blocks store) 

101 if _is_sensor_broadcast_enabled(): 

102 try: 

103 from core.peer_link.message_bus import get_message_bus 

104 bus = get_message_bus() 

105 bus.publish( 

106 'sensor.reading', 

107 { 

108 'sensor_id': reading.sensor_id, 

109 'sensor_type': reading.sensor_type, 

110 'value': reading.data, 

111 'timestamp': reading.timestamp, 

112 'quality': reading.quality, 

113 'source': reading.source, 

114 'frame_id': reading.frame_id, 

115 }, 

116 skip_crossbar=True, # Sensor data stays on PeerLink, not Crossbar 

117 ) 

118 except Exception: 

119 pass # Broadcast failure must not block sensor storage 

120 

121 def get_latest(self, sensor_id: str) -> Optional[SensorReading]: 

122 """Get the latest reading for a sensor if within TTL.""" 

123 with self._lock: 

124 buf = self._sensors.get(sensor_id) 

125 if not buf: 

126 return None 

127 reading = buf[-1] 

128 ttl = self._ttl.get(reading.sensor_type, self._default_ttl) 

129 if time.time() - reading.timestamp > ttl: 

130 return None # Expired 

131 return reading 

132 

133 def get_window(self, sensor_id: str, duration_sec: float) -> List[SensorReading]: 

134 """Get all readings within a time window (newest first).""" 

135 cutoff = time.time() - duration_sec 

136 with self._lock: 

137 buf = self._sensors.get(sensor_id) 

138 if not buf: 

139 return [] 

140 return [r for r in reversed(buf) if r.timestamp >= cutoff] 

141 

142 def get_all_latest(self) -> Dict[str, SensorReading]: 

143 """Snapshot of latest reading per sensor (respecting TTL).""" 

144 result = {} 

145 now = time.time() 

146 with self._lock: 

147 for sid, buf in self._sensors.items(): 

148 if buf: 

149 reading = buf[-1] 

150 ttl = self._ttl.get(reading.sensor_type, self._default_ttl) 

151 if now - reading.timestamp <= ttl: 

152 result[sid] = reading 

153 return result 

154 

155 def active_sensors(self) -> List[str]: 

156 """List sensor IDs with recent (non-expired) readings.""" 

157 return list(self.get_all_latest().keys()) 

158 

159 def has_sensor(self, sensor_id: str) -> bool: 

160 """Check if a sensor has any readings (expired or not).""" 

161 with self._lock: 

162 buf = self._sensors.get(sensor_id) 

163 return bool(buf) 

164 

165 def get_ttl(self, sensor_type: str) -> float: 

166 """Get the TTL for a sensor type.""" 

167 return self._ttl.get(sensor_type, self._default_ttl) 

168 

169 def set_ttl(self, sensor_type: str, ttl: float): 

170 """Override TTL for a sensor type.""" 

171 with self._lock: 

172 self._ttl[sensor_type] = ttl 

173 

174 def stats(self) -> Dict: 

175 """Per-sensor statistics: count, rate, staleness, active.""" 

176 now = time.time() 

177 result = {} 

178 with self._lock: 

179 for sid, buf in self._sensors.items(): 

180 if not buf: 

181 continue 

182 latest = buf[-1] 

183 ttl = self._ttl.get(latest.sensor_type, self._default_ttl) 

184 staleness = now - latest.timestamp 

185 # Estimate rate from last N readings 

186 rate = 0.0 

187 if len(buf) >= 2: 

188 span = buf[-1].timestamp - buf[0].timestamp 

189 if span > 0: 

190 rate = (len(buf) - 1) / span 

191 

192 result[sid] = { 

193 'sensor_type': latest.sensor_type, 

194 'total_count': self._counts.get(sid, 0), 

195 'buffered': len(buf), 

196 'staleness_sec': round(staleness, 3), 

197 'rate_hz': round(rate, 1), 

198 'active': staleness <= ttl, 

199 'source': latest.source, 

200 } 

201 return result 

202 

203 def clear(self, sensor_id: Optional[str] = None): 

204 """Clear readings for a specific sensor or all sensors.""" 

205 with self._lock: 

206 if sensor_id: 

207 self._sensors.pop(sensor_id, None) 

208 self._counts.pop(sensor_id, None) 

209 else: 

210 self._sensors.clear() 

211 self._counts.clear()