Coverage for integrations / openclaw / gateway_bridge.py: 45.5%
110 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Gateway Bridge — Connect HART OS to OpenClaw's WebSocket control plane.
4OpenClaw runs a Gateway at ws://127.0.0.1:18789. We connect to it
5bidirectionally:
7 Inbound: HART agents can send messages through OpenClaw's channels
8 (WhatsApp, Telegram, Discord, etc.)
9 Outbound: OpenClaw agents can invoke HART recipes, tools, and agents
11This makes HART OS the superset — it can use all of OpenClaw's 20+
12channel adapters natively, plus its own 30+ channel integrations.
13"""
15import asyncio
16import json
17import logging
18import os
19import shutil
20import subprocess
21import threading
22from typing import Any, Callable, Dict, List, Optional
24logger = logging.getLogger(__name__)
26DEFAULT_GATEWAY_URL = 'ws://127.0.0.1:18789'
29class OpenClawGatewayBridge:
30 """Bidirectional bridge to the OpenClaw Gateway WebSocket.
32 Lifecycle:
33 bridge = OpenClawGatewayBridge()
34 bridge.connect() # Connect to running gateway
35 bridge.send_message(channel, msg) # Send via OpenClaw channel
36 bridge.on_message(handler) # Receive from OpenClaw
37 bridge.disconnect()
38 """
40 def __init__(self, gateway_url: Optional[str] = None):
41 self._url = gateway_url or os.environ.get(
42 'OPENCLAW_GATEWAY_URL', DEFAULT_GATEWAY_URL
43 )
44 self._ws = None
45 self._connected = False
46 self._handlers: List[Callable] = []
47 self._loop: Optional[asyncio.AbstractEventLoop] = None
48 self._thread: Optional[threading.Thread] = None
50 @property
51 def connected(self) -> bool:
52 return self._connected
54 @property
55 def gateway_url(self) -> str:
56 return self._url
58 def on_message(self, handler: Callable[[Dict[str, Any]], None]):
59 """Register a handler for inbound OpenClaw messages."""
60 self._handlers.append(handler)
62 def connect(self) -> bool:
63 """Connect to the OpenClaw Gateway.
65 Returns True if connected, False if gateway is unavailable.
66 Starts a background thread for the WebSocket event loop.
67 """
68 if self._connected:
69 return True
71 try:
72 import websockets # noqa: F401
73 except ImportError:
74 logger.warning("websockets not installed, OpenClaw bridge unavailable")
75 return False
77 self._loop = asyncio.new_event_loop()
78 self._thread = threading.Thread(
79 target=self._run_loop, daemon=True, name='openclaw-gateway'
80 )
81 self._thread.start()
83 # Wait briefly for connection
84 for _ in range(20):
85 if self._connected:
86 return True
87 import time
88 time.sleep(0.1)
90 return self._connected
92 def _run_loop(self):
93 """Background event loop for WebSocket."""
94 asyncio.set_event_loop(self._loop)
95 self._loop.run_until_complete(self._ws_loop())
97 async def _ws_loop(self):
98 """WebSocket connection loop with auto-reconnect."""
99 try:
100 import websockets
101 except ImportError:
102 return
104 while True:
105 try:
106 async with websockets.connect(self._url) as ws:
107 self._ws = ws
108 self._connected = True
109 logger.info("Connected to OpenClaw gateway at %s", self._url)
111 # Announce HART OS as a tool provider
112 await ws.send(json.dumps({
113 'type': 'hart_announce',
114 'capabilities': [
115 'recipe_execution',
116 'agent_dispatch',
117 'model_bus',
118 'compute_mesh',
119 'vision',
120 'tts',
121 ],
122 }))
124 async for raw in ws:
125 try:
126 msg = json.loads(raw)
127 for handler in self._handlers:
128 handler(msg)
129 except json.JSONDecodeError:
130 logger.debug("Non-JSON from gateway: %s", raw[:100])
132 except Exception as e:
133 self._connected = False
134 self._ws = None
135 logger.debug("Gateway connection lost: %s, retrying in 5s", e)
136 await asyncio.sleep(5)
138 def send_message(self, channel: str, message: str,
139 recipient: Optional[str] = None) -> bool:
140 """Send a message through OpenClaw's channel system.
142 Args:
143 channel: Channel name (whatsapp, telegram, discord, slack, etc.)
144 message: Message text
145 recipient: Optional recipient ID for DM channels
146 """
147 if not self._connected or not self._ws:
148 logger.warning("Not connected to OpenClaw gateway")
149 return False
151 payload = {
152 'type': 'send_message',
153 'channel': channel,
154 'message': message,
155 }
156 if recipient:
157 payload['recipient'] = recipient
159 try:
160 future = asyncio.run_coroutine_threadsafe(
161 self._ws.send(json.dumps(payload)), self._loop
162 )
163 future.result(timeout=5)
164 return True
165 except Exception as e:
166 logger.error("Failed to send via OpenClaw: %s", e)
167 return False
169 def invoke_tool(self, tool_name: str, args: Dict[str, Any]) -> Optional[str]:
170 """Invoke an OpenClaw tool through the gateway.
172 This lets HART agents use OpenClaw's built-in and skill tools.
173 """
174 if not self._connected or not self._ws:
175 return None
177 payload = {
178 'type': 'tool_invoke',
179 'tool': tool_name,
180 'args': args,
181 }
183 try:
184 future = asyncio.run_coroutine_threadsafe(
185 self._ws.send(json.dumps(payload)), self._loop
186 )
187 future.result(timeout=10)
188 return '{"status": "dispatched"}'
189 except Exception as e:
190 logger.error("Tool invoke failed: %s", e)
191 return None
193 def disconnect(self):
194 """Disconnect from the OpenClaw gateway."""
195 self._connected = False
196 if self._ws:
197 try:
198 asyncio.run_coroutine_threadsafe(
199 self._ws.close(), self._loop
200 )
201 except Exception:
202 pass
203 self._ws = None
204 if self._loop:
205 self._loop.call_soon_threadsafe(self._loop.stop)
207 def health(self) -> Dict[str, Any]:
208 """Health check for the gateway connection."""
209 return {
210 'connected': self._connected,
211 'gateway_url': self._url,
212 'openclaw_installed': shutil.which('openclaw') is not None,
213 }
216# ── OpenClaw Process Management ───────────────────────────────────
218def is_openclaw_installed() -> bool:
219 """Check if OpenClaw is installed on the system."""
220 return shutil.which('openclaw') is not None
223def get_openclaw_version() -> Optional[str]:
224 """Get installed OpenClaw version."""
225 if not is_openclaw_installed():
226 return None
227 try:
228 result = subprocess.run(
229 ['openclaw', '--version'],
230 capture_output=True, text=True, timeout=5
231 )
232 return result.stdout.strip() if result.returncode == 0 else None
233 except Exception:
234 return None
237def start_openclaw_gateway(port: int = 18789) -> Optional[subprocess.Popen]:
238 """Start the OpenClaw gateway process.
240 HART OS manages OpenClaw as a native app — we start/stop it like
241 any other service (RustDesk, Sunshine, llama.cpp).
242 """
243 if not is_openclaw_installed():
244 logger.warning("OpenClaw not installed, cannot start gateway")
245 return None
247 try:
248 proc = subprocess.Popen(
249 ['openclaw', 'gateway', '--port', str(port)],
250 stdout=subprocess.PIPE,
251 stderr=subprocess.PIPE,
252 )
253 logger.info("Started OpenClaw gateway on port %d (PID %d)", port, proc.pid)
254 return proc
255 except Exception as e:
256 logger.error("Failed to start OpenClaw gateway: %s", e)
257 return None
260# ── Singleton ──────────────────────────────────────────────────────
262_bridge: Optional[OpenClawGatewayBridge] = None
265def get_gateway_bridge() -> OpenClawGatewayBridge:
266 """Get the singleton gateway bridge."""
267 global _bridge
268 if _bridge is None:
269 _bridge = OpenClawGatewayBridge()
270 return _bridge