Coverage for integrations / robotics / control_loop.py: 95.3%

64 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Control Loop Bridge — Timing bridge between agentic and native layers. 

3 

4NOT a PID controller. NOT intelligence. Just timing. 

5 

6HevolveAI owns the actual control loops (PID, motor control, kinematics). 

7This bridge ensures the LLM-langchain side sends sensor data and receives 

8feedback at the right cadence. It's the clock that keeps the agentic 

9and embodied sides in sync. 

10 

11Usage: 

12 from integrations.robotics.control_loop import ControlLoopBridge 

13 loop = ControlLoopBridge() 

14 loop.register_callback('imu_0', my_sensor_handler, hz=50) 

15 loop.start() 

16""" 

17import logging 

18import threading 

19import time 

20from typing import Callable, Dict, Optional 

21 

22logger = logging.getLogger('hevolve_robotics') 

23 

24 

25class ControlLoopBridge: 

26 """Timing bridge for sensor-action loops. 

27 

28 Registers callbacks at specified Hz. Each callback is called on its 

29 own timer with drift compensation. 

30 """ 

31 

32 def __init__(self): 

33 self._lock = threading.Lock() 

34 self._callbacks: Dict[str, Dict] = {} # name → {fn, hz, thread, running} 

35 self._stats: Dict[str, Dict] = {} # name → {calls, missed, jitter} 

36 

37 def register_callback( 

38 self, 

39 name: str, 

40 callback: Callable, 

41 hz: float = 10.0, 

42 ): 

43 """Register a timed callback. 

44 

45 Args: 

46 name: Unique callback name (e.g., 'imu_ingestion', 'feedback_poll'). 

47 callback: Function to call at the given frequency. 

48 hz: Target frequency in Hz. 

49 """ 

50 with self._lock: 

51 if name in self._callbacks: 

52 self.unregister_callback(name) 

53 self._callbacks[name] = { 

54 'fn': callback, 

55 'hz': hz, 

56 'thread': None, 

57 'running': False, 

58 } 

59 self._stats[name] = { 

60 'calls': 0, 

61 'missed_deadlines': 0, 

62 'total_jitter_ms': 0.0, 

63 'target_hz': hz, 

64 } 

65 

66 def unregister_callback(self, name: str): 

67 """Stop and remove a callback.""" 

68 with self._lock: 

69 cb = self._callbacks.get(name) 

70 if cb: 

71 cb['running'] = False 

72 self._callbacks.pop(name, None) 

73 

74 def start(self): 

75 """Start all registered callbacks.""" 

76 with self._lock: 

77 for name, cb in self._callbacks.items(): 

78 if not cb['running']: 

79 cb['running'] = True 

80 thread = threading.Thread( 

81 target=self._loop, args=(name,), 

82 name=f'control_loop_{name}', daemon=True, 

83 ) 

84 cb['thread'] = thread 

85 thread.start() 

86 

87 def stop(self): 

88 """Stop all callbacks.""" 

89 with self._lock: 

90 for cb in self._callbacks.values(): 

91 cb['running'] = False 

92 

93 def get_stats(self) -> Dict: 

94 """Get timing statistics for all callbacks.""" 

95 with self._lock: 

96 result = {} 

97 for name, stats in self._stats.items(): 

98 calls = stats['calls'] 

99 result[name] = { 

100 **stats, 

101 'avg_jitter_ms': ( 

102 stats['total_jitter_ms'] / calls if calls > 0 else 0 

103 ), 

104 } 

105 return result 

106 

107 def _loop(self, name: str): 

108 """Run a callback at the target Hz with drift compensation.""" 

109 cb = self._callbacks.get(name) 

110 if not cb: 

111 return 

112 

113 interval = 1.0 / cb['hz'] 

114 next_time = time.monotonic() + interval 

115 

116 while cb.get('running', False): 

117 now = time.monotonic() 

118 

119 # Call the callback 

120 try: 

121 cb['fn']() 

122 except Exception as e: 

123 logger.debug(f"Control loop '{name}' callback error: {e}") 

124 

125 # Track timing 

126 elapsed = time.monotonic() - now 

127 jitter_ms = abs(elapsed - interval) * 1000 

128 

129 with self._lock: 

130 stats = self._stats.get(name, {}) 

131 stats['calls'] = stats.get('calls', 0) + 1 

132 stats['total_jitter_ms'] = stats.get('total_jitter_ms', 0) + jitter_ms 

133 if elapsed > interval * 1.5: 

134 stats['missed_deadlines'] = stats.get('missed_deadlines', 0) + 1 

135 

136 # Drift-compensated sleep 

137 next_time += interval 

138 sleep_time = next_time - time.monotonic() 

139 if sleep_time > 0: 

140 time.sleep(sleep_time) 

141 else: 

142 # Fell behind — reset target 

143 next_time = time.monotonic() + interval