Coverage for integrations / channels / extensions / tlon_adapter.py: 46.2%

145 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Tlon (Urbit) Channel Adapter 

3 

4Implements Tlon/Urbit messaging integration. 

5Based on HevolveBot extension patterns. 

6 

7Features: 

8- Urbit API integration 

9- Graph store messaging 

10- Groups support 

11- DMs and channels 

12- Docker-compatible 

13""" 

14 

15from __future__ import annotations 

16 

17import asyncio 

18import logging 

19import os 

20import json 

21try: 

22 import aiohttp 

23 HAS_AIOHTTP = True 

24except ImportError: 

25 HAS_AIOHTTP = False 

26from typing import Optional, List, Dict, Any, Callable 

27from datetime import datetime 

28from dataclasses import dataclass, field 

29 

30from ..base import ( 

31 ChannelAdapter, 

32 ChannelConfig, 

33 ChannelStatus, 

34 Message, 

35 MessageType, 

36 SendResult, 

37 ChannelConnectionError, 

38 ChannelSendError, 

39) 

40 

41logger = logging.getLogger(__name__) 

42 

43 

44@dataclass 

45class TlonConfig(ChannelConfig): 

46 """Tlon/Urbit-specific configuration.""" 

47 ship_url: str = "" # e.g., http://localhost:8080 

48 ship_name: str = "" # e.g., ~zod 

49 ship_code: str = "" # Login code 

50 default_channel: str = "" 

51 enable_groups: bool = True 

52 enable_dms: bool = True 

53 reconnect_delay: float = 5.0 

54 max_reconnect_attempts: int = 10 

55 

56 @classmethod 

57 def from_env(cls) -> "TlonConfig": 

58 """Create config from environment variables.""" 

59 return cls( 

60 ship_url=os.getenv("URBIT_URL", "http://localhost:8080"), 

61 ship_name=os.getenv("URBIT_SHIP", ""), 

62 ship_code=os.getenv("URBIT_CODE", ""), 

63 default_channel=os.getenv("URBIT_CHANNEL", ""), 

64 ) 

65 

66 

67@dataclass 

68class TlonGroup: 

69 """Tlon group information.""" 

70 name: str 

71 ship: str 

72 title: str 

73 description: str = "" 

74 member_count: int = 0 

75 

76 

77@dataclass 

78class TlonChannel: 

79 """Tlon channel information.""" 

80 name: str 

81 group: str 

82 title: str 

83 channel_type: str = "chat" # chat, notebook, collection 

84 

85 

86class TlonAdapter(ChannelAdapter): 

87 """Tlon/Urbit channel adapter.""" 

88 

89 channel_type = "tlon" 

90 

91 @property 

92 def name(self) -> str: 

93 """Get adapter name.""" 

94 return self.channel_type 

95 

96 def __init__(self, config: TlonConfig): 

97 super().__init__(config) 

98 self.config: TlonConfig = config 

99 self._session: Optional[aiohttp.ClientSession] = None 

100 self._cookie: Optional[str] = None 

101 self._connected = False 

102 self._sse_task: Optional[asyncio.Task] = None 

103 self._message_handlers: List[Callable] = [] 

104 self._event_id = 0 

105 self._channel_id = f"hevolvebot-{datetime.now().timestamp()}" 

106 

107 async def connect(self) -> bool: 

108 """Connect to Urbit ship.""" 

109 try: 

110 self._session = aiohttp.ClientSession() 

111 

112 # Login to ship 

113 login_url = f"{self.config.ship_url}/~/login" 

114 async with self._session.post( 

115 login_url, 

116 data={"password": self.config.ship_code} 

117 ) as resp: 

118 if resp.status != 204: 

119 raise ChannelConnectionError("Failed to login to Urbit ship") 

120 

121 # Get session cookie 

122 self._cookie = resp.cookies.get("urbauth-~" + self.config.ship_name.lstrip("~")) 

123 

124 # Open event channel 

125 await self._open_channel() 

126 

127 # Start SSE listener 

128 self._sse_task = asyncio.create_task(self._sse_loop()) 

129 

130 self._connected = True 

131 self._status = ChannelStatus.CONNECTED 

132 logger.info(f"Connected to Urbit ship {self.config.ship_name}") 

133 return True 

134 

135 except Exception as e: 

136 logger.error(f"Failed to connect to Urbit: {e}") 

137 self._status = ChannelStatus.ERROR 

138 raise ChannelConnectionError(str(e)) 

139 

140 async def disconnect(self) -> None: 

141 """Disconnect from Urbit ship.""" 

142 self._connected = False 

143 

144 if self._sse_task: 

145 self._sse_task.cancel() 

146 try: 

147 await self._sse_task 

148 except asyncio.CancelledError: 

149 pass 

150 

151 # Close channel 

152 if self._session: 

153 try: 

154 await self._session.delete( 

155 f"{self.config.ship_url}/~/channel/{self._channel_id}" 

156 ) 

157 except Exception: 

158 pass 

159 await self._session.close() 

160 self._session = None 

161 

162 self._status = ChannelStatus.DISCONNECTED 

163 logger.info("Disconnected from Urbit") 

164 

165 async def _open_channel(self) -> None: 

166 """Open Urbit event channel.""" 

167 url = f"{self.config.ship_url}/~/channel/{self._channel_id}" 

168 

169 self._event_id += 1 

170 poke = { 

171 "id": self._event_id, 

172 "action": "poke", 

173 "ship": self.config.ship_name.lstrip("~"), 

174 "app": "hood", 

175 "mark": "helm-hi", 

176 "json": "HevolveBot connected" 

177 } 

178 

179 async with self._session.put(url, json=[poke]) as resp: 

180 if resp.status != 204: 

181 raise ChannelConnectionError("Failed to open channel") 

182 

183 async def _sse_loop(self) -> None: 

184 """Listen for Server-Sent Events from Urbit.""" 

185 url = f"{self.config.ship_url}/~/channel/{self._channel_id}" 

186 

187 while self._connected: 

188 try: 

189 async with self._session.get(url) as resp: 

190 async for line in resp.content: 

191 if not self._connected: 

192 break 

193 

194 line = line.decode().strip() 

195 if line.startswith("data:"): 

196 data = json.loads(line[5:]) 

197 await self._handle_event(data) 

198 

199 except asyncio.CancelledError: 

200 break 

201 except Exception as e: 

202 logger.error(f"SSE error: {e}") 

203 if self._connected: 

204 await asyncio.sleep(self.config.reconnect_delay) 

205 

206 async def _handle_event(self, data: Dict[str, Any]) -> None: 

207 """Handle incoming Urbit event.""" 

208 try: 

209 response = data.get("response", "") 

210 

211 if response == "diff": 

212 # Chat message 

213 json_data = data.get("json", {}) 

214 if isinstance(json_data, dict) and "message" in json_data: 

215 message = self._parse_message(json_data) 

216 if message: 

217 for handler in self._message_handlers: 

218 asyncio.create_task(handler(message)) 

219 

220 # Acknowledge event 

221 self._event_id += 1 

222 ack = {"id": self._event_id, "action": "ack", "event-id": data.get("id", 0)} 

223 await self._session.put( 

224 f"{self.config.ship_url}/~/channel/{self._channel_id}", 

225 json=[ack] 

226 ) 

227 

228 except Exception as e: 

229 logger.error(f"Error handling event: {e}") 

230 

231 def _parse_message(self, data: Dict[str, Any]) -> Optional[Message]: 

232 """Parse Urbit message to unified Message.""" 

233 try: 

234 msg = data.get("message", {}) 

235 author = msg.get("author", "") 

236 content = msg.get("contents", []) 

237 

238 # Extract text from contents 

239 text_parts = [] 

240 for item in content: 

241 if isinstance(item, dict) and "text" in item: 

242 text_parts.append(item["text"]) 

243 

244 text = " ".join(text_parts) 

245 

246 if not text: 

247 return None 

248 

249 return Message( 

250 id=str(msg.get("time-sent", datetime.now().timestamp())), 

251 channel=self.channel_type, 

252 chat_id=data.get("resource", ""), 

253 sender_id=author, 

254 sender_name=author, 

255 text=text, 

256 timestamp=datetime.now(), 

257 message_type=MessageType.TEXT, 

258 ) 

259 except Exception as e: 

260 logger.error(f"Error parsing message: {e}") 

261 return None 

262 

263 def on_message(self, handler: Callable) -> None: 

264 """Register message handler.""" 

265 self._message_handlers.append(handler) 

266 

267 async def send_message( 

268 self, 

269 chat_id: str, 

270 text: str, 

271 reply_to: Optional[str] = None, 

272 **kwargs 

273 ) -> SendResult: 

274 """Send a message to Urbit channel.""" 

275 try: 

276 self._event_id += 1 

277 

278 # Parse chat_id as resource path 

279 # Format: /ship/~ship-name/group-name/channel-name 

280 

281 poke = { 

282 "id": self._event_id, 

283 "action": "poke", 

284 "ship": self.config.ship_name.lstrip("~"), 

285 "app": "graph-push-hook", 

286 "mark": "graph-update-3", 

287 "json": { 

288 "add-nodes": { 

289 "resource": { 

290 "ship": self.config.ship_name, 

291 "name": chat_id 

292 }, 

293 "nodes": { 

294 f"/{int(datetime.now().timestamp() * 1000)}": { 

295 "post": { 

296 "author": self.config.ship_name, 

297 "index": f"/{int(datetime.now().timestamp() * 1000)}", 

298 "time-sent": int(datetime.now().timestamp() * 1000), 

299 "contents": [{"text": text}], 

300 "hash": None, 

301 "signatures": [] 

302 }, 

303 "children": None 

304 } 

305 } 

306 } 

307 } 

308 } 

309 

310 async with self._session.put( 

311 f"{self.config.ship_url}/~/channel/{self._channel_id}", 

312 json=[poke] 

313 ) as resp: 

314 if resp.status != 204: 

315 raise ChannelSendError("Failed to send message") 

316 

317 return SendResult( 

318 success=True, 

319 message_id=str(self._event_id), 

320 timestamp=datetime.now() 

321 ) 

322 

323 except Exception as e: 

324 logger.error(f"Failed to send message: {e}") 

325 raise ChannelSendError(str(e)) 

326 

327 async def edit_message(self, chat_id: str, message_id: str, text: str, **kwargs) -> bool: 

328 """Urbit doesn't support message editing.""" 

329 return False 

330 

331 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool: 

332 """Delete is not directly supported in graph-store.""" 

333 return False 

334 

335 async def send_typing(self, chat_id: str, **kwargs) -> None: 

336 """Urbit doesn't have typing indicators.""" 

337 pass 

338 

339 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

340 """Get channel information.""" 

341 return { 

342 "id": chat_id, 

343 "type": "channel", 

344 "ship": self.config.ship_name, 

345 } 

346 

347 

348def create_tlon_adapter( 

349 ship_url: Optional[str] = None, 

350 ship_name: Optional[str] = None, 

351 ship_code: Optional[str] = None, 

352 **kwargs 

353) -> TlonAdapter: 

354 """Factory function to create a Tlon adapter.""" 

355 config = TlonConfig( 

356 ship_url=ship_url or os.getenv("URBIT_URL", "http://localhost:8080"), 

357 ship_name=ship_name or os.getenv("URBIT_SHIP", ""), 

358 ship_code=ship_code or os.getenv("URBIT_CODE", ""), 

359 **kwargs 

360 ) 

361 return TlonAdapter(config)