Coverage for integrations / robotics / robot_prompt_builder.py: 91.0%

67 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Robot Goal Prompt Builder — Builds prompts for 'robot' goal type. 

3 

4Injects: 

5 - Robot capabilities (what this robot can do) 

6 - Available tools (what the agent can command) 

7 - Safety constraints (workspace limits, E-stop status) 

8 - Active sensor state (what the robot currently sees/feels) 

9 

10The prompt tells the LLM agent WHAT it can do. 

11HevolveAI figures out HOW to do it. 

12""" 

13import json 

14import logging 

15from typing import Dict, Optional 

16 

17logger = logging.getLogger('hevolve_robotics') 

18 

19 

20def build_robot_prompt(goal_dict: Dict, 

21 product_dict: Optional[Dict] = None) -> str: 

22 """Build a /chat prompt for a robot goal. 

23 

24 The agent receives: 

25 1. What hardware is available (capabilities) 

26 2. What tools it can use (navigate_to, move_joint, etc.) 

27 3. Current safety status (E-stop, workspace limits) 

28 4. Current sensor state (what's live right now) 

29 5. The goal itself 

30 

31 The agent decides WHAT to do. HevolveAI decides HOW. 

32 """ 

33 config = goal_dict.get('config', goal_dict.get('config_json', {})) or {} 

34 

35 # Guard: skip if no robot bridge is connected. Without an actual robot, 

36 # the agent loops trying to call get_robot_status() which fails, wastes 

37 # LLM budget, and gets killed by the watchdog. 

38 try: 

39 from integrations.robotics.capability_advertiser import get_capability_advertiser 

40 caps = get_capability_advertiser().get_capabilities() 

41 has_any = (caps.get('locomotion') or caps.get('manipulation') 

42 or any(v for v in caps.get('sensors', {}).values())) 

43 if not has_any: 

44 import logging 

45 logging.getLogger('hevolve_social').info( 

46 f"Robot goal '{goal_dict.get('title', '')}': skipping — " 

47 f"no locomotion, manipulation, or sensors detected") 

48 return None 

49 except Exception: 

50 pass # Capability advertiser unavailable — continue with fallback prompt 

51 

52 # Capabilities 

53 caps_section = _get_capabilities_section() 

54 

55 # Safety 

56 safety_section = _get_safety_section() 

57 

58 # Sensors 

59 sensor_section = _get_sensor_section() 

60 

61 return ( 

62 f"YOU ARE A ROBOT TASK AGENT.\n\n" 

63 f"You control a physical robot through the HART platform. " 

64 f"You decide WHAT the robot should do. The native embodiment layer " 

65 f"(HevolveAI) handles HOW — motor control, path planning, sensor fusion, " 

66 f"kinematics. You never compute trajectories or PID values yourself.\n\n" 

67 f"{caps_section}\n" 

68 f"{safety_section}\n" 

69 f"{sensor_section}\n" 

70 f"YOUR TOOLS:\n" 

71 f" navigate_to(x, y, z) — Move the robot to a position\n" 

72 f" move_joint(joint_id, position, velocity) — Move a specific joint\n" 

73 f" execute_motion_sequence(steps) — Execute a series of actions\n" 

74 f" read_sensor(sensor_id) — Read a sensor value\n" 

75 f" get_sensor_window(sensor_id, duration_s) — Get sensor history\n" 

76 f" get_robot_capabilities() — Query full capability set\n" 

77 f" get_robot_status() — Get safety + sensor + bridge status\n" 

78 f" configure_sensor(sensor_id, config) — Adjust sensor parameters\n\n" 

79 f"GOAL:\n" 

80 f" Title: {goal_dict.get('title', '')}\n" 

81 f" Description: {goal_dict.get('description', '')}\n\n" 

82 f"RULES:\n" 

83 f" - ALWAYS check get_robot_status() before starting physical actions\n" 

84 f" - If E-stop is active, do NOT send any motion commands\n" 

85 f" - Use read_sensor() to verify outcomes after actions\n" 

86 f" - If a motion fails, check safety status before retrying\n" 

87 f" - Record outcomes with save_data_in_memory for recipe learning\n" 

88 f" - Never compute trajectories — let the native layer handle it\n" 

89 ) 

90 

91 

92def _get_capabilities_section() -> str: 

93 """Build capabilities section from the advertiser.""" 

94 try: 

95 from integrations.robotics.capability_advertiser import ( 

96 get_capability_advertiser, 

97 ) 

98 adv = get_capability_advertiser() 

99 caps = adv.get_capabilities() 

100 

101 lines = ["ROBOT CAPABILITIES:"] 

102 lines.append(f" Form factor: {caps.get('form_factor', 'unknown')}") 

103 

104 if caps.get('locomotion'): 

105 loc = caps['locomotion'] 

106 lines.append(f" Locomotion: {loc.get('type', 'yes')} " 

107 f"(max speed: {loc.get('max_speed', 'N/A')})") 

108 else: 

109 lines.append(" Locomotion: NONE (stationary)") 

110 

111 if caps.get('manipulation'): 

112 manip = caps['manipulation'] 

113 lines.append(f" Manipulation: {manip.get('arms', 0)} arm(s), " 

114 f"{manip.get('grippers', 0)} gripper(s), " 

115 f"{manip.get('dof', 'N/A')} DOF") 

116 else: 

117 lines.append(" Manipulation: NONE") 

118 

119 sensors = [k for k, v in caps.get('sensors', {}).items() if v] 

120 lines.append(f" Sensors: {', '.join(sensors) if sensors else 'none detected'}") 

121 

122 if caps.get('actuators'): 

123 lines.append(f" Actuators: {', '.join(caps['actuators'])}") 

124 

125 if caps.get('payload_kg') is not None: 

126 lines.append(f" Payload: {caps['payload_kg']} kg") 

127 

128 if caps.get('native_skills'): 

129 lines.append(f" Native skills: {', '.join(caps['native_skills'])}") 

130 

131 return '\n'.join(lines) 

132 except Exception: 

133 return "ROBOT CAPABILITIES: detection unavailable" 

134 

135 

136def _get_safety_section() -> str: 

137 """Build safety status section.""" 

138 try: 

139 from integrations.robotics.safety_monitor import get_safety_monitor 

140 monitor = get_safety_monitor() 

141 status = monitor.get_safety_status() 

142 

143 lines = ["SAFETY STATUS:"] 

144 if status.get('is_estopped'): 

145 lines.append(f" *** E-STOP ACTIVE: {status.get('estop_reason', 'unknown')} ***") 

146 lines.append(" NO MOTION COMMANDS WILL BE ACCEPTED") 

147 else: 

148 lines.append(" E-stop: CLEAR") 

149 

150 if status.get('workspace_limits'): 

151 limits = status['workspace_limits'] 

152 lines.append(f" Workspace limits: " 

153 f"x=[{limits.get('x_min', '-inf')}, {limits.get('x_max', 'inf')}] " 

154 f"y=[{limits.get('y_min', '-inf')}, {limits.get('y_max', 'inf')}] " 

155 f"z=[{limits.get('z_min', '-inf')}, {limits.get('z_max', 'inf')}]") 

156 return '\n'.join(lines) 

157 except Exception: 

158 return "SAFETY STATUS: monitor unavailable" 

159 

160 

161def _get_sensor_section() -> str: 

162 """Build active sensor section.""" 

163 try: 

164 from integrations.robotics.sensor_store import get_sensor_store 

165 store = get_sensor_store() 

166 active = store.active_sensors() 

167 

168 if not active: 

169 return "ACTIVE SENSORS: none" 

170 

171 lines = ["ACTIVE SENSORS:"] 

172 for sensor_id, info in active.items(): 

173 lines.append(f" {sensor_id}: type={info.get('sensor_type', '?')}, " 

174 f"readings={info.get('count', 0)}") 

175 return '\n'.join(lines) 

176 except Exception: 

177 return "ACTIVE SENSORS: store unavailable"