Coverage for integrations / robotics / hardware_bridge.py: 49.8%
847 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Hardware Bridge -- Close the loop between physical robots and the hive.
4Soft agents think. Hard agents act. This bridge connects them:
6INBOUND (sensors -> hive):
7 - Normalize heterogeneous sensor data into a common schema
8 - Support: USB serial, GPIO, ROS topics, HTTP streams, WebSocket, MQTT
9 - Buffer and batch sensor readings for efficiency
10 - Push to intelligence_api.py for multi-intelligence fusion
12OUTBOUND (hive -> actuators):
13 - Translate action plans into hardware-specific commands
14 - Support: serial commands, GPIO, ROS cmd_vel, HTTP actuator APIs
15 - Safety gate: every command passes through safety_monitor before execution
16 - Feedback loop: actuator response feeds back as sensor data
18EXPERIENCE (learning loop):
19 - Every sensor->action->outcome triple is an experience
20 - Experiences queue to WorldModelBridge for HevolveAI training
21 - The hive learns from every robot's actions
22 - Better models pushed back to all robots via federation
24Usage:
25 from integrations.robotics.hardware_bridge import get_bridge
27 bridge = get_bridge('robot_01')
28 bridge.register_sensor(HTTPSensorAdapter(
29 sensor_id='cam_front', sensor_type='camera',
30 url='http://192.168.1.10/capture',
31 ))
32 bridge.register_actuator(SerialActuatorAdapter(
33 actuator_id='motor_left', actuator_type='motor',
34 port='/dev/ttyUSB0',
35 ))
36 bridge.start()
38 # Full cycle: sense -> think -> act -> learn
39 result = bridge.think_and_act(context='navigate to charging station')
40"""
42import json
43import logging
44import struct
45import threading
46import time
47from abc import ABC, abstractmethod
48from collections import deque
49from dataclasses import dataclass, field
50from typing import Any, Callable, Dict, List, Optional, Tuple
52logger = logging.getLogger(__name__)
54# ======================================================================
55# Data classes
56# ======================================================================
59@dataclass
60class SensorReading:
61 """Normalized sensor reading from any hardware source.
63 Carries both the normalized ``data`` and the original ``raw`` payload
64 so that downstream intelligence can choose the representation it needs.
65 """
66 sensor_id: str
67 sensor_type: str # camera, lidar, imu, audio, touch, gps, temperature, proximity
68 data: Any # Normalized data (dict, list, scalar)
69 timestamp: float = field(default_factory=time.time)
70 raw: Any = None # Original hardware-specific data
73@dataclass
74class ActuatorCommand:
75 """Command destined for a physical actuator.
77 ``safety_cleared`` starts False and is only set True by the safety
78 gate. Actuator adapters MUST refuse to execute commands where this
79 flag is still False.
80 """
81 actuator_id: str
82 actuator_type: str # motor, servo, led, speaker, display, gripper
83 command: dict # e.g. {action: 'move', params: {speed: 0.5, direction: 'forward'}}
84 safety_cleared: bool = False
85 timestamp: float = field(default_factory=time.time)
88@dataclass
89class Experience:
90 """Sensor->action->outcome triple that feeds the learning loop.
92 The hive learns from every robot's actions. Each experience is
93 flushed to WorldModelBridge -> HevolveAI so better models can be
94 pushed back to all robots via federation.
95 """
96 robot_id: str
97 sensor_state: dict # Snapshot of all sensors at action time
98 action_taken: dict # What the robot did
99 outcome: dict # What happened (success/fail, sensor delta)
100 reward: float # Computed reward signal
101 timestamp: float = field(default_factory=time.time)
104# ======================================================================
105# Adapter base classes
106# ======================================================================
109class SensorAdapter(ABC):
110 """Base for all sensor input adapters.
112 Subclasses bridge a specific transport (serial, HTTP, MQTT, GPIO, ROS)
113 to the common SensorReading format.
114 """
116 def __init__(self, sensor_id: str, sensor_type: str):
117 self.sensor_id = sensor_id
118 self.sensor_type = sensor_type
120 @abstractmethod
121 def read(self) -> Optional[SensorReading]:
122 """Read a single sensor value (blocking).
124 Returns None if the sensor is unavailable or the read failed.
125 """
126 ...
128 @abstractmethod
129 def start_stream(self, callback: Callable[[SensorReading], None]) -> None:
130 """Begin continuous streaming; call ``callback`` for each reading."""
131 ...
133 @abstractmethod
134 def stop_stream(self) -> None:
135 """Stop the continuous stream started by ``start_stream``."""
136 ...
139class ActuatorAdapter(ABC):
140 """Base for all actuator output adapters.
142 Subclasses translate ActuatorCommands into hardware-specific protocols
143 (serial bytes, HTTP POST, GPIO writes, etc.).
144 """
146 def __init__(self, actuator_id: str, actuator_type: str):
147 self.actuator_id = actuator_id
148 self.actuator_type = actuator_type
150 @abstractmethod
151 def execute(self, command: ActuatorCommand) -> dict:
152 """Execute a command. Returns a result dict with at least ``{ok: bool}``."""
153 ...
155 @abstractmethod
156 def get_state(self) -> dict:
157 """Return current actuator state (position, speed, temperature, etc.)."""
158 ...
160 @abstractmethod
161 def emergency_stop(self) -> None:
162 """Immediately halt this actuator. Must be idempotent."""
163 ...
166# ======================================================================
167# Built-in sensor adapters
168# ======================================================================
171class SerialSensorAdapter(SensorAdapter):
172 """Read sensors via USB serial (Arduino, ESP32, etc.).
174 Expects line-oriented JSON from the device, e.g.::
176 {\"accel_x\": 0.01, \"accel_y\": -9.81, \"accel_z\": 0.02}
178 Falls back to a raw-line reading if JSON parsing fails.
179 """
181 def __init__(
182 self,
183 sensor_id: str,
184 sensor_type: str,
185 port: str = '',
186 baudrate: int = 115200,
187 timeout: float = 0.1,
188 ):
189 super().__init__(sensor_id, sensor_type)
190 self._port = port
191 self._baudrate = baudrate
192 self._timeout = timeout
193 self._running = False
194 self._thread: Optional[threading.Thread] = None
195 self._callback: Optional[Callable] = None
197 def read(self) -> Optional[SensorReading]:
198 try:
199 import serial as pyserial # lazy -- optional dependency
200 except ImportError:
201 logger.debug("SerialSensorAdapter: pyserial not installed")
202 return None
203 try:
204 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout)
205 line = ser.readline().decode('utf-8', errors='ignore').strip()
206 ser.close()
207 if not line:
208 return None
209 return self._parse_line(line)
210 except Exception as exc:
211 logger.debug("SerialSensorAdapter read error on %s: %s", self._port, exc)
212 return None
214 def start_stream(self, callback: Callable[[SensorReading], None]) -> None:
215 if self._running:
216 return
217 self._callback = callback
218 self._running = True
219 self._thread = threading.Thread(
220 target=self._stream_loop,
221 name=f'serial_sensor_{self.sensor_id}',
222 daemon=True,
223 )
224 self._thread.start()
226 def stop_stream(self) -> None:
227 self._running = False
229 def _stream_loop(self) -> None:
230 try:
231 import serial as pyserial
232 except ImportError:
233 logger.warning("SerialSensorAdapter: pyserial not installed, stream aborted")
234 return
235 while self._running:
236 try:
237 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout)
238 while self._running:
239 line = ser.readline().decode('utf-8', errors='ignore').strip()
240 if line:
241 reading = self._parse_line(line)
242 if reading and self._callback:
243 self._callback(reading)
244 ser.close()
245 except Exception as exc:
246 logger.debug("SerialSensorAdapter stream error: %s", exc)
247 time.sleep(1.0)
249 def _parse_line(self, line: str) -> SensorReading:
250 raw = line
251 try:
252 data = json.loads(line)
253 except (json.JSONDecodeError, ValueError):
254 data = {'raw_line': line}
255 return SensorReading(
256 sensor_id=self.sensor_id,
257 sensor_type=self.sensor_type,
258 data=data,
259 raw=raw,
260 )
263class HTTPSensorAdapter(SensorAdapter):
264 """Read sensors via HTTP endpoint (IP cameras, REST APIs).
266 GETs ``url`` and parses the JSON response as sensor data.
267 For binary payloads (e.g. JPEG from an IP camera), the raw bytes
268 are stored in ``raw`` and a description dict in ``data``.
269 """
271 def __init__(
272 self,
273 sensor_id: str,
274 sensor_type: str,
275 url: str = '',
276 poll_interval: float = 1.0,
277 timeout: float = 5.0,
278 headers: Optional[dict] = None,
279 ):
280 super().__init__(sensor_id, sensor_type)
281 self._url = url
282 self._poll_interval = poll_interval
283 self._timeout = timeout
284 self._headers = headers or {}
285 self._running = False
286 self._thread: Optional[threading.Thread] = None
287 self._callback: Optional[Callable] = None
289 def read(self) -> Optional[SensorReading]:
290 try:
291 from core.http_pool import pooled_get
292 except ImportError:
293 try:
294 import requests
295 pooled_get = requests.get
296 except ImportError:
297 logger.debug("HTTPSensorAdapter: no HTTP library available")
298 return None
299 try:
300 resp = pooled_get(self._url, headers=self._headers, timeout=self._timeout)
301 return self._parse_response(resp)
302 except Exception as exc:
303 logger.debug("HTTPSensorAdapter read error on %s: %s", self._url, exc)
304 return None
306 def start_stream(self, callback: Callable[[SensorReading], None]) -> None:
307 if self._running:
308 return
309 self._callback = callback
310 self._running = True
311 self._thread = threading.Thread(
312 target=self._poll_loop,
313 name=f'http_sensor_{self.sensor_id}',
314 daemon=True,
315 )
316 self._thread.start()
318 def stop_stream(self) -> None:
319 self._running = False
321 def _poll_loop(self) -> None:
322 while self._running:
323 reading = self.read()
324 if reading and self._callback:
325 self._callback(reading)
326 time.sleep(self._poll_interval)
328 def _parse_response(self, resp) -> Optional[SensorReading]:
329 content_type = resp.headers.get('Content-Type', '')
330 if 'json' in content_type:
331 try:
332 data = resp.json()
333 except (ValueError, AttributeError):
334 data = {'raw_text': resp.text[:4096]}
335 return SensorReading(
336 sensor_id=self.sensor_id,
337 sensor_type=self.sensor_type,
338 data=data,
339 raw=resp.text[:4096],
340 )
341 # Binary payload (e.g. JPEG from camera)
342 raw_bytes = resp.content[:10_000_000] # cap at 10 MB
343 return SensorReading(
344 sensor_id=self.sensor_id,
345 sensor_type=self.sensor_type,
346 data={
347 'content_type': content_type,
348 'size_bytes': len(raw_bytes),
349 },
350 raw=raw_bytes,
351 )
354class MQTTSensorAdapter(SensorAdapter):
355 """Read sensors via MQTT topics (IoT devices).
357 Subscribes to ``topic`` and fires readings on every message.
358 Requires paho-mqtt (optional dependency).
359 """
361 def __init__(
362 self,
363 sensor_id: str,
364 sensor_type: str,
365 broker: str = 'localhost',
366 port: int = 1883,
367 topic: str = '',
368 username: str = '',
369 password: str = '',
370 ):
371 super().__init__(sensor_id, sensor_type)
372 self._broker = broker
373 self._port = port
374 self._topic = topic
375 self._username = username
376 self._password = password
377 self._client = None
378 self._callback: Optional[Callable] = None
379 self._running = False
381 def read(self) -> Optional[SensorReading]:
382 # MQTT is event-driven; single read not naturally supported.
383 # Do a quick subscribe, grab one message, and disconnect.
384 try:
385 import paho.mqtt.client as mqtt
386 except ImportError:
387 logger.debug("MQTTSensorAdapter: paho-mqtt not installed")
388 return None
390 result_holder: List[Optional[SensorReading]] = [None]
391 event = threading.Event()
393 def on_message(_client, _userdata, msg):
394 result_holder[0] = self._parse_payload(msg.payload)
395 event.set()
397 try:
398 client = mqtt.Client()
399 if self._username:
400 client.username_pw_set(self._username, self._password)
401 client.on_message = on_message
402 client.connect(self._broker, self._port, keepalive=10)
403 client.subscribe(self._topic)
404 client.loop_start()
405 event.wait(timeout=5.0)
406 client.loop_stop()
407 client.disconnect()
408 except Exception as exc:
409 logger.debug("MQTTSensorAdapter read error: %s", exc)
410 return result_holder[0]
412 def start_stream(self, callback: Callable[[SensorReading], None]) -> None:
413 if self._running:
414 return
415 try:
416 import paho.mqtt.client as mqtt
417 except ImportError:
418 logger.warning("MQTTSensorAdapter: paho-mqtt not installed, stream aborted")
419 return
421 self._callback = callback
422 self._running = True
424 def on_message(_client, _userdata, msg):
425 reading = self._parse_payload(msg.payload)
426 if reading and self._callback:
427 self._callback(reading)
429 self._client = mqtt.Client()
430 if self._username:
431 self._client.username_pw_set(self._username, self._password)
432 self._client.on_message = on_message
433 try:
434 self._client.connect(self._broker, self._port, keepalive=60)
435 self._client.subscribe(self._topic)
436 self._client.loop_start()
437 except Exception as exc:
438 logger.warning("MQTTSensorAdapter connect error: %s", exc)
439 self._running = False
441 def stop_stream(self) -> None:
442 self._running = False
443 if self._client:
444 try:
445 self._client.loop_stop()
446 self._client.disconnect()
447 except Exception:
448 pass
449 self._client = None
451 def _parse_payload(self, payload: bytes) -> Optional[SensorReading]:
452 raw = payload
453 try:
454 text = payload.decode('utf-8', errors='ignore')
455 data = json.loads(text)
456 except (json.JSONDecodeError, ValueError, UnicodeDecodeError):
457 data = {'raw_bytes': len(payload)}
458 return SensorReading(
459 sensor_id=self.sensor_id,
460 sensor_type=self.sensor_type,
461 data=data,
462 raw=raw,
463 )
466class WebSocketSensorAdapter(SensorAdapter):
467 """Read sensors via WebSocket stream.
469 Connects to a WebSocket endpoint and receives JSON messages as
470 sensor readings. Useful for high-frequency data (IMU streams,
471 depth cameras, real-time telemetry).
473 Requires the ``websocket-client`` package (optional dependency).
474 """
476 def __init__(
477 self,
478 sensor_id: str,
479 sensor_type: str,
480 url: str = '',
481 headers: Optional[dict] = None,
482 reconnect_delay: float = 2.0,
483 ):
484 super().__init__(sensor_id, sensor_type)
485 self._url = url
486 self._headers = headers or {}
487 self._reconnect_delay = reconnect_delay
488 self._running = False
489 self._thread: Optional[threading.Thread] = None
490 self._callback: Optional[Callable] = None
491 self._ws = None
492 self._last_reading: Optional[SensorReading] = None
493 self._lock = threading.Lock()
495 def read(self) -> Optional[SensorReading]:
496 """Return the most recent reading received via the WebSocket.
498 If the stream is not active, attempts a one-shot connect,
499 reads a single frame, and disconnects.
500 """
501 # If streaming, return cached latest
502 with self._lock:
503 if self._last_reading is not None:
504 return self._last_reading
506 # One-shot read
507 try:
508 import websocket as ws_lib # websocket-client
509 except ImportError:
510 logger.debug("WebSocketSensorAdapter: websocket-client not installed")
511 return None
513 try:
514 ws = ws_lib.create_connection(
515 self._url,
516 header=self._headers,
517 timeout=5.0,
518 )
519 frame = ws.recv()
520 ws.close()
521 return self._parse_frame(frame)
522 except Exception as exc:
523 logger.debug("WebSocketSensorAdapter read error on %s: %s",
524 self._url, exc)
525 return None
527 def start_stream(self, callback: Callable[[SensorReading], None]) -> None:
528 if self._running:
529 return
530 self._callback = callback
531 self._running = True
532 self._thread = threading.Thread(
533 target=self._stream_loop,
534 name=f'ws_sensor_{self.sensor_id}',
535 daemon=True,
536 )
537 self._thread.start()
539 def stop_stream(self) -> None:
540 self._running = False
541 ws = self._ws
542 if ws is not None:
543 try:
544 ws.close()
545 except Exception:
546 pass
547 self._ws = None
549 def _stream_loop(self) -> None:
550 try:
551 import websocket as ws_lib
552 except ImportError:
553 logger.warning("WebSocketSensorAdapter: websocket-client not installed, "
554 "stream aborted")
555 return
557 while self._running:
558 try:
559 ws = ws_lib.create_connection(
560 self._url,
561 header=self._headers,
562 timeout=10.0,
563 )
564 self._ws = ws
565 while self._running:
566 frame = ws.recv()
567 if not frame:
568 break
569 reading = self._parse_frame(frame)
570 if reading:
571 with self._lock:
572 self._last_reading = reading
573 if self._callback:
574 self._callback(reading)
575 ws.close()
576 except Exception as exc:
577 logger.debug("WebSocketSensorAdapter stream error: %s", exc)
578 finally:
579 self._ws = None
581 if self._running:
582 time.sleep(self._reconnect_delay)
584 def _parse_frame(self, frame) -> Optional[SensorReading]:
585 if isinstance(frame, bytes):
586 raw = frame
587 try:
588 text = frame.decode('utf-8', errors='ignore')
589 data = json.loads(text)
590 except (json.JSONDecodeError, ValueError, UnicodeDecodeError):
591 data = {'raw_bytes': len(frame)}
592 else:
593 raw = frame
594 try:
595 data = json.loads(frame)
596 except (json.JSONDecodeError, ValueError):
597 data = {'raw_text': str(frame)[:4096]}
599 return SensorReading(
600 sensor_id=self.sensor_id,
601 sensor_type=self.sensor_type,
602 data=data,
603 raw=raw,
604 )
607# ======================================================================
608# Built-in actuator adapters
609# ======================================================================
612class SerialActuatorAdapter(ActuatorAdapter):
613 """Send commands via USB serial to motor controllers, servo boards, etc.
615 Writes JSON-encoded commands to the serial port. The device is
616 expected to respond with a JSON status line.
617 """
619 def __init__(
620 self,
621 actuator_id: str,
622 actuator_type: str,
623 port: str = '',
624 baudrate: int = 115200,
625 timeout: float = 1.0,
626 ):
627 super().__init__(actuator_id, actuator_type)
628 self._port = port
629 self._baudrate = baudrate
630 self._timeout = timeout
631 self._lock = threading.Lock()
633 def execute(self, command: ActuatorCommand) -> dict:
634 if not command.safety_cleared:
635 return {'ok': False, 'error': 'command not safety cleared'}
636 try:
637 import serial as pyserial
638 except ImportError:
639 return {'ok': False, 'error': 'pyserial not installed'}
640 with self._lock:
641 try:
642 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout)
643 payload = json.dumps(command.command) + '\n'
644 ser.write(payload.encode('utf-8'))
645 response_line = ser.readline().decode('utf-8', errors='ignore').strip()
646 ser.close()
647 if response_line:
648 try:
649 return {'ok': True, 'response': json.loads(response_line)}
650 except (json.JSONDecodeError, ValueError):
651 return {'ok': True, 'response': response_line}
652 return {'ok': True, 'response': None}
653 except Exception as exc:
654 return {'ok': False, 'error': str(exc)}
656 def get_state(self) -> dict:
657 try:
658 import serial as pyserial
659 except ImportError:
660 return {'error': 'pyserial not installed'}
661 with self._lock:
662 try:
663 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout)
664 ser.write(b'{"action":"get_state"}\n')
665 line = ser.readline().decode('utf-8', errors='ignore').strip()
666 ser.close()
667 if line:
668 try:
669 return json.loads(line)
670 except (json.JSONDecodeError, ValueError):
671 return {'raw': line}
672 return {}
673 except Exception as exc:
674 return {'error': str(exc)}
676 def emergency_stop(self) -> None:
677 try:
678 import serial as pyserial
679 except ImportError:
680 return
681 with self._lock:
682 try:
683 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout)
684 ser.write(b'{"action":"emergency_stop"}\n')
685 ser.close()
686 except Exception as exc:
687 logger.error("SerialActuatorAdapter E-stop write failed: %s", exc)
690class HTTPActuatorAdapter(ActuatorAdapter):
691 """Send commands via HTTP POST (robot APIs, smart home devices).
693 POSTs the command dict as JSON to ``url``.
694 """
696 def __init__(
697 self,
698 actuator_id: str,
699 actuator_type: str,
700 url: str = '',
701 timeout: float = 5.0,
702 headers: Optional[dict] = None,
703 ):
704 super().__init__(actuator_id, actuator_type)
705 self._url = url
706 self._timeout = timeout
707 self._headers = headers or {}
709 def execute(self, command: ActuatorCommand) -> dict:
710 if not command.safety_cleared:
711 return {'ok': False, 'error': 'command not safety cleared'}
712 post_fn = self._get_post_fn()
713 if post_fn is None:
714 return {'ok': False, 'error': 'no HTTP library available'}
715 try:
716 resp = post_fn(
717 self._url,
718 json=command.command,
719 headers=self._headers,
720 timeout=self._timeout,
721 )
722 try:
723 body = resp.json()
724 except (ValueError, AttributeError):
725 body = {'status_code': getattr(resp, 'status_code', None)}
726 ok = getattr(resp, 'status_code', 500) < 400
727 return {'ok': ok, 'response': body}
728 except Exception as exc:
729 return {'ok': False, 'error': str(exc)}
731 def get_state(self) -> dict:
732 get_fn = self._get_get_fn()
733 if get_fn is None:
734 return {'error': 'no HTTP library available'}
735 try:
736 resp = get_fn(
737 self._url,
738 headers=self._headers,
739 timeout=self._timeout,
740 )
741 try:
742 return resp.json()
743 except (ValueError, AttributeError):
744 return {'raw': getattr(resp, 'text', '')[:4096]}
745 except Exception as exc:
746 return {'error': str(exc)}
748 def emergency_stop(self) -> None:
749 post_fn = self._get_post_fn()
750 if post_fn is None:
751 return
752 try:
753 post_fn(
754 self._url,
755 json={'action': 'emergency_stop'},
756 headers=self._headers,
757 timeout=self._timeout,
758 )
759 except Exception as exc:
760 logger.error("HTTPActuatorAdapter E-stop failed: %s", exc)
762 @staticmethod
763 def _get_post_fn():
764 try:
765 from core.http_pool import pooled_post
766 return pooled_post
767 except ImportError:
768 pass
769 try:
770 import requests
771 return requests.post
772 except ImportError:
773 return None
775 @staticmethod
776 def _get_get_fn():
777 try:
778 from core.http_pool import pooled_get
779 return pooled_get
780 except ImportError:
781 pass
782 try:
783 import requests
784 return requests.get
785 except ImportError:
786 return None
789class MQTTActuatorAdapter(ActuatorAdapter):
790 """Send commands via MQTT publish (IoT actuators, smart home devices).
792 Publishes JSON-encoded commands to the configured ``topic``.
793 Optionally subscribes to a ``response_topic`` for acknowledgment.
795 Requires paho-mqtt (optional dependency).
796 """
798 def __init__(
799 self,
800 actuator_id: str,
801 actuator_type: str,
802 broker: str = 'localhost',
803 port: int = 1883,
804 topic: str = '',
805 response_topic: str = '',
806 username: str = '',
807 password: str = '',
808 qos: int = 1,
809 ):
810 super().__init__(actuator_id, actuator_type)
811 self._broker = broker
812 self._port = port
813 self._topic = topic
814 self._response_topic = response_topic
815 self._username = username
816 self._password = password
817 self._qos = qos
818 self._lock = threading.Lock()
820 def execute(self, command: ActuatorCommand) -> dict:
821 if not command.safety_cleared:
822 return {'ok': False, 'error': 'command not safety cleared'}
824 try:
825 import paho.mqtt.client as mqtt
826 except ImportError:
827 return {'ok': False, 'error': 'paho-mqtt not installed'}
829 with self._lock:
830 try:
831 client = mqtt.Client()
832 if self._username:
833 client.username_pw_set(self._username, self._password)
835 # If response topic is set, subscribe for ack
836 ack_holder: List[Optional[dict]] = [None]
837 ack_event = threading.Event()
839 if self._response_topic:
840 def on_message(_c, _u, msg):
841 try:
842 ack_holder[0] = json.loads(
843 msg.payload.decode('utf-8', errors='ignore'))
844 except (json.JSONDecodeError, ValueError):
845 ack_holder[0] = {'raw': msg.payload.decode(
846 'utf-8', errors='ignore')}
847 ack_event.set()
849 client.on_message = on_message
851 client.connect(self._broker, self._port, keepalive=10)
853 if self._response_topic:
854 client.subscribe(self._response_topic)
856 client.loop_start()
858 payload = json.dumps(command.command)
859 client.publish(self._topic, payload.encode('utf-8'),
860 qos=self._qos)
862 if self._response_topic:
863 ack_event.wait(timeout=5.0)
865 client.loop_stop()
866 client.disconnect()
868 return {
869 'ok': True,
870 'response': ack_holder[0],
871 'topic': self._topic,
872 }
873 except Exception as exc:
874 return {'ok': False, 'error': str(exc)}
876 def get_state(self) -> dict:
877 # MQTT actuators typically don't support state queries.
878 # Return connection metadata instead.
879 return {
880 'broker': self._broker,
881 'topic': self._topic,
882 'response_topic': self._response_topic or None,
883 }
885 def emergency_stop(self) -> None:
886 try:
887 import paho.mqtt.client as mqtt
888 except ImportError:
889 return
891 with self._lock:
892 try:
893 client = mqtt.Client()
894 if self._username:
895 client.username_pw_set(self._username, self._password)
896 client.connect(self._broker, self._port, keepalive=10)
897 payload = json.dumps({'action': 'emergency_stop'})
898 client.publish(self._topic, payload.encode('utf-8'), qos=2)
899 client.disconnect()
900 except Exception as exc:
901 logger.error("MQTTActuatorAdapter E-stop failed: %s", exc)
904# ======================================================================
905# Safety Monitor (inline -- used when external safety_monitor is absent)
906# ======================================================================
909class SafetyMonitor:
910 """Validates every actuator command before execution.
912 Enforces:
913 - Maximum velocity limits
914 - Maximum force limits
915 - Workspace bounds (3D bounding box)
916 - Emergency stop detection and propagation
918 This is a lightweight inline monitor. The full-featured
919 ``integrations.robotics.safety_monitor.SafetyMonitor`` takes
920 precedence when available (HardwareBridge._safety_gate checks
921 for it first).
923 Thread-safe.
924 """
926 DEFAULT_MAX_VELOCITY: float = 2.0 # m/s
927 DEFAULT_MAX_FORCE: float = 50.0 # Newtons
928 DEFAULT_WORKSPACE_BOUNDS: Dict[str, Tuple[float, float]] = {
929 'x': (-5.0, 5.0),
930 'y': (-5.0, 5.0),
931 'z': (0.0, 3.0),
932 }
934 def __init__(
935 self,
936 max_velocity: float = DEFAULT_MAX_VELOCITY,
937 max_force: float = DEFAULT_MAX_FORCE,
938 workspace_bounds: Optional[Dict[str, Tuple[float, float]]] = None,
939 ):
940 self._lock = threading.Lock()
941 self.max_velocity = max_velocity
942 self.max_force = max_force
943 self.workspace_bounds = workspace_bounds or dict(self.DEFAULT_WORKSPACE_BOUNDS)
944 self._estop_active = False
945 self._estop_reason = ''
947 @property
948 def is_estopped(self) -> bool:
949 """Return True if emergency stop is active."""
950 with self._lock:
951 return self._estop_active
953 def trigger_estop(self, reason: str = 'manual') -> None:
954 """Activate emergency stop."""
955 with self._lock:
956 self._estop_active = True
957 self._estop_reason = reason
958 logger.warning("SafetyMonitor: E-STOP triggered -- %s", reason)
960 def clear_estop(self) -> None:
961 """Clear emergency stop. Only human operators should call this."""
962 with self._lock:
963 self._estop_active = False
964 self._estop_reason = ''
965 logger.info("SafetyMonitor: E-STOP cleared")
967 def check_command(self, command: ActuatorCommand) -> Tuple[bool, str]:
968 """Validate a command against all safety constraints.
970 Returns (safe, reason). If safe is False, reason explains why.
971 """
972 if self.is_estopped:
973 return False, f'E-STOP active: {self._estop_reason}'
975 params = command.command.get('params', {})
977 # Velocity check
978 speed = params.get('speed', params.get('velocity'))
979 if speed is not None:
980 try:
981 if abs(float(speed)) > self.max_velocity:
982 return False, (
983 f'velocity {speed} exceeds limit {self.max_velocity} m/s')
984 except (TypeError, ValueError):
985 pass
987 # Force check
988 force = params.get('force', params.get('torque'))
989 if force is not None:
990 try:
991 if abs(float(force)) > self.max_force:
992 return False, (
993 f'force {force} exceeds limit {self.max_force} N')
994 except (TypeError, ValueError):
995 pass
997 # Workspace bounds check
998 if not self.check_position_safe(params):
999 return False, 'position outside workspace bounds'
1001 return True, ''
1003 def check_position_safe(self, params: dict) -> bool:
1004 """Check whether position parameters fall within workspace bounds.
1006 Supports both Cartesian (x,y,z) and joint (joint_N) keys.
1007 """
1008 for axis, (lo, hi) in self.workspace_bounds.items():
1009 val = params.get(axis)
1010 if val is not None:
1011 try:
1012 if float(val) < lo or float(val) > hi:
1013 return False
1014 except (TypeError, ValueError):
1015 pass
1016 return True
1018 def gate_commands(
1019 self, commands: List[ActuatorCommand],
1020 ) -> List[ActuatorCommand]:
1021 """Filter a batch of commands. Sets safety_cleared on passed ones.
1023 Returns only commands that passed.
1024 """
1025 cleared: List[ActuatorCommand] = []
1026 for cmd in commands:
1027 safe, reason = self.check_command(cmd)
1028 if safe:
1029 cmd.safety_cleared = True
1030 cleared.append(cmd)
1031 else:
1032 logger.warning(
1033 "SafetyMonitor blocked command for %s: %s",
1034 cmd.actuator_id, reason,
1035 )
1036 return cleared
1039# Module-level inline safety monitor singleton
1040_inline_safety: Optional[SafetyMonitor] = None
1041_inline_safety_lock = threading.Lock()
1044def get_inline_safety_monitor() -> SafetyMonitor:
1045 """Get or create the inline SafetyMonitor singleton."""
1046 global _inline_safety
1047 if _inline_safety is None:
1048 with _inline_safety_lock:
1049 if _inline_safety is None:
1050 _inline_safety = SafetyMonitor()
1051 return _inline_safety
1054# ======================================================================
1055# Hardware Bridge
1056# ======================================================================
1058# Experience buffer limits
1059_EXPERIENCE_MAX_SIZE = 10_000
1060_EXPERIENCE_AUTO_FLUSH = 100
1063class HardwareBridge:
1064 """Close the loop between physical robots and the hive.
1066 Owns three responsibilities:
1068 1. **Inbound** -- collects sensor readings from registered adapters,
1069 normalizes them, and pushes them to SensorStore + EventBus.
1070 2. **Outbound** -- translates action plans into actuator commands,
1071 gates them through SafetyMonitor, and executes them.
1072 3. **Learning** -- records sensor->action->outcome triples as
1073 Experiences and flushes them to WorldModelBridge so the hive
1074 learns from every robot.
1075 """
1077 def __init__(self, robot_id: str):
1078 self._robot_id = robot_id
1079 self._sensor_adapters: Dict[str, SensorAdapter] = {}
1080 self._actuator_adapters: Dict[str, ActuatorAdapter] = {}
1081 self._experience_buffer: deque = deque(maxlen=_EXPERIENCE_MAX_SIZE)
1082 self._running = False
1083 self._sensor_thread: Optional[threading.Thread] = None
1084 self._lock = threading.Lock()
1085 self._latest_readings: Dict[str, SensorReading] = {}
1086 self._stats = {
1087 'readings_received': 0,
1088 'actions_executed': 0,
1089 'experiences_recorded': 0,
1090 'experiences_flushed': 0,
1091 }
1093 # ------------------------------------------------------------------
1094 # Registration
1095 # ------------------------------------------------------------------
1097 def register_sensor(self, adapter: SensorAdapter) -> None:
1098 """Register a sensor input adapter.
1100 The adapter's ``sensor_id`` is used as the key. Registering a
1101 second adapter with the same ID replaces the first (the old
1102 stream is stopped).
1103 """
1104 with self._lock:
1105 old = self._sensor_adapters.get(adapter.sensor_id)
1106 if old is not None:
1107 try:
1108 old.stop_stream()
1109 except Exception:
1110 pass
1111 self._sensor_adapters[adapter.sensor_id] = adapter
1112 logger.info(
1113 "[HardwareBridge:%s] sensor registered: %s (%s)",
1114 self._robot_id, adapter.sensor_id, adapter.sensor_type,
1115 )
1116 # If already running, start the new adapter's stream immediately
1117 if self._running:
1118 self._start_adapter_stream(adapter)
1120 def register_actuator(self, adapter: ActuatorAdapter) -> None:
1121 """Register an actuator output adapter.
1123 The adapter's ``actuator_id`` is used as the key.
1124 """
1125 with self._lock:
1126 old = self._actuator_adapters.get(adapter.actuator_id)
1127 if old is not None:
1128 try:
1129 old.emergency_stop()
1130 except Exception:
1131 pass
1132 self._actuator_adapters[adapter.actuator_id] = adapter
1133 logger.info(
1134 "[HardwareBridge:%s] actuator registered: %s (%s)",
1135 self._robot_id, adapter.actuator_id, adapter.actuator_type,
1136 )
1138 # ------------------------------------------------------------------
1139 # Lifecycle
1140 # ------------------------------------------------------------------
1142 def start(self) -> None:
1143 """Start all sensor streams and begin experience collection."""
1144 with self._lock:
1145 if self._running:
1146 return
1147 self._running = True
1149 logger.info("[HardwareBridge:%s] starting", self._robot_id)
1151 with self._lock:
1152 adapters = list(self._sensor_adapters.values())
1154 for adapter in adapters:
1155 self._start_adapter_stream(adapter)
1157 def stop(self) -> None:
1158 """Stop all sensor streams, emergency-stop actuators, flush experiences."""
1159 with self._lock:
1160 if not self._running:
1161 return
1162 self._running = False
1164 logger.info("[HardwareBridge:%s] stopping", self._robot_id)
1166 # Stop all sensor streams
1167 with self._lock:
1168 sensor_adapters = list(self._sensor_adapters.values())
1169 actuator_adapters = list(self._actuator_adapters.values())
1171 for adapter in sensor_adapters:
1172 try:
1173 adapter.stop_stream()
1174 except Exception as exc:
1175 logger.debug("Error stopping sensor %s: %s", adapter.sensor_id, exc)
1177 # Emergency-stop all actuators
1178 for adapter in actuator_adapters:
1179 try:
1180 adapter.emergency_stop()
1181 except Exception as exc:
1182 logger.debug("Error stopping actuator %s: %s", adapter.actuator_id, exc)
1184 # Flush remaining experiences
1185 self._flush_experiences()
1187 # ------------------------------------------------------------------
1188 # Sensor snapshot
1189 # ------------------------------------------------------------------
1191 def get_sensor_snapshot(self) -> dict:
1192 """Return the latest reading from every registered sensor.
1194 Returns a dict keyed by sensor_id, values are reading dicts.
1195 """
1196 with self._lock:
1197 snapshot = {}
1198 for sid, reading in self._latest_readings.items():
1199 snapshot[sid] = {
1200 'sensor_id': reading.sensor_id,
1201 'sensor_type': reading.sensor_type,
1202 'data': reading.data,
1203 'timestamp': reading.timestamp,
1204 }
1205 return snapshot
1207 # ------------------------------------------------------------------
1208 # Action execution
1209 # ------------------------------------------------------------------
1211 def execute_action(self, plan: dict) -> dict:
1212 """Execute an action plan from the intelligence layer.
1214 The plan is a dict with ``commands`` -- a list of dicts each
1215 having at least ``actuator_id`` and ``command``. Example::
1217 {
1218 "commands": [
1219 {"actuator_id": "motor_left", "command": {"action": "move", "params": {"speed": 0.5}}},
1220 {"actuator_id": "motor_right", "command": {"action": "move", "params": {"speed": 0.5}}},
1221 ]
1222 }
1224 Every command passes through the safety gate before reaching
1225 the actuator.
1227 Returns a result dict with per-actuator outcomes.
1228 """
1229 # Resource governor check
1230 if not self._resource_ok():
1231 return {'ok': False, 'error': 'resource governor denied', 'results': {}}
1233 raw_commands = plan.get('commands', [])
1234 if not raw_commands:
1235 return {'ok': False, 'error': 'no commands in plan', 'results': {}}
1237 # Build ActuatorCommand objects
1238 commands: List[ActuatorCommand] = []
1239 for entry in raw_commands:
1240 aid = entry.get('actuator_id', '')
1241 with self._lock:
1242 adapter = self._actuator_adapters.get(aid)
1243 if adapter is None:
1244 logger.warning(
1245 "[HardwareBridge:%s] unknown actuator_id: %s",
1246 self._robot_id, aid,
1247 )
1248 continue
1249 commands.append(ActuatorCommand(
1250 actuator_id=aid,
1251 actuator_type=adapter.actuator_type,
1252 command=entry.get('command', {}),
1253 ))
1255 if not commands:
1256 return {'ok': False, 'error': 'no valid actuator targets', 'results': {}}
1258 # Safety gate -- mandatory
1259 cleared = self._safety_gate(commands)
1261 # Execute cleared commands
1262 results: Dict[str, dict] = {}
1263 any_ok = False
1264 for cmd in cleared:
1265 with self._lock:
1266 adapter = self._actuator_adapters.get(cmd.actuator_id)
1267 if adapter is None:
1268 results[cmd.actuator_id] = {'ok': False, 'error': 'adapter gone'}
1269 continue
1270 try:
1271 result = adapter.execute(cmd)
1272 results[cmd.actuator_id] = result
1273 if result.get('ok'):
1274 any_ok = True
1275 except Exception as exc:
1276 results[cmd.actuator_id] = {'ok': False, 'error': str(exc)}
1278 with self._lock:
1279 self._stats['actions_executed'] += 1
1281 # Emit event
1282 self._emit('robot.action_executed', {
1283 'robot_id': self._robot_id,
1284 'plan': plan,
1285 'results': results,
1286 })
1288 # Record experience: snapshot sensors before and after
1289 sensor_before = self.get_sensor_snapshot()
1290 # Small pause to let sensors update with post-action readings
1291 # (non-blocking, real feedback comes on next sensor cycle)
1292 outcome = {
1293 'results': results,
1294 'any_ok': any_ok,
1295 'sensor_after': self.get_sensor_snapshot(),
1296 }
1297 self._record_experience(
1298 sensor_state=sensor_before,
1299 action=plan,
1300 outcome=outcome,
1301 )
1303 return {'ok': any_ok, 'results': results}
1305 # ------------------------------------------------------------------
1306 # Think-and-act: full soft+hard cycle
1307 # ------------------------------------------------------------------
1309 def think_and_act(self, context: str = '') -> dict:
1310 """Full loop: sense -> think -> act -> learn.
1312 1. Read sensor snapshot
1313 2. Call the intelligence API (intelligence_api.think())
1314 3. Execute the resulting action plan
1315 4. Record the experience
1317 This is the complete soft+hard cycle.
1319 Args:
1320 context: Optional textual context for the intelligence layer
1321 (e.g. 'navigate to charging station').
1323 Returns:
1324 Dict with ``sensors``, ``thought``, ``action_result``, ``experience_count``.
1325 """
1326 # 1. Sense
1327 sensor_snapshot = self.get_sensor_snapshot()
1329 # 2. Think -- call intelligence API
1330 thought = self._call_intelligence(sensor_snapshot, context)
1332 # 3. Act -- execute the action plan from the intelligence layer
1333 action_plan = thought.get('action_plan', {})
1334 action_result = {}
1335 if action_plan and action_plan.get('commands'):
1336 action_result = self.execute_action(action_plan)
1337 else:
1338 # No action plan, but still record the observation experience
1339 self._record_experience(
1340 sensor_state=sensor_snapshot,
1341 action={'noop': True, 'context': context},
1342 outcome={'thought': thought, 'action_result': None},
1343 )
1345 return {
1346 'robot_id': self._robot_id,
1347 'sensors': sensor_snapshot,
1348 'thought': thought,
1349 'action_result': action_result,
1350 'experience_count': len(self._experience_buffer),
1351 }
1353 # ------------------------------------------------------------------
1354 # Safety gate
1355 # ------------------------------------------------------------------
1357 def _safety_gate(self, commands: List[ActuatorCommand]) -> List[ActuatorCommand]:
1358 """Filter commands through SafetyMonitor.
1360 Every command must pass the safety check. Commands that fail
1361 are logged and dropped -- they never reach an actuator.
1363 Prefers the full external SafetyMonitor from safety_monitor.py.
1364 Falls back to the inline SafetyMonitor defined in this module
1365 when the external one is not importable. Safety is
1366 NON-NEGOTIABLE -- every command is gated.
1368 Returns the list of commands with ``safety_cleared=True``.
1369 """
1370 # Try the full-featured external monitor first
1371 monitor = None
1372 try:
1373 from integrations.robotics.safety_monitor import get_safety_monitor
1374 monitor = get_safety_monitor()
1375 except ImportError:
1376 pass
1378 # Fall back to inline SafetyMonitor
1379 if monitor is None:
1380 monitor = get_inline_safety_monitor()
1381 logger.debug(
1382 "[HardwareBridge:%s] using inline SafetyMonitor "
1383 "(external safety_monitor not available)", self._robot_id,
1384 )
1386 # Global E-stop check
1387 if monitor.is_estopped:
1388 logger.warning(
1389 "[HardwareBridge:%s] E-stop active -- all commands blocked",
1390 self._robot_id,
1391 )
1392 return []
1394 # Gate each command through the monitor
1395 cleared: List[ActuatorCommand] = []
1396 for cmd in commands:
1397 # If the monitor has check_command (inline SafetyMonitor),
1398 # use it for comprehensive validation.
1399 if hasattr(monitor, 'check_command'):
1400 safe, reason = monitor.check_command(cmd)
1401 if not safe:
1402 logger.warning(
1403 "[HardwareBridge:%s] command for %s blocked: %s",
1404 self._robot_id, cmd.actuator_id, reason,
1405 )
1406 continue
1407 cmd.safety_cleared = True
1408 cleared.append(cmd)
1409 else:
1410 # External monitor: position + velocity checks
1411 params = cmd.command.get('params', {})
1412 position_keys = {'x', 'y', 'z', 'joint_0', 'joint_1',
1413 'joint_2', 'joint_3', 'joint_4', 'joint_5'}
1414 position = {k: v for k, v in params.items()
1415 if k in position_keys}
1417 if position:
1418 if not monitor.check_position_safe(position):
1419 logger.warning(
1420 "[HardwareBridge:%s] command for %s blocked: "
1421 "position outside workspace limits",
1422 self._robot_id, cmd.actuator_id,
1423 )
1424 continue
1426 # Velocity sanity check
1427 speed = params.get('speed', params.get('velocity'))
1428 if speed is not None:
1429 try:
1430 if abs(float(speed)) > 10.0:
1431 logger.warning(
1432 "[HardwareBridge:%s] command for %s blocked: "
1433 "speed %.2f exceeds limit",
1434 self._robot_id, cmd.actuator_id, float(speed),
1435 )
1436 continue
1437 except (TypeError, ValueError):
1438 pass
1440 cmd.safety_cleared = True
1441 cleared.append(cmd)
1443 return cleared
1445 # ------------------------------------------------------------------
1446 # Experience recording + flushing
1447 # ------------------------------------------------------------------
1449 def _record_experience(
1450 self,
1451 sensor_state: dict,
1452 action: dict,
1453 outcome: dict,
1454 ) -> None:
1455 """Buffer a sensor->action->outcome triple.
1457 Auto-flushes every ``_EXPERIENCE_AUTO_FLUSH`` experiences.
1458 """
1459 reward = self._compute_reward(action, outcome)
1461 exp = Experience(
1462 robot_id=self._robot_id,
1463 sensor_state=sensor_state,
1464 action_taken=action,
1465 outcome=outcome,
1466 reward=reward,
1467 )
1469 with self._lock:
1470 self._experience_buffer.append(exp)
1471 self._stats['experiences_recorded'] += 1
1472 buffer_len = len(self._experience_buffer)
1474 self._emit('robot.experience_recorded', {
1475 'robot_id': self._robot_id,
1476 'reward': reward,
1477 'buffer_size': buffer_len,
1478 })
1480 # Auto-flush when buffer reaches threshold
1481 if buffer_len >= _EXPERIENCE_AUTO_FLUSH:
1482 self._flush_experiences()
1484 def _flush_experiences(self) -> int:
1485 """Push buffered experiences to WorldModelBridge for HevolveAI training.
1487 Returns the number of experiences successfully flushed.
1488 """
1489 with self._lock:
1490 if not self._experience_buffer:
1491 return 0
1492 batch = list(self._experience_buffer)
1493 self._experience_buffer.clear()
1495 # Convert to dicts for transport
1496 experience_dicts = []
1497 for exp in batch:
1498 experience_dicts.append({
1499 'robot_id': exp.robot_id,
1500 'sensor_state': exp.sensor_state,
1501 'action_taken': exp.action_taken,
1502 'outcome': exp.outcome,
1503 'reward': exp.reward,
1504 'timestamp': exp.timestamp,
1505 })
1507 flushed = 0
1508 try:
1509 from integrations.agent_engine.world_model_bridge import (
1510 get_world_model_bridge,
1511 )
1512 bridge = get_world_model_bridge()
1514 # Use ingest_sensor_batch for the sensor component
1515 sensor_readings = []
1516 for exp_dict in experience_dicts:
1517 for sid, reading_data in exp_dict.get('sensor_state', {}).items():
1518 sensor_readings.append(reading_data)
1519 if sensor_readings:
1520 bridge.ingest_sensor_batch(sensor_readings)
1522 # Send actions for the action component
1523 for exp_dict in experience_dicts:
1524 action = exp_dict.get('action_taken', {})
1525 if action and not action.get('noop'):
1526 bridge.send_action(action)
1528 flushed = len(batch)
1529 except ImportError:
1530 logger.debug(
1531 "[HardwareBridge:%s] WorldModelBridge unavailable -- "
1532 "%d experiences dropped", self._robot_id, len(batch),
1533 )
1534 except Exception as exc:
1535 logger.warning(
1536 "[HardwareBridge:%s] experience flush failed: %s",
1537 self._robot_id, exc,
1538 )
1540 with self._lock:
1541 self._stats['experiences_flushed'] += flushed
1543 return flushed
1545 # ------------------------------------------------------------------
1546 # Internal helpers
1547 # ------------------------------------------------------------------
1549 def _start_adapter_stream(self, adapter: SensorAdapter) -> None:
1550 """Start a single sensor adapter's stream with our ingestion callback."""
1551 def on_reading(reading: SensorReading):
1552 self._on_sensor_reading(reading)
1554 try:
1555 adapter.start_stream(on_reading)
1556 except Exception as exc:
1557 logger.warning(
1558 "[HardwareBridge:%s] failed to start stream for %s: %s",
1559 self._robot_id, adapter.sensor_id, exc,
1560 )
1562 def _on_sensor_reading(self, reading: SensorReading) -> None:
1563 """Handle an incoming sensor reading from any adapter."""
1564 # Resource governor gate
1565 if not self._resource_ok():
1566 return
1568 with self._lock:
1569 self._latest_readings[reading.sensor_id] = reading
1570 self._stats['readings_received'] += 1
1572 # Push to SensorStore (existing robotics infrastructure)
1573 try:
1574 from integrations.robotics.sensor_store import get_sensor_store
1575 from integrations.robotics.sensor_model import SensorReading as StoreSensorReading
1577 store = get_sensor_store()
1578 store_reading = StoreSensorReading(
1579 sensor_id=reading.sensor_id,
1580 sensor_type=reading.sensor_type,
1581 data=reading.data if isinstance(reading.data, dict) else {'value': reading.data},
1582 source='hardware_bridge',
1583 )
1584 store.put_reading(store_reading)
1585 except ImportError:
1586 pass
1587 except Exception as exc:
1588 logger.debug("SensorStore push failed: %s", exc)
1590 # Emit event
1591 self._emit('robot.sensor_reading', {
1592 'robot_id': self._robot_id,
1593 'sensor_id': reading.sensor_id,
1594 'sensor_type': reading.sensor_type,
1595 'data': reading.data,
1596 'timestamp': reading.timestamp,
1597 })
1599 def _call_intelligence(self, sensor_snapshot: dict, context: str) -> dict:
1600 """Call the intelligence API for multi-intelligence fusion.
1602 Falls back gracefully if the intelligence API is not available.
1603 """
1604 try:
1605 from integrations.robotics.intelligence_api import think
1606 return think(
1607 robot_id=self._robot_id,
1608 sensor_snapshot=sensor_snapshot,
1609 context=context,
1610 )
1611 except ImportError:
1612 logger.debug(
1613 "[HardwareBridge:%s] intelligence_api not available, "
1614 "returning empty thought", self._robot_id,
1615 )
1616 return {
1617 'action_plan': {},
1618 'reasoning': 'intelligence API not available',
1619 }
1620 except Exception as exc:
1621 logger.warning(
1622 "[HardwareBridge:%s] intelligence call failed: %s",
1623 self._robot_id, exc,
1624 )
1625 return {
1626 'action_plan': {},
1627 'reasoning': f'intelligence error: {exc}',
1628 }
1630 @staticmethod
1631 def _compute_reward(action: dict, outcome: dict) -> float:
1632 """Compute a simple reward signal from action and outcome.
1634 Basic heuristic: +1.0 for success, -0.5 for failure, 0.0 for noop.
1635 Real reward shaping belongs in HevolveAI -- this is just a bootstrap
1636 signal so the experience buffer carries a non-zero reward.
1637 """
1638 if action.get('noop'):
1639 return 0.0
1640 results = outcome.get('results', {})
1641 if not results:
1642 return 0.0
1643 successes = sum(1 for r in results.values() if r.get('ok'))
1644 total = len(results)
1645 if total == 0:
1646 return 0.0
1647 ratio = successes / total
1648 # Scale: all success = +1.0, all fail = -0.5, mixed = proportional
1649 return ratio * 1.5 - 0.5
1651 @staticmethod
1652 def _resource_ok() -> bool:
1653 """Check resource governor. Returns True if processing is allowed."""
1654 try:
1655 from core.resource_governor import should_proceed
1656 return should_proceed('cpu_heavy')
1657 except ImportError:
1658 return True # No governor = no throttling
1660 @staticmethod
1661 def _emit(topic: str, data: Any) -> None:
1662 """Emit an event on the platform EventBus (best-effort)."""
1663 try:
1664 from core.platform.events import emit_event
1665 emit_event(topic, data)
1666 except ImportError:
1667 pass
1668 except Exception:
1669 pass
1671 def get_stats(self) -> dict:
1672 """Return bridge statistics for monitoring."""
1673 with self._lock:
1674 return {
1675 'robot_id': self._robot_id,
1676 'sensors_registered': len(self._sensor_adapters),
1677 'actuators_registered': len(self._actuator_adapters),
1678 'experience_buffer_size': len(self._experience_buffer),
1679 'running': self._running,
1680 **dict(self._stats),
1681 }
1684# ======================================================================
1685# Module-level bridge registry
1686# ======================================================================
1688_bridges: Dict[str, HardwareBridge] = {}
1689_bridges_lock = threading.Lock()
1692def get_bridge(robot_id: str) -> HardwareBridge:
1693 """Get or create a HardwareBridge for the given robot ID.
1695 Thread-safe. Creates a new bridge on first access for each robot_id.
1696 """
1697 if robot_id not in _bridges:
1698 with _bridges_lock:
1699 if robot_id not in _bridges:
1700 _bridges[robot_id] = HardwareBridge(robot_id)
1701 return _bridges[robot_id]
1704def list_bridges() -> List[str]:
1705 """Return all registered robot IDs."""
1706 with _bridges_lock:
1707 return list(_bridges.keys())
1710# ======================================================================
1711# Flask Blueprint
1712# ======================================================================
1715def _create_blueprint():
1716 """Create the Flask blueprint for hardware bridge endpoints.
1718 Lazy import to avoid requiring Flask at module load time.
1719 """
1720 try:
1721 from flask import Blueprint, jsonify, request
1722 except ImportError:
1723 return None
1725 hardware_bp = Blueprint('hardware_bridge', __name__)
1727 @hardware_bp.route('/api/robot/<robot_id>/sensors/register', methods=['POST'])
1728 def register_sensor_endpoint(robot_id: str):
1729 """Register a sensor adapter via HTTP.
1731 Body: {
1732 "sensor_id": "cam_front",
1733 "sensor_type": "camera",
1734 "adapter_type": "http", # http | mqtt | serial
1735 "config": { # adapter-specific config
1736 "url": "http://...",
1737 "poll_interval": 1.0
1738 }
1739 }
1740 """
1741 body = request.get_json(silent=True) or {}
1742 sensor_id = body.get('sensor_id', '')
1743 sensor_type = body.get('sensor_type', '')
1744 adapter_type = body.get('adapter_type', '')
1745 config = body.get('config', {})
1747 if not sensor_id or not sensor_type:
1748 return jsonify({'error': 'sensor_id and sensor_type required'}), 400
1750 adapter = _build_sensor_adapter(sensor_id, sensor_type, adapter_type, config)
1751 if adapter is None:
1752 return jsonify({'error': f'unknown adapter_type: {adapter_type}'}), 400
1754 bridge = get_bridge(robot_id)
1755 bridge.register_sensor(adapter)
1756 return jsonify({'ok': True, 'sensor_id': sensor_id}), 200
1758 @hardware_bp.route('/api/robot/<robot_id>/actuators/register', methods=['POST'])
1759 def register_actuator_endpoint(robot_id: str):
1760 """Register an actuator adapter via HTTP.
1762 Body: {
1763 "actuator_id": "motor_left",
1764 "actuator_type": "motor",
1765 "adapter_type": "serial", # serial | http
1766 "config": {
1767 "port": "/dev/ttyUSB0",
1768 "baudrate": 115200
1769 }
1770 }
1771 """
1772 body = request.get_json(silent=True) or {}
1773 actuator_id = body.get('actuator_id', '')
1774 actuator_type = body.get('actuator_type', '')
1775 adapter_type = body.get('adapter_type', '')
1776 config = body.get('config', {})
1778 if not actuator_id or not actuator_type:
1779 return jsonify({'error': 'actuator_id and actuator_type required'}), 400
1781 adapter = _build_actuator_adapter(
1782 actuator_id, actuator_type, adapter_type, config,
1783 )
1784 if adapter is None:
1785 return jsonify({'error': f'unknown adapter_type: {adapter_type}'}), 400
1787 bridge = get_bridge(robot_id)
1788 bridge.register_actuator(adapter)
1789 return jsonify({'ok': True, 'actuator_id': actuator_id}), 200
1791 @hardware_bp.route('/api/robot/<robot_id>/act', methods=['POST'])
1792 def act_endpoint(robot_id: str):
1793 """Execute an action plan.
1795 Body: {
1796 "commands": [
1797 {"actuator_id": "motor_left", "command": {"action": "move", "params": {"speed": 0.5}}}
1798 ]
1799 }
1800 """
1801 body = request.get_json(silent=True) or {}
1802 bridge = get_bridge(robot_id)
1803 result = bridge.execute_action(body)
1804 status = 200 if result.get('ok') else 400
1805 return jsonify(result), status
1807 @hardware_bp.route('/api/robot/<robot_id>/think_and_act', methods=['POST'])
1808 def think_and_act_endpoint(robot_id: str):
1809 """Full loop: sense -> think -> act -> learn.
1811 Body: {"context": "optional textual context"}
1812 """
1813 body = request.get_json(silent=True) or {}
1814 context = body.get('context', '')
1815 bridge = get_bridge(robot_id)
1816 result = bridge.think_and_act(context=context)
1817 return jsonify(result), 200
1819 @hardware_bp.route('/api/robot/<robot_id>/sensors/snapshot', methods=['GET'])
1820 def sensors_snapshot_endpoint(robot_id: str):
1821 """Get current state of all sensors."""
1822 bridge = get_bridge(robot_id)
1823 return jsonify(bridge.get_sensor_snapshot()), 200
1825 @hardware_bp.route('/api/robot/<robot_id>/experience/stats', methods=['GET'])
1826 def experience_stats_endpoint(robot_id: str):
1827 """Get experience buffer statistics."""
1828 bridge = get_bridge(robot_id)
1829 return jsonify(bridge.get_stats()), 200
1831 return hardware_bp
1834def _build_sensor_adapter(
1835 sensor_id: str,
1836 sensor_type: str,
1837 adapter_type: str,
1838 config: dict,
1839) -> Optional[SensorAdapter]:
1840 """Factory for sensor adapters from HTTP registration payloads."""
1841 adapter_type = adapter_type.lower()
1842 if adapter_type == 'http':
1843 return HTTPSensorAdapter(
1844 sensor_id=sensor_id,
1845 sensor_type=sensor_type,
1846 url=config.get('url', ''),
1847 poll_interval=config.get('poll_interval', 1.0),
1848 timeout=config.get('timeout', 5.0),
1849 headers=config.get('headers'),
1850 )
1851 elif adapter_type == 'mqtt':
1852 return MQTTSensorAdapter(
1853 sensor_id=sensor_id,
1854 sensor_type=sensor_type,
1855 broker=config.get('broker', 'localhost'),
1856 port=config.get('port', 1883),
1857 topic=config.get('topic', ''),
1858 username=config.get('username', ''),
1859 password=config.get('password', ''),
1860 )
1861 elif adapter_type == 'serial':
1862 return SerialSensorAdapter(
1863 sensor_id=sensor_id,
1864 sensor_type=sensor_type,
1865 port=config.get('port', ''),
1866 baudrate=config.get('baudrate', 115200),
1867 timeout=config.get('timeout', 0.1),
1868 )
1869 elif adapter_type in ('websocket', 'ws'):
1870 return WebSocketSensorAdapter(
1871 sensor_id=sensor_id,
1872 sensor_type=sensor_type,
1873 url=config.get('url', ''),
1874 headers=config.get('headers'),
1875 reconnect_delay=config.get('reconnect_delay', 2.0),
1876 )
1877 return None
1880def _build_actuator_adapter(
1881 actuator_id: str,
1882 actuator_type: str,
1883 adapter_type: str,
1884 config: dict,
1885) -> Optional[ActuatorAdapter]:
1886 """Factory for actuator adapters from HTTP registration payloads."""
1887 adapter_type = adapter_type.lower()
1888 if adapter_type == 'serial':
1889 return SerialActuatorAdapter(
1890 actuator_id=actuator_id,
1891 actuator_type=actuator_type,
1892 port=config.get('port', ''),
1893 baudrate=config.get('baudrate', 115200),
1894 timeout=config.get('timeout', 1.0),
1895 )
1896 elif adapter_type == 'http':
1897 return HTTPActuatorAdapter(
1898 actuator_id=actuator_id,
1899 actuator_type=actuator_type,
1900 url=config.get('url', ''),
1901 timeout=config.get('timeout', 5.0),
1902 headers=config.get('headers'),
1903 )
1904 elif adapter_type == 'mqtt':
1905 return MQTTActuatorAdapter(
1906 actuator_id=actuator_id,
1907 actuator_type=actuator_type,
1908 broker=config.get('broker', 'localhost'),
1909 port=config.get('port', 1883),
1910 topic=config.get('topic', ''),
1911 response_topic=config.get('response_topic', ''),
1912 username=config.get('username', ''),
1913 password=config.get('password', ''),
1914 qos=config.get('qos', 1),
1915 )
1916 return None
1919# ======================================================================
1920# Unified Robotics Blueprint (/api/robotics/...)
1921# ======================================================================
1924def create_robotics_blueprint():
1925 """Create the Flask blueprint for unified robotics endpoints.
1927 Exposes hive-wide robotics operations at ``/api/robotics/...``.
1928 Per-robot endpoints live on the ``_create_blueprint()`` blueprint above
1929 at ``/api/robot/<robot_id>/...``.
1931 Routes:
1932 GET /api/robotics/status -- all bridges status
1933 POST /api/robotics/think -- trigger think_and_act
1934 GET /api/robotics/sensors -- all sensor readings (all robots)
1935 POST /api/robotics/command -- direct actuator command (safety-gated)
1936 """
1937 try:
1938 from flask import Blueprint, jsonify, request
1939 except ImportError:
1940 return None
1942 robotics_bp = Blueprint('robotics', __name__)
1944 @robotics_bp.route('/api/robotics/status', methods=['GET'])
1945 def robotics_status():
1946 """GET /api/robotics/status -- all bridges status."""
1947 robot_ids = list_bridges()
1948 bridge_stats = []
1949 for rid in robot_ids:
1950 bridge = get_bridge(rid)
1951 bridge_stats.append(bridge.get_stats())
1952 return jsonify({
1953 'robot_count': len(robot_ids),
1954 'robots': bridge_stats,
1955 })
1957 @robotics_bp.route('/api/robotics/think', methods=['POST'])
1958 def robotics_think():
1959 """POST /api/robotics/think -- trigger think_and_act on a robot.
1961 Body: {
1962 "robot_id": "my-robot",
1963 "context": "navigate to kitchen"
1964 }
1965 """
1966 body = request.get_json(silent=True) or {}
1967 robot_id = body.get('robot_id', '')
1968 if not robot_id:
1969 return jsonify({'error': 'robot_id is required'}), 400
1970 context = body.get('context', '')
1971 bridge = get_bridge(robot_id)
1972 result = bridge.think_and_act(context=context)
1973 return jsonify(result)
1975 @robotics_bp.route('/api/robotics/sensors', methods=['GET'])
1976 def robotics_sensors():
1977 """GET /api/robotics/sensors -- all sensor readings from all robots."""
1978 robot_ids = list_bridges()
1979 all_readings: Dict[str, dict] = {}
1980 for rid in robot_ids:
1981 bridge = get_bridge(rid)
1982 snapshot = bridge.get_sensor_snapshot()
1983 if snapshot:
1984 all_readings[rid] = snapshot
1985 return jsonify({
1986 'robot_count': len(robot_ids),
1987 'readings': all_readings,
1988 })
1990 @robotics_bp.route('/api/robotics/command', methods=['POST'])
1991 def robotics_command():
1992 """POST /api/robotics/command -- direct actuator command (safety-gated).
1994 Body: {
1995 "robot_id": "my-robot",
1996 "commands": [
1997 {"actuator_id": "motor_left", "command": {"action": "move", "params": {"speed": 0.5}}}
1998 ]
1999 }
2000 """
2001 body = request.get_json(silent=True) or {}
2002 robot_id = body.get('robot_id', '')
2003 if not robot_id:
2004 return jsonify({'error': 'robot_id is required'}), 400
2005 bridge = get_bridge(robot_id)
2006 result = bridge.execute_action(body)
2007 status = 200 if result.get('ok') else 400
2008 return jsonify(result), status
2010 return robotics_bp