Coverage for integrations / openclaw / gateway_bridge.py: 45.5%

110 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Gateway Bridge — Connect HART OS to OpenClaw's WebSocket control plane. 

3 

4OpenClaw runs a Gateway at ws://127.0.0.1:18789. We connect to it 

5bidirectionally: 

6 

7 Inbound: HART agents can send messages through OpenClaw's channels 

8 (WhatsApp, Telegram, Discord, etc.) 

9 Outbound: OpenClaw agents can invoke HART recipes, tools, and agents 

10 

11This makes HART OS the superset — it can use all of OpenClaw's 20+ 

12channel adapters natively, plus its own 30+ channel integrations. 

13""" 

14 

15import asyncio 

16import json 

17import logging 

18import os 

19import shutil 

20import subprocess 

21import threading 

22from typing import Any, Callable, Dict, List, Optional 

23 

24logger = logging.getLogger(__name__) 

25 

26DEFAULT_GATEWAY_URL = 'ws://127.0.0.1:18789' 

27 

28 

29class OpenClawGatewayBridge: 

30 """Bidirectional bridge to the OpenClaw Gateway WebSocket. 

31 

32 Lifecycle: 

33 bridge = OpenClawGatewayBridge() 

34 bridge.connect() # Connect to running gateway 

35 bridge.send_message(channel, msg) # Send via OpenClaw channel 

36 bridge.on_message(handler) # Receive from OpenClaw 

37 bridge.disconnect() 

38 """ 

39 

40 def __init__(self, gateway_url: Optional[str] = None): 

41 self._url = gateway_url or os.environ.get( 

42 'OPENCLAW_GATEWAY_URL', DEFAULT_GATEWAY_URL 

43 ) 

44 self._ws = None 

45 self._connected = False 

46 self._handlers: List[Callable] = [] 

47 self._loop: Optional[asyncio.AbstractEventLoop] = None 

48 self._thread: Optional[threading.Thread] = None 

49 

50 @property 

51 def connected(self) -> bool: 

52 return self._connected 

53 

54 @property 

55 def gateway_url(self) -> str: 

56 return self._url 

57 

58 def on_message(self, handler: Callable[[Dict[str, Any]], None]): 

59 """Register a handler for inbound OpenClaw messages.""" 

60 self._handlers.append(handler) 

61 

62 def connect(self) -> bool: 

63 """Connect to the OpenClaw Gateway. 

64 

65 Returns True if connected, False if gateway is unavailable. 

66 Starts a background thread for the WebSocket event loop. 

67 """ 

68 if self._connected: 

69 return True 

70 

71 try: 

72 import websockets # noqa: F401 

73 except ImportError: 

74 logger.warning("websockets not installed, OpenClaw bridge unavailable") 

75 return False 

76 

77 self._loop = asyncio.new_event_loop() 

78 self._thread = threading.Thread( 

79 target=self._run_loop, daemon=True, name='openclaw-gateway' 

80 ) 

81 self._thread.start() 

82 

83 # Wait briefly for connection 

84 for _ in range(20): 

85 if self._connected: 

86 return True 

87 import time 

88 time.sleep(0.1) 

89 

90 return self._connected 

91 

92 def _run_loop(self): 

93 """Background event loop for WebSocket.""" 

94 asyncio.set_event_loop(self._loop) 

95 self._loop.run_until_complete(self._ws_loop()) 

96 

97 async def _ws_loop(self): 

98 """WebSocket connection loop with auto-reconnect.""" 

99 try: 

100 import websockets 

101 except ImportError: 

102 return 

103 

104 while True: 

105 try: 

106 async with websockets.connect(self._url) as ws: 

107 self._ws = ws 

108 self._connected = True 

109 logger.info("Connected to OpenClaw gateway at %s", self._url) 

110 

111 # Announce HART OS as a tool provider 

112 await ws.send(json.dumps({ 

113 'type': 'hart_announce', 

114 'capabilities': [ 

115 'recipe_execution', 

116 'agent_dispatch', 

117 'model_bus', 

118 'compute_mesh', 

119 'vision', 

120 'tts', 

121 ], 

122 })) 

123 

124 async for raw in ws: 

125 try: 

126 msg = json.loads(raw) 

127 for handler in self._handlers: 

128 handler(msg) 

129 except json.JSONDecodeError: 

130 logger.debug("Non-JSON from gateway: %s", raw[:100]) 

131 

132 except Exception as e: 

133 self._connected = False 

134 self._ws = None 

135 logger.debug("Gateway connection lost: %s, retrying in 5s", e) 

136 await asyncio.sleep(5) 

137 

138 def send_message(self, channel: str, message: str, 

139 recipient: Optional[str] = None) -> bool: 

140 """Send a message through OpenClaw's channel system. 

141 

142 Args: 

143 channel: Channel name (whatsapp, telegram, discord, slack, etc.) 

144 message: Message text 

145 recipient: Optional recipient ID for DM channels 

146 """ 

147 if not self._connected or not self._ws: 

148 logger.warning("Not connected to OpenClaw gateway") 

149 return False 

150 

151 payload = { 

152 'type': 'send_message', 

153 'channel': channel, 

154 'message': message, 

155 } 

156 if recipient: 

157 payload['recipient'] = recipient 

158 

159 try: 

160 future = asyncio.run_coroutine_threadsafe( 

161 self._ws.send(json.dumps(payload)), self._loop 

162 ) 

163 future.result(timeout=5) 

164 return True 

165 except Exception as e: 

166 logger.error("Failed to send via OpenClaw: %s", e) 

167 return False 

168 

169 def invoke_tool(self, tool_name: str, args: Dict[str, Any]) -> Optional[str]: 

170 """Invoke an OpenClaw tool through the gateway. 

171 

172 This lets HART agents use OpenClaw's built-in and skill tools. 

173 """ 

174 if not self._connected or not self._ws: 

175 return None 

176 

177 payload = { 

178 'type': 'tool_invoke', 

179 'tool': tool_name, 

180 'args': args, 

181 } 

182 

183 try: 

184 future = asyncio.run_coroutine_threadsafe( 

185 self._ws.send(json.dumps(payload)), self._loop 

186 ) 

187 future.result(timeout=10) 

188 return '{"status": "dispatched"}' 

189 except Exception as e: 

190 logger.error("Tool invoke failed: %s", e) 

191 return None 

192 

193 def disconnect(self): 

194 """Disconnect from the OpenClaw gateway.""" 

195 self._connected = False 

196 if self._ws: 

197 try: 

198 asyncio.run_coroutine_threadsafe( 

199 self._ws.close(), self._loop 

200 ) 

201 except Exception: 

202 pass 

203 self._ws = None 

204 if self._loop: 

205 self._loop.call_soon_threadsafe(self._loop.stop) 

206 

207 def health(self) -> Dict[str, Any]: 

208 """Health check for the gateway connection.""" 

209 return { 

210 'connected': self._connected, 

211 'gateway_url': self._url, 

212 'openclaw_installed': shutil.which('openclaw') is not None, 

213 } 

214 

215 

216# ── OpenClaw Process Management ─────────────────────────────────── 

217 

218def is_openclaw_installed() -> bool: 

219 """Check if OpenClaw is installed on the system.""" 

220 return shutil.which('openclaw') is not None 

221 

222 

223def get_openclaw_version() -> Optional[str]: 

224 """Get installed OpenClaw version.""" 

225 if not is_openclaw_installed(): 

226 return None 

227 try: 

228 result = subprocess.run( 

229 ['openclaw', '--version'], 

230 capture_output=True, text=True, timeout=5 

231 ) 

232 return result.stdout.strip() if result.returncode == 0 else None 

233 except Exception: 

234 return None 

235 

236 

237def start_openclaw_gateway(port: int = 18789) -> Optional[subprocess.Popen]: 

238 """Start the OpenClaw gateway process. 

239 

240 HART OS manages OpenClaw as a native app — we start/stop it like 

241 any other service (RustDesk, Sunshine, llama.cpp). 

242 """ 

243 if not is_openclaw_installed(): 

244 logger.warning("OpenClaw not installed, cannot start gateway") 

245 return None 

246 

247 try: 

248 proc = subprocess.Popen( 

249 ['openclaw', 'gateway', '--port', str(port)], 

250 stdout=subprocess.PIPE, 

251 stderr=subprocess.PIPE, 

252 ) 

253 logger.info("Started OpenClaw gateway on port %d (PID %d)", port, proc.pid) 

254 return proc 

255 except Exception as e: 

256 logger.error("Failed to start OpenClaw gateway: %s", e) 

257 return None 

258 

259 

260# ── Singleton ────────────────────────────────────────────────────── 

261 

262_bridge: Optional[OpenClawGatewayBridge] = None 

263 

264 

265def get_gateway_bridge() -> OpenClawGatewayBridge: 

266 """Get the singleton gateway bridge.""" 

267 global _bridge 

268 if _bridge is None: 

269 _bridge = OpenClawGatewayBridge() 

270 return _bridge