Coverage for integrations / channels / hardware / wamp_iot_adapter.py: 63.2%
106 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2WAMP/Crossbar Channel Adapter — Pub/sub for IoT devices via existing Crossbar router.
4Bridges IoT sensors and actuators to HART agents through the existing Crossbar
5WAMP infrastructure (ws://host:8088/ws). Uses the same router that powers
6agent multichat and channel bridging.
8Subscribes to configurable WAMP topics, publishes agent responses.
9ESP32/microcontrollers connect via WebSocket WAMP or through a serial→WAMP gateway.
11Usage:
12 from integrations.channels.hardware.mqtt_adapter import WAMPIoTAdapter
13 adapter = WAMPIoTAdapter(topics=['com.hertzai.hevolve.iot.sensors.#'])
14 adapter.on_message(handler)
15 await adapter.start()
17Environment:
18 CBURL Crossbar WebSocket URL (default: ws://localhost:8088/ws)
19 CBREALM WAMP realm (default: realm1)
20 HEVOLVE_IOT_TOPICS Comma-separated WAMP topics to subscribe
21"""
22import asyncio
23import json
24import logging
25import os
26import threading
27import time
28import uuid
29from typing import Callable, Dict, List, Optional, Any
31from integrations.channels.base import (
32 ChannelAdapter, ChannelConfig, ChannelStatus,
33 Message, SendResult,
34)
36logger = logging.getLogger(__name__)
38# Also export as MQTTAdapter for backward compat with __init__.py registration
39# (the auto_register function references MQTTAdapter)
42class WAMPIoTAdapter(ChannelAdapter):
43 """Channel adapter for IoT devices via Crossbar WAMP pub/sub.
45 Subscribe topics generate Message events.
46 send_message publishes to a WAMP topic (chat_id = topic URI).
47 """
49 def __init__(
50 self,
51 crossbar_url: str = '',
52 realm: str = '',
53 topics: List[str] = None,
54 config: ChannelConfig = None,
55 ):
56 super().__init__(config or ChannelConfig())
57 self._crossbar_url = crossbar_url or os.environ.get(
58 'CBURL', 'ws://localhost:8088/ws')
59 self._realm = realm or os.environ.get('CBREALM', 'realm1')
60 self._topics = topics or _parse_topic_list(
61 os.environ.get('HEVOLVE_IOT_TOPICS',
62 'com.hertzai.hevolve.iot.sensors'))
63 self._component = None
64 self._session = None
65 self._loop = None
66 self._thread = None
68 @property
69 def name(self) -> str:
70 return 'wamp_iot'
72 async def connect(self) -> bool:
73 """Connect to Crossbar WAMP router and subscribe to IoT topics."""
74 try:
75 from autobahn.asyncio.component import Component
76 except ImportError:
77 logger.error("WAMP IoT adapter: autobahn not installed")
78 return False
80 if not self._crossbar_url:
81 logger.error("WAMP IoT adapter: no Crossbar URL configured")
82 return False
84 try:
85 self._component = Component(
86 transports=self._crossbar_url,
87 realm=self._realm,
88 )
90 adapter_ref = self # Capture for closures
92 @self._component.on_join
93 async def on_join(session, details):
94 adapter_ref._session = session
95 logger.info("WAMP IoT adapter: joined session")
96 # Subscribe to IoT topics
97 for topic in adapter_ref._topics:
98 try:
99 await session.subscribe(
100 adapter_ref._on_wamp_event, topic)
101 logger.debug(f"WAMP IoT subscribed: {topic}")
102 except Exception as e:
103 logger.error(f"WAMP subscribe failed for {topic}: {e}")
105 @self._component.on_leave
106 async def on_leave(session, details):
107 adapter_ref._session = None
108 if adapter_ref._running:
109 logger.warning("WAMP IoT adapter: session lost, will reconnect")
111 # Run component in background thread with its own event loop
112 self._thread = threading.Thread(
113 target=self._run_component_loop, daemon=True)
114 self._thread.start()
116 # Wait for session (up to 5 seconds)
117 for _ in range(50):
118 if self._session is not None:
119 self.status = ChannelStatus.CONNECTED
120 logger.info(
121 f"WAMP IoT adapter: connected to {self._crossbar_url}, "
122 f"topics={self._topics}")
123 return True
124 time.sleep(0.1)
126 logger.warning("WAMP IoT adapter: connection timeout")
127 return False
128 except Exception as e:
129 logger.error(f"WAMP IoT adapter: connect failed: {e}")
130 self.status = ChannelStatus.ERROR
131 return False
133 def _run_component_loop(self):
134 """Run WAMP component in a dedicated event loop thread."""
135 try:
136 self._loop = asyncio.new_event_loop()
137 asyncio.set_event_loop(self._loop)
138 from autobahn.asyncio.component import run
139 run([self._component], log_level='warn')
140 except Exception as e:
141 if self._running:
142 logger.error(f"WAMP IoT component loop error: {e}")
144 async def disconnect(self) -> None:
145 """Disconnect from Crossbar."""
146 self._running = False
147 self._session = None
148 if self._component:
149 try:
150 # Component will stop when the loop ends
151 pass
152 except Exception:
153 pass
155 async def send_message(
156 self, chat_id: str, text: str,
157 reply_to: Optional[str] = None,
158 media: Optional[List] = None,
159 buttons: Optional[List[Dict]] = None,
160 ) -> SendResult:
161 """Publish a message to a WAMP topic.
163 chat_id: WAMP topic URI (e.g. "com.hertzai.hevolve.iot.actuators.led1")
164 text: payload (plain text or JSON string)
165 """
166 if not self._session:
167 return SendResult(success=False, error="WAMP session not active")
169 try:
170 # Try to parse as JSON for structured messages
171 try:
172 payload = json.loads(text)
173 except (json.JSONDecodeError, TypeError):
174 payload = {'text': text}
176 self._session.publish(chat_id, payload)
177 return SendResult(
178 success=True,
179 message_id=f"wamp_{uuid.uuid4().hex[:8]}",
180 )
181 except Exception as e:
182 logger.error(f"WAMP publish error: {e}")
183 return SendResult(success=False, error=str(e))
185 async def edit_message(self, chat_id: str, message_id: str,
186 text: str, buttons=None) -> SendResult:
187 """WAMP is pub/sub — edit means re-publish."""
188 return await self.send_message(chat_id, text)
190 async def delete_message(self, chat_id: str, message_id: str) -> bool:
191 return False # Not applicable to WAMP pub/sub
193 async def send_typing(self, chat_id: str) -> None:
194 pass # Not applicable
196 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
197 return {
198 'crossbar_url': self._crossbar_url,
199 'realm': self._realm,
200 'topic': chat_id,
201 'connected': self._session is not None,
202 }
204 # ─── WAMP Event Handler ───
206 def _on_wamp_event(self, *args, **kwargs):
207 """Called when an event arrives on a subscribed WAMP topic."""
208 try:
209 # WAMP events can be positional args or kwargs
210 if args and isinstance(args[0], dict):
211 data = args[0]
212 elif kwargs:
213 data = kwargs
214 else:
215 data = {'raw_args': list(args)}
217 # Extract text and sender from payload
218 if isinstance(data, dict):
219 text = data.get('text', json.dumps(data))
220 sender = data.get('sender', data.get('node_id', 'wamp_device'))
221 topic = data.get('topic', 'unknown')
222 else:
223 text = str(data)
224 sender = 'wamp_device'
225 topic = 'unknown'
227 msg = Message(
228 id=str(uuid.uuid4())[:8],
229 channel='wamp_iot',
230 sender_id=sender,
231 sender_name=f'WAMP {sender}',
232 chat_id=topic,
233 text=text,
234 raw=data if isinstance(data, dict) else {'value': data},
235 )
237 for handler in self._message_handlers:
238 try:
239 handler(msg)
240 except Exception as e:
241 logger.error(f"WAMP IoT handler error: {e}")
243 except Exception as e:
244 logger.error(f"WAMP event processing error: {e}")
247# Backward compat alias — __init__.py references MQTTAdapter
248MQTTAdapter = WAMPIoTAdapter
251def _parse_topic_list(env_value: str) -> List[str]:
252 """Parse comma-separated WAMP topic URIs from env var."""
253 if not env_value:
254 return ['com.hertzai.hevolve.iot.sensors']
255 return [t.strip() for t in env_value.split(',') if t.strip()]