Coverage for integrations / channels / hardware / wamp_iot_adapter.py: 63.2%

106 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2WAMP/Crossbar Channel Adapter — Pub/sub for IoT devices via existing Crossbar router. 

3 

4Bridges IoT sensors and actuators to HART agents through the existing Crossbar 

5WAMP infrastructure (ws://host:8088/ws). Uses the same router that powers 

6agent multichat and channel bridging. 

7 

8Subscribes to configurable WAMP topics, publishes agent responses. 

9ESP32/microcontrollers connect via WebSocket WAMP or through a serial→WAMP gateway. 

10 

11Usage: 

12 from integrations.channels.hardware.mqtt_adapter import WAMPIoTAdapter 

13 adapter = WAMPIoTAdapter(topics=['com.hertzai.hevolve.iot.sensors.#']) 

14 adapter.on_message(handler) 

15 await adapter.start() 

16 

17Environment: 

18 CBURL Crossbar WebSocket URL (default: ws://localhost:8088/ws) 

19 CBREALM WAMP realm (default: realm1) 

20 HEVOLVE_IOT_TOPICS Comma-separated WAMP topics to subscribe 

21""" 

22import asyncio 

23import json 

24import logging 

25import os 

26import threading 

27import time 

28import uuid 

29from typing import Callable, Dict, List, Optional, Any 

30 

31from integrations.channels.base import ( 

32 ChannelAdapter, ChannelConfig, ChannelStatus, 

33 Message, SendResult, 

34) 

35 

36logger = logging.getLogger(__name__) 

37 

38# Also export as MQTTAdapter for backward compat with __init__.py registration 

39# (the auto_register function references MQTTAdapter) 

40 

41 

42class WAMPIoTAdapter(ChannelAdapter): 

43 """Channel adapter for IoT devices via Crossbar WAMP pub/sub. 

44 

45 Subscribe topics generate Message events. 

46 send_message publishes to a WAMP topic (chat_id = topic URI). 

47 """ 

48 

49 def __init__( 

50 self, 

51 crossbar_url: str = '', 

52 realm: str = '', 

53 topics: List[str] = None, 

54 config: ChannelConfig = None, 

55 ): 

56 super().__init__(config or ChannelConfig()) 

57 self._crossbar_url = crossbar_url or os.environ.get( 

58 'CBURL', 'ws://localhost:8088/ws') 

59 self._realm = realm or os.environ.get('CBREALM', 'realm1') 

60 self._topics = topics or _parse_topic_list( 

61 os.environ.get('HEVOLVE_IOT_TOPICS', 

62 'com.hertzai.hevolve.iot.sensors')) 

63 self._component = None 

64 self._session = None 

65 self._loop = None 

66 self._thread = None 

67 

68 @property 

69 def name(self) -> str: 

70 return 'wamp_iot' 

71 

72 async def connect(self) -> bool: 

73 """Connect to Crossbar WAMP router and subscribe to IoT topics.""" 

74 try: 

75 from autobahn.asyncio.component import Component 

76 except ImportError: 

77 logger.error("WAMP IoT adapter: autobahn not installed") 

78 return False 

79 

80 if not self._crossbar_url: 

81 logger.error("WAMP IoT adapter: no Crossbar URL configured") 

82 return False 

83 

84 try: 

85 self._component = Component( 

86 transports=self._crossbar_url, 

87 realm=self._realm, 

88 ) 

89 

90 adapter_ref = self # Capture for closures 

91 

92 @self._component.on_join 

93 async def on_join(session, details): 

94 adapter_ref._session = session 

95 logger.info("WAMP IoT adapter: joined session") 

96 # Subscribe to IoT topics 

97 for topic in adapter_ref._topics: 

98 try: 

99 await session.subscribe( 

100 adapter_ref._on_wamp_event, topic) 

101 logger.debug(f"WAMP IoT subscribed: {topic}") 

102 except Exception as e: 

103 logger.error(f"WAMP subscribe failed for {topic}: {e}") 

104 

105 @self._component.on_leave 

106 async def on_leave(session, details): 

107 adapter_ref._session = None 

108 if adapter_ref._running: 

109 logger.warning("WAMP IoT adapter: session lost, will reconnect") 

110 

111 # Run component in background thread with its own event loop 

112 self._thread = threading.Thread( 

113 target=self._run_component_loop, daemon=True) 

114 self._thread.start() 

115 

116 # Wait for session (up to 5 seconds) 

117 for _ in range(50): 

118 if self._session is not None: 

119 self.status = ChannelStatus.CONNECTED 

120 logger.info( 

121 f"WAMP IoT adapter: connected to {self._crossbar_url}, " 

122 f"topics={self._topics}") 

123 return True 

124 time.sleep(0.1) 

125 

126 logger.warning("WAMP IoT adapter: connection timeout") 

127 return False 

128 except Exception as e: 

129 logger.error(f"WAMP IoT adapter: connect failed: {e}") 

130 self.status = ChannelStatus.ERROR 

131 return False 

132 

133 def _run_component_loop(self): 

134 """Run WAMP component in a dedicated event loop thread.""" 

135 try: 

136 self._loop = asyncio.new_event_loop() 

137 asyncio.set_event_loop(self._loop) 

138 from autobahn.asyncio.component import run 

139 run([self._component], log_level='warn') 

140 except Exception as e: 

141 if self._running: 

142 logger.error(f"WAMP IoT component loop error: {e}") 

143 

144 async def disconnect(self) -> None: 

145 """Disconnect from Crossbar.""" 

146 self._running = False 

147 self._session = None 

148 if self._component: 

149 try: 

150 # Component will stop when the loop ends 

151 pass 

152 except Exception: 

153 pass 

154 

155 async def send_message( 

156 self, chat_id: str, text: str, 

157 reply_to: Optional[str] = None, 

158 media: Optional[List] = None, 

159 buttons: Optional[List[Dict]] = None, 

160 ) -> SendResult: 

161 """Publish a message to a WAMP topic. 

162 

163 chat_id: WAMP topic URI (e.g. "com.hertzai.hevolve.iot.actuators.led1") 

164 text: payload (plain text or JSON string) 

165 """ 

166 if not self._session: 

167 return SendResult(success=False, error="WAMP session not active") 

168 

169 try: 

170 # Try to parse as JSON for structured messages 

171 try: 

172 payload = json.loads(text) 

173 except (json.JSONDecodeError, TypeError): 

174 payload = {'text': text} 

175 

176 self._session.publish(chat_id, payload) 

177 return SendResult( 

178 success=True, 

179 message_id=f"wamp_{uuid.uuid4().hex[:8]}", 

180 ) 

181 except Exception as e: 

182 logger.error(f"WAMP publish error: {e}") 

183 return SendResult(success=False, error=str(e)) 

184 

185 async def edit_message(self, chat_id: str, message_id: str, 

186 text: str, buttons=None) -> SendResult: 

187 """WAMP is pub/sub — edit means re-publish.""" 

188 return await self.send_message(chat_id, text) 

189 

190 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

191 return False # Not applicable to WAMP pub/sub 

192 

193 async def send_typing(self, chat_id: str) -> None: 

194 pass # Not applicable 

195 

196 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

197 return { 

198 'crossbar_url': self._crossbar_url, 

199 'realm': self._realm, 

200 'topic': chat_id, 

201 'connected': self._session is not None, 

202 } 

203 

204 # ─── WAMP Event Handler ─── 

205 

206 def _on_wamp_event(self, *args, **kwargs): 

207 """Called when an event arrives on a subscribed WAMP topic.""" 

208 try: 

209 # WAMP events can be positional args or kwargs 

210 if args and isinstance(args[0], dict): 

211 data = args[0] 

212 elif kwargs: 

213 data = kwargs 

214 else: 

215 data = {'raw_args': list(args)} 

216 

217 # Extract text and sender from payload 

218 if isinstance(data, dict): 

219 text = data.get('text', json.dumps(data)) 

220 sender = data.get('sender', data.get('node_id', 'wamp_device')) 

221 topic = data.get('topic', 'unknown') 

222 else: 

223 text = str(data) 

224 sender = 'wamp_device' 

225 topic = 'unknown' 

226 

227 msg = Message( 

228 id=str(uuid.uuid4())[:8], 

229 channel='wamp_iot', 

230 sender_id=sender, 

231 sender_name=f'WAMP {sender}', 

232 chat_id=topic, 

233 text=text, 

234 raw=data if isinstance(data, dict) else {'value': data}, 

235 ) 

236 

237 for handler in self._message_handlers: 

238 try: 

239 handler(msg) 

240 except Exception as e: 

241 logger.error(f"WAMP IoT handler error: {e}") 

242 

243 except Exception as e: 

244 logger.error(f"WAMP event processing error: {e}") 

245 

246 

247# Backward compat alias — __init__.py references MQTTAdapter 

248MQTTAdapter = WAMPIoTAdapter 

249 

250 

251def _parse_topic_list(env_value: str) -> List[str]: 

252 """Parse comma-separated WAMP topic URIs from env var.""" 

253 if not env_value: 

254 return ['com.hertzai.hevolve.iot.sensors'] 

255 return [t.strip() for t in env_value.split(',') if t.strip()]