Coverage for integrations / robotics / robot_tools.py: 74.7%

91 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Robot AutoGen Tools — Agentic interface to physical actions. 

3 

4Every tool is a thin routing wrapper: 

5 validate input → WorldModelBridge → return result 

6 

7NO intelligence here. No path planning. No sensor fusion. 

8HevolveAI owns those. These tools let an LLM agent say "go there" 

9and the bridge routes it to HevolveAI which figures out how. 

10 

11Tool categories: 

12 - Query: get_robot_capabilities, read_sensor, get_sensor_window, get_robot_status 

13 - Action: navigate_to, move_joint, execute_motion_sequence 

14 - Config: configure_sensor 

15""" 

16import json 

17import logging 

18from typing import Any, Dict 

19 

20logger = logging.getLogger('hevolve_robotics') 

21 

22 

23def get_robot_capabilities(**kwargs) -> str: 

24 """Get this robot's advertised capabilities. 

25 

26 Returns JSON with: locomotion, manipulation, sensors, actuators, 

27 form_factor, native_skills, workspace, payload_kg, battery. 

28 """ 

29 try: 

30 from integrations.robotics.capability_advertiser import ( 

31 get_capability_advertiser, 

32 ) 

33 adv = get_capability_advertiser() 

34 caps = adv.get_capabilities() 

35 return json.dumps(caps, default=str) 

36 except Exception as e: 

37 return json.dumps({'error': str(e)}) 

38 

39 

40def read_sensor(sensor_id: str = '', **kwargs) -> str: 

41 """Read the latest value from a specific sensor. 

42 

43 Args: 

44 sensor_id: The sensor identifier (e.g. 'imu_0', 'gps_0'). 

45 

46 Returns JSON with the latest SensorReading or error. 

47 """ 

48 if not sensor_id: 

49 return json.dumps({'error': 'sensor_id is required'}) 

50 try: 

51 from integrations.robotics.sensor_store import get_sensor_store 

52 store = get_sensor_store() 

53 reading = store.get_latest(sensor_id) 

54 if reading is None: 

55 return json.dumps({'error': f'No data for sensor {sensor_id}'}) 

56 return json.dumps(reading.to_dict()) 

57 except Exception as e: 

58 return json.dumps({'error': str(e)}) 

59 

60 

61def get_sensor_window(sensor_id: str = '', duration_s: float = 1.0, 

62 **kwargs) -> str: 

63 """Get a time window of sensor readings. 

64 

65 Args: 

66 sensor_id: The sensor identifier. 

67 duration_s: Window duration in seconds (default 1.0). 

68 

69 Returns JSON list of SensorReading dicts. 

70 """ 

71 if not sensor_id: 

72 return json.dumps({'error': 'sensor_id is required'}) 

73 try: 

74 from integrations.robotics.sensor_store import get_sensor_store 

75 store = get_sensor_store() 

76 readings = store.get_window(sensor_id, duration_s) 

77 return json.dumps([r.to_dict() for r in readings]) 

78 except Exception as e: 

79 return json.dumps({'error': str(e)}) 

80 

81 

82def get_robot_status(**kwargs) -> str: 

83 """Get overall robot status: safety, sensors, bridge health. 

84 

85 Returns JSON with safety_status, active_sensors, bridge_stats. 

86 """ 

87 status: Dict[str, Any] = {} 

88 

89 # Safety 

90 try: 

91 from integrations.robotics.safety_monitor import get_safety_monitor 

92 monitor = get_safety_monitor() 

93 status['safety'] = monitor.get_safety_status() 

94 except Exception: 

95 status['safety'] = {'available': False} 

96 

97 # Sensors 

98 try: 

99 from integrations.robotics.sensor_store import get_sensor_store 

100 store = get_sensor_store() 

101 status['active_sensors'] = list(store.active_sensors().keys()) 

102 status['sensor_stats'] = store.stats() 

103 except Exception: 

104 status['active_sensors'] = [] 

105 

106 # Bridge 

107 try: 

108 from integrations.agent_engine.world_model_bridge import ( 

109 get_world_model_bridge, 

110 ) 

111 bridge = get_world_model_bridge() 

112 status['bridge'] = bridge.get_stats() 

113 except Exception: 

114 status['bridge'] = {'available': False} 

115 

116 return json.dumps(status, default=str) 

117 

118 

119def navigate_to(x: float = 0.0, y: float = 0.0, z: float = 0.0, 

120 **kwargs) -> str: 

121 """Send a navigate_to action through the world model bridge. 

122 

123 HevolveAI owns the actual path planning, obstacle avoidance, and 

124 motor control. This tool just says "go to (x, y, z)". 

125 

126 Args: 

127 x, y, z: Target position in robot frame (meters). 

128 

129 Returns JSON with success status. 

130 """ 

131 try: 

132 from integrations.agent_engine.world_model_bridge import ( 

133 get_world_model_bridge, 

134 ) 

135 bridge = get_world_model_bridge() 

136 result = bridge.send_action({ 

137 'type': 'navigate_to', 

138 'target': 'base', 

139 'params': {'x': float(x), 'y': float(y), 'z': float(z)}, 

140 }) 

141 return json.dumps({'success': result}) 

142 except Exception as e: 

143 return json.dumps({'error': str(e)}) 

144 

145 

146def move_joint(joint_id: str = '', position: float = 0.0, 

147 velocity: float = 0.0, **kwargs) -> str: 

148 """Send a joint move command through the world model bridge. 

149 

150 HevolveAI owns the actual kinematics and PID control. 

151 This tool just says "move joint X to position Y". 

152 

153 Args: 

154 joint_id: Joint identifier (e.g. 'shoulder', 'elbow', 'wrist'). 

155 position: Target position (radians or meters depending on joint type). 

156 velocity: Optional velocity limit. 

157 

158 Returns JSON with success status. 

159 """ 

160 if not joint_id: 

161 return json.dumps({'error': 'joint_id is required'}) 

162 try: 

163 from integrations.agent_engine.world_model_bridge import ( 

164 get_world_model_bridge, 

165 ) 

166 bridge = get_world_model_bridge() 

167 params = {'position': float(position)} 

168 if velocity: 

169 params['velocity'] = float(velocity) 

170 result = bridge.send_action({ 

171 'type': 'servo_position', 

172 'target': joint_id, 

173 'params': params, 

174 }) 

175 return json.dumps({'success': result}) 

176 except Exception as e: 

177 return json.dumps({'error': str(e)}) 

178 

179 

180def execute_motion_sequence(steps: str = '[]', **kwargs) -> str: 

181 """Execute a sequence of actions through the world model bridge. 

182 

183 Each step is sent in order. HevolveAI handles the actual execution, 

184 timing, and real-time adaptation (pause on obstacle, etc.). 

185 

186 Args: 

187 steps: JSON string — list of action dicts, each with 

188 {type, target, params}. 

189 

190 Returns JSON with results per step. 

191 """ 

192 try: 

193 step_list = json.loads(steps) if isinstance(steps, str) else steps 

194 except json.JSONDecodeError: 

195 return json.dumps({'error': 'steps must be valid JSON array'}) 

196 

197 if not isinstance(step_list, list) or not step_list: 

198 return json.dumps({'error': 'steps must be a non-empty list'}) 

199 

200 try: 

201 from integrations.agent_engine.world_model_bridge import ( 

202 get_world_model_bridge, 

203 ) 

204 bridge = get_world_model_bridge() 

205 results = [] 

206 for i, step in enumerate(step_list): 

207 ok = bridge.send_action(step) 

208 results.append({'step': i, 'success': ok}) 

209 if not ok: 

210 break # Stop sequence on failure 

211 return json.dumps({'results': results}) 

212 except Exception as e: 

213 return json.dumps({'error': str(e)}) 

214 

215 

216def configure_sensor(sensor_id: str = '', config: str = '{}', 

217 **kwargs) -> str: 

218 """Configure a sensor's parameters via the world model bridge. 

219 

220 The actual sensor configuration is handled by HevolveAI's native layer. 

221 This tool routes the configuration request through the bridge. 

222 

223 Args: 

224 sensor_id: The sensor to configure. 

225 config: JSON string with configuration parameters. 

226 

227 Returns JSON with success status. 

228 """ 

229 if not sensor_id: 

230 return json.dumps({'error': 'sensor_id is required'}) 

231 try: 

232 cfg = json.loads(config) if isinstance(config, str) else config 

233 except json.JSONDecodeError: 

234 return json.dumps({'error': 'config must be valid JSON'}) 

235 

236 try: 

237 from integrations.agent_engine.world_model_bridge import ( 

238 get_world_model_bridge, 

239 ) 

240 bridge = get_world_model_bridge() 

241 result = bridge.send_action({ 

242 'type': 'sensor_config', 

243 'target': sensor_id, 

244 'params': cfg, 

245 }) 

246 return json.dumps({'success': result}) 

247 except Exception as e: 

248 return json.dumps({'error': str(e)}) 

249 

250 

251# ── Tool registration list for AutoGen ───────────────────────── 

252 

253ROBOT_TOOLS = [ 

254 { 

255 'function': get_robot_capabilities, 

256 'name': 'get_robot_capabilities', 

257 'description': ( 

258 'Get this robot\'s hardware capabilities: locomotion, ' 

259 'manipulation, sensors, actuators, form factor, native skills.' 

260 ), 

261 }, 

262 { 

263 'function': read_sensor, 

264 'name': 'read_sensor', 

265 'description': 'Read the latest value from a specific sensor by ID.', 

266 }, 

267 { 

268 'function': get_sensor_window, 

269 'name': 'get_sensor_window', 

270 'description': ( 

271 'Get a time window of sensor readings for analysis. ' 

272 'Returns the last N seconds of data.' 

273 ), 

274 }, 

275 { 

276 'function': get_robot_status, 

277 'name': 'get_robot_status', 

278 'description': ( 

279 'Get overall robot status: safety state, active sensors, ' 

280 'bridge health, and capability summary.' 

281 ), 

282 }, 

283 { 

284 'function': navigate_to, 

285 'name': 'navigate_to', 

286 'description': ( 

287 'Navigate the robot to a target position (x, y, z). ' 

288 'Path planning and obstacle avoidance are handled by the ' 

289 'native embodiment layer.' 

290 ), 

291 }, 

292 { 

293 'function': move_joint, 

294 'name': 'move_joint', 

295 'description': ( 

296 'Move a specific joint to a target position. ' 

297 'Kinematics and PID control are handled natively.' 

298 ), 

299 }, 

300 { 

301 'function': execute_motion_sequence, 

302 'name': 'execute_motion_sequence', 

303 'description': ( 

304 'Execute a sequence of motor actions in order. ' 

305 'Each step is sent to the native layer for execution.' 

306 ), 

307 }, 

308 { 

309 'function': configure_sensor, 

310 'name': 'configure_sensor', 

311 'description': ( 

312 'Configure a sensor\'s parameters (sample rate, range, etc). ' 

313 'Configuration is applied by the native layer.' 

314 ), 

315 }, 

316]