Coverage for integrations / robotics / robot_boot.py: 79.7%

79 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Robot Boot — Initializes all robotics subsystems. 

3 

4Called from embedded_main.py when HEVOLVE_ROBOT_ENABLED=true. 

5Connects: 

6 - SafetyMonitor (from Batch 1) 

7 - SensorStore + adapters (from Batch 2) 

8 - WorldModelBridge embodied extensions (from Batch 3) 

9 - ControlLoopBridge timing (from Batch 3) 

10 - CapabilityAdvertiser (from Batch 4) 

11 

12Does NOT start any intelligence. Just wires the routing layer. 

13HevolveAI starts its own intelligence when it receives data. 

14""" 

15import json 

16import logging 

17import os 

18from typing import Any, Dict, Optional 

19 

20logger = logging.getLogger('hevolve_robotics') 

21 

22 

23def boot_robotics(caps: Any = None) -> Dict: 

24 """Initialize all robotics subsystems. 

25 

26 Args: 

27 caps: SystemCheckResult from system_requirements.run_system_check() 

28 (optional — used for hardware detection hints) 

29 

30 Returns: 

31 Dict with initialization status for each subsystem. 

32 """ 

33 status: Dict[str, Any] = { 

34 'safety': False, 

35 'sensor_store': False, 

36 'control_loop': False, 

37 'capability_advertiser': False, 

38 'bridge_ready': False, 

39 } 

40 

41 # 1. Safety monitor (may already be running from _boot_safety) 

42 try: 

43 from integrations.robotics.safety_monitor import get_safety_monitor 

44 monitor = get_safety_monitor() 

45 status['safety'] = True 

46 logger.info("Robot boot: safety monitor ready") 

47 except Exception as e: 

48 logger.warning(f"Robot boot: safety monitor failed: {e}") 

49 

50 # 2. Sensor store 

51 try: 

52 from integrations.robotics.sensor_store import get_sensor_store 

53 store = get_sensor_store() 

54 status['sensor_store'] = True 

55 logger.info("Robot boot: sensor store ready") 

56 except Exception as e: 

57 logger.warning(f"Robot boot: sensor store failed: {e}") 

58 

59 # 3. Connect hardware adapters to sensor store 

60 _connect_sensor_adapters(caps) 

61 

62 # 4. Control loop bridge 

63 try: 

64 from integrations.robotics.control_loop import ControlLoopBridge 

65 loop = ControlLoopBridge() 

66 

67 # Register sensor ingestion callback 

68 hz = float(os.environ.get('HEVOLVE_SENSOR_INGEST_HZ', '10')) 

69 loop.register_callback('sensor_ingest', _sensor_ingest_tick, hz=hz) 

70 

71 # Register learning feedback poll 

72 feedback_hz = float(os.environ.get('HEVOLVE_FEEDBACK_HZ', '1')) 

73 loop.register_callback('feedback_poll', _feedback_poll_tick, 

74 hz=feedback_hz) 

75 

76 loop.start() 

77 status['control_loop'] = True 

78 logger.info(f"Robot boot: control loop started " 

79 f"(sensor={hz}Hz, feedback={feedback_hz}Hz)") 

80 except Exception as e: 

81 logger.warning(f"Robot boot: control loop failed: {e}") 

82 

83 # 5. Capability advertiser 

84 try: 

85 from integrations.robotics.capability_advertiser import ( 

86 get_capability_advertiser, 

87 ) 

88 adv = get_capability_advertiser() 

89 adv.detect_capabilities() 

90 status['capability_advertiser'] = True 

91 caps_summary = adv.get_gossip_payload() 

92 logger.info(f"Robot boot: capabilities detected: {caps_summary}") 

93 except Exception as e: 

94 logger.warning(f"Robot boot: capability detection failed: {e}") 

95 

96 # 6. Verify WorldModelBridge is operational 

97 try: 

98 from integrations.agent_engine.world_model_bridge import ( 

99 get_world_model_bridge, 

100 ) 

101 bridge = get_world_model_bridge() 

102 stats = bridge.get_stats() 

103 status['bridge_ready'] = True 

104 logger.info("Robot boot: world model bridge ready") 

105 except Exception as e: 

106 logger.warning(f"Robot boot: bridge check failed: {e}") 

107 

108 booted = sum(1 for v in status.values() if v) 

109 total = len(status) 

110 logger.info(f"Robot boot complete: {booted}/{total} subsystems ready") 

111 

112 return status 

113 

114 

115def _connect_sensor_adapters(caps: Any = None): 

116 """Wire hardware adapters to SensorStore based on available hardware.""" 

117 hw = getattr(caps, 'hardware', None) if caps else None 

118 

119 # Serial adapter 

120 if hw and getattr(hw, 'has_serial', False): 

121 try: 

122 from integrations.robotics.sensor_adapters import ( 

123 SerialSensorBridge, 

124 ) 

125 port = os.environ.get('HEVOLVE_SERIAL_PORT', '') 

126 if port: 

127 bridge = SerialSensorBridge(port=port) 

128 logger.info(f"Robot boot: serial sensor bridge on {port}") 

129 except Exception as e: 

130 logger.debug(f"Serial sensor bridge skipped: {e}") 

131 

132 # GPIO adapter 

133 if hw and getattr(hw, 'has_gpio', False): 

134 try: 

135 from integrations.robotics.sensor_adapters import ( 

136 GPIOSensorBridge, 

137 ) 

138 pins_raw = os.environ.get('HEVOLVE_GPIO_SENSOR_PINS', '') 

139 if pins_raw: 

140 pins = [int(p.strip()) for p in pins_raw.split(',') 

141 if p.strip().isdigit()] 

142 bridge = GPIOSensorBridge(pins=pins) 

143 logger.info(f"Robot boot: GPIO sensor bridge on pins {pins}") 

144 except Exception as e: 

145 logger.debug(f"GPIO sensor bridge skipped: {e}") 

146 

147 

148def _sensor_ingest_tick(): 

149 """Periodic callback: flush sensor store readings to HevolveAI.""" 

150 try: 

151 from integrations.robotics.sensor_store import get_sensor_store 

152 from integrations.agent_engine.world_model_bridge import ( 

153 get_world_model_bridge, 

154 ) 

155 store = get_sensor_store() 

156 bridge = get_world_model_bridge() 

157 

158 # Get all latest readings and batch-send to bridge 

159 latest = store.get_all_latest() 

160 if latest: 

161 readings = [r.to_dict() for r in latest.values()] 

162 bridge.ingest_sensor_batch(readings) 

163 except Exception as e: 

164 logger.debug(f"Sensor tick error: {e}") 

165 

166 

167def _feedback_poll_tick(): 

168 """Periodic callback: poll HevolveAI for learning feedback.""" 

169 try: 

170 from integrations.agent_engine.world_model_bridge import ( 

171 get_world_model_bridge, 

172 ) 

173 bridge = get_world_model_bridge() 

174 feedback = bridge.get_learning_feedback() 

175 if feedback: 

176 logger.debug(f"Learning feedback: {feedback}") 

177 except Exception as e: 

178 logger.debug(f"Feedback poll error: {e}")