Coverage for integrations / robotics / sensor_store.py: 94.4%
107 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2SensorStore — Thread-safe multi-modal sensor data store.
4Follows the FrameStore pattern (RLock, per-key bounded deques, TTL).
5Stores SensorReadings from all modalities: IMU, GPS, LiDAR, encoders,
6force/torque, proximity, temperature, etc.
8HevolveAI's world model operates in one latent space — this store is the
9buffer between hardware adapters and the WorldModelBridge that feeds
10sensor data into that unified latent space.
12Usage:
13 from integrations.robotics.sensor_store import SensorStore
14 store = SensorStore()
15 store.put_reading(SensorReading(sensor_id='imu_0', sensor_type='imu',
16 data={'accel_x': 0.1, 'accel_y': -9.8}))
17 latest = store.get_latest('imu_0')
18"""
19import logging
20import os
21import threading
22import time
23from collections import deque
24from typing import Dict, List, Optional
26from .sensor_model import SensorReading, DEFAULT_TTL
28logger = logging.getLogger('hevolve_robotics')
30# Opt-in broadcast: sensor data can be high-frequency, so PeerLink
31# broadcast is gated behind an env var. Set HEVOLVE_SENSOR_BROADCAST=true
32# to publish every reading to the MessageBus 'sensor.reading' topic.
33_SENSOR_BROADCAST_ENABLED: Optional[bool] = None
36def _is_sensor_broadcast_enabled() -> bool:
37 """Check HEVOLVE_SENSOR_BROADCAST env var (cached after first call)."""
38 global _SENSOR_BROADCAST_ENABLED
39 if _SENSOR_BROADCAST_ENABLED is None:
40 _SENSOR_BROADCAST_ENABLED = os.environ.get(
41 'HEVOLVE_SENSOR_BROADCAST', ''
42 ).lower() in ('true', '1', 'yes')
43 return _SENSOR_BROADCAST_ENABLED
45# Singleton
46_store = None
47_store_lock = threading.Lock()
50def get_sensor_store() -> 'SensorStore':
51 """Get or create the singleton SensorStore."""
52 global _store
53 if _store is None:
54 with _store_lock:
55 if _store is None:
56 _store = SensorStore()
57 return _store
60class SensorStore:
61 """Thread-safe multi-modal sensor data store.
63 Per-sensor bounded deque with configurable max entries and TTL.
64 Automatic cleanup of expired entries on read.
65 """
67 def __init__(
68 self,
69 max_entries_per_sensor: int = 100,
70 ttl_overrides: Optional[Dict[str, float]] = None,
71 ):
72 self._lock = threading.RLock()
73 self._max_entries = max_entries_per_sensor
74 self._ttl = dict(DEFAULT_TTL)
75 if ttl_overrides:
76 self._ttl.update(ttl_overrides)
77 self._default_ttl = 5.0 # Default for unknown types
79 # Per-sensor storage: sensor_id → deque of SensorReading
80 self._sensors: Dict[str, deque] = {}
81 # Per-sensor counters
82 self._counts: Dict[str, int] = {}
84 def put_reading(self, reading: SensorReading):
85 """Store a sensor reading with auto-cleanup of expired entries.
87 When HEVOLVE_SENSOR_BROADCAST=true, also publishes to the
88 MessageBus 'sensor.reading' topic so peer nodes receive the
89 data over PeerLink (skip_crossbar — sensor data stays local/P2P).
90 """
91 with self._lock:
92 sid = reading.sensor_id
93 if sid not in self._sensors:
94 self._sensors[sid] = deque(maxlen=self._max_entries)
95 self._counts[sid] = 0
96 self._sensors[sid].append(reading)
97 self._counts[sid] = self._counts.get(sid, 0) + 1
99 # Best-effort PeerLink broadcast (same pattern as
100 # safety_monitor._gossip_estop — fire-and-forget, never blocks store)
101 if _is_sensor_broadcast_enabled():
102 try:
103 from core.peer_link.message_bus import get_message_bus
104 bus = get_message_bus()
105 bus.publish(
106 'sensor.reading',
107 {
108 'sensor_id': reading.sensor_id,
109 'sensor_type': reading.sensor_type,
110 'value': reading.data,
111 'timestamp': reading.timestamp,
112 'quality': reading.quality,
113 'source': reading.source,
114 'frame_id': reading.frame_id,
115 },
116 skip_crossbar=True, # Sensor data stays on PeerLink, not Crossbar
117 )
118 except Exception:
119 pass # Broadcast failure must not block sensor storage
121 def get_latest(self, sensor_id: str) -> Optional[SensorReading]:
122 """Get the latest reading for a sensor if within TTL."""
123 with self._lock:
124 buf = self._sensors.get(sensor_id)
125 if not buf:
126 return None
127 reading = buf[-1]
128 ttl = self._ttl.get(reading.sensor_type, self._default_ttl)
129 if time.time() - reading.timestamp > ttl:
130 return None # Expired
131 return reading
133 def get_window(self, sensor_id: str, duration_sec: float) -> List[SensorReading]:
134 """Get all readings within a time window (newest first)."""
135 cutoff = time.time() - duration_sec
136 with self._lock:
137 buf = self._sensors.get(sensor_id)
138 if not buf:
139 return []
140 return [r for r in reversed(buf) if r.timestamp >= cutoff]
142 def get_all_latest(self) -> Dict[str, SensorReading]:
143 """Snapshot of latest reading per sensor (respecting TTL)."""
144 result = {}
145 now = time.time()
146 with self._lock:
147 for sid, buf in self._sensors.items():
148 if buf:
149 reading = buf[-1]
150 ttl = self._ttl.get(reading.sensor_type, self._default_ttl)
151 if now - reading.timestamp <= ttl:
152 result[sid] = reading
153 return result
155 def active_sensors(self) -> List[str]:
156 """List sensor IDs with recent (non-expired) readings."""
157 return list(self.get_all_latest().keys())
159 def has_sensor(self, sensor_id: str) -> bool:
160 """Check if a sensor has any readings (expired or not)."""
161 with self._lock:
162 buf = self._sensors.get(sensor_id)
163 return bool(buf)
165 def get_ttl(self, sensor_type: str) -> float:
166 """Get the TTL for a sensor type."""
167 return self._ttl.get(sensor_type, self._default_ttl)
169 def set_ttl(self, sensor_type: str, ttl: float):
170 """Override TTL for a sensor type."""
171 with self._lock:
172 self._ttl[sensor_type] = ttl
174 def stats(self) -> Dict:
175 """Per-sensor statistics: count, rate, staleness, active."""
176 now = time.time()
177 result = {}
178 with self._lock:
179 for sid, buf in self._sensors.items():
180 if not buf:
181 continue
182 latest = buf[-1]
183 ttl = self._ttl.get(latest.sensor_type, self._default_ttl)
184 staleness = now - latest.timestamp
185 # Estimate rate from last N readings
186 rate = 0.0
187 if len(buf) >= 2:
188 span = buf[-1].timestamp - buf[0].timestamp
189 if span > 0:
190 rate = (len(buf) - 1) / span
192 result[sid] = {
193 'sensor_type': latest.sensor_type,
194 'total_count': self._counts.get(sid, 0),
195 'buffered': len(buf),
196 'staleness_sec': round(staleness, 3),
197 'rate_hz': round(rate, 1),
198 'active': staleness <= ttl,
199 'source': latest.source,
200 }
201 return result
203 def clear(self, sensor_id: Optional[str] = None):
204 """Clear readings for a specific sensor or all sensors."""
205 with self._lock:
206 if sensor_id:
207 self._sensors.pop(sensor_id, None)
208 self._counts.pop(sensor_id, None)
209 else:
210 self._sensors.clear()
211 self._counts.clear()