Coverage for integrations / robotics / recipe_adapter.py: 97.7%

44 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Robot Recipe Adapter — Physical action ↔ recipe step conversion. 

3 

4Bridges the CREATE/REUSE recipe system with physical robot actions: 

5 - CREATE mode: records motor sequences with sensor context → recipe steps 

6 - REUSE mode: replays recipe steps → RobotAction commands via WorldModelBridge 

7 

8HevolveAI handles real-time adaptation during replay (pause on obstacle, 

9adjust trajectory). This adapter just converts the data format. 

10 

11NO intelligence here. Just format conversion between: 

12 Recipe step (JSON in prompts/{id}_recipe.json) 

13 ↔ RobotAction (integrations/robotics/action_model.py) 

14 + sensor context from SensorStore 

15""" 

16import json 

17import logging 

18import time 

19import uuid 

20from typing import Any, Dict, List, Optional 

21 

22logger = logging.getLogger('hevolve_robotics') 

23 

24 

25class RobotRecipeAdapter: 

26 """Converts between physical actions and recipe steps. 

27 

28 Recipe steps for physical actions include: 

29 - The action command (RobotAction.to_dict()) 

30 - Sensor context at action time (what the robot "saw/felt") 

31 - Outcome (did it work? how far off was it?) 

32 

33 This lets REUSE mode replay physical sequences, with HevolveAI 

34 providing real-time adaptation via its native intelligence. 

35 """ 

36 

37 @staticmethod 

38 def action_to_recipe_step( 

39 action: Dict, 

40 sensor_context: Optional[Dict] = None, 

41 outcome: Optional[Dict] = None, 

42 ) -> Dict: 

43 """Convert a physical action + context into a recipe step. 

44 

45 Args: 

46 action: RobotAction.to_dict() or raw action dict 

47 sensor_context: Sensor readings at action time 

48 outcome: Result of the action (success, error, distance_error, etc.) 

49 

50 Returns: 

51 Recipe step dict ready for storage in recipe JSON. 

52 """ 

53 step = { 

54 'step_type': 'robot_action', 

55 'action': action, 

56 'sensor_context': sensor_context or {}, 

57 'outcome': outcome or {}, 

58 'timestamp': time.time(), 

59 'step_id': str(uuid.uuid4())[:8], 

60 } 

61 return step 

62 

63 @staticmethod 

64 def recipe_step_to_action(step: Dict) -> Optional[Dict]: 

65 """Convert a recipe step back into an action dict for replay. 

66 

67 Args: 

68 step: Recipe step dict (from action_to_recipe_step) 

69 

70 Returns: 

71 Action dict suitable for WorldModelBridge.send_action(), 

72 or None if step is not a robot_action. 

73 """ 

74 if step.get('step_type') != 'robot_action': 

75 return None 

76 

77 action = step.get('action') 

78 if not action or not isinstance(action, dict): 

79 return None 

80 

81 # Ensure required fields 

82 if 'type' not in action: 

83 return None 

84 

85 return action 

86 

87 @staticmethod 

88 def record_motion_sequence( 

89 actions: List[Dict], 

90 sensor_log: Optional[List[Dict]] = None, 

91 ) -> Dict: 

92 """Record a full motion sequence as a recipe. 

93 

94 Args: 

95 actions: List of (action_dict, sensor_context, outcome) tuples 

96 or plain action dicts 

97 sensor_log: Optional separate sensor log (parallel to actions) 

98 

99 Returns: 

100 Recipe dict with steps and metadata. 

101 """ 

102 steps = [] 

103 for i, entry in enumerate(actions): 

104 if isinstance(entry, dict) and 'action' in entry: 

105 # Already has action/sensor_context/outcome structure 

106 step = RobotRecipeAdapter.action_to_recipe_step( 

107 action=entry['action'], 

108 sensor_context=entry.get('sensor_context'), 

109 outcome=entry.get('outcome'), 

110 ) 

111 elif isinstance(entry, dict): 

112 # Plain action dict 

113 sensor_ctx = {} 

114 if sensor_log and i < len(sensor_log): 

115 sensor_ctx = sensor_log[i] 

116 step = RobotRecipeAdapter.action_to_recipe_step( 

117 action=entry, 

118 sensor_context=sensor_ctx, 

119 ) 

120 else: 

121 continue 

122 steps.append(step) 

123 

124 recipe_id = f"robot_sequence_{uuid.uuid4().hex[:8]}" 

125 return { 

126 'recipe_id': recipe_id, 

127 'recipe_type': 'robot_motion_sequence', 

128 'steps': steps, 

129 'step_count': len(steps), 

130 'created_at': time.time(), 

131 } 

132 

133 @staticmethod 

134 def replay_motion_recipe(recipe: Dict) -> List[Dict]: 

135 """Extract action sequence from a recipe for replay. 

136 

137 The caller (or WorldModelBridge) sends each action in order. 

138 HevolveAI handles real-time adaptation during execution. 

139 

140 Args: 

141 recipe: Recipe dict from record_motion_sequence() 

142 

143 Returns: 

144 List of action dicts for send_action() 

145 """ 

146 actions = [] 

147 for step in recipe.get('steps', []): 

148 action = RobotRecipeAdapter.recipe_step_to_action(step) 

149 if action is not None: 

150 actions.append(action) 

151 return actions