Coverage for integrations / robotics / robot_tools.py: 74.7%
91 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Robot AutoGen Tools — Agentic interface to physical actions.
4Every tool is a thin routing wrapper:
5 validate input → WorldModelBridge → return result
7NO intelligence here. No path planning. No sensor fusion.
8HevolveAI owns those. These tools let an LLM agent say "go there"
9and the bridge routes it to HevolveAI which figures out how.
11Tool categories:
12 - Query: get_robot_capabilities, read_sensor, get_sensor_window, get_robot_status
13 - Action: navigate_to, move_joint, execute_motion_sequence
14 - Config: configure_sensor
15"""
16import json
17import logging
18from typing import Any, Dict
20logger = logging.getLogger('hevolve_robotics')
23def get_robot_capabilities(**kwargs) -> str:
24 """Get this robot's advertised capabilities.
26 Returns JSON with: locomotion, manipulation, sensors, actuators,
27 form_factor, native_skills, workspace, payload_kg, battery.
28 """
29 try:
30 from integrations.robotics.capability_advertiser import (
31 get_capability_advertiser,
32 )
33 adv = get_capability_advertiser()
34 caps = adv.get_capabilities()
35 return json.dumps(caps, default=str)
36 except Exception as e:
37 return json.dumps({'error': str(e)})
40def read_sensor(sensor_id: str = '', **kwargs) -> str:
41 """Read the latest value from a specific sensor.
43 Args:
44 sensor_id: The sensor identifier (e.g. 'imu_0', 'gps_0').
46 Returns JSON with the latest SensorReading or error.
47 """
48 if not sensor_id:
49 return json.dumps({'error': 'sensor_id is required'})
50 try:
51 from integrations.robotics.sensor_store import get_sensor_store
52 store = get_sensor_store()
53 reading = store.get_latest(sensor_id)
54 if reading is None:
55 return json.dumps({'error': f'No data for sensor {sensor_id}'})
56 return json.dumps(reading.to_dict())
57 except Exception as e:
58 return json.dumps({'error': str(e)})
61def get_sensor_window(sensor_id: str = '', duration_s: float = 1.0,
62 **kwargs) -> str:
63 """Get a time window of sensor readings.
65 Args:
66 sensor_id: The sensor identifier.
67 duration_s: Window duration in seconds (default 1.0).
69 Returns JSON list of SensorReading dicts.
70 """
71 if not sensor_id:
72 return json.dumps({'error': 'sensor_id is required'})
73 try:
74 from integrations.robotics.sensor_store import get_sensor_store
75 store = get_sensor_store()
76 readings = store.get_window(sensor_id, duration_s)
77 return json.dumps([r.to_dict() for r in readings])
78 except Exception as e:
79 return json.dumps({'error': str(e)})
82def get_robot_status(**kwargs) -> str:
83 """Get overall robot status: safety, sensors, bridge health.
85 Returns JSON with safety_status, active_sensors, bridge_stats.
86 """
87 status: Dict[str, Any] = {}
89 # Safety
90 try:
91 from integrations.robotics.safety_monitor import get_safety_monitor
92 monitor = get_safety_monitor()
93 status['safety'] = monitor.get_safety_status()
94 except Exception:
95 status['safety'] = {'available': False}
97 # Sensors
98 try:
99 from integrations.robotics.sensor_store import get_sensor_store
100 store = get_sensor_store()
101 status['active_sensors'] = list(store.active_sensors().keys())
102 status['sensor_stats'] = store.stats()
103 except Exception:
104 status['active_sensors'] = []
106 # Bridge
107 try:
108 from integrations.agent_engine.world_model_bridge import (
109 get_world_model_bridge,
110 )
111 bridge = get_world_model_bridge()
112 status['bridge'] = bridge.get_stats()
113 except Exception:
114 status['bridge'] = {'available': False}
116 return json.dumps(status, default=str)
119def navigate_to(x: float = 0.0, y: float = 0.0, z: float = 0.0,
120 **kwargs) -> str:
121 """Send a navigate_to action through the world model bridge.
123 HevolveAI owns the actual path planning, obstacle avoidance, and
124 motor control. This tool just says "go to (x, y, z)".
126 Args:
127 x, y, z: Target position in robot frame (meters).
129 Returns JSON with success status.
130 """
131 try:
132 from integrations.agent_engine.world_model_bridge import (
133 get_world_model_bridge,
134 )
135 bridge = get_world_model_bridge()
136 result = bridge.send_action({
137 'type': 'navigate_to',
138 'target': 'base',
139 'params': {'x': float(x), 'y': float(y), 'z': float(z)},
140 })
141 return json.dumps({'success': result})
142 except Exception as e:
143 return json.dumps({'error': str(e)})
146def move_joint(joint_id: str = '', position: float = 0.0,
147 velocity: float = 0.0, **kwargs) -> str:
148 """Send a joint move command through the world model bridge.
150 HevolveAI owns the actual kinematics and PID control.
151 This tool just says "move joint X to position Y".
153 Args:
154 joint_id: Joint identifier (e.g. 'shoulder', 'elbow', 'wrist').
155 position: Target position (radians or meters depending on joint type).
156 velocity: Optional velocity limit.
158 Returns JSON with success status.
159 """
160 if not joint_id:
161 return json.dumps({'error': 'joint_id is required'})
162 try:
163 from integrations.agent_engine.world_model_bridge import (
164 get_world_model_bridge,
165 )
166 bridge = get_world_model_bridge()
167 params = {'position': float(position)}
168 if velocity:
169 params['velocity'] = float(velocity)
170 result = bridge.send_action({
171 'type': 'servo_position',
172 'target': joint_id,
173 'params': params,
174 })
175 return json.dumps({'success': result})
176 except Exception as e:
177 return json.dumps({'error': str(e)})
180def execute_motion_sequence(steps: str = '[]', **kwargs) -> str:
181 """Execute a sequence of actions through the world model bridge.
183 Each step is sent in order. HevolveAI handles the actual execution,
184 timing, and real-time adaptation (pause on obstacle, etc.).
186 Args:
187 steps: JSON string — list of action dicts, each with
188 {type, target, params}.
190 Returns JSON with results per step.
191 """
192 try:
193 step_list = json.loads(steps) if isinstance(steps, str) else steps
194 except json.JSONDecodeError:
195 return json.dumps({'error': 'steps must be valid JSON array'})
197 if not isinstance(step_list, list) or not step_list:
198 return json.dumps({'error': 'steps must be a non-empty list'})
200 try:
201 from integrations.agent_engine.world_model_bridge import (
202 get_world_model_bridge,
203 )
204 bridge = get_world_model_bridge()
205 results = []
206 for i, step in enumerate(step_list):
207 ok = bridge.send_action(step)
208 results.append({'step': i, 'success': ok})
209 if not ok:
210 break # Stop sequence on failure
211 return json.dumps({'results': results})
212 except Exception as e:
213 return json.dumps({'error': str(e)})
216def configure_sensor(sensor_id: str = '', config: str = '{}',
217 **kwargs) -> str:
218 """Configure a sensor's parameters via the world model bridge.
220 The actual sensor configuration is handled by HevolveAI's native layer.
221 This tool routes the configuration request through the bridge.
223 Args:
224 sensor_id: The sensor to configure.
225 config: JSON string with configuration parameters.
227 Returns JSON with success status.
228 """
229 if not sensor_id:
230 return json.dumps({'error': 'sensor_id is required'})
231 try:
232 cfg = json.loads(config) if isinstance(config, str) else config
233 except json.JSONDecodeError:
234 return json.dumps({'error': 'config must be valid JSON'})
236 try:
237 from integrations.agent_engine.world_model_bridge import (
238 get_world_model_bridge,
239 )
240 bridge = get_world_model_bridge()
241 result = bridge.send_action({
242 'type': 'sensor_config',
243 'target': sensor_id,
244 'params': cfg,
245 })
246 return json.dumps({'success': result})
247 except Exception as e:
248 return json.dumps({'error': str(e)})
251# ── Tool registration list for AutoGen ─────────────────────────
253ROBOT_TOOLS = [
254 {
255 'function': get_robot_capabilities,
256 'name': 'get_robot_capabilities',
257 'description': (
258 'Get this robot\'s hardware capabilities: locomotion, '
259 'manipulation, sensors, actuators, form factor, native skills.'
260 ),
261 },
262 {
263 'function': read_sensor,
264 'name': 'read_sensor',
265 'description': 'Read the latest value from a specific sensor by ID.',
266 },
267 {
268 'function': get_sensor_window,
269 'name': 'get_sensor_window',
270 'description': (
271 'Get a time window of sensor readings for analysis. '
272 'Returns the last N seconds of data.'
273 ),
274 },
275 {
276 'function': get_robot_status,
277 'name': 'get_robot_status',
278 'description': (
279 'Get overall robot status: safety state, active sensors, '
280 'bridge health, and capability summary.'
281 ),
282 },
283 {
284 'function': navigate_to,
285 'name': 'navigate_to',
286 'description': (
287 'Navigate the robot to a target position (x, y, z). '
288 'Path planning and obstacle avoidance are handled by the '
289 'native embodiment layer.'
290 ),
291 },
292 {
293 'function': move_joint,
294 'name': 'move_joint',
295 'description': (
296 'Move a specific joint to a target position. '
297 'Kinematics and PID control are handled natively.'
298 ),
299 },
300 {
301 'function': execute_motion_sequence,
302 'name': 'execute_motion_sequence',
303 'description': (
304 'Execute a sequence of motor actions in order. '
305 'Each step is sent to the native layer for execution.'
306 ),
307 },
308 {
309 'function': configure_sensor,
310 'name': 'configure_sensor',
311 'description': (
312 'Configure a sensor\'s parameters (sample rate, range, etc). '
313 'Configuration is applied by the native layer.'
314 ),
315 },
316]