Coverage for integrations / robotics / capability_advertiser.py: 91.7%
109 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Robot Capability Advertiser — Discovery + Advertisement for fleet dispatch.
4Answers: "What can this robot DO?" — by combining:
5 1. system_requirements.py HardwareProfile (CPU, RAM, GPU, sensors)
6 2. SensorStore.active_sensors() (what sensors are currently live)
7 3. robot_config.json (static configuration — arm DOF, payload, form factor)
8 4. HevolveAI capability query via WorldModelBridge (what native skills exist)
10This is DISCOVERY, not intelligence. HevolveAI owns the actual capabilities.
11We just ask it what it can do, and advertise that to the fleet.
13The match_score() method lets dispatch route tasks to the right robot:
14 "Navigate to warehouse 3" → match against locomotion capability.
15 "Pick up the box" → match against manipulation.gripper capability.
16"""
17import json
18import logging
19import os
20from typing import Any, Dict, List, Optional
22logger = logging.getLogger('hevolve_robotics')
25class RobotCapabilityAdvertiser:
26 """Discovers and advertises robot capabilities for fleet dispatch.
28 NOT intelligence. Just a structured query of what this node can do.
29 """
31 def __init__(self):
32 self._capabilities: Dict[str, Any] = {}
33 self._detected = False
35 def detect_capabilities(self) -> Dict[str, Any]:
36 """Detect all capabilities from hardware profile, sensors, config, HevolveAI.
38 Returns a structured dict:
39 locomotion: {type, max_speed, ...} or None
40 manipulation: {arms, grippers, dof, ...} or None
41 sensors: {imu: True, gps: True, lidar: False, ...}
42 actuators: [list of actuator IDs]
43 workspace: {x_min, x_max, y_min, y_max, z_min, z_max} or None
44 payload_kg: float or None
45 battery: {voltage, capacity_wh} or None
46 form_factor: str (rover, arm, drone, humanoid, stationary, unknown)
47 native_skills: [list from HevolveAI] or []
48 """
49 caps: Dict[str, Any] = {
50 'locomotion': None,
51 'manipulation': None,
52 'sensors': {},
53 'actuators': [],
54 'workspace': None,
55 'payload_kg': None,
56 'battery': None,
57 'form_factor': 'unknown',
58 'native_skills': [],
59 }
61 # 1. Hardware profile (system_requirements.py)
62 self._detect_from_hardware_profile(caps)
64 # 2. Active sensors (SensorStore)
65 self._detect_from_sensor_store(caps)
67 # 3. Static config (robot_config.json)
68 self._detect_from_config_file(caps)
70 # 4. HevolveAI native capabilities (via WorldModelBridge)
71 self._detect_from_hevolveai(caps)
73 self._capabilities = caps
74 self._detected = True
75 return caps
77 def get_capabilities(self) -> Dict[str, Any]:
78 """Return cached capabilities, detecting if needed."""
79 if not self._detected:
80 self.detect_capabilities()
81 return self._capabilities
83 def get_gossip_payload(self) -> Dict[str, Any]:
84 """Compact capability summary for gossip beacon.
86 Keeps it small for constrained bandwidth profiles.
87 """
88 caps = self.get_capabilities()
89 return {
90 'form_factor': caps.get('form_factor', 'unknown'),
91 'has_locomotion': caps.get('locomotion') is not None,
92 'has_manipulation': caps.get('manipulation') is not None,
93 'sensor_types': list(
94 k for k, v in caps.get('sensors', {}).items() if v
95 ),
96 'native_skill_count': len(caps.get('native_skills', [])),
97 }
99 def matches_task_requirements(self, task_reqs: Dict) -> float:
100 """Score how well this robot matches a task's requirements.
102 Args:
103 task_reqs: Dict with optional keys:
104 required_capabilities: list of strings
105 e.g. ['locomotion', 'gripper', 'gps']
106 preferred_form_factor: str
107 min_payload_kg: float
109 Returns:
110 0.0 (no match) to 1.0 (perfect match)
111 """
112 caps = self.get_capabilities()
114 required = task_reqs.get('required_capabilities', [])
115 if not required:
116 return 0.5 # No requirements = neutral match
118 matched = 0
119 total = len(required)
121 for req in required:
122 if self._has_capability(caps, req):
123 matched += 1
125 if total == 0:
126 return 0.5
128 score = matched / total
130 # Bonus for matching form factor
131 preferred = task_reqs.get('preferred_form_factor')
132 if preferred and caps.get('form_factor') == preferred:
133 score = min(1.0, score + 0.1)
135 # Penalty for insufficient payload
136 min_payload = task_reqs.get('min_payload_kg')
137 if min_payload and caps.get('payload_kg') is not None:
138 if caps['payload_kg'] < min_payload:
139 score *= 0.5
141 return round(score, 2)
143 # ── Private helpers ──────────────────────────────────────────
145 def _has_capability(self, caps: Dict, req: str) -> bool:
146 """Check if a specific capability requirement is met."""
147 req_lower = req.lower()
149 if req_lower == 'locomotion':
150 return caps.get('locomotion') is not None
151 if req_lower in ('manipulation', 'arm'):
152 return caps.get('manipulation') is not None
153 if req_lower == 'gripper':
154 manip = caps.get('manipulation')
155 return manip is not None and manip.get('grippers', 0) > 0
156 # Sensor types
157 if req_lower in ('imu', 'gps', 'lidar', 'camera', 'depth',
158 'encoder', 'force_torque', 'proximity', 'battery'):
159 return caps.get('sensors', {}).get(req_lower, False)
160 # Native skills
161 if req_lower in caps.get('native_skills', []):
162 return True
163 return False
165 def _detect_from_hardware_profile(self, caps: Dict):
166 """Pull sensor detection from system_requirements HardwareProfile."""
167 try:
168 from security.system_requirements import detect_hardware
169 hw = detect_hardware()
170 caps['sensors']['imu'] = getattr(hw, 'has_imu', False)
171 caps['sensors']['gps'] = getattr(hw, 'has_gps', False)
172 caps['sensors']['lidar'] = getattr(hw, 'has_lidar', False)
173 caps['sensors']['camera'] = getattr(hw, 'has_camera', False)
174 except Exception as e:
175 logger.debug(f"Hardware profile detection skipped: {e}")
177 def _detect_from_sensor_store(self, caps: Dict):
178 """Check SensorStore for live sensors."""
179 try:
180 from integrations.robotics.sensor_store import get_sensor_store
181 store = get_sensor_store()
182 for sensor_id, info in store.active_sensors().items():
183 sensor_type = info.get('sensor_type', '')
184 if sensor_type:
185 caps['sensors'][sensor_type] = True
186 except Exception as e:
187 logger.debug(f"SensorStore detection skipped: {e}")
189 def _detect_from_config_file(self, caps: Dict):
190 """Load static robot configuration from robot_config.json."""
191 config_path = os.environ.get(
192 'HEVOLVE_ROBOT_CONFIG',
193 os.path.join('agent_data', 'robot_config.json'),
194 )
195 try:
196 with open(config_path, 'r') as f:
197 config = json.load(f)
198 except (FileNotFoundError, json.JSONDecodeError):
199 return
201 caps['form_factor'] = config.get('form_factor', caps['form_factor'])
202 caps['payload_kg'] = config.get('payload_kg')
204 if 'locomotion' in config:
205 caps['locomotion'] = config['locomotion']
206 if 'manipulation' in config:
207 caps['manipulation'] = config['manipulation']
208 if 'workspace' in config:
209 caps['workspace'] = config['workspace']
210 if 'battery' in config:
211 caps['battery'] = config['battery']
212 if 'actuators' in config:
213 caps['actuators'] = config['actuators']
215 def _detect_from_hevolveai(self, caps: Dict):
216 """Query HevolveAI for native capabilities via WorldModelBridge."""
217 try:
218 from integrations.agent_engine.world_model_bridge import (
219 get_world_model_bridge,
220 )
221 bridge = get_world_model_bridge()
222 health = bridge.check_health()
223 if health and health.get('status') == 'ok':
224 skills = health.get('native_skills', [])
225 if isinstance(skills, list):
226 caps['native_skills'] = skills
227 except Exception as e:
228 logger.debug(f"HevolveAI capability query skipped: {e}")
231# ── Singleton ─────────────────────────────────────────────────
233_advertiser: Optional[RobotCapabilityAdvertiser] = None
236def get_capability_advertiser() -> RobotCapabilityAdvertiser:
237 """Get or create the singleton RobotCapabilityAdvertiser."""
238 global _advertiser
239 if _advertiser is None:
240 _advertiser = RobotCapabilityAdvertiser()
241 return _advertiser