Coverage for integrations / robotics / action_model.py: 100.0%
20 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Action Data Model — Universal representation for robot actions.
4Actions are the world model's predictions tested against reality.
5They flow from LLM-langchain's agentic layer through WorldModelBridge
6to HevolveAI's native embodiment where the actual execution happens.
8This is a data model only — no intelligence. The actual motor control,
9kinematics, PID loops live in HevolveAI (raw native intelligence).
11Action types:
12 motor_velocity, servo_position, gpio_output, gripper,
13 navigate_to, speak, emergency_stop
14"""
15import time
16from dataclasses import dataclass, field
17from typing import Any, Dict, Optional
20@dataclass
21class RobotAction:
22 """Universal action format.
24 Flows: Agent goal → dispatch → RobotAction → WorldModelBridge → HevolveAI
25 """
26 action_type: str # motor_velocity, servo_position, gpio_output, etc.
27 target: str # Actuator identifier (e.g., 'left_wheel', 'gripper_0')
28 params: Dict[str, Any] = field(default_factory=dict)
29 timestamp: float = field(default_factory=time.time)
30 priority: int = 0 # Higher = more urgent (safety actions get 999)
31 timeout_ms: float = 0 # 0 = no timeout
32 source: str = 'agent' # 'agent', 'recipe', 'safety', 'fleet_command'
34 def to_dict(self) -> Dict:
35 """Serialize for transport to HevolveAI via WorldModelBridge."""
36 return {
37 'type': self.action_type,
38 'target': self.target,
39 'params': self.params,
40 'timestamp': self.timestamp,
41 'priority': self.priority,
42 'timeout_ms': self.timeout_ms,
43 'source': self.source,
44 }
46 @classmethod
47 def from_dict(cls, d: Dict) -> 'RobotAction':
48 """Deserialize from dict."""
49 return cls(
50 action_type=d.get('type', d.get('action_type', '')),
51 target=d.get('target', ''),
52 params=d.get('params', {}),
53 timestamp=d.get('timestamp', time.time()),
54 priority=d.get('priority', 0),
55 timeout_ms=d.get('timeout_ms', 0),
56 source=d.get('source', 'agent'),
57 )
59 @classmethod
60 def emergency_stop_action(cls) -> 'RobotAction':
61 """Create an emergency stop action (highest priority)."""
62 return cls(
63 action_type='emergency_stop',
64 target='*',
65 params={'velocity': 0, 'force': 0},
66 priority=999,
67 source='safety',
68 )