Coverage for integrations / robotics / recipe_adapter.py: 97.7%
44 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Robot Recipe Adapter — Physical action ↔ recipe step conversion.
4Bridges the CREATE/REUSE recipe system with physical robot actions:
5 - CREATE mode: records motor sequences with sensor context → recipe steps
6 - REUSE mode: replays recipe steps → RobotAction commands via WorldModelBridge
8HevolveAI handles real-time adaptation during replay (pause on obstacle,
9adjust trajectory). This adapter just converts the data format.
11NO intelligence here. Just format conversion between:
12 Recipe step (JSON in prompts/{id}_recipe.json)
13 ↔ RobotAction (integrations/robotics/action_model.py)
14 + sensor context from SensorStore
15"""
16import json
17import logging
18import time
19import uuid
20from typing import Any, Dict, List, Optional
22logger = logging.getLogger('hevolve_robotics')
25class RobotRecipeAdapter:
26 """Converts between physical actions and recipe steps.
28 Recipe steps for physical actions include:
29 - The action command (RobotAction.to_dict())
30 - Sensor context at action time (what the robot "saw/felt")
31 - Outcome (did it work? how far off was it?)
33 This lets REUSE mode replay physical sequences, with HevolveAI
34 providing real-time adaptation via its native intelligence.
35 """
37 @staticmethod
38 def action_to_recipe_step(
39 action: Dict,
40 sensor_context: Optional[Dict] = None,
41 outcome: Optional[Dict] = None,
42 ) -> Dict:
43 """Convert a physical action + context into a recipe step.
45 Args:
46 action: RobotAction.to_dict() or raw action dict
47 sensor_context: Sensor readings at action time
48 outcome: Result of the action (success, error, distance_error, etc.)
50 Returns:
51 Recipe step dict ready for storage in recipe JSON.
52 """
53 step = {
54 'step_type': 'robot_action',
55 'action': action,
56 'sensor_context': sensor_context or {},
57 'outcome': outcome or {},
58 'timestamp': time.time(),
59 'step_id': str(uuid.uuid4())[:8],
60 }
61 return step
63 @staticmethod
64 def recipe_step_to_action(step: Dict) -> Optional[Dict]:
65 """Convert a recipe step back into an action dict for replay.
67 Args:
68 step: Recipe step dict (from action_to_recipe_step)
70 Returns:
71 Action dict suitable for WorldModelBridge.send_action(),
72 or None if step is not a robot_action.
73 """
74 if step.get('step_type') != 'robot_action':
75 return None
77 action = step.get('action')
78 if not action or not isinstance(action, dict):
79 return None
81 # Ensure required fields
82 if 'type' not in action:
83 return None
85 return action
87 @staticmethod
88 def record_motion_sequence(
89 actions: List[Dict],
90 sensor_log: Optional[List[Dict]] = None,
91 ) -> Dict:
92 """Record a full motion sequence as a recipe.
94 Args:
95 actions: List of (action_dict, sensor_context, outcome) tuples
96 or plain action dicts
97 sensor_log: Optional separate sensor log (parallel to actions)
99 Returns:
100 Recipe dict with steps and metadata.
101 """
102 steps = []
103 for i, entry in enumerate(actions):
104 if isinstance(entry, dict) and 'action' in entry:
105 # Already has action/sensor_context/outcome structure
106 step = RobotRecipeAdapter.action_to_recipe_step(
107 action=entry['action'],
108 sensor_context=entry.get('sensor_context'),
109 outcome=entry.get('outcome'),
110 )
111 elif isinstance(entry, dict):
112 # Plain action dict
113 sensor_ctx = {}
114 if sensor_log and i < len(sensor_log):
115 sensor_ctx = sensor_log[i]
116 step = RobotRecipeAdapter.action_to_recipe_step(
117 action=entry,
118 sensor_context=sensor_ctx,
119 )
120 else:
121 continue
122 steps.append(step)
124 recipe_id = f"robot_sequence_{uuid.uuid4().hex[:8]}"
125 return {
126 'recipe_id': recipe_id,
127 'recipe_type': 'robot_motion_sequence',
128 'steps': steps,
129 'step_count': len(steps),
130 'created_at': time.time(),
131 }
133 @staticmethod
134 def replay_motion_recipe(recipe: Dict) -> List[Dict]:
135 """Extract action sequence from a recipe for replay.
137 The caller (or WorldModelBridge) sends each action in order.
138 HevolveAI handles real-time adaptation during execution.
140 Args:
141 recipe: Recipe dict from record_motion_sequence()
143 Returns:
144 List of action dicts for send_action()
145 """
146 actions = []
147 for step in recipe.get('steps', []):
148 action = RobotRecipeAdapter.recipe_step_to_action(step)
149 if action is not None:
150 actions.append(action)
151 return actions