Coverage for integrations / robotics / robot_boot.py: 79.7%
79 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Robot Boot — Initializes all robotics subsystems.
4Called from embedded_main.py when HEVOLVE_ROBOT_ENABLED=true.
5Connects:
6 - SafetyMonitor (from Batch 1)
7 - SensorStore + adapters (from Batch 2)
8 - WorldModelBridge embodied extensions (from Batch 3)
9 - ControlLoopBridge timing (from Batch 3)
10 - CapabilityAdvertiser (from Batch 4)
12Does NOT start any intelligence. Just wires the routing layer.
13HevolveAI starts its own intelligence when it receives data.
14"""
15import json
16import logging
17import os
18from typing import Any, Dict, Optional
20logger = logging.getLogger('hevolve_robotics')
23def boot_robotics(caps: Any = None) -> Dict:
24 """Initialize all robotics subsystems.
26 Args:
27 caps: SystemCheckResult from system_requirements.run_system_check()
28 (optional — used for hardware detection hints)
30 Returns:
31 Dict with initialization status for each subsystem.
32 """
33 status: Dict[str, Any] = {
34 'safety': False,
35 'sensor_store': False,
36 'control_loop': False,
37 'capability_advertiser': False,
38 'bridge_ready': False,
39 }
41 # 1. Safety monitor (may already be running from _boot_safety)
42 try:
43 from integrations.robotics.safety_monitor import get_safety_monitor
44 monitor = get_safety_monitor()
45 status['safety'] = True
46 logger.info("Robot boot: safety monitor ready")
47 except Exception as e:
48 logger.warning(f"Robot boot: safety monitor failed: {e}")
50 # 2. Sensor store
51 try:
52 from integrations.robotics.sensor_store import get_sensor_store
53 store = get_sensor_store()
54 status['sensor_store'] = True
55 logger.info("Robot boot: sensor store ready")
56 except Exception as e:
57 logger.warning(f"Robot boot: sensor store failed: {e}")
59 # 3. Connect hardware adapters to sensor store
60 _connect_sensor_adapters(caps)
62 # 4. Control loop bridge
63 try:
64 from integrations.robotics.control_loop import ControlLoopBridge
65 loop = ControlLoopBridge()
67 # Register sensor ingestion callback
68 hz = float(os.environ.get('HEVOLVE_SENSOR_INGEST_HZ', '10'))
69 loop.register_callback('sensor_ingest', _sensor_ingest_tick, hz=hz)
71 # Register learning feedback poll
72 feedback_hz = float(os.environ.get('HEVOLVE_FEEDBACK_HZ', '1'))
73 loop.register_callback('feedback_poll', _feedback_poll_tick,
74 hz=feedback_hz)
76 loop.start()
77 status['control_loop'] = True
78 logger.info(f"Robot boot: control loop started "
79 f"(sensor={hz}Hz, feedback={feedback_hz}Hz)")
80 except Exception as e:
81 logger.warning(f"Robot boot: control loop failed: {e}")
83 # 5. Capability advertiser
84 try:
85 from integrations.robotics.capability_advertiser import (
86 get_capability_advertiser,
87 )
88 adv = get_capability_advertiser()
89 adv.detect_capabilities()
90 status['capability_advertiser'] = True
91 caps_summary = adv.get_gossip_payload()
92 logger.info(f"Robot boot: capabilities detected: {caps_summary}")
93 except Exception as e:
94 logger.warning(f"Robot boot: capability detection failed: {e}")
96 # 6. Verify WorldModelBridge is operational
97 try:
98 from integrations.agent_engine.world_model_bridge import (
99 get_world_model_bridge,
100 )
101 bridge = get_world_model_bridge()
102 stats = bridge.get_stats()
103 status['bridge_ready'] = True
104 logger.info("Robot boot: world model bridge ready")
105 except Exception as e:
106 logger.warning(f"Robot boot: bridge check failed: {e}")
108 booted = sum(1 for v in status.values() if v)
109 total = len(status)
110 logger.info(f"Robot boot complete: {booted}/{total} subsystems ready")
112 return status
115def _connect_sensor_adapters(caps: Any = None):
116 """Wire hardware adapters to SensorStore based on available hardware."""
117 hw = getattr(caps, 'hardware', None) if caps else None
119 # Serial adapter
120 if hw and getattr(hw, 'has_serial', False):
121 try:
122 from integrations.robotics.sensor_adapters import (
123 SerialSensorBridge,
124 )
125 port = os.environ.get('HEVOLVE_SERIAL_PORT', '')
126 if port:
127 bridge = SerialSensorBridge(port=port)
128 logger.info(f"Robot boot: serial sensor bridge on {port}")
129 except Exception as e:
130 logger.debug(f"Serial sensor bridge skipped: {e}")
132 # GPIO adapter
133 if hw and getattr(hw, 'has_gpio', False):
134 try:
135 from integrations.robotics.sensor_adapters import (
136 GPIOSensorBridge,
137 )
138 pins_raw = os.environ.get('HEVOLVE_GPIO_SENSOR_PINS', '')
139 if pins_raw:
140 pins = [int(p.strip()) for p in pins_raw.split(',')
141 if p.strip().isdigit()]
142 bridge = GPIOSensorBridge(pins=pins)
143 logger.info(f"Robot boot: GPIO sensor bridge on pins {pins}")
144 except Exception as e:
145 logger.debug(f"GPIO sensor bridge skipped: {e}")
148def _sensor_ingest_tick():
149 """Periodic callback: flush sensor store readings to HevolveAI."""
150 try:
151 from integrations.robotics.sensor_store import get_sensor_store
152 from integrations.agent_engine.world_model_bridge import (
153 get_world_model_bridge,
154 )
155 store = get_sensor_store()
156 bridge = get_world_model_bridge()
158 # Get all latest readings and batch-send to bridge
159 latest = store.get_all_latest()
160 if latest:
161 readings = [r.to_dict() for r in latest.values()]
162 bridge.ingest_sensor_batch(readings)
163 except Exception as e:
164 logger.debug(f"Sensor tick error: {e}")
167def _feedback_poll_tick():
168 """Periodic callback: poll HevolveAI for learning feedback."""
169 try:
170 from integrations.agent_engine.world_model_bridge import (
171 get_world_model_bridge,
172 )
173 bridge = get_world_model_bridge()
174 feedback = bridge.get_learning_feedback()
175 if feedback:
176 logger.debug(f"Learning feedback: {feedback}")
177 except Exception as e:
178 logger.debug(f"Feedback poll error: {e}")