Coverage for integrations / robotics / hardware_bridge.py: 49.8%

847 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Hardware Bridge -- Close the loop between physical robots and the hive. 

3 

4Soft agents think. Hard agents act. This bridge connects them: 

5 

6INBOUND (sensors -> hive): 

7 - Normalize heterogeneous sensor data into a common schema 

8 - Support: USB serial, GPIO, ROS topics, HTTP streams, WebSocket, MQTT 

9 - Buffer and batch sensor readings for efficiency 

10 - Push to intelligence_api.py for multi-intelligence fusion 

11 

12OUTBOUND (hive -> actuators): 

13 - Translate action plans into hardware-specific commands 

14 - Support: serial commands, GPIO, ROS cmd_vel, HTTP actuator APIs 

15 - Safety gate: every command passes through safety_monitor before execution 

16 - Feedback loop: actuator response feeds back as sensor data 

17 

18EXPERIENCE (learning loop): 

19 - Every sensor->action->outcome triple is an experience 

20 - Experiences queue to WorldModelBridge for HevolveAI training 

21 - The hive learns from every robot's actions 

22 - Better models pushed back to all robots via federation 

23 

24Usage: 

25 from integrations.robotics.hardware_bridge import get_bridge 

26 

27 bridge = get_bridge('robot_01') 

28 bridge.register_sensor(HTTPSensorAdapter( 

29 sensor_id='cam_front', sensor_type='camera', 

30 url='http://192.168.1.10/capture', 

31 )) 

32 bridge.register_actuator(SerialActuatorAdapter( 

33 actuator_id='motor_left', actuator_type='motor', 

34 port='/dev/ttyUSB0', 

35 )) 

36 bridge.start() 

37 

38 # Full cycle: sense -> think -> act -> learn 

39 result = bridge.think_and_act(context='navigate to charging station') 

40""" 

41 

42import json 

43import logging 

44import struct 

45import threading 

46import time 

47from abc import ABC, abstractmethod 

48from collections import deque 

49from dataclasses import dataclass, field 

50from typing import Any, Callable, Dict, List, Optional, Tuple 

51 

52logger = logging.getLogger(__name__) 

53 

54# ====================================================================== 

55# Data classes 

56# ====================================================================== 

57 

58 

59@dataclass 

60class SensorReading: 

61 """Normalized sensor reading from any hardware source. 

62 

63 Carries both the normalized ``data`` and the original ``raw`` payload 

64 so that downstream intelligence can choose the representation it needs. 

65 """ 

66 sensor_id: str 

67 sensor_type: str # camera, lidar, imu, audio, touch, gps, temperature, proximity 

68 data: Any # Normalized data (dict, list, scalar) 

69 timestamp: float = field(default_factory=time.time) 

70 raw: Any = None # Original hardware-specific data 

71 

72 

73@dataclass 

74class ActuatorCommand: 

75 """Command destined for a physical actuator. 

76 

77 ``safety_cleared`` starts False and is only set True by the safety 

78 gate. Actuator adapters MUST refuse to execute commands where this 

79 flag is still False. 

80 """ 

81 actuator_id: str 

82 actuator_type: str # motor, servo, led, speaker, display, gripper 

83 command: dict # e.g. {action: 'move', params: {speed: 0.5, direction: 'forward'}} 

84 safety_cleared: bool = False 

85 timestamp: float = field(default_factory=time.time) 

86 

87 

88@dataclass 

89class Experience: 

90 """Sensor->action->outcome triple that feeds the learning loop. 

91 

92 The hive learns from every robot's actions. Each experience is 

93 flushed to WorldModelBridge -> HevolveAI so better models can be 

94 pushed back to all robots via federation. 

95 """ 

96 robot_id: str 

97 sensor_state: dict # Snapshot of all sensors at action time 

98 action_taken: dict # What the robot did 

99 outcome: dict # What happened (success/fail, sensor delta) 

100 reward: float # Computed reward signal 

101 timestamp: float = field(default_factory=time.time) 

102 

103 

104# ====================================================================== 

105# Adapter base classes 

106# ====================================================================== 

107 

108 

109class SensorAdapter(ABC): 

110 """Base for all sensor input adapters. 

111 

112 Subclasses bridge a specific transport (serial, HTTP, MQTT, GPIO, ROS) 

113 to the common SensorReading format. 

114 """ 

115 

116 def __init__(self, sensor_id: str, sensor_type: str): 

117 self.sensor_id = sensor_id 

118 self.sensor_type = sensor_type 

119 

120 @abstractmethod 

121 def read(self) -> Optional[SensorReading]: 

122 """Read a single sensor value (blocking). 

123 

124 Returns None if the sensor is unavailable or the read failed. 

125 """ 

126 ... 

127 

128 @abstractmethod 

129 def start_stream(self, callback: Callable[[SensorReading], None]) -> None: 

130 """Begin continuous streaming; call ``callback`` for each reading.""" 

131 ... 

132 

133 @abstractmethod 

134 def stop_stream(self) -> None: 

135 """Stop the continuous stream started by ``start_stream``.""" 

136 ... 

137 

138 

139class ActuatorAdapter(ABC): 

140 """Base for all actuator output adapters. 

141 

142 Subclasses translate ActuatorCommands into hardware-specific protocols 

143 (serial bytes, HTTP POST, GPIO writes, etc.). 

144 """ 

145 

146 def __init__(self, actuator_id: str, actuator_type: str): 

147 self.actuator_id = actuator_id 

148 self.actuator_type = actuator_type 

149 

150 @abstractmethod 

151 def execute(self, command: ActuatorCommand) -> dict: 

152 """Execute a command. Returns a result dict with at least ``{ok: bool}``.""" 

153 ... 

154 

155 @abstractmethod 

156 def get_state(self) -> dict: 

157 """Return current actuator state (position, speed, temperature, etc.).""" 

158 ... 

159 

160 @abstractmethod 

161 def emergency_stop(self) -> None: 

162 """Immediately halt this actuator. Must be idempotent.""" 

163 ... 

164 

165 

166# ====================================================================== 

167# Built-in sensor adapters 

168# ====================================================================== 

169 

170 

171class SerialSensorAdapter(SensorAdapter): 

172 """Read sensors via USB serial (Arduino, ESP32, etc.). 

173 

174 Expects line-oriented JSON from the device, e.g.:: 

175 

176 {\"accel_x\": 0.01, \"accel_y\": -9.81, \"accel_z\": 0.02} 

177 

178 Falls back to a raw-line reading if JSON parsing fails. 

179 """ 

180 

181 def __init__( 

182 self, 

183 sensor_id: str, 

184 sensor_type: str, 

185 port: str = '', 

186 baudrate: int = 115200, 

187 timeout: float = 0.1, 

188 ): 

189 super().__init__(sensor_id, sensor_type) 

190 self._port = port 

191 self._baudrate = baudrate 

192 self._timeout = timeout 

193 self._running = False 

194 self._thread: Optional[threading.Thread] = None 

195 self._callback: Optional[Callable] = None 

196 

197 def read(self) -> Optional[SensorReading]: 

198 try: 

199 import serial as pyserial # lazy -- optional dependency 

200 except ImportError: 

201 logger.debug("SerialSensorAdapter: pyserial not installed") 

202 return None 

203 try: 

204 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout) 

205 line = ser.readline().decode('utf-8', errors='ignore').strip() 

206 ser.close() 

207 if not line: 

208 return None 

209 return self._parse_line(line) 

210 except Exception as exc: 

211 logger.debug("SerialSensorAdapter read error on %s: %s", self._port, exc) 

212 return None 

213 

214 def start_stream(self, callback: Callable[[SensorReading], None]) -> None: 

215 if self._running: 

216 return 

217 self._callback = callback 

218 self._running = True 

219 self._thread = threading.Thread( 

220 target=self._stream_loop, 

221 name=f'serial_sensor_{self.sensor_id}', 

222 daemon=True, 

223 ) 

224 self._thread.start() 

225 

226 def stop_stream(self) -> None: 

227 self._running = False 

228 

229 def _stream_loop(self) -> None: 

230 try: 

231 import serial as pyserial 

232 except ImportError: 

233 logger.warning("SerialSensorAdapter: pyserial not installed, stream aborted") 

234 return 

235 while self._running: 

236 try: 

237 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout) 

238 while self._running: 

239 line = ser.readline().decode('utf-8', errors='ignore').strip() 

240 if line: 

241 reading = self._parse_line(line) 

242 if reading and self._callback: 

243 self._callback(reading) 

244 ser.close() 

245 except Exception as exc: 

246 logger.debug("SerialSensorAdapter stream error: %s", exc) 

247 time.sleep(1.0) 

248 

249 def _parse_line(self, line: str) -> SensorReading: 

250 raw = line 

251 try: 

252 data = json.loads(line) 

253 except (json.JSONDecodeError, ValueError): 

254 data = {'raw_line': line} 

255 return SensorReading( 

256 sensor_id=self.sensor_id, 

257 sensor_type=self.sensor_type, 

258 data=data, 

259 raw=raw, 

260 ) 

261 

262 

263class HTTPSensorAdapter(SensorAdapter): 

264 """Read sensors via HTTP endpoint (IP cameras, REST APIs). 

265 

266 GETs ``url`` and parses the JSON response as sensor data. 

267 For binary payloads (e.g. JPEG from an IP camera), the raw bytes 

268 are stored in ``raw`` and a description dict in ``data``. 

269 """ 

270 

271 def __init__( 

272 self, 

273 sensor_id: str, 

274 sensor_type: str, 

275 url: str = '', 

276 poll_interval: float = 1.0, 

277 timeout: float = 5.0, 

278 headers: Optional[dict] = None, 

279 ): 

280 super().__init__(sensor_id, sensor_type) 

281 self._url = url 

282 self._poll_interval = poll_interval 

283 self._timeout = timeout 

284 self._headers = headers or {} 

285 self._running = False 

286 self._thread: Optional[threading.Thread] = None 

287 self._callback: Optional[Callable] = None 

288 

289 def read(self) -> Optional[SensorReading]: 

290 try: 

291 from core.http_pool import pooled_get 

292 except ImportError: 

293 try: 

294 import requests 

295 pooled_get = requests.get 

296 except ImportError: 

297 logger.debug("HTTPSensorAdapter: no HTTP library available") 

298 return None 

299 try: 

300 resp = pooled_get(self._url, headers=self._headers, timeout=self._timeout) 

301 return self._parse_response(resp) 

302 except Exception as exc: 

303 logger.debug("HTTPSensorAdapter read error on %s: %s", self._url, exc) 

304 return None 

305 

306 def start_stream(self, callback: Callable[[SensorReading], None]) -> None: 

307 if self._running: 

308 return 

309 self._callback = callback 

310 self._running = True 

311 self._thread = threading.Thread( 

312 target=self._poll_loop, 

313 name=f'http_sensor_{self.sensor_id}', 

314 daemon=True, 

315 ) 

316 self._thread.start() 

317 

318 def stop_stream(self) -> None: 

319 self._running = False 

320 

321 def _poll_loop(self) -> None: 

322 while self._running: 

323 reading = self.read() 

324 if reading and self._callback: 

325 self._callback(reading) 

326 time.sleep(self._poll_interval) 

327 

328 def _parse_response(self, resp) -> Optional[SensorReading]: 

329 content_type = resp.headers.get('Content-Type', '') 

330 if 'json' in content_type: 

331 try: 

332 data = resp.json() 

333 except (ValueError, AttributeError): 

334 data = {'raw_text': resp.text[:4096]} 

335 return SensorReading( 

336 sensor_id=self.sensor_id, 

337 sensor_type=self.sensor_type, 

338 data=data, 

339 raw=resp.text[:4096], 

340 ) 

341 # Binary payload (e.g. JPEG from camera) 

342 raw_bytes = resp.content[:10_000_000] # cap at 10 MB 

343 return SensorReading( 

344 sensor_id=self.sensor_id, 

345 sensor_type=self.sensor_type, 

346 data={ 

347 'content_type': content_type, 

348 'size_bytes': len(raw_bytes), 

349 }, 

350 raw=raw_bytes, 

351 ) 

352 

353 

354class MQTTSensorAdapter(SensorAdapter): 

355 """Read sensors via MQTT topics (IoT devices). 

356 

357 Subscribes to ``topic`` and fires readings on every message. 

358 Requires paho-mqtt (optional dependency). 

359 """ 

360 

361 def __init__( 

362 self, 

363 sensor_id: str, 

364 sensor_type: str, 

365 broker: str = 'localhost', 

366 port: int = 1883, 

367 topic: str = '', 

368 username: str = '', 

369 password: str = '', 

370 ): 

371 super().__init__(sensor_id, sensor_type) 

372 self._broker = broker 

373 self._port = port 

374 self._topic = topic 

375 self._username = username 

376 self._password = password 

377 self._client = None 

378 self._callback: Optional[Callable] = None 

379 self._running = False 

380 

381 def read(self) -> Optional[SensorReading]: 

382 # MQTT is event-driven; single read not naturally supported. 

383 # Do a quick subscribe, grab one message, and disconnect. 

384 try: 

385 import paho.mqtt.client as mqtt 

386 except ImportError: 

387 logger.debug("MQTTSensorAdapter: paho-mqtt not installed") 

388 return None 

389 

390 result_holder: List[Optional[SensorReading]] = [None] 

391 event = threading.Event() 

392 

393 def on_message(_client, _userdata, msg): 

394 result_holder[0] = self._parse_payload(msg.payload) 

395 event.set() 

396 

397 try: 

398 client = mqtt.Client() 

399 if self._username: 

400 client.username_pw_set(self._username, self._password) 

401 client.on_message = on_message 

402 client.connect(self._broker, self._port, keepalive=10) 

403 client.subscribe(self._topic) 

404 client.loop_start() 

405 event.wait(timeout=5.0) 

406 client.loop_stop() 

407 client.disconnect() 

408 except Exception as exc: 

409 logger.debug("MQTTSensorAdapter read error: %s", exc) 

410 return result_holder[0] 

411 

412 def start_stream(self, callback: Callable[[SensorReading], None]) -> None: 

413 if self._running: 

414 return 

415 try: 

416 import paho.mqtt.client as mqtt 

417 except ImportError: 

418 logger.warning("MQTTSensorAdapter: paho-mqtt not installed, stream aborted") 

419 return 

420 

421 self._callback = callback 

422 self._running = True 

423 

424 def on_message(_client, _userdata, msg): 

425 reading = self._parse_payload(msg.payload) 

426 if reading and self._callback: 

427 self._callback(reading) 

428 

429 self._client = mqtt.Client() 

430 if self._username: 

431 self._client.username_pw_set(self._username, self._password) 

432 self._client.on_message = on_message 

433 try: 

434 self._client.connect(self._broker, self._port, keepalive=60) 

435 self._client.subscribe(self._topic) 

436 self._client.loop_start() 

437 except Exception as exc: 

438 logger.warning("MQTTSensorAdapter connect error: %s", exc) 

439 self._running = False 

440 

441 def stop_stream(self) -> None: 

442 self._running = False 

443 if self._client: 

444 try: 

445 self._client.loop_stop() 

446 self._client.disconnect() 

447 except Exception: 

448 pass 

449 self._client = None 

450 

451 def _parse_payload(self, payload: bytes) -> Optional[SensorReading]: 

452 raw = payload 

453 try: 

454 text = payload.decode('utf-8', errors='ignore') 

455 data = json.loads(text) 

456 except (json.JSONDecodeError, ValueError, UnicodeDecodeError): 

457 data = {'raw_bytes': len(payload)} 

458 return SensorReading( 

459 sensor_id=self.sensor_id, 

460 sensor_type=self.sensor_type, 

461 data=data, 

462 raw=raw, 

463 ) 

464 

465 

466class WebSocketSensorAdapter(SensorAdapter): 

467 """Read sensors via WebSocket stream. 

468 

469 Connects to a WebSocket endpoint and receives JSON messages as 

470 sensor readings. Useful for high-frequency data (IMU streams, 

471 depth cameras, real-time telemetry). 

472 

473 Requires the ``websocket-client`` package (optional dependency). 

474 """ 

475 

476 def __init__( 

477 self, 

478 sensor_id: str, 

479 sensor_type: str, 

480 url: str = '', 

481 headers: Optional[dict] = None, 

482 reconnect_delay: float = 2.0, 

483 ): 

484 super().__init__(sensor_id, sensor_type) 

485 self._url = url 

486 self._headers = headers or {} 

487 self._reconnect_delay = reconnect_delay 

488 self._running = False 

489 self._thread: Optional[threading.Thread] = None 

490 self._callback: Optional[Callable] = None 

491 self._ws = None 

492 self._last_reading: Optional[SensorReading] = None 

493 self._lock = threading.Lock() 

494 

495 def read(self) -> Optional[SensorReading]: 

496 """Return the most recent reading received via the WebSocket. 

497 

498 If the stream is not active, attempts a one-shot connect, 

499 reads a single frame, and disconnects. 

500 """ 

501 # If streaming, return cached latest 

502 with self._lock: 

503 if self._last_reading is not None: 

504 return self._last_reading 

505 

506 # One-shot read 

507 try: 

508 import websocket as ws_lib # websocket-client 

509 except ImportError: 

510 logger.debug("WebSocketSensorAdapter: websocket-client not installed") 

511 return None 

512 

513 try: 

514 ws = ws_lib.create_connection( 

515 self._url, 

516 header=self._headers, 

517 timeout=5.0, 

518 ) 

519 frame = ws.recv() 

520 ws.close() 

521 return self._parse_frame(frame) 

522 except Exception as exc: 

523 logger.debug("WebSocketSensorAdapter read error on %s: %s", 

524 self._url, exc) 

525 return None 

526 

527 def start_stream(self, callback: Callable[[SensorReading], None]) -> None: 

528 if self._running: 

529 return 

530 self._callback = callback 

531 self._running = True 

532 self._thread = threading.Thread( 

533 target=self._stream_loop, 

534 name=f'ws_sensor_{self.sensor_id}', 

535 daemon=True, 

536 ) 

537 self._thread.start() 

538 

539 def stop_stream(self) -> None: 

540 self._running = False 

541 ws = self._ws 

542 if ws is not None: 

543 try: 

544 ws.close() 

545 except Exception: 

546 pass 

547 self._ws = None 

548 

549 def _stream_loop(self) -> None: 

550 try: 

551 import websocket as ws_lib 

552 except ImportError: 

553 logger.warning("WebSocketSensorAdapter: websocket-client not installed, " 

554 "stream aborted") 

555 return 

556 

557 while self._running: 

558 try: 

559 ws = ws_lib.create_connection( 

560 self._url, 

561 header=self._headers, 

562 timeout=10.0, 

563 ) 

564 self._ws = ws 

565 while self._running: 

566 frame = ws.recv() 

567 if not frame: 

568 break 

569 reading = self._parse_frame(frame) 

570 if reading: 

571 with self._lock: 

572 self._last_reading = reading 

573 if self._callback: 

574 self._callback(reading) 

575 ws.close() 

576 except Exception as exc: 

577 logger.debug("WebSocketSensorAdapter stream error: %s", exc) 

578 finally: 

579 self._ws = None 

580 

581 if self._running: 

582 time.sleep(self._reconnect_delay) 

583 

584 def _parse_frame(self, frame) -> Optional[SensorReading]: 

585 if isinstance(frame, bytes): 

586 raw = frame 

587 try: 

588 text = frame.decode('utf-8', errors='ignore') 

589 data = json.loads(text) 

590 except (json.JSONDecodeError, ValueError, UnicodeDecodeError): 

591 data = {'raw_bytes': len(frame)} 

592 else: 

593 raw = frame 

594 try: 

595 data = json.loads(frame) 

596 except (json.JSONDecodeError, ValueError): 

597 data = {'raw_text': str(frame)[:4096]} 

598 

599 return SensorReading( 

600 sensor_id=self.sensor_id, 

601 sensor_type=self.sensor_type, 

602 data=data, 

603 raw=raw, 

604 ) 

605 

606 

607# ====================================================================== 

608# Built-in actuator adapters 

609# ====================================================================== 

610 

611 

612class SerialActuatorAdapter(ActuatorAdapter): 

613 """Send commands via USB serial to motor controllers, servo boards, etc. 

614 

615 Writes JSON-encoded commands to the serial port. The device is 

616 expected to respond with a JSON status line. 

617 """ 

618 

619 def __init__( 

620 self, 

621 actuator_id: str, 

622 actuator_type: str, 

623 port: str = '', 

624 baudrate: int = 115200, 

625 timeout: float = 1.0, 

626 ): 

627 super().__init__(actuator_id, actuator_type) 

628 self._port = port 

629 self._baudrate = baudrate 

630 self._timeout = timeout 

631 self._lock = threading.Lock() 

632 

633 def execute(self, command: ActuatorCommand) -> dict: 

634 if not command.safety_cleared: 

635 return {'ok': False, 'error': 'command not safety cleared'} 

636 try: 

637 import serial as pyserial 

638 except ImportError: 

639 return {'ok': False, 'error': 'pyserial not installed'} 

640 with self._lock: 

641 try: 

642 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout) 

643 payload = json.dumps(command.command) + '\n' 

644 ser.write(payload.encode('utf-8')) 

645 response_line = ser.readline().decode('utf-8', errors='ignore').strip() 

646 ser.close() 

647 if response_line: 

648 try: 

649 return {'ok': True, 'response': json.loads(response_line)} 

650 except (json.JSONDecodeError, ValueError): 

651 return {'ok': True, 'response': response_line} 

652 return {'ok': True, 'response': None} 

653 except Exception as exc: 

654 return {'ok': False, 'error': str(exc)} 

655 

656 def get_state(self) -> dict: 

657 try: 

658 import serial as pyserial 

659 except ImportError: 

660 return {'error': 'pyserial not installed'} 

661 with self._lock: 

662 try: 

663 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout) 

664 ser.write(b'{"action":"get_state"}\n') 

665 line = ser.readline().decode('utf-8', errors='ignore').strip() 

666 ser.close() 

667 if line: 

668 try: 

669 return json.loads(line) 

670 except (json.JSONDecodeError, ValueError): 

671 return {'raw': line} 

672 return {} 

673 except Exception as exc: 

674 return {'error': str(exc)} 

675 

676 def emergency_stop(self) -> None: 

677 try: 

678 import serial as pyserial 

679 except ImportError: 

680 return 

681 with self._lock: 

682 try: 

683 ser = pyserial.Serial(self._port, self._baudrate, timeout=self._timeout) 

684 ser.write(b'{"action":"emergency_stop"}\n') 

685 ser.close() 

686 except Exception as exc: 

687 logger.error("SerialActuatorAdapter E-stop write failed: %s", exc) 

688 

689 

690class HTTPActuatorAdapter(ActuatorAdapter): 

691 """Send commands via HTTP POST (robot APIs, smart home devices). 

692 

693 POSTs the command dict as JSON to ``url``. 

694 """ 

695 

696 def __init__( 

697 self, 

698 actuator_id: str, 

699 actuator_type: str, 

700 url: str = '', 

701 timeout: float = 5.0, 

702 headers: Optional[dict] = None, 

703 ): 

704 super().__init__(actuator_id, actuator_type) 

705 self._url = url 

706 self._timeout = timeout 

707 self._headers = headers or {} 

708 

709 def execute(self, command: ActuatorCommand) -> dict: 

710 if not command.safety_cleared: 

711 return {'ok': False, 'error': 'command not safety cleared'} 

712 post_fn = self._get_post_fn() 

713 if post_fn is None: 

714 return {'ok': False, 'error': 'no HTTP library available'} 

715 try: 

716 resp = post_fn( 

717 self._url, 

718 json=command.command, 

719 headers=self._headers, 

720 timeout=self._timeout, 

721 ) 

722 try: 

723 body = resp.json() 

724 except (ValueError, AttributeError): 

725 body = {'status_code': getattr(resp, 'status_code', None)} 

726 ok = getattr(resp, 'status_code', 500) < 400 

727 return {'ok': ok, 'response': body} 

728 except Exception as exc: 

729 return {'ok': False, 'error': str(exc)} 

730 

731 def get_state(self) -> dict: 

732 get_fn = self._get_get_fn() 

733 if get_fn is None: 

734 return {'error': 'no HTTP library available'} 

735 try: 

736 resp = get_fn( 

737 self._url, 

738 headers=self._headers, 

739 timeout=self._timeout, 

740 ) 

741 try: 

742 return resp.json() 

743 except (ValueError, AttributeError): 

744 return {'raw': getattr(resp, 'text', '')[:4096]} 

745 except Exception as exc: 

746 return {'error': str(exc)} 

747 

748 def emergency_stop(self) -> None: 

749 post_fn = self._get_post_fn() 

750 if post_fn is None: 

751 return 

752 try: 

753 post_fn( 

754 self._url, 

755 json={'action': 'emergency_stop'}, 

756 headers=self._headers, 

757 timeout=self._timeout, 

758 ) 

759 except Exception as exc: 

760 logger.error("HTTPActuatorAdapter E-stop failed: %s", exc) 

761 

762 @staticmethod 

763 def _get_post_fn(): 

764 try: 

765 from core.http_pool import pooled_post 

766 return pooled_post 

767 except ImportError: 

768 pass 

769 try: 

770 import requests 

771 return requests.post 

772 except ImportError: 

773 return None 

774 

775 @staticmethod 

776 def _get_get_fn(): 

777 try: 

778 from core.http_pool import pooled_get 

779 return pooled_get 

780 except ImportError: 

781 pass 

782 try: 

783 import requests 

784 return requests.get 

785 except ImportError: 

786 return None 

787 

788 

789class MQTTActuatorAdapter(ActuatorAdapter): 

790 """Send commands via MQTT publish (IoT actuators, smart home devices). 

791 

792 Publishes JSON-encoded commands to the configured ``topic``. 

793 Optionally subscribes to a ``response_topic`` for acknowledgment. 

794 

795 Requires paho-mqtt (optional dependency). 

796 """ 

797 

798 def __init__( 

799 self, 

800 actuator_id: str, 

801 actuator_type: str, 

802 broker: str = 'localhost', 

803 port: int = 1883, 

804 topic: str = '', 

805 response_topic: str = '', 

806 username: str = '', 

807 password: str = '', 

808 qos: int = 1, 

809 ): 

810 super().__init__(actuator_id, actuator_type) 

811 self._broker = broker 

812 self._port = port 

813 self._topic = topic 

814 self._response_topic = response_topic 

815 self._username = username 

816 self._password = password 

817 self._qos = qos 

818 self._lock = threading.Lock() 

819 

820 def execute(self, command: ActuatorCommand) -> dict: 

821 if not command.safety_cleared: 

822 return {'ok': False, 'error': 'command not safety cleared'} 

823 

824 try: 

825 import paho.mqtt.client as mqtt 

826 except ImportError: 

827 return {'ok': False, 'error': 'paho-mqtt not installed'} 

828 

829 with self._lock: 

830 try: 

831 client = mqtt.Client() 

832 if self._username: 

833 client.username_pw_set(self._username, self._password) 

834 

835 # If response topic is set, subscribe for ack 

836 ack_holder: List[Optional[dict]] = [None] 

837 ack_event = threading.Event() 

838 

839 if self._response_topic: 

840 def on_message(_c, _u, msg): 

841 try: 

842 ack_holder[0] = json.loads( 

843 msg.payload.decode('utf-8', errors='ignore')) 

844 except (json.JSONDecodeError, ValueError): 

845 ack_holder[0] = {'raw': msg.payload.decode( 

846 'utf-8', errors='ignore')} 

847 ack_event.set() 

848 

849 client.on_message = on_message 

850 

851 client.connect(self._broker, self._port, keepalive=10) 

852 

853 if self._response_topic: 

854 client.subscribe(self._response_topic) 

855 

856 client.loop_start() 

857 

858 payload = json.dumps(command.command) 

859 client.publish(self._topic, payload.encode('utf-8'), 

860 qos=self._qos) 

861 

862 if self._response_topic: 

863 ack_event.wait(timeout=5.0) 

864 

865 client.loop_stop() 

866 client.disconnect() 

867 

868 return { 

869 'ok': True, 

870 'response': ack_holder[0], 

871 'topic': self._topic, 

872 } 

873 except Exception as exc: 

874 return {'ok': False, 'error': str(exc)} 

875 

876 def get_state(self) -> dict: 

877 # MQTT actuators typically don't support state queries. 

878 # Return connection metadata instead. 

879 return { 

880 'broker': self._broker, 

881 'topic': self._topic, 

882 'response_topic': self._response_topic or None, 

883 } 

884 

885 def emergency_stop(self) -> None: 

886 try: 

887 import paho.mqtt.client as mqtt 

888 except ImportError: 

889 return 

890 

891 with self._lock: 

892 try: 

893 client = mqtt.Client() 

894 if self._username: 

895 client.username_pw_set(self._username, self._password) 

896 client.connect(self._broker, self._port, keepalive=10) 

897 payload = json.dumps({'action': 'emergency_stop'}) 

898 client.publish(self._topic, payload.encode('utf-8'), qos=2) 

899 client.disconnect() 

900 except Exception as exc: 

901 logger.error("MQTTActuatorAdapter E-stop failed: %s", exc) 

902 

903 

904# ====================================================================== 

905# Safety Monitor (inline -- used when external safety_monitor is absent) 

906# ====================================================================== 

907 

908 

909class SafetyMonitor: 

910 """Validates every actuator command before execution. 

911 

912 Enforces: 

913 - Maximum velocity limits 

914 - Maximum force limits 

915 - Workspace bounds (3D bounding box) 

916 - Emergency stop detection and propagation 

917 

918 This is a lightweight inline monitor. The full-featured 

919 ``integrations.robotics.safety_monitor.SafetyMonitor`` takes 

920 precedence when available (HardwareBridge._safety_gate checks 

921 for it first). 

922 

923 Thread-safe. 

924 """ 

925 

926 DEFAULT_MAX_VELOCITY: float = 2.0 # m/s 

927 DEFAULT_MAX_FORCE: float = 50.0 # Newtons 

928 DEFAULT_WORKSPACE_BOUNDS: Dict[str, Tuple[float, float]] = { 

929 'x': (-5.0, 5.0), 

930 'y': (-5.0, 5.0), 

931 'z': (0.0, 3.0), 

932 } 

933 

934 def __init__( 

935 self, 

936 max_velocity: float = DEFAULT_MAX_VELOCITY, 

937 max_force: float = DEFAULT_MAX_FORCE, 

938 workspace_bounds: Optional[Dict[str, Tuple[float, float]]] = None, 

939 ): 

940 self._lock = threading.Lock() 

941 self.max_velocity = max_velocity 

942 self.max_force = max_force 

943 self.workspace_bounds = workspace_bounds or dict(self.DEFAULT_WORKSPACE_BOUNDS) 

944 self._estop_active = False 

945 self._estop_reason = '' 

946 

947 @property 

948 def is_estopped(self) -> bool: 

949 """Return True if emergency stop is active.""" 

950 with self._lock: 

951 return self._estop_active 

952 

953 def trigger_estop(self, reason: str = 'manual') -> None: 

954 """Activate emergency stop.""" 

955 with self._lock: 

956 self._estop_active = True 

957 self._estop_reason = reason 

958 logger.warning("SafetyMonitor: E-STOP triggered -- %s", reason) 

959 

960 def clear_estop(self) -> None: 

961 """Clear emergency stop. Only human operators should call this.""" 

962 with self._lock: 

963 self._estop_active = False 

964 self._estop_reason = '' 

965 logger.info("SafetyMonitor: E-STOP cleared") 

966 

967 def check_command(self, command: ActuatorCommand) -> Tuple[bool, str]: 

968 """Validate a command against all safety constraints. 

969 

970 Returns (safe, reason). If safe is False, reason explains why. 

971 """ 

972 if self.is_estopped: 

973 return False, f'E-STOP active: {self._estop_reason}' 

974 

975 params = command.command.get('params', {}) 

976 

977 # Velocity check 

978 speed = params.get('speed', params.get('velocity')) 

979 if speed is not None: 

980 try: 

981 if abs(float(speed)) > self.max_velocity: 

982 return False, ( 

983 f'velocity {speed} exceeds limit {self.max_velocity} m/s') 

984 except (TypeError, ValueError): 

985 pass 

986 

987 # Force check 

988 force = params.get('force', params.get('torque')) 

989 if force is not None: 

990 try: 

991 if abs(float(force)) > self.max_force: 

992 return False, ( 

993 f'force {force} exceeds limit {self.max_force} N') 

994 except (TypeError, ValueError): 

995 pass 

996 

997 # Workspace bounds check 

998 if not self.check_position_safe(params): 

999 return False, 'position outside workspace bounds' 

1000 

1001 return True, '' 

1002 

1003 def check_position_safe(self, params: dict) -> bool: 

1004 """Check whether position parameters fall within workspace bounds. 

1005 

1006 Supports both Cartesian (x,y,z) and joint (joint_N) keys. 

1007 """ 

1008 for axis, (lo, hi) in self.workspace_bounds.items(): 

1009 val = params.get(axis) 

1010 if val is not None: 

1011 try: 

1012 if float(val) < lo or float(val) > hi: 

1013 return False 

1014 except (TypeError, ValueError): 

1015 pass 

1016 return True 

1017 

1018 def gate_commands( 

1019 self, commands: List[ActuatorCommand], 

1020 ) -> List[ActuatorCommand]: 

1021 """Filter a batch of commands. Sets safety_cleared on passed ones. 

1022 

1023 Returns only commands that passed. 

1024 """ 

1025 cleared: List[ActuatorCommand] = [] 

1026 for cmd in commands: 

1027 safe, reason = self.check_command(cmd) 

1028 if safe: 

1029 cmd.safety_cleared = True 

1030 cleared.append(cmd) 

1031 else: 

1032 logger.warning( 

1033 "SafetyMonitor blocked command for %s: %s", 

1034 cmd.actuator_id, reason, 

1035 ) 

1036 return cleared 

1037 

1038 

1039# Module-level inline safety monitor singleton 

1040_inline_safety: Optional[SafetyMonitor] = None 

1041_inline_safety_lock = threading.Lock() 

1042 

1043 

1044def get_inline_safety_monitor() -> SafetyMonitor: 

1045 """Get or create the inline SafetyMonitor singleton.""" 

1046 global _inline_safety 

1047 if _inline_safety is None: 

1048 with _inline_safety_lock: 

1049 if _inline_safety is None: 

1050 _inline_safety = SafetyMonitor() 

1051 return _inline_safety 

1052 

1053 

1054# ====================================================================== 

1055# Hardware Bridge 

1056# ====================================================================== 

1057 

1058# Experience buffer limits 

1059_EXPERIENCE_MAX_SIZE = 10_000 

1060_EXPERIENCE_AUTO_FLUSH = 100 

1061 

1062 

1063class HardwareBridge: 

1064 """Close the loop between physical robots and the hive. 

1065 

1066 Owns three responsibilities: 

1067 

1068 1. **Inbound** -- collects sensor readings from registered adapters, 

1069 normalizes them, and pushes them to SensorStore + EventBus. 

1070 2. **Outbound** -- translates action plans into actuator commands, 

1071 gates them through SafetyMonitor, and executes them. 

1072 3. **Learning** -- records sensor->action->outcome triples as 

1073 Experiences and flushes them to WorldModelBridge so the hive 

1074 learns from every robot. 

1075 """ 

1076 

1077 def __init__(self, robot_id: str): 

1078 self._robot_id = robot_id 

1079 self._sensor_adapters: Dict[str, SensorAdapter] = {} 

1080 self._actuator_adapters: Dict[str, ActuatorAdapter] = {} 

1081 self._experience_buffer: deque = deque(maxlen=_EXPERIENCE_MAX_SIZE) 

1082 self._running = False 

1083 self._sensor_thread: Optional[threading.Thread] = None 

1084 self._lock = threading.Lock() 

1085 self._latest_readings: Dict[str, SensorReading] = {} 

1086 self._stats = { 

1087 'readings_received': 0, 

1088 'actions_executed': 0, 

1089 'experiences_recorded': 0, 

1090 'experiences_flushed': 0, 

1091 } 

1092 

1093 # ------------------------------------------------------------------ 

1094 # Registration 

1095 # ------------------------------------------------------------------ 

1096 

1097 def register_sensor(self, adapter: SensorAdapter) -> None: 

1098 """Register a sensor input adapter. 

1099 

1100 The adapter's ``sensor_id`` is used as the key. Registering a 

1101 second adapter with the same ID replaces the first (the old 

1102 stream is stopped). 

1103 """ 

1104 with self._lock: 

1105 old = self._sensor_adapters.get(adapter.sensor_id) 

1106 if old is not None: 

1107 try: 

1108 old.stop_stream() 

1109 except Exception: 

1110 pass 

1111 self._sensor_adapters[adapter.sensor_id] = adapter 

1112 logger.info( 

1113 "[HardwareBridge:%s] sensor registered: %s (%s)", 

1114 self._robot_id, adapter.sensor_id, adapter.sensor_type, 

1115 ) 

1116 # If already running, start the new adapter's stream immediately 

1117 if self._running: 

1118 self._start_adapter_stream(adapter) 

1119 

1120 def register_actuator(self, adapter: ActuatorAdapter) -> None: 

1121 """Register an actuator output adapter. 

1122 

1123 The adapter's ``actuator_id`` is used as the key. 

1124 """ 

1125 with self._lock: 

1126 old = self._actuator_adapters.get(adapter.actuator_id) 

1127 if old is not None: 

1128 try: 

1129 old.emergency_stop() 

1130 except Exception: 

1131 pass 

1132 self._actuator_adapters[adapter.actuator_id] = adapter 

1133 logger.info( 

1134 "[HardwareBridge:%s] actuator registered: %s (%s)", 

1135 self._robot_id, adapter.actuator_id, adapter.actuator_type, 

1136 ) 

1137 

1138 # ------------------------------------------------------------------ 

1139 # Lifecycle 

1140 # ------------------------------------------------------------------ 

1141 

1142 def start(self) -> None: 

1143 """Start all sensor streams and begin experience collection.""" 

1144 with self._lock: 

1145 if self._running: 

1146 return 

1147 self._running = True 

1148 

1149 logger.info("[HardwareBridge:%s] starting", self._robot_id) 

1150 

1151 with self._lock: 

1152 adapters = list(self._sensor_adapters.values()) 

1153 

1154 for adapter in adapters: 

1155 self._start_adapter_stream(adapter) 

1156 

1157 def stop(self) -> None: 

1158 """Stop all sensor streams, emergency-stop actuators, flush experiences.""" 

1159 with self._lock: 

1160 if not self._running: 

1161 return 

1162 self._running = False 

1163 

1164 logger.info("[HardwareBridge:%s] stopping", self._robot_id) 

1165 

1166 # Stop all sensor streams 

1167 with self._lock: 

1168 sensor_adapters = list(self._sensor_adapters.values()) 

1169 actuator_adapters = list(self._actuator_adapters.values()) 

1170 

1171 for adapter in sensor_adapters: 

1172 try: 

1173 adapter.stop_stream() 

1174 except Exception as exc: 

1175 logger.debug("Error stopping sensor %s: %s", adapter.sensor_id, exc) 

1176 

1177 # Emergency-stop all actuators 

1178 for adapter in actuator_adapters: 

1179 try: 

1180 adapter.emergency_stop() 

1181 except Exception as exc: 

1182 logger.debug("Error stopping actuator %s: %s", adapter.actuator_id, exc) 

1183 

1184 # Flush remaining experiences 

1185 self._flush_experiences() 

1186 

1187 # ------------------------------------------------------------------ 

1188 # Sensor snapshot 

1189 # ------------------------------------------------------------------ 

1190 

1191 def get_sensor_snapshot(self) -> dict: 

1192 """Return the latest reading from every registered sensor. 

1193 

1194 Returns a dict keyed by sensor_id, values are reading dicts. 

1195 """ 

1196 with self._lock: 

1197 snapshot = {} 

1198 for sid, reading in self._latest_readings.items(): 

1199 snapshot[sid] = { 

1200 'sensor_id': reading.sensor_id, 

1201 'sensor_type': reading.sensor_type, 

1202 'data': reading.data, 

1203 'timestamp': reading.timestamp, 

1204 } 

1205 return snapshot 

1206 

1207 # ------------------------------------------------------------------ 

1208 # Action execution 

1209 # ------------------------------------------------------------------ 

1210 

1211 def execute_action(self, plan: dict) -> dict: 

1212 """Execute an action plan from the intelligence layer. 

1213 

1214 The plan is a dict with ``commands`` -- a list of dicts each 

1215 having at least ``actuator_id`` and ``command``. Example:: 

1216 

1217 { 

1218 "commands": [ 

1219 {"actuator_id": "motor_left", "command": {"action": "move", "params": {"speed": 0.5}}}, 

1220 {"actuator_id": "motor_right", "command": {"action": "move", "params": {"speed": 0.5}}}, 

1221 ] 

1222 } 

1223 

1224 Every command passes through the safety gate before reaching 

1225 the actuator. 

1226 

1227 Returns a result dict with per-actuator outcomes. 

1228 """ 

1229 # Resource governor check 

1230 if not self._resource_ok(): 

1231 return {'ok': False, 'error': 'resource governor denied', 'results': {}} 

1232 

1233 raw_commands = plan.get('commands', []) 

1234 if not raw_commands: 

1235 return {'ok': False, 'error': 'no commands in plan', 'results': {}} 

1236 

1237 # Build ActuatorCommand objects 

1238 commands: List[ActuatorCommand] = [] 

1239 for entry in raw_commands: 

1240 aid = entry.get('actuator_id', '') 

1241 with self._lock: 

1242 adapter = self._actuator_adapters.get(aid) 

1243 if adapter is None: 

1244 logger.warning( 

1245 "[HardwareBridge:%s] unknown actuator_id: %s", 

1246 self._robot_id, aid, 

1247 ) 

1248 continue 

1249 commands.append(ActuatorCommand( 

1250 actuator_id=aid, 

1251 actuator_type=adapter.actuator_type, 

1252 command=entry.get('command', {}), 

1253 )) 

1254 

1255 if not commands: 

1256 return {'ok': False, 'error': 'no valid actuator targets', 'results': {}} 

1257 

1258 # Safety gate -- mandatory 

1259 cleared = self._safety_gate(commands) 

1260 

1261 # Execute cleared commands 

1262 results: Dict[str, dict] = {} 

1263 any_ok = False 

1264 for cmd in cleared: 

1265 with self._lock: 

1266 adapter = self._actuator_adapters.get(cmd.actuator_id) 

1267 if adapter is None: 

1268 results[cmd.actuator_id] = {'ok': False, 'error': 'adapter gone'} 

1269 continue 

1270 try: 

1271 result = adapter.execute(cmd) 

1272 results[cmd.actuator_id] = result 

1273 if result.get('ok'): 

1274 any_ok = True 

1275 except Exception as exc: 

1276 results[cmd.actuator_id] = {'ok': False, 'error': str(exc)} 

1277 

1278 with self._lock: 

1279 self._stats['actions_executed'] += 1 

1280 

1281 # Emit event 

1282 self._emit('robot.action_executed', { 

1283 'robot_id': self._robot_id, 

1284 'plan': plan, 

1285 'results': results, 

1286 }) 

1287 

1288 # Record experience: snapshot sensors before and after 

1289 sensor_before = self.get_sensor_snapshot() 

1290 # Small pause to let sensors update with post-action readings 

1291 # (non-blocking, real feedback comes on next sensor cycle) 

1292 outcome = { 

1293 'results': results, 

1294 'any_ok': any_ok, 

1295 'sensor_after': self.get_sensor_snapshot(), 

1296 } 

1297 self._record_experience( 

1298 sensor_state=sensor_before, 

1299 action=plan, 

1300 outcome=outcome, 

1301 ) 

1302 

1303 return {'ok': any_ok, 'results': results} 

1304 

1305 # ------------------------------------------------------------------ 

1306 # Think-and-act: full soft+hard cycle 

1307 # ------------------------------------------------------------------ 

1308 

1309 def think_and_act(self, context: str = '') -> dict: 

1310 """Full loop: sense -> think -> act -> learn. 

1311 

1312 1. Read sensor snapshot 

1313 2. Call the intelligence API (intelligence_api.think()) 

1314 3. Execute the resulting action plan 

1315 4. Record the experience 

1316 

1317 This is the complete soft+hard cycle. 

1318 

1319 Args: 

1320 context: Optional textual context for the intelligence layer 

1321 (e.g. 'navigate to charging station'). 

1322 

1323 Returns: 

1324 Dict with ``sensors``, ``thought``, ``action_result``, ``experience_count``. 

1325 """ 

1326 # 1. Sense 

1327 sensor_snapshot = self.get_sensor_snapshot() 

1328 

1329 # 2. Think -- call intelligence API 

1330 thought = self._call_intelligence(sensor_snapshot, context) 

1331 

1332 # 3. Act -- execute the action plan from the intelligence layer 

1333 action_plan = thought.get('action_plan', {}) 

1334 action_result = {} 

1335 if action_plan and action_plan.get('commands'): 

1336 action_result = self.execute_action(action_plan) 

1337 else: 

1338 # No action plan, but still record the observation experience 

1339 self._record_experience( 

1340 sensor_state=sensor_snapshot, 

1341 action={'noop': True, 'context': context}, 

1342 outcome={'thought': thought, 'action_result': None}, 

1343 ) 

1344 

1345 return { 

1346 'robot_id': self._robot_id, 

1347 'sensors': sensor_snapshot, 

1348 'thought': thought, 

1349 'action_result': action_result, 

1350 'experience_count': len(self._experience_buffer), 

1351 } 

1352 

1353 # ------------------------------------------------------------------ 

1354 # Safety gate 

1355 # ------------------------------------------------------------------ 

1356 

1357 def _safety_gate(self, commands: List[ActuatorCommand]) -> List[ActuatorCommand]: 

1358 """Filter commands through SafetyMonitor. 

1359 

1360 Every command must pass the safety check. Commands that fail 

1361 are logged and dropped -- they never reach an actuator. 

1362 

1363 Prefers the full external SafetyMonitor from safety_monitor.py. 

1364 Falls back to the inline SafetyMonitor defined in this module 

1365 when the external one is not importable. Safety is 

1366 NON-NEGOTIABLE -- every command is gated. 

1367 

1368 Returns the list of commands with ``safety_cleared=True``. 

1369 """ 

1370 # Try the full-featured external monitor first 

1371 monitor = None 

1372 try: 

1373 from integrations.robotics.safety_monitor import get_safety_monitor 

1374 monitor = get_safety_monitor() 

1375 except ImportError: 

1376 pass 

1377 

1378 # Fall back to inline SafetyMonitor 

1379 if monitor is None: 

1380 monitor = get_inline_safety_monitor() 

1381 logger.debug( 

1382 "[HardwareBridge:%s] using inline SafetyMonitor " 

1383 "(external safety_monitor not available)", self._robot_id, 

1384 ) 

1385 

1386 # Global E-stop check 

1387 if monitor.is_estopped: 

1388 logger.warning( 

1389 "[HardwareBridge:%s] E-stop active -- all commands blocked", 

1390 self._robot_id, 

1391 ) 

1392 return [] 

1393 

1394 # Gate each command through the monitor 

1395 cleared: List[ActuatorCommand] = [] 

1396 for cmd in commands: 

1397 # If the monitor has check_command (inline SafetyMonitor), 

1398 # use it for comprehensive validation. 

1399 if hasattr(monitor, 'check_command'): 

1400 safe, reason = monitor.check_command(cmd) 

1401 if not safe: 

1402 logger.warning( 

1403 "[HardwareBridge:%s] command for %s blocked: %s", 

1404 self._robot_id, cmd.actuator_id, reason, 

1405 ) 

1406 continue 

1407 cmd.safety_cleared = True 

1408 cleared.append(cmd) 

1409 else: 

1410 # External monitor: position + velocity checks 

1411 params = cmd.command.get('params', {}) 

1412 position_keys = {'x', 'y', 'z', 'joint_0', 'joint_1', 

1413 'joint_2', 'joint_3', 'joint_4', 'joint_5'} 

1414 position = {k: v for k, v in params.items() 

1415 if k in position_keys} 

1416 

1417 if position: 

1418 if not monitor.check_position_safe(position): 

1419 logger.warning( 

1420 "[HardwareBridge:%s] command for %s blocked: " 

1421 "position outside workspace limits", 

1422 self._robot_id, cmd.actuator_id, 

1423 ) 

1424 continue 

1425 

1426 # Velocity sanity check 

1427 speed = params.get('speed', params.get('velocity')) 

1428 if speed is not None: 

1429 try: 

1430 if abs(float(speed)) > 10.0: 

1431 logger.warning( 

1432 "[HardwareBridge:%s] command for %s blocked: " 

1433 "speed %.2f exceeds limit", 

1434 self._robot_id, cmd.actuator_id, float(speed), 

1435 ) 

1436 continue 

1437 except (TypeError, ValueError): 

1438 pass 

1439 

1440 cmd.safety_cleared = True 

1441 cleared.append(cmd) 

1442 

1443 return cleared 

1444 

1445 # ------------------------------------------------------------------ 

1446 # Experience recording + flushing 

1447 # ------------------------------------------------------------------ 

1448 

1449 def _record_experience( 

1450 self, 

1451 sensor_state: dict, 

1452 action: dict, 

1453 outcome: dict, 

1454 ) -> None: 

1455 """Buffer a sensor->action->outcome triple. 

1456 

1457 Auto-flushes every ``_EXPERIENCE_AUTO_FLUSH`` experiences. 

1458 """ 

1459 reward = self._compute_reward(action, outcome) 

1460 

1461 exp = Experience( 

1462 robot_id=self._robot_id, 

1463 sensor_state=sensor_state, 

1464 action_taken=action, 

1465 outcome=outcome, 

1466 reward=reward, 

1467 ) 

1468 

1469 with self._lock: 

1470 self._experience_buffer.append(exp) 

1471 self._stats['experiences_recorded'] += 1 

1472 buffer_len = len(self._experience_buffer) 

1473 

1474 self._emit('robot.experience_recorded', { 

1475 'robot_id': self._robot_id, 

1476 'reward': reward, 

1477 'buffer_size': buffer_len, 

1478 }) 

1479 

1480 # Auto-flush when buffer reaches threshold 

1481 if buffer_len >= _EXPERIENCE_AUTO_FLUSH: 

1482 self._flush_experiences() 

1483 

1484 def _flush_experiences(self) -> int: 

1485 """Push buffered experiences to WorldModelBridge for HevolveAI training. 

1486 

1487 Returns the number of experiences successfully flushed. 

1488 """ 

1489 with self._lock: 

1490 if not self._experience_buffer: 

1491 return 0 

1492 batch = list(self._experience_buffer) 

1493 self._experience_buffer.clear() 

1494 

1495 # Convert to dicts for transport 

1496 experience_dicts = [] 

1497 for exp in batch: 

1498 experience_dicts.append({ 

1499 'robot_id': exp.robot_id, 

1500 'sensor_state': exp.sensor_state, 

1501 'action_taken': exp.action_taken, 

1502 'outcome': exp.outcome, 

1503 'reward': exp.reward, 

1504 'timestamp': exp.timestamp, 

1505 }) 

1506 

1507 flushed = 0 

1508 try: 

1509 from integrations.agent_engine.world_model_bridge import ( 

1510 get_world_model_bridge, 

1511 ) 

1512 bridge = get_world_model_bridge() 

1513 

1514 # Use ingest_sensor_batch for the sensor component 

1515 sensor_readings = [] 

1516 for exp_dict in experience_dicts: 

1517 for sid, reading_data in exp_dict.get('sensor_state', {}).items(): 

1518 sensor_readings.append(reading_data) 

1519 if sensor_readings: 

1520 bridge.ingest_sensor_batch(sensor_readings) 

1521 

1522 # Send actions for the action component 

1523 for exp_dict in experience_dicts: 

1524 action = exp_dict.get('action_taken', {}) 

1525 if action and not action.get('noop'): 

1526 bridge.send_action(action) 

1527 

1528 flushed = len(batch) 

1529 except ImportError: 

1530 logger.debug( 

1531 "[HardwareBridge:%s] WorldModelBridge unavailable -- " 

1532 "%d experiences dropped", self._robot_id, len(batch), 

1533 ) 

1534 except Exception as exc: 

1535 logger.warning( 

1536 "[HardwareBridge:%s] experience flush failed: %s", 

1537 self._robot_id, exc, 

1538 ) 

1539 

1540 with self._lock: 

1541 self._stats['experiences_flushed'] += flushed 

1542 

1543 return flushed 

1544 

1545 # ------------------------------------------------------------------ 

1546 # Internal helpers 

1547 # ------------------------------------------------------------------ 

1548 

1549 def _start_adapter_stream(self, adapter: SensorAdapter) -> None: 

1550 """Start a single sensor adapter's stream with our ingestion callback.""" 

1551 def on_reading(reading: SensorReading): 

1552 self._on_sensor_reading(reading) 

1553 

1554 try: 

1555 adapter.start_stream(on_reading) 

1556 except Exception as exc: 

1557 logger.warning( 

1558 "[HardwareBridge:%s] failed to start stream for %s: %s", 

1559 self._robot_id, adapter.sensor_id, exc, 

1560 ) 

1561 

1562 def _on_sensor_reading(self, reading: SensorReading) -> None: 

1563 """Handle an incoming sensor reading from any adapter.""" 

1564 # Resource governor gate 

1565 if not self._resource_ok(): 

1566 return 

1567 

1568 with self._lock: 

1569 self._latest_readings[reading.sensor_id] = reading 

1570 self._stats['readings_received'] += 1 

1571 

1572 # Push to SensorStore (existing robotics infrastructure) 

1573 try: 

1574 from integrations.robotics.sensor_store import get_sensor_store 

1575 from integrations.robotics.sensor_model import SensorReading as StoreSensorReading 

1576 

1577 store = get_sensor_store() 

1578 store_reading = StoreSensorReading( 

1579 sensor_id=reading.sensor_id, 

1580 sensor_type=reading.sensor_type, 

1581 data=reading.data if isinstance(reading.data, dict) else {'value': reading.data}, 

1582 source='hardware_bridge', 

1583 ) 

1584 store.put_reading(store_reading) 

1585 except ImportError: 

1586 pass 

1587 except Exception as exc: 

1588 logger.debug("SensorStore push failed: %s", exc) 

1589 

1590 # Emit event 

1591 self._emit('robot.sensor_reading', { 

1592 'robot_id': self._robot_id, 

1593 'sensor_id': reading.sensor_id, 

1594 'sensor_type': reading.sensor_type, 

1595 'data': reading.data, 

1596 'timestamp': reading.timestamp, 

1597 }) 

1598 

1599 def _call_intelligence(self, sensor_snapshot: dict, context: str) -> dict: 

1600 """Call the intelligence API for multi-intelligence fusion. 

1601 

1602 Falls back gracefully if the intelligence API is not available. 

1603 """ 

1604 try: 

1605 from integrations.robotics.intelligence_api import think 

1606 return think( 

1607 robot_id=self._robot_id, 

1608 sensor_snapshot=sensor_snapshot, 

1609 context=context, 

1610 ) 

1611 except ImportError: 

1612 logger.debug( 

1613 "[HardwareBridge:%s] intelligence_api not available, " 

1614 "returning empty thought", self._robot_id, 

1615 ) 

1616 return { 

1617 'action_plan': {}, 

1618 'reasoning': 'intelligence API not available', 

1619 } 

1620 except Exception as exc: 

1621 logger.warning( 

1622 "[HardwareBridge:%s] intelligence call failed: %s", 

1623 self._robot_id, exc, 

1624 ) 

1625 return { 

1626 'action_plan': {}, 

1627 'reasoning': f'intelligence error: {exc}', 

1628 } 

1629 

1630 @staticmethod 

1631 def _compute_reward(action: dict, outcome: dict) -> float: 

1632 """Compute a simple reward signal from action and outcome. 

1633 

1634 Basic heuristic: +1.0 for success, -0.5 for failure, 0.0 for noop. 

1635 Real reward shaping belongs in HevolveAI -- this is just a bootstrap 

1636 signal so the experience buffer carries a non-zero reward. 

1637 """ 

1638 if action.get('noop'): 

1639 return 0.0 

1640 results = outcome.get('results', {}) 

1641 if not results: 

1642 return 0.0 

1643 successes = sum(1 for r in results.values() if r.get('ok')) 

1644 total = len(results) 

1645 if total == 0: 

1646 return 0.0 

1647 ratio = successes / total 

1648 # Scale: all success = +1.0, all fail = -0.5, mixed = proportional 

1649 return ratio * 1.5 - 0.5 

1650 

1651 @staticmethod 

1652 def _resource_ok() -> bool: 

1653 """Check resource governor. Returns True if processing is allowed.""" 

1654 try: 

1655 from core.resource_governor import should_proceed 

1656 return should_proceed('cpu_heavy') 

1657 except ImportError: 

1658 return True # No governor = no throttling 

1659 

1660 @staticmethod 

1661 def _emit(topic: str, data: Any) -> None: 

1662 """Emit an event on the platform EventBus (best-effort).""" 

1663 try: 

1664 from core.platform.events import emit_event 

1665 emit_event(topic, data) 

1666 except ImportError: 

1667 pass 

1668 except Exception: 

1669 pass 

1670 

1671 def get_stats(self) -> dict: 

1672 """Return bridge statistics for monitoring.""" 

1673 with self._lock: 

1674 return { 

1675 'robot_id': self._robot_id, 

1676 'sensors_registered': len(self._sensor_adapters), 

1677 'actuators_registered': len(self._actuator_adapters), 

1678 'experience_buffer_size': len(self._experience_buffer), 

1679 'running': self._running, 

1680 **dict(self._stats), 

1681 } 

1682 

1683 

1684# ====================================================================== 

1685# Module-level bridge registry 

1686# ====================================================================== 

1687 

1688_bridges: Dict[str, HardwareBridge] = {} 

1689_bridges_lock = threading.Lock() 

1690 

1691 

1692def get_bridge(robot_id: str) -> HardwareBridge: 

1693 """Get or create a HardwareBridge for the given robot ID. 

1694 

1695 Thread-safe. Creates a new bridge on first access for each robot_id. 

1696 """ 

1697 if robot_id not in _bridges: 

1698 with _bridges_lock: 

1699 if robot_id not in _bridges: 

1700 _bridges[robot_id] = HardwareBridge(robot_id) 

1701 return _bridges[robot_id] 

1702 

1703 

1704def list_bridges() -> List[str]: 

1705 """Return all registered robot IDs.""" 

1706 with _bridges_lock: 

1707 return list(_bridges.keys()) 

1708 

1709 

1710# ====================================================================== 

1711# Flask Blueprint 

1712# ====================================================================== 

1713 

1714 

1715def _create_blueprint(): 

1716 """Create the Flask blueprint for hardware bridge endpoints. 

1717 

1718 Lazy import to avoid requiring Flask at module load time. 

1719 """ 

1720 try: 

1721 from flask import Blueprint, jsonify, request 

1722 except ImportError: 

1723 return None 

1724 

1725 hardware_bp = Blueprint('hardware_bridge', __name__) 

1726 

1727 @hardware_bp.route('/api/robot/<robot_id>/sensors/register', methods=['POST']) 

1728 def register_sensor_endpoint(robot_id: str): 

1729 """Register a sensor adapter via HTTP. 

1730 

1731 Body: { 

1732 "sensor_id": "cam_front", 

1733 "sensor_type": "camera", 

1734 "adapter_type": "http", # http | mqtt | serial 

1735 "config": { # adapter-specific config 

1736 "url": "http://...", 

1737 "poll_interval": 1.0 

1738 } 

1739 } 

1740 """ 

1741 body = request.get_json(silent=True) or {} 

1742 sensor_id = body.get('sensor_id', '') 

1743 sensor_type = body.get('sensor_type', '') 

1744 adapter_type = body.get('adapter_type', '') 

1745 config = body.get('config', {}) 

1746 

1747 if not sensor_id or not sensor_type: 

1748 return jsonify({'error': 'sensor_id and sensor_type required'}), 400 

1749 

1750 adapter = _build_sensor_adapter(sensor_id, sensor_type, adapter_type, config) 

1751 if adapter is None: 

1752 return jsonify({'error': f'unknown adapter_type: {adapter_type}'}), 400 

1753 

1754 bridge = get_bridge(robot_id) 

1755 bridge.register_sensor(adapter) 

1756 return jsonify({'ok': True, 'sensor_id': sensor_id}), 200 

1757 

1758 @hardware_bp.route('/api/robot/<robot_id>/actuators/register', methods=['POST']) 

1759 def register_actuator_endpoint(robot_id: str): 

1760 """Register an actuator adapter via HTTP. 

1761 

1762 Body: { 

1763 "actuator_id": "motor_left", 

1764 "actuator_type": "motor", 

1765 "adapter_type": "serial", # serial | http 

1766 "config": { 

1767 "port": "/dev/ttyUSB0", 

1768 "baudrate": 115200 

1769 } 

1770 } 

1771 """ 

1772 body = request.get_json(silent=True) or {} 

1773 actuator_id = body.get('actuator_id', '') 

1774 actuator_type = body.get('actuator_type', '') 

1775 adapter_type = body.get('adapter_type', '') 

1776 config = body.get('config', {}) 

1777 

1778 if not actuator_id or not actuator_type: 

1779 return jsonify({'error': 'actuator_id and actuator_type required'}), 400 

1780 

1781 adapter = _build_actuator_adapter( 

1782 actuator_id, actuator_type, adapter_type, config, 

1783 ) 

1784 if adapter is None: 

1785 return jsonify({'error': f'unknown adapter_type: {adapter_type}'}), 400 

1786 

1787 bridge = get_bridge(robot_id) 

1788 bridge.register_actuator(adapter) 

1789 return jsonify({'ok': True, 'actuator_id': actuator_id}), 200 

1790 

1791 @hardware_bp.route('/api/robot/<robot_id>/act', methods=['POST']) 

1792 def act_endpoint(robot_id: str): 

1793 """Execute an action plan. 

1794 

1795 Body: { 

1796 "commands": [ 

1797 {"actuator_id": "motor_left", "command": {"action": "move", "params": {"speed": 0.5}}} 

1798 ] 

1799 } 

1800 """ 

1801 body = request.get_json(silent=True) or {} 

1802 bridge = get_bridge(robot_id) 

1803 result = bridge.execute_action(body) 

1804 status = 200 if result.get('ok') else 400 

1805 return jsonify(result), status 

1806 

1807 @hardware_bp.route('/api/robot/<robot_id>/think_and_act', methods=['POST']) 

1808 def think_and_act_endpoint(robot_id: str): 

1809 """Full loop: sense -> think -> act -> learn. 

1810 

1811 Body: {"context": "optional textual context"} 

1812 """ 

1813 body = request.get_json(silent=True) or {} 

1814 context = body.get('context', '') 

1815 bridge = get_bridge(robot_id) 

1816 result = bridge.think_and_act(context=context) 

1817 return jsonify(result), 200 

1818 

1819 @hardware_bp.route('/api/robot/<robot_id>/sensors/snapshot', methods=['GET']) 

1820 def sensors_snapshot_endpoint(robot_id: str): 

1821 """Get current state of all sensors.""" 

1822 bridge = get_bridge(robot_id) 

1823 return jsonify(bridge.get_sensor_snapshot()), 200 

1824 

1825 @hardware_bp.route('/api/robot/<robot_id>/experience/stats', methods=['GET']) 

1826 def experience_stats_endpoint(robot_id: str): 

1827 """Get experience buffer statistics.""" 

1828 bridge = get_bridge(robot_id) 

1829 return jsonify(bridge.get_stats()), 200 

1830 

1831 return hardware_bp 

1832 

1833 

1834def _build_sensor_adapter( 

1835 sensor_id: str, 

1836 sensor_type: str, 

1837 adapter_type: str, 

1838 config: dict, 

1839) -> Optional[SensorAdapter]: 

1840 """Factory for sensor adapters from HTTP registration payloads.""" 

1841 adapter_type = adapter_type.lower() 

1842 if adapter_type == 'http': 

1843 return HTTPSensorAdapter( 

1844 sensor_id=sensor_id, 

1845 sensor_type=sensor_type, 

1846 url=config.get('url', ''), 

1847 poll_interval=config.get('poll_interval', 1.0), 

1848 timeout=config.get('timeout', 5.0), 

1849 headers=config.get('headers'), 

1850 ) 

1851 elif adapter_type == 'mqtt': 

1852 return MQTTSensorAdapter( 

1853 sensor_id=sensor_id, 

1854 sensor_type=sensor_type, 

1855 broker=config.get('broker', 'localhost'), 

1856 port=config.get('port', 1883), 

1857 topic=config.get('topic', ''), 

1858 username=config.get('username', ''), 

1859 password=config.get('password', ''), 

1860 ) 

1861 elif adapter_type == 'serial': 

1862 return SerialSensorAdapter( 

1863 sensor_id=sensor_id, 

1864 sensor_type=sensor_type, 

1865 port=config.get('port', ''), 

1866 baudrate=config.get('baudrate', 115200), 

1867 timeout=config.get('timeout', 0.1), 

1868 ) 

1869 elif adapter_type in ('websocket', 'ws'): 

1870 return WebSocketSensorAdapter( 

1871 sensor_id=sensor_id, 

1872 sensor_type=sensor_type, 

1873 url=config.get('url', ''), 

1874 headers=config.get('headers'), 

1875 reconnect_delay=config.get('reconnect_delay', 2.0), 

1876 ) 

1877 return None 

1878 

1879 

1880def _build_actuator_adapter( 

1881 actuator_id: str, 

1882 actuator_type: str, 

1883 adapter_type: str, 

1884 config: dict, 

1885) -> Optional[ActuatorAdapter]: 

1886 """Factory for actuator adapters from HTTP registration payloads.""" 

1887 adapter_type = adapter_type.lower() 

1888 if adapter_type == 'serial': 

1889 return SerialActuatorAdapter( 

1890 actuator_id=actuator_id, 

1891 actuator_type=actuator_type, 

1892 port=config.get('port', ''), 

1893 baudrate=config.get('baudrate', 115200), 

1894 timeout=config.get('timeout', 1.0), 

1895 ) 

1896 elif adapter_type == 'http': 

1897 return HTTPActuatorAdapter( 

1898 actuator_id=actuator_id, 

1899 actuator_type=actuator_type, 

1900 url=config.get('url', ''), 

1901 timeout=config.get('timeout', 5.0), 

1902 headers=config.get('headers'), 

1903 ) 

1904 elif adapter_type == 'mqtt': 

1905 return MQTTActuatorAdapter( 

1906 actuator_id=actuator_id, 

1907 actuator_type=actuator_type, 

1908 broker=config.get('broker', 'localhost'), 

1909 port=config.get('port', 1883), 

1910 topic=config.get('topic', ''), 

1911 response_topic=config.get('response_topic', ''), 

1912 username=config.get('username', ''), 

1913 password=config.get('password', ''), 

1914 qos=config.get('qos', 1), 

1915 ) 

1916 return None 

1917 

1918 

1919# ====================================================================== 

1920# Unified Robotics Blueprint (/api/robotics/...) 

1921# ====================================================================== 

1922 

1923 

1924def create_robotics_blueprint(): 

1925 """Create the Flask blueprint for unified robotics endpoints. 

1926 

1927 Exposes hive-wide robotics operations at ``/api/robotics/...``. 

1928 Per-robot endpoints live on the ``_create_blueprint()`` blueprint above 

1929 at ``/api/robot/<robot_id>/...``. 

1930 

1931 Routes: 

1932 GET /api/robotics/status -- all bridges status 

1933 POST /api/robotics/think -- trigger think_and_act 

1934 GET /api/robotics/sensors -- all sensor readings (all robots) 

1935 POST /api/robotics/command -- direct actuator command (safety-gated) 

1936 """ 

1937 try: 

1938 from flask import Blueprint, jsonify, request 

1939 except ImportError: 

1940 return None 

1941 

1942 robotics_bp = Blueprint('robotics', __name__) 

1943 

1944 @robotics_bp.route('/api/robotics/status', methods=['GET']) 

1945 def robotics_status(): 

1946 """GET /api/robotics/status -- all bridges status.""" 

1947 robot_ids = list_bridges() 

1948 bridge_stats = [] 

1949 for rid in robot_ids: 

1950 bridge = get_bridge(rid) 

1951 bridge_stats.append(bridge.get_stats()) 

1952 return jsonify({ 

1953 'robot_count': len(robot_ids), 

1954 'robots': bridge_stats, 

1955 }) 

1956 

1957 @robotics_bp.route('/api/robotics/think', methods=['POST']) 

1958 def robotics_think(): 

1959 """POST /api/robotics/think -- trigger think_and_act on a robot. 

1960 

1961 Body: { 

1962 "robot_id": "my-robot", 

1963 "context": "navigate to kitchen" 

1964 } 

1965 """ 

1966 body = request.get_json(silent=True) or {} 

1967 robot_id = body.get('robot_id', '') 

1968 if not robot_id: 

1969 return jsonify({'error': 'robot_id is required'}), 400 

1970 context = body.get('context', '') 

1971 bridge = get_bridge(robot_id) 

1972 result = bridge.think_and_act(context=context) 

1973 return jsonify(result) 

1974 

1975 @robotics_bp.route('/api/robotics/sensors', methods=['GET']) 

1976 def robotics_sensors(): 

1977 """GET /api/robotics/sensors -- all sensor readings from all robots.""" 

1978 robot_ids = list_bridges() 

1979 all_readings: Dict[str, dict] = {} 

1980 for rid in robot_ids: 

1981 bridge = get_bridge(rid) 

1982 snapshot = bridge.get_sensor_snapshot() 

1983 if snapshot: 

1984 all_readings[rid] = snapshot 

1985 return jsonify({ 

1986 'robot_count': len(robot_ids), 

1987 'readings': all_readings, 

1988 }) 

1989 

1990 @robotics_bp.route('/api/robotics/command', methods=['POST']) 

1991 def robotics_command(): 

1992 """POST /api/robotics/command -- direct actuator command (safety-gated). 

1993 

1994 Body: { 

1995 "robot_id": "my-robot", 

1996 "commands": [ 

1997 {"actuator_id": "motor_left", "command": {"action": "move", "params": {"speed": 0.5}}} 

1998 ] 

1999 } 

2000 """ 

2001 body = request.get_json(silent=True) or {} 

2002 robot_id = body.get('robot_id', '') 

2003 if not robot_id: 

2004 return jsonify({'error': 'robot_id is required'}), 400 

2005 bridge = get_bridge(robot_id) 

2006 result = bridge.execute_action(body) 

2007 status = 200 if result.get('ok') else 400 

2008 return jsonify(result), status 

2009 

2010 return robotics_bp