Coverage for integrations / robotics / robot_prompt_builder.py: 91.0%
67 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Robot Goal Prompt Builder — Builds prompts for 'robot' goal type.
4Injects:
5 - Robot capabilities (what this robot can do)
6 - Available tools (what the agent can command)
7 - Safety constraints (workspace limits, E-stop status)
8 - Active sensor state (what the robot currently sees/feels)
10The prompt tells the LLM agent WHAT it can do.
11HevolveAI figures out HOW to do it.
12"""
13import json
14import logging
15from typing import Dict, Optional
17logger = logging.getLogger('hevolve_robotics')
20def build_robot_prompt(goal_dict: Dict,
21 product_dict: Optional[Dict] = None) -> str:
22 """Build a /chat prompt for a robot goal.
24 The agent receives:
25 1. What hardware is available (capabilities)
26 2. What tools it can use (navigate_to, move_joint, etc.)
27 3. Current safety status (E-stop, workspace limits)
28 4. Current sensor state (what's live right now)
29 5. The goal itself
31 The agent decides WHAT to do. HevolveAI decides HOW.
32 """
33 config = goal_dict.get('config', goal_dict.get('config_json', {})) or {}
35 # Guard: skip if no robot bridge is connected. Without an actual robot,
36 # the agent loops trying to call get_robot_status() which fails, wastes
37 # LLM budget, and gets killed by the watchdog.
38 try:
39 from integrations.robotics.capability_advertiser import get_capability_advertiser
40 caps = get_capability_advertiser().get_capabilities()
41 has_any = (caps.get('locomotion') or caps.get('manipulation')
42 or any(v for v in caps.get('sensors', {}).values()))
43 if not has_any:
44 import logging
45 logging.getLogger('hevolve_social').info(
46 f"Robot goal '{goal_dict.get('title', '')}': skipping — "
47 f"no locomotion, manipulation, or sensors detected")
48 return None
49 except Exception:
50 pass # Capability advertiser unavailable — continue with fallback prompt
52 # Capabilities
53 caps_section = _get_capabilities_section()
55 # Safety
56 safety_section = _get_safety_section()
58 # Sensors
59 sensor_section = _get_sensor_section()
61 return (
62 f"YOU ARE A ROBOT TASK AGENT.\n\n"
63 f"You control a physical robot through the HART platform. "
64 f"You decide WHAT the robot should do. The native embodiment layer "
65 f"(HevolveAI) handles HOW — motor control, path planning, sensor fusion, "
66 f"kinematics. You never compute trajectories or PID values yourself.\n\n"
67 f"{caps_section}\n"
68 f"{safety_section}\n"
69 f"{sensor_section}\n"
70 f"YOUR TOOLS:\n"
71 f" navigate_to(x, y, z) — Move the robot to a position\n"
72 f" move_joint(joint_id, position, velocity) — Move a specific joint\n"
73 f" execute_motion_sequence(steps) — Execute a series of actions\n"
74 f" read_sensor(sensor_id) — Read a sensor value\n"
75 f" get_sensor_window(sensor_id, duration_s) — Get sensor history\n"
76 f" get_robot_capabilities() — Query full capability set\n"
77 f" get_robot_status() — Get safety + sensor + bridge status\n"
78 f" configure_sensor(sensor_id, config) — Adjust sensor parameters\n\n"
79 f"GOAL:\n"
80 f" Title: {goal_dict.get('title', '')}\n"
81 f" Description: {goal_dict.get('description', '')}\n\n"
82 f"RULES:\n"
83 f" - ALWAYS check get_robot_status() before starting physical actions\n"
84 f" - If E-stop is active, do NOT send any motion commands\n"
85 f" - Use read_sensor() to verify outcomes after actions\n"
86 f" - If a motion fails, check safety status before retrying\n"
87 f" - Record outcomes with save_data_in_memory for recipe learning\n"
88 f" - Never compute trajectories — let the native layer handle it\n"
89 )
92def _get_capabilities_section() -> str:
93 """Build capabilities section from the advertiser."""
94 try:
95 from integrations.robotics.capability_advertiser import (
96 get_capability_advertiser,
97 )
98 adv = get_capability_advertiser()
99 caps = adv.get_capabilities()
101 lines = ["ROBOT CAPABILITIES:"]
102 lines.append(f" Form factor: {caps.get('form_factor', 'unknown')}")
104 if caps.get('locomotion'):
105 loc = caps['locomotion']
106 lines.append(f" Locomotion: {loc.get('type', 'yes')} "
107 f"(max speed: {loc.get('max_speed', 'N/A')})")
108 else:
109 lines.append(" Locomotion: NONE (stationary)")
111 if caps.get('manipulation'):
112 manip = caps['manipulation']
113 lines.append(f" Manipulation: {manip.get('arms', 0)} arm(s), "
114 f"{manip.get('grippers', 0)} gripper(s), "
115 f"{manip.get('dof', 'N/A')} DOF")
116 else:
117 lines.append(" Manipulation: NONE")
119 sensors = [k for k, v in caps.get('sensors', {}).items() if v]
120 lines.append(f" Sensors: {', '.join(sensors) if sensors else 'none detected'}")
122 if caps.get('actuators'):
123 lines.append(f" Actuators: {', '.join(caps['actuators'])}")
125 if caps.get('payload_kg') is not None:
126 lines.append(f" Payload: {caps['payload_kg']} kg")
128 if caps.get('native_skills'):
129 lines.append(f" Native skills: {', '.join(caps['native_skills'])}")
131 return '\n'.join(lines)
132 except Exception:
133 return "ROBOT CAPABILITIES: detection unavailable"
136def _get_safety_section() -> str:
137 """Build safety status section."""
138 try:
139 from integrations.robotics.safety_monitor import get_safety_monitor
140 monitor = get_safety_monitor()
141 status = monitor.get_safety_status()
143 lines = ["SAFETY STATUS:"]
144 if status.get('is_estopped'):
145 lines.append(f" *** E-STOP ACTIVE: {status.get('estop_reason', 'unknown')} ***")
146 lines.append(" NO MOTION COMMANDS WILL BE ACCEPTED")
147 else:
148 lines.append(" E-stop: CLEAR")
150 if status.get('workspace_limits'):
151 limits = status['workspace_limits']
152 lines.append(f" Workspace limits: "
153 f"x=[{limits.get('x_min', '-inf')}, {limits.get('x_max', 'inf')}] "
154 f"y=[{limits.get('y_min', '-inf')}, {limits.get('y_max', 'inf')}] "
155 f"z=[{limits.get('z_min', '-inf')}, {limits.get('z_max', 'inf')}]")
156 return '\n'.join(lines)
157 except Exception:
158 return "SAFETY STATUS: monitor unavailable"
161def _get_sensor_section() -> str:
162 """Build active sensor section."""
163 try:
164 from integrations.robotics.sensor_store import get_sensor_store
165 store = get_sensor_store()
166 active = store.active_sensors()
168 if not active:
169 return "ACTIVE SENSORS: none"
171 lines = ["ACTIVE SENSORS:"]
172 for sensor_id, info in active.items():
173 lines.append(f" {sensor_id}: type={info.get('sensor_type', '?')}, "
174 f"readings={info.get('count', 0)}")
175 return '\n'.join(lines)
176 except Exception:
177 return "ACTIVE SENSORS: store unavailable"