Coverage for integrations / channels / hardware / __init__.py: 87.1%
31 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Hardware I/O Channel Adapters — Serial, GPIO, WAMP IoT, ROS bridge.
4Auto-detects available hardware at boot and registers appropriate adapters.
5All follow the ChannelAdapter base pattern from integrations/channels/base.py.
7IoT pub/sub uses existing Crossbar/WAMP infrastructure (not MQTT).
8"""
9import logging
10import os
12logger = logging.getLogger(__name__)
15def auto_register_hardware_adapters(registry=None):
16 """Detect hardware at boot and register appropriate adapters.
18 Args:
19 registry: Optional ChannelRegistry to register adapters with.
20 If None, returns a list of adapter classes that are available.
22 Returns:
23 List of (adapter_name, adapter_class) tuples for available hardware.
24 """
25 available = []
27 # Serial adapter — available if pyserial installed or /dev/tty* exists
28 try:
29 from .serial_adapter import SerialAdapter
30 available.append(('serial', SerialAdapter))
31 logger.info("Hardware adapter available: serial")
32 except ImportError:
33 pass
35 # GPIO adapter — available on Linux with gpiod or RPi.GPIO
36 try:
37 from .gpio_adapter import GPIOAdapter
38 if GPIOAdapter.is_available():
39 available.append(('gpio', GPIOAdapter))
40 logger.info("Hardware adapter available: gpio")
41 except ImportError:
42 pass
44 # WAMP IoT adapter — uses existing Crossbar router for IoT pub/sub
45 # Registered when CBURL is set or IoT topics are configured
46 crossbar_url = os.environ.get('CBURL', '')
47 iot_topics = os.environ.get('HEVOLVE_IOT_TOPICS', '')
48 if crossbar_url or iot_topics:
49 try:
50 from .wamp_iot_adapter import WAMPIoTAdapter
51 available.append(('wamp_iot', WAMPIoTAdapter))
52 logger.info(f"Hardware adapter available: wamp_iot (crossbar={crossbar_url or 'default'})")
53 except ImportError:
54 logger.debug("WAMP IoT configured but autobahn not installed")
56 # ROS bridge — only if explicitly enabled (pulls ~500MB deps)
57 if os.environ.get('HEVOLVE_ROS_BRIDGE_ENABLED', '').lower() == 'true':
58 try:
59 from .ros_bridge import ROSBridgeAdapter
60 available.append(('ros', ROSBridgeAdapter))
61 logger.info("Hardware adapter available: ros")
62 except ImportError:
63 logger.debug("ROS bridge enabled but rclpy not installed")
65 if registry is not None:
66 for name, adapter_class in available:
67 try:
68 registry.register(name, adapter_class)
69 except Exception as e:
70 logger.warning(f"Failed to register {name} adapter: {e}")
72 return available