Coverage for integrations / robotics / control_loop.py: 95.3%
64 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Control Loop Bridge — Timing bridge between agentic and native layers.
4NOT a PID controller. NOT intelligence. Just timing.
6HevolveAI owns the actual control loops (PID, motor control, kinematics).
7This bridge ensures the LLM-langchain side sends sensor data and receives
8feedback at the right cadence. It's the clock that keeps the agentic
9and embodied sides in sync.
11Usage:
12 from integrations.robotics.control_loop import ControlLoopBridge
13 loop = ControlLoopBridge()
14 loop.register_callback('imu_0', my_sensor_handler, hz=50)
15 loop.start()
16"""
17import logging
18import threading
19import time
20from typing import Callable, Dict, Optional
22logger = logging.getLogger('hevolve_robotics')
25class ControlLoopBridge:
26 """Timing bridge for sensor-action loops.
28 Registers callbacks at specified Hz. Each callback is called on its
29 own timer with drift compensation.
30 """
32 def __init__(self):
33 self._lock = threading.Lock()
34 self._callbacks: Dict[str, Dict] = {} # name → {fn, hz, thread, running}
35 self._stats: Dict[str, Dict] = {} # name → {calls, missed, jitter}
37 def register_callback(
38 self,
39 name: str,
40 callback: Callable,
41 hz: float = 10.0,
42 ):
43 """Register a timed callback.
45 Args:
46 name: Unique callback name (e.g., 'imu_ingestion', 'feedback_poll').
47 callback: Function to call at the given frequency.
48 hz: Target frequency in Hz.
49 """
50 with self._lock:
51 if name in self._callbacks:
52 self.unregister_callback(name)
53 self._callbacks[name] = {
54 'fn': callback,
55 'hz': hz,
56 'thread': None,
57 'running': False,
58 }
59 self._stats[name] = {
60 'calls': 0,
61 'missed_deadlines': 0,
62 'total_jitter_ms': 0.0,
63 'target_hz': hz,
64 }
66 def unregister_callback(self, name: str):
67 """Stop and remove a callback."""
68 with self._lock:
69 cb = self._callbacks.get(name)
70 if cb:
71 cb['running'] = False
72 self._callbacks.pop(name, None)
74 def start(self):
75 """Start all registered callbacks."""
76 with self._lock:
77 for name, cb in self._callbacks.items():
78 if not cb['running']:
79 cb['running'] = True
80 thread = threading.Thread(
81 target=self._loop, args=(name,),
82 name=f'control_loop_{name}', daemon=True,
83 )
84 cb['thread'] = thread
85 thread.start()
87 def stop(self):
88 """Stop all callbacks."""
89 with self._lock:
90 for cb in self._callbacks.values():
91 cb['running'] = False
93 def get_stats(self) -> Dict:
94 """Get timing statistics for all callbacks."""
95 with self._lock:
96 result = {}
97 for name, stats in self._stats.items():
98 calls = stats['calls']
99 result[name] = {
100 **stats,
101 'avg_jitter_ms': (
102 stats['total_jitter_ms'] / calls if calls > 0 else 0
103 ),
104 }
105 return result
107 def _loop(self, name: str):
108 """Run a callback at the target Hz with drift compensation."""
109 cb = self._callbacks.get(name)
110 if not cb:
111 return
113 interval = 1.0 / cb['hz']
114 next_time = time.monotonic() + interval
116 while cb.get('running', False):
117 now = time.monotonic()
119 # Call the callback
120 try:
121 cb['fn']()
122 except Exception as e:
123 logger.debug(f"Control loop '{name}' callback error: {e}")
125 # Track timing
126 elapsed = time.monotonic() - now
127 jitter_ms = abs(elapsed - interval) * 1000
129 with self._lock:
130 stats = self._stats.get(name, {})
131 stats['calls'] = stats.get('calls', 0) + 1
132 stats['total_jitter_ms'] = stats.get('total_jitter_ms', 0) + jitter_ms
133 if elapsed > interval * 1.5:
134 stats['missed_deadlines'] = stats.get('missed_deadlines', 0) + 1
136 # Drift-compensated sleep
137 next_time += interval
138 sleep_time = next_time - time.monotonic()
139 if sleep_time > 0:
140 time.sleep(sleep_time)
141 else:
142 # Fell behind — reset target
143 next_time = time.monotonic() + interval