Coverage for integrations / robotics / capability_advertiser.py: 91.7%

109 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Robot Capability Advertiser — Discovery + Advertisement for fleet dispatch. 

3 

4Answers: "What can this robot DO?" — by combining: 

5 1. system_requirements.py HardwareProfile (CPU, RAM, GPU, sensors) 

6 2. SensorStore.active_sensors() (what sensors are currently live) 

7 3. robot_config.json (static configuration — arm DOF, payload, form factor) 

8 4. HevolveAI capability query via WorldModelBridge (what native skills exist) 

9 

10This is DISCOVERY, not intelligence. HevolveAI owns the actual capabilities. 

11We just ask it what it can do, and advertise that to the fleet. 

12 

13The match_score() method lets dispatch route tasks to the right robot: 

14 "Navigate to warehouse 3" → match against locomotion capability. 

15 "Pick up the box" → match against manipulation.gripper capability. 

16""" 

17import json 

18import logging 

19import os 

20from typing import Any, Dict, List, Optional 

21 

22logger = logging.getLogger('hevolve_robotics') 

23 

24 

25class RobotCapabilityAdvertiser: 

26 """Discovers and advertises robot capabilities for fleet dispatch. 

27 

28 NOT intelligence. Just a structured query of what this node can do. 

29 """ 

30 

31 def __init__(self): 

32 self._capabilities: Dict[str, Any] = {} 

33 self._detected = False 

34 

35 def detect_capabilities(self) -> Dict[str, Any]: 

36 """Detect all capabilities from hardware profile, sensors, config, HevolveAI. 

37 

38 Returns a structured dict: 

39 locomotion: {type, max_speed, ...} or None 

40 manipulation: {arms, grippers, dof, ...} or None 

41 sensors: {imu: True, gps: True, lidar: False, ...} 

42 actuators: [list of actuator IDs] 

43 workspace: {x_min, x_max, y_min, y_max, z_min, z_max} or None 

44 payload_kg: float or None 

45 battery: {voltage, capacity_wh} or None 

46 form_factor: str (rover, arm, drone, humanoid, stationary, unknown) 

47 native_skills: [list from HevolveAI] or [] 

48 """ 

49 caps: Dict[str, Any] = { 

50 'locomotion': None, 

51 'manipulation': None, 

52 'sensors': {}, 

53 'actuators': [], 

54 'workspace': None, 

55 'payload_kg': None, 

56 'battery': None, 

57 'form_factor': 'unknown', 

58 'native_skills': [], 

59 } 

60 

61 # 1. Hardware profile (system_requirements.py) 

62 self._detect_from_hardware_profile(caps) 

63 

64 # 2. Active sensors (SensorStore) 

65 self._detect_from_sensor_store(caps) 

66 

67 # 3. Static config (robot_config.json) 

68 self._detect_from_config_file(caps) 

69 

70 # 4. HevolveAI native capabilities (via WorldModelBridge) 

71 self._detect_from_hevolveai(caps) 

72 

73 self._capabilities = caps 

74 self._detected = True 

75 return caps 

76 

77 def get_capabilities(self) -> Dict[str, Any]: 

78 """Return cached capabilities, detecting if needed.""" 

79 if not self._detected: 

80 self.detect_capabilities() 

81 return self._capabilities 

82 

83 def get_gossip_payload(self) -> Dict[str, Any]: 

84 """Compact capability summary for gossip beacon. 

85 

86 Keeps it small for constrained bandwidth profiles. 

87 """ 

88 caps = self.get_capabilities() 

89 return { 

90 'form_factor': caps.get('form_factor', 'unknown'), 

91 'has_locomotion': caps.get('locomotion') is not None, 

92 'has_manipulation': caps.get('manipulation') is not None, 

93 'sensor_types': list( 

94 k for k, v in caps.get('sensors', {}).items() if v 

95 ), 

96 'native_skill_count': len(caps.get('native_skills', [])), 

97 } 

98 

99 def matches_task_requirements(self, task_reqs: Dict) -> float: 

100 """Score how well this robot matches a task's requirements. 

101 

102 Args: 

103 task_reqs: Dict with optional keys: 

104 required_capabilities: list of strings 

105 e.g. ['locomotion', 'gripper', 'gps'] 

106 preferred_form_factor: str 

107 min_payload_kg: float 

108 

109 Returns: 

110 0.0 (no match) to 1.0 (perfect match) 

111 """ 

112 caps = self.get_capabilities() 

113 

114 required = task_reqs.get('required_capabilities', []) 

115 if not required: 

116 return 0.5 # No requirements = neutral match 

117 

118 matched = 0 

119 total = len(required) 

120 

121 for req in required: 

122 if self._has_capability(caps, req): 

123 matched += 1 

124 

125 if total == 0: 

126 return 0.5 

127 

128 score = matched / total 

129 

130 # Bonus for matching form factor 

131 preferred = task_reqs.get('preferred_form_factor') 

132 if preferred and caps.get('form_factor') == preferred: 

133 score = min(1.0, score + 0.1) 

134 

135 # Penalty for insufficient payload 

136 min_payload = task_reqs.get('min_payload_kg') 

137 if min_payload and caps.get('payload_kg') is not None: 

138 if caps['payload_kg'] < min_payload: 

139 score *= 0.5 

140 

141 return round(score, 2) 

142 

143 # ── Private helpers ────────────────────────────────────────── 

144 

145 def _has_capability(self, caps: Dict, req: str) -> bool: 

146 """Check if a specific capability requirement is met.""" 

147 req_lower = req.lower() 

148 

149 if req_lower == 'locomotion': 

150 return caps.get('locomotion') is not None 

151 if req_lower in ('manipulation', 'arm'): 

152 return caps.get('manipulation') is not None 

153 if req_lower == 'gripper': 

154 manip = caps.get('manipulation') 

155 return manip is not None and manip.get('grippers', 0) > 0 

156 # Sensor types 

157 if req_lower in ('imu', 'gps', 'lidar', 'camera', 'depth', 

158 'encoder', 'force_torque', 'proximity', 'battery'): 

159 return caps.get('sensors', {}).get(req_lower, False) 

160 # Native skills 

161 if req_lower in caps.get('native_skills', []): 

162 return True 

163 return False 

164 

165 def _detect_from_hardware_profile(self, caps: Dict): 

166 """Pull sensor detection from system_requirements HardwareProfile.""" 

167 try: 

168 from security.system_requirements import detect_hardware 

169 hw = detect_hardware() 

170 caps['sensors']['imu'] = getattr(hw, 'has_imu', False) 

171 caps['sensors']['gps'] = getattr(hw, 'has_gps', False) 

172 caps['sensors']['lidar'] = getattr(hw, 'has_lidar', False) 

173 caps['sensors']['camera'] = getattr(hw, 'has_camera', False) 

174 except Exception as e: 

175 logger.debug(f"Hardware profile detection skipped: {e}") 

176 

177 def _detect_from_sensor_store(self, caps: Dict): 

178 """Check SensorStore for live sensors.""" 

179 try: 

180 from integrations.robotics.sensor_store import get_sensor_store 

181 store = get_sensor_store() 

182 for sensor_id, info in store.active_sensors().items(): 

183 sensor_type = info.get('sensor_type', '') 

184 if sensor_type: 

185 caps['sensors'][sensor_type] = True 

186 except Exception as e: 

187 logger.debug(f"SensorStore detection skipped: {e}") 

188 

189 def _detect_from_config_file(self, caps: Dict): 

190 """Load static robot configuration from robot_config.json.""" 

191 config_path = os.environ.get( 

192 'HEVOLVE_ROBOT_CONFIG', 

193 os.path.join('agent_data', 'robot_config.json'), 

194 ) 

195 try: 

196 with open(config_path, 'r') as f: 

197 config = json.load(f) 

198 except (FileNotFoundError, json.JSONDecodeError): 

199 return 

200 

201 caps['form_factor'] = config.get('form_factor', caps['form_factor']) 

202 caps['payload_kg'] = config.get('payload_kg') 

203 

204 if 'locomotion' in config: 

205 caps['locomotion'] = config['locomotion'] 

206 if 'manipulation' in config: 

207 caps['manipulation'] = config['manipulation'] 

208 if 'workspace' in config: 

209 caps['workspace'] = config['workspace'] 

210 if 'battery' in config: 

211 caps['battery'] = config['battery'] 

212 if 'actuators' in config: 

213 caps['actuators'] = config['actuators'] 

214 

215 def _detect_from_hevolveai(self, caps: Dict): 

216 """Query HevolveAI for native capabilities via WorldModelBridge.""" 

217 try: 

218 from integrations.agent_engine.world_model_bridge import ( 

219 get_world_model_bridge, 

220 ) 

221 bridge = get_world_model_bridge() 

222 health = bridge.check_health() 

223 if health and health.get('status') == 'ok': 

224 skills = health.get('native_skills', []) 

225 if isinstance(skills, list): 

226 caps['native_skills'] = skills 

227 except Exception as e: 

228 logger.debug(f"HevolveAI capability query skipped: {e}") 

229 

230 

231# ── Singleton ───────────────────────────────────────────────── 

232 

233_advertiser: Optional[RobotCapabilityAdvertiser] = None 

234 

235 

236def get_capability_advertiser() -> RobotCapabilityAdvertiser: 

237 """Get or create the singleton RobotCapabilityAdvertiser.""" 

238 global _advertiser 

239 if _advertiser is None: 

240 _advertiser = RobotCapabilityAdvertiser() 

241 return _advertiser