Coverage for integrations / robotics / action_model.py: 100.0%

20 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Action Data Model — Universal representation for robot actions. 

3 

4Actions are the world model's predictions tested against reality. 

5They flow from LLM-langchain's agentic layer through WorldModelBridge 

6to HevolveAI's native embodiment where the actual execution happens. 

7 

8This is a data model only — no intelligence. The actual motor control, 

9kinematics, PID loops live in HevolveAI (raw native intelligence). 

10 

11Action types: 

12 motor_velocity, servo_position, gpio_output, gripper, 

13 navigate_to, speak, emergency_stop 

14""" 

15import time 

16from dataclasses import dataclass, field 

17from typing import Any, Dict, Optional 

18 

19 

20@dataclass 

21class RobotAction: 

22 """Universal action format. 

23 

24 Flows: Agent goal → dispatch → RobotAction → WorldModelBridge → HevolveAI 

25 """ 

26 action_type: str # motor_velocity, servo_position, gpio_output, etc. 

27 target: str # Actuator identifier (e.g., 'left_wheel', 'gripper_0') 

28 params: Dict[str, Any] = field(default_factory=dict) 

29 timestamp: float = field(default_factory=time.time) 

30 priority: int = 0 # Higher = more urgent (safety actions get 999) 

31 timeout_ms: float = 0 # 0 = no timeout 

32 source: str = 'agent' # 'agent', 'recipe', 'safety', 'fleet_command' 

33 

34 def to_dict(self) -> Dict: 

35 """Serialize for transport to HevolveAI via WorldModelBridge.""" 

36 return { 

37 'type': self.action_type, 

38 'target': self.target, 

39 'params': self.params, 

40 'timestamp': self.timestamp, 

41 'priority': self.priority, 

42 'timeout_ms': self.timeout_ms, 

43 'source': self.source, 

44 } 

45 

46 @classmethod 

47 def from_dict(cls, d: Dict) -> 'RobotAction': 

48 """Deserialize from dict.""" 

49 return cls( 

50 action_type=d.get('type', d.get('action_type', '')), 

51 target=d.get('target', ''), 

52 params=d.get('params', {}), 

53 timestamp=d.get('timestamp', time.time()), 

54 priority=d.get('priority', 0), 

55 timeout_ms=d.get('timeout_ms', 0), 

56 source=d.get('source', 'agent'), 

57 ) 

58 

59 @classmethod 

60 def emergency_stop_action(cls) -> 'RobotAction': 

61 """Create an emergency stop action (highest priority).""" 

62 return cls( 

63 action_type='emergency_stop', 

64 target='*', 

65 params={'velocity': 0, 'force': 0}, 

66 priority=999, 

67 source='safety', 

68 )