Coverage for integrations / robotics / sensor_model.py: 97.3%

37 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Sensor Data Model — Unified representation for all sensor types. 

3 

4Every sensor reading is a SensorReading dataclass with timestamps, 

5frame IDs, covariance, and type-specific data. This is the format 

6that flows through SensorStore → WorldModelBridge → HevolveAI. 

7 

8HevolveAI's world model operates in one latent space: text, sensors, 

9motors are all representations of the same world. This model ensures 

10sensor data is structured consistently for that unified space. 

11 

12Supported sensor types: 

13 imu, gps, encoder, force_torque, proximity, temperature, 

14 camera, depth, lidar, contact, battery 

15""" 

16import time 

17from dataclasses import dataclass, field 

18from typing import Any, Dict, List, Optional 

19 

20 

21# ── Predefined sensor data schemas ────────────────────────────── 

22 

23SENSOR_SCHEMAS = { 

24 'imu': { 

25 'required': [], 

26 'optional': [ 

27 'accel_x', 'accel_y', 'accel_z', 

28 'gyro_x', 'gyro_y', 'gyro_z', 

29 'mag_x', 'mag_y', 'mag_z', 

30 ], 

31 }, 

32 'gps': { 

33 'required': ['latitude', 'longitude'], 

34 'optional': ['altitude', 'fix_quality', 'hdop', 'num_satellites'], 

35 }, 

36 'encoder': { 

37 'required': ['position_ticks'], 

38 'optional': ['velocity_ticks_per_sec', 'direction'], 

39 }, 

40 'force_torque': { 

41 'required': [], 

42 'optional': ['fx', 'fy', 'fz', 'tx', 'ty', 'tz'], 

43 }, 

44 'proximity': { 

45 'required': ['distance_m'], 

46 'optional': ['angle_rad', 'object_detected'], 

47 }, 

48 'temperature': { 

49 'required': ['celsius'], 

50 'optional': ['sensor_location'], 

51 }, 

52 'camera': { 

53 'required': [], 

54 'optional': ['frame_base64', 'width', 'height', 'encoding', 'description'], 

55 }, 

56 'depth': { 

57 'required': [], 

58 'optional': ['depth_map_base64', 'width', 'height', 'min_depth', 'max_depth'], 

59 }, 

60 'lidar': { 

61 'required': [], 

62 'optional': [ 

63 'ranges', 'angle_min', 'angle_max', 'angle_increment', 

64 'range_min', 'range_max', 

65 ], 

66 }, 

67 'contact': { 

68 'required': ['is_contact'], 

69 'optional': ['force_n', 'location'], 

70 }, 

71 'battery': { 

72 'required': ['voltage'], 

73 'optional': ['current_a', 'percentage', 'temperature_c', 'charging'], 

74 }, 

75} 

76 

77# Default TTL per sensor type (seconds) 

78DEFAULT_TTL = { 

79 'imu': 0.5, 

80 'gps': 5.0, 

81 'encoder': 0.2, 

82 'force_torque': 0.5, 

83 'proximity': 1.0, 

84 'temperature': 30.0, 

85 'camera': 2.0, 

86 'depth': 2.0, 

87 'lidar': 1.0, 

88 'contact': 0.5, 

89 'battery': 60.0, 

90} 

91 

92 

93@dataclass 

94class SensorReading: 

95 """Universal sensor data format. 

96 

97 This is the atom of sensor data that flows through the system: 

98 SensorStore → WorldModelBridge → HevolveAI latent space. 

99 """ 

100 sensor_id: str # e.g., 'imu_0', 'gps_0', 'lidar_front' 

101 sensor_type: str # One of SENSOR_SCHEMAS keys 

102 timestamp: float = field(default_factory=time.time) 

103 frame_id: str = 'base_link' # Coordinate frame reference 

104 data: Dict[str, Any] = field(default_factory=dict) 

105 covariance: Optional[List[float]] = None # Flattened uncertainty matrix 

106 quality: float = 1.0 # 0.0-1.0, signal quality metric 

107 source: str = 'local' # 'local', 'ros', 'serial', 'gpio', 'wamp' 

108 

109 def to_dict(self) -> Dict: 

110 """Serialize to dict for JSON/gossip transport.""" 

111 d = { 

112 'sensor_id': self.sensor_id, 

113 'sensor_type': self.sensor_type, 

114 'timestamp': self.timestamp, 

115 'frame_id': self.frame_id, 

116 'data': self.data, 

117 'quality': self.quality, 

118 'source': self.source, 

119 } 

120 if self.covariance is not None: 

121 d['covariance'] = self.covariance 

122 return d 

123 

124 @classmethod 

125 def from_dict(cls, d: Dict) -> 'SensorReading': 

126 """Deserialize from dict.""" 

127 return cls( 

128 sensor_id=d['sensor_id'], 

129 sensor_type=d['sensor_type'], 

130 timestamp=d.get('timestamp', time.time()), 

131 frame_id=d.get('frame_id', 'base_link'), 

132 data=d.get('data', {}), 

133 covariance=d.get('covariance'), 

134 quality=d.get('quality', 1.0), 

135 source=d.get('source', 'local'), 

136 ) 

137 

138 

139def validate_reading(reading: SensorReading) -> bool: 

140 """Validate a SensorReading against its type schema. 

141 

142 Returns True if the reading has valid structure. 

143 Lenient: unknown sensor types are accepted (extensible). 

144 """ 

145 if not reading.sensor_id or not reading.sensor_type: 

146 return False 

147 

148 if not 0.0 <= reading.quality <= 1.0: 

149 return False 

150 

151 if reading.timestamp <= 0: 

152 return False 

153 

154 schema = SENSOR_SCHEMAS.get(reading.sensor_type) 

155 if schema is None: 

156 return True # Unknown types are valid (extensible) 

157 

158 # Check required fields 

159 for req in schema.get('required', []): 

160 if req not in reading.data: 

161 return False 

162 

163 return True