Coverage for integrations / channels / extensions / tlon_adapter.py: 46.2%
145 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Tlon (Urbit) Channel Adapter
4Implements Tlon/Urbit messaging integration.
5Based on HevolveBot extension patterns.
7Features:
8- Urbit API integration
9- Graph store messaging
10- Groups support
11- DMs and channels
12- Docker-compatible
13"""
15from __future__ import annotations
17import asyncio
18import logging
19import os
20import json
21try:
22 import aiohttp
23 HAS_AIOHTTP = True
24except ImportError:
25 HAS_AIOHTTP = False
26from typing import Optional, List, Dict, Any, Callable
27from datetime import datetime
28from dataclasses import dataclass, field
30from ..base import (
31 ChannelAdapter,
32 ChannelConfig,
33 ChannelStatus,
34 Message,
35 MessageType,
36 SendResult,
37 ChannelConnectionError,
38 ChannelSendError,
39)
41logger = logging.getLogger(__name__)
44@dataclass
45class TlonConfig(ChannelConfig):
46 """Tlon/Urbit-specific configuration."""
47 ship_url: str = "" # e.g., http://localhost:8080
48 ship_name: str = "" # e.g., ~zod
49 ship_code: str = "" # Login code
50 default_channel: str = ""
51 enable_groups: bool = True
52 enable_dms: bool = True
53 reconnect_delay: float = 5.0
54 max_reconnect_attempts: int = 10
56 @classmethod
57 def from_env(cls) -> "TlonConfig":
58 """Create config from environment variables."""
59 return cls(
60 ship_url=os.getenv("URBIT_URL", "http://localhost:8080"),
61 ship_name=os.getenv("URBIT_SHIP", ""),
62 ship_code=os.getenv("URBIT_CODE", ""),
63 default_channel=os.getenv("URBIT_CHANNEL", ""),
64 )
67@dataclass
68class TlonGroup:
69 """Tlon group information."""
70 name: str
71 ship: str
72 title: str
73 description: str = ""
74 member_count: int = 0
77@dataclass
78class TlonChannel:
79 """Tlon channel information."""
80 name: str
81 group: str
82 title: str
83 channel_type: str = "chat" # chat, notebook, collection
86class TlonAdapter(ChannelAdapter):
87 """Tlon/Urbit channel adapter."""
89 channel_type = "tlon"
91 @property
92 def name(self) -> str:
93 """Get adapter name."""
94 return self.channel_type
96 def __init__(self, config: TlonConfig):
97 super().__init__(config)
98 self.config: TlonConfig = config
99 self._session: Optional[aiohttp.ClientSession] = None
100 self._cookie: Optional[str] = None
101 self._connected = False
102 self._sse_task: Optional[asyncio.Task] = None
103 self._message_handlers: List[Callable] = []
104 self._event_id = 0
105 self._channel_id = f"hevolvebot-{datetime.now().timestamp()}"
107 async def connect(self) -> bool:
108 """Connect to Urbit ship."""
109 try:
110 self._session = aiohttp.ClientSession()
112 # Login to ship
113 login_url = f"{self.config.ship_url}/~/login"
114 async with self._session.post(
115 login_url,
116 data={"password": self.config.ship_code}
117 ) as resp:
118 if resp.status != 204:
119 raise ChannelConnectionError("Failed to login to Urbit ship")
121 # Get session cookie
122 self._cookie = resp.cookies.get("urbauth-~" + self.config.ship_name.lstrip("~"))
124 # Open event channel
125 await self._open_channel()
127 # Start SSE listener
128 self._sse_task = asyncio.create_task(self._sse_loop())
130 self._connected = True
131 self._status = ChannelStatus.CONNECTED
132 logger.info(f"Connected to Urbit ship {self.config.ship_name}")
133 return True
135 except Exception as e:
136 logger.error(f"Failed to connect to Urbit: {e}")
137 self._status = ChannelStatus.ERROR
138 raise ChannelConnectionError(str(e))
140 async def disconnect(self) -> None:
141 """Disconnect from Urbit ship."""
142 self._connected = False
144 if self._sse_task:
145 self._sse_task.cancel()
146 try:
147 await self._sse_task
148 except asyncio.CancelledError:
149 pass
151 # Close channel
152 if self._session:
153 try:
154 await self._session.delete(
155 f"{self.config.ship_url}/~/channel/{self._channel_id}"
156 )
157 except Exception:
158 pass
159 await self._session.close()
160 self._session = None
162 self._status = ChannelStatus.DISCONNECTED
163 logger.info("Disconnected from Urbit")
165 async def _open_channel(self) -> None:
166 """Open Urbit event channel."""
167 url = f"{self.config.ship_url}/~/channel/{self._channel_id}"
169 self._event_id += 1
170 poke = {
171 "id": self._event_id,
172 "action": "poke",
173 "ship": self.config.ship_name.lstrip("~"),
174 "app": "hood",
175 "mark": "helm-hi",
176 "json": "HevolveBot connected"
177 }
179 async with self._session.put(url, json=[poke]) as resp:
180 if resp.status != 204:
181 raise ChannelConnectionError("Failed to open channel")
183 async def _sse_loop(self) -> None:
184 """Listen for Server-Sent Events from Urbit."""
185 url = f"{self.config.ship_url}/~/channel/{self._channel_id}"
187 while self._connected:
188 try:
189 async with self._session.get(url) as resp:
190 async for line in resp.content:
191 if not self._connected:
192 break
194 line = line.decode().strip()
195 if line.startswith("data:"):
196 data = json.loads(line[5:])
197 await self._handle_event(data)
199 except asyncio.CancelledError:
200 break
201 except Exception as e:
202 logger.error(f"SSE error: {e}")
203 if self._connected:
204 await asyncio.sleep(self.config.reconnect_delay)
206 async def _handle_event(self, data: Dict[str, Any]) -> None:
207 """Handle incoming Urbit event."""
208 try:
209 response = data.get("response", "")
211 if response == "diff":
212 # Chat message
213 json_data = data.get("json", {})
214 if isinstance(json_data, dict) and "message" in json_data:
215 message = self._parse_message(json_data)
216 if message:
217 for handler in self._message_handlers:
218 asyncio.create_task(handler(message))
220 # Acknowledge event
221 self._event_id += 1
222 ack = {"id": self._event_id, "action": "ack", "event-id": data.get("id", 0)}
223 await self._session.put(
224 f"{self.config.ship_url}/~/channel/{self._channel_id}",
225 json=[ack]
226 )
228 except Exception as e:
229 logger.error(f"Error handling event: {e}")
231 def _parse_message(self, data: Dict[str, Any]) -> Optional[Message]:
232 """Parse Urbit message to unified Message."""
233 try:
234 msg = data.get("message", {})
235 author = msg.get("author", "")
236 content = msg.get("contents", [])
238 # Extract text from contents
239 text_parts = []
240 for item in content:
241 if isinstance(item, dict) and "text" in item:
242 text_parts.append(item["text"])
244 text = " ".join(text_parts)
246 if not text:
247 return None
249 return Message(
250 id=str(msg.get("time-sent", datetime.now().timestamp())),
251 channel=self.channel_type,
252 chat_id=data.get("resource", ""),
253 sender_id=author,
254 sender_name=author,
255 text=text,
256 timestamp=datetime.now(),
257 message_type=MessageType.TEXT,
258 )
259 except Exception as e:
260 logger.error(f"Error parsing message: {e}")
261 return None
263 def on_message(self, handler: Callable) -> None:
264 """Register message handler."""
265 self._message_handlers.append(handler)
267 async def send_message(
268 self,
269 chat_id: str,
270 text: str,
271 reply_to: Optional[str] = None,
272 **kwargs
273 ) -> SendResult:
274 """Send a message to Urbit channel."""
275 try:
276 self._event_id += 1
278 # Parse chat_id as resource path
279 # Format: /ship/~ship-name/group-name/channel-name
281 poke = {
282 "id": self._event_id,
283 "action": "poke",
284 "ship": self.config.ship_name.lstrip("~"),
285 "app": "graph-push-hook",
286 "mark": "graph-update-3",
287 "json": {
288 "add-nodes": {
289 "resource": {
290 "ship": self.config.ship_name,
291 "name": chat_id
292 },
293 "nodes": {
294 f"/{int(datetime.now().timestamp() * 1000)}": {
295 "post": {
296 "author": self.config.ship_name,
297 "index": f"/{int(datetime.now().timestamp() * 1000)}",
298 "time-sent": int(datetime.now().timestamp() * 1000),
299 "contents": [{"text": text}],
300 "hash": None,
301 "signatures": []
302 },
303 "children": None
304 }
305 }
306 }
307 }
308 }
310 async with self._session.put(
311 f"{self.config.ship_url}/~/channel/{self._channel_id}",
312 json=[poke]
313 ) as resp:
314 if resp.status != 204:
315 raise ChannelSendError("Failed to send message")
317 return SendResult(
318 success=True,
319 message_id=str(self._event_id),
320 timestamp=datetime.now()
321 )
323 except Exception as e:
324 logger.error(f"Failed to send message: {e}")
325 raise ChannelSendError(str(e))
327 async def edit_message(self, chat_id: str, message_id: str, text: str, **kwargs) -> bool:
328 """Urbit doesn't support message editing."""
329 return False
331 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool:
332 """Delete is not directly supported in graph-store."""
333 return False
335 async def send_typing(self, chat_id: str, **kwargs) -> None:
336 """Urbit doesn't have typing indicators."""
337 pass
339 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
340 """Get channel information."""
341 return {
342 "id": chat_id,
343 "type": "channel",
344 "ship": self.config.ship_name,
345 }
348def create_tlon_adapter(
349 ship_url: Optional[str] = None,
350 ship_name: Optional[str] = None,
351 ship_code: Optional[str] = None,
352 **kwargs
353) -> TlonAdapter:
354 """Factory function to create a Tlon adapter."""
355 config = TlonConfig(
356 ship_url=ship_url or os.getenv("URBIT_URL", "http://localhost:8080"),
357 ship_name=ship_name or os.getenv("URBIT_SHIP", ""),
358 ship_code=ship_code or os.getenv("URBIT_CODE", ""),
359 **kwargs
360 )
361 return TlonAdapter(config)