Coverage for integrations / channels / hardware / __init__.py: 87.1%

31 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Hardware I/O Channel Adapters — Serial, GPIO, WAMP IoT, ROS bridge. 

3 

4Auto-detects available hardware at boot and registers appropriate adapters. 

5All follow the ChannelAdapter base pattern from integrations/channels/base.py. 

6 

7IoT pub/sub uses existing Crossbar/WAMP infrastructure (not MQTT). 

8""" 

9import logging 

10import os 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15def auto_register_hardware_adapters(registry=None): 

16 """Detect hardware at boot and register appropriate adapters. 

17 

18 Args: 

19 registry: Optional ChannelRegistry to register adapters with. 

20 If None, returns a list of adapter classes that are available. 

21 

22 Returns: 

23 List of (adapter_name, adapter_class) tuples for available hardware. 

24 """ 

25 available = [] 

26 

27 # Serial adapter — available if pyserial installed or /dev/tty* exists 

28 try: 

29 from .serial_adapter import SerialAdapter 

30 available.append(('serial', SerialAdapter)) 

31 logger.info("Hardware adapter available: serial") 

32 except ImportError: 

33 pass 

34 

35 # GPIO adapter — available on Linux with gpiod or RPi.GPIO 

36 try: 

37 from .gpio_adapter import GPIOAdapter 

38 if GPIOAdapter.is_available(): 

39 available.append(('gpio', GPIOAdapter)) 

40 logger.info("Hardware adapter available: gpio") 

41 except ImportError: 

42 pass 

43 

44 # WAMP IoT adapter — uses existing Crossbar router for IoT pub/sub 

45 # Registered when CBURL is set or IoT topics are configured 

46 crossbar_url = os.environ.get('CBURL', '') 

47 iot_topics = os.environ.get('HEVOLVE_IOT_TOPICS', '') 

48 if crossbar_url or iot_topics: 

49 try: 

50 from .wamp_iot_adapter import WAMPIoTAdapter 

51 available.append(('wamp_iot', WAMPIoTAdapter)) 

52 logger.info(f"Hardware adapter available: wamp_iot (crossbar={crossbar_url or 'default'})") 

53 except ImportError: 

54 logger.debug("WAMP IoT configured but autobahn not installed") 

55 

56 # ROS bridge — only if explicitly enabled (pulls ~500MB deps) 

57 if os.environ.get('HEVOLVE_ROS_BRIDGE_ENABLED', '').lower() == 'true': 

58 try: 

59 from .ros_bridge import ROSBridgeAdapter 

60 available.append(('ros', ROSBridgeAdapter)) 

61 logger.info("Hardware adapter available: ros") 

62 except ImportError: 

63 logger.debug("ROS bridge enabled but rclpy not installed") 

64 

65 if registry is not None: 

66 for name, adapter_class in available: 

67 try: 

68 registry.register(name, adapter_class) 

69 except Exception as e: 

70 logger.warning(f"Failed to register {name} adapter: {e}") 

71 

72 return available