Coverage for integrations / robotics / sensor_model.py: 97.3%
37 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Sensor Data Model — Unified representation for all sensor types.
4Every sensor reading is a SensorReading dataclass with timestamps,
5frame IDs, covariance, and type-specific data. This is the format
6that flows through SensorStore → WorldModelBridge → HevolveAI.
8HevolveAI's world model operates in one latent space: text, sensors,
9motors are all representations of the same world. This model ensures
10sensor data is structured consistently for that unified space.
12Supported sensor types:
13 imu, gps, encoder, force_torque, proximity, temperature,
14 camera, depth, lidar, contact, battery
15"""
16import time
17from dataclasses import dataclass, field
18from typing import Any, Dict, List, Optional
21# ── Predefined sensor data schemas ──────────────────────────────
23SENSOR_SCHEMAS = {
24 'imu': {
25 'required': [],
26 'optional': [
27 'accel_x', 'accel_y', 'accel_z',
28 'gyro_x', 'gyro_y', 'gyro_z',
29 'mag_x', 'mag_y', 'mag_z',
30 ],
31 },
32 'gps': {
33 'required': ['latitude', 'longitude'],
34 'optional': ['altitude', 'fix_quality', 'hdop', 'num_satellites'],
35 },
36 'encoder': {
37 'required': ['position_ticks'],
38 'optional': ['velocity_ticks_per_sec', 'direction'],
39 },
40 'force_torque': {
41 'required': [],
42 'optional': ['fx', 'fy', 'fz', 'tx', 'ty', 'tz'],
43 },
44 'proximity': {
45 'required': ['distance_m'],
46 'optional': ['angle_rad', 'object_detected'],
47 },
48 'temperature': {
49 'required': ['celsius'],
50 'optional': ['sensor_location'],
51 },
52 'camera': {
53 'required': [],
54 'optional': ['frame_base64', 'width', 'height', 'encoding', 'description'],
55 },
56 'depth': {
57 'required': [],
58 'optional': ['depth_map_base64', 'width', 'height', 'min_depth', 'max_depth'],
59 },
60 'lidar': {
61 'required': [],
62 'optional': [
63 'ranges', 'angle_min', 'angle_max', 'angle_increment',
64 'range_min', 'range_max',
65 ],
66 },
67 'contact': {
68 'required': ['is_contact'],
69 'optional': ['force_n', 'location'],
70 },
71 'battery': {
72 'required': ['voltage'],
73 'optional': ['current_a', 'percentage', 'temperature_c', 'charging'],
74 },
75}
77# Default TTL per sensor type (seconds)
78DEFAULT_TTL = {
79 'imu': 0.5,
80 'gps': 5.0,
81 'encoder': 0.2,
82 'force_torque': 0.5,
83 'proximity': 1.0,
84 'temperature': 30.0,
85 'camera': 2.0,
86 'depth': 2.0,
87 'lidar': 1.0,
88 'contact': 0.5,
89 'battery': 60.0,
90}
93@dataclass
94class SensorReading:
95 """Universal sensor data format.
97 This is the atom of sensor data that flows through the system:
98 SensorStore → WorldModelBridge → HevolveAI latent space.
99 """
100 sensor_id: str # e.g., 'imu_0', 'gps_0', 'lidar_front'
101 sensor_type: str # One of SENSOR_SCHEMAS keys
102 timestamp: float = field(default_factory=time.time)
103 frame_id: str = 'base_link' # Coordinate frame reference
104 data: Dict[str, Any] = field(default_factory=dict)
105 covariance: Optional[List[float]] = None # Flattened uncertainty matrix
106 quality: float = 1.0 # 0.0-1.0, signal quality metric
107 source: str = 'local' # 'local', 'ros', 'serial', 'gpio', 'wamp'
109 def to_dict(self) -> Dict:
110 """Serialize to dict for JSON/gossip transport."""
111 d = {
112 'sensor_id': self.sensor_id,
113 'sensor_type': self.sensor_type,
114 'timestamp': self.timestamp,
115 'frame_id': self.frame_id,
116 'data': self.data,
117 'quality': self.quality,
118 'source': self.source,
119 }
120 if self.covariance is not None:
121 d['covariance'] = self.covariance
122 return d
124 @classmethod
125 def from_dict(cls, d: Dict) -> 'SensorReading':
126 """Deserialize from dict."""
127 return cls(
128 sensor_id=d['sensor_id'],
129 sensor_type=d['sensor_type'],
130 timestamp=d.get('timestamp', time.time()),
131 frame_id=d.get('frame_id', 'base_link'),
132 data=d.get('data', {}),
133 covariance=d.get('covariance'),
134 quality=d.get('quality', 1.0),
135 source=d.get('source', 'local'),
136 )
139def validate_reading(reading: SensorReading) -> bool:
140 """Validate a SensorReading against its type schema.
142 Returns True if the reading has valid structure.
143 Lenient: unknown sensor types are accepted (extensible).
144 """
145 if not reading.sensor_id or not reading.sensor_type:
146 return False
148 if not 0.0 <= reading.quality <= 1.0:
149 return False
151 if reading.timestamp <= 0:
152 return False
154 schema = SENSOR_SCHEMAS.get(reading.sensor_type)
155 if schema is None:
156 return True # Unknown types are valid (extensible)
158 # Check required fields
159 for req in schema.get('required', []):
160 if req not in reading.data:
161 return False
163 return True