Coverage for integrations / channels / extensions / openprose_adapter.py: 45.3%
161 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Open Prose Channel Adapter
4Implements Open Prose messaging integration.
5Based on HevolveBot extension patterns.
7Open Prose is an open-source prose/document collaboration platform.
9Features:
10- Document collaboration
11- Real-time editing
12- Comments and discussions
13- Docker-compatible
14"""
16from __future__ import annotations
18import asyncio
19import logging
20import os
21import json
22try:
23 import aiohttp
24 HAS_AIOHTTP = True
25except ImportError:
26 HAS_AIOHTTP = False
27from typing import Optional, List, Dict, Any, Callable
28from datetime import datetime
29from dataclasses import dataclass, field
30from urllib.parse import urljoin
32from ..base import (
33 ChannelAdapter,
34 ChannelConfig,
35 ChannelStatus,
36 Message,
37 MessageType,
38 SendResult,
39 ChannelConnectionError,
40 ChannelSendError,
41)
43logger = logging.getLogger(__name__)
46@dataclass
47class OpenProseConfig(ChannelConfig):
48 """Open Prose-specific configuration."""
49 server_url: str = ""
50 api_key: str = ""
51 workspace_id: str = ""
52 enable_comments: bool = True
53 enable_suggestions: bool = True
54 poll_interval: float = 2.0
55 reconnect_delay: float = 5.0
57 @classmethod
58 def from_env(cls) -> "OpenProseConfig":
59 """Create config from environment variables."""
60 return cls(
61 server_url=os.getenv("OPENPROSE_URL", ""),
62 api_key=os.getenv("OPENPROSE_API_KEY", ""),
63 workspace_id=os.getenv("OPENPROSE_WORKSPACE", ""),
64 )
67@dataclass
68class Document:
69 """Open Prose document."""
70 id: str
71 title: str
72 content: str
73 author_id: str
74 created_at: datetime
75 updated_at: datetime
76 collaborators: List[str] = field(default_factory=list)
79@dataclass
80class Comment:
81 """Document comment/discussion."""
82 id: str
83 document_id: str
84 author_id: str
85 author_name: str
86 content: str
87 thread_id: Optional[str] = None
88 created_at: datetime = field(default_factory=datetime.now)
91class OpenProseAdapter(ChannelAdapter):
92 """Open Prose channel adapter."""
94 channel_type = "openprose"
96 @property
97 def name(self) -> str:
98 """Get adapter name."""
99 return self.channel_type
101 def __init__(self, config: OpenProseConfig):
102 super().__init__(config)
103 self.config: OpenProseConfig = config
104 self._session: Optional[aiohttp.ClientSession] = None
105 self._connected = False
106 self._poll_task: Optional[asyncio.Task] = None
107 self._message_handlers: List[Callable] = []
108 self._last_check: Dict[str, datetime] = {}
110 @property
111 def base_url(self) -> str:
112 """Get base API URL."""
113 return urljoin(self.config.server_url, "/api/v1/")
115 def _get_headers(self) -> Dict[str, str]:
116 """Get headers for API requests."""
117 return {
118 "Authorization": f"Bearer {self.config.api_key}",
119 "Content-Type": "application/json",
120 }
122 async def connect(self) -> bool:
123 """Connect to Open Prose server."""
124 try:
125 self._session = aiohttp.ClientSession()
127 # Verify connection
128 async with self._session.get(
129 urljoin(self.base_url, "health"),
130 headers=self._get_headers()
131 ) as resp:
132 if resp.status != 200:
133 raise ChannelConnectionError("Failed to connect to Open Prose")
135 # Start polling for comments
136 self._poll_task = asyncio.create_task(self._poll_loop())
138 self._connected = True
139 self._status = ChannelStatus.CONNECTED
140 logger.info("Connected to Open Prose")
141 return True
143 except Exception as e:
144 logger.error(f"Failed to connect to Open Prose: {e}")
145 self._status = ChannelStatus.ERROR
146 raise ChannelConnectionError(str(e))
148 async def disconnect(self) -> None:
149 """Disconnect from Open Prose."""
150 self._connected = False
152 if self._poll_task:
153 self._poll_task.cancel()
154 try:
155 await self._poll_task
156 except asyncio.CancelledError:
157 pass
159 if self._session:
160 await self._session.close()
161 self._session = None
163 self._status = ChannelStatus.DISCONNECTED
164 logger.info("Disconnected from Open Prose")
166 async def _poll_loop(self) -> None:
167 """Poll for new comments/discussions."""
168 while self._connected:
169 try:
170 await self._check_comments()
171 await asyncio.sleep(self.config.poll_interval)
172 except asyncio.CancelledError:
173 break
174 except Exception as e:
175 logger.error(f"Poll error: {e}")
176 await asyncio.sleep(self.config.reconnect_delay)
178 async def _check_comments(self) -> None:
179 """Check for new comments."""
180 if not self.config.workspace_id:
181 return
183 url = urljoin(self.base_url, f"workspaces/{self.config.workspace_id}/comments")
185 try:
186 async with self._session.get(url, headers=self._get_headers()) as resp:
187 if resp.status != 200:
188 return
190 data = await resp.json()
192 for comment_data in data.get("comments", []):
193 comment_id = comment_data.get("id", "")
194 created_at = datetime.fromisoformat(
195 comment_data.get("created_at", datetime.now().isoformat())
196 )
198 # Check if new
199 doc_id = comment_data.get("document_id", "")
200 last_check = self._last_check.get(doc_id, datetime.min)
202 if created_at > last_check:
203 message = self._parse_comment(comment_data)
204 if message:
205 for handler in self._message_handlers:
206 asyncio.create_task(handler(message))
208 self._last_check[doc_id] = datetime.now()
210 except Exception as e:
211 logger.error(f"Error checking comments: {e}")
213 def _parse_comment(self, data: Dict[str, Any]) -> Optional[Message]:
214 """Parse comment to unified Message."""
215 try:
216 return Message(
217 id=data.get("id", ""),
218 channel=self.channel_type,
219 chat_id=data.get("document_id", ""),
220 sender_id=data.get("author_id", ""),
221 sender_name=data.get("author_name", ""),
222 text=data.get("content", ""),
223 timestamp=datetime.fromisoformat(
224 data.get("created_at", datetime.now().isoformat())
225 ),
226 message_type=MessageType.TEXT,
227 reply_to=data.get("thread_id"),
228 metadata={
229 "document_title": data.get("document_title", ""),
230 "selection": data.get("selection"),
231 }
232 )
233 except Exception as e:
234 logger.error(f"Error parsing comment: {e}")
235 return None
237 def on_message(self, handler: Callable) -> None:
238 """Register message handler."""
239 self._message_handlers.append(handler)
241 async def send_message(
242 self,
243 chat_id: str,
244 text: str,
245 reply_to: Optional[str] = None,
246 **kwargs
247 ) -> SendResult:
248 """Add a comment to a document."""
249 url = urljoin(self.base_url, f"documents/{chat_id}/comments")
251 payload = {
252 "content": text,
253 "thread_id": reply_to,
254 }
256 # Add selection if provided
257 if "selection" in kwargs:
258 payload["selection"] = kwargs["selection"]
260 try:
261 async with self._session.post(
262 url,
263 json=payload,
264 headers=self._get_headers()
265 ) as resp:
266 if resp.status not in (200, 201):
267 raise ChannelSendError("Failed to add comment")
269 data = await resp.json()
270 return SendResult(
271 success=True,
272 message_id=data.get("id", ""),
273 timestamp=datetime.now()
274 )
276 except Exception as e:
277 logger.error(f"Failed to send comment: {e}")
278 raise ChannelSendError(str(e))
280 async def edit_message(
281 self,
282 chat_id: str,
283 message_id: str,
284 text: str,
285 **kwargs
286 ) -> bool:
287 """Edit a comment."""
288 url = urljoin(self.base_url, f"comments/{message_id}")
290 try:
291 async with self._session.patch(
292 url,
293 json={"content": text},
294 headers=self._get_headers()
295 ) as resp:
296 return resp.status == 200
297 except Exception as e:
298 logger.error(f"Failed to edit comment: {e}")
299 return False
301 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool:
302 """Delete a comment."""
303 url = urljoin(self.base_url, f"comments/{message_id}")
305 try:
306 async with self._session.delete(url, headers=self._get_headers()) as resp:
307 return resp.status in (200, 204)
308 except Exception as e:
309 logger.error(f"Failed to delete comment: {e}")
310 return False
312 async def send_typing(self, chat_id: str, **kwargs) -> None:
313 """Show typing/composing indicator."""
314 # Could implement presence API if available
315 pass
317 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
318 """Get document information."""
319 url = urljoin(self.base_url, f"documents/{chat_id}")
321 try:
322 async with self._session.get(url, headers=self._get_headers()) as resp:
323 if resp.status != 200:
324 return None
326 data = await resp.json()
327 return {
328 "id": data.get("id", ""),
329 "title": data.get("title", ""),
330 "author": data.get("author_name", ""),
331 "collaborators": data.get("collaborators", []),
332 }
333 except Exception as e:
334 logger.error(f"Failed to get document info: {e}")
335 return None
337 async def add_suggestion(
338 self,
339 document_id: str,
340 selection: Dict[str, Any],
341 suggested_text: str,
342 reason: Optional[str] = None
343 ) -> Optional[str]:
344 """Add a text suggestion to a document."""
345 if not self.config.enable_suggestions:
346 return None
348 url = urljoin(self.base_url, f"documents/{document_id}/suggestions")
350 payload = {
351 "selection": selection,
352 "suggested_text": suggested_text,
353 "reason": reason,
354 }
356 try:
357 async with self._session.post(
358 url,
359 json=payload,
360 headers=self._get_headers()
361 ) as resp:
362 if resp.status in (200, 201):
363 data = await resp.json()
364 return data.get("id")
365 return None
366 except Exception as e:
367 logger.error(f"Failed to add suggestion: {e}")
368 return None
371def create_openprose_adapter(
372 server_url: Optional[str] = None,
373 api_key: Optional[str] = None,
374 workspace_id: Optional[str] = None,
375 **kwargs
376) -> OpenProseAdapter:
377 """Factory function to create an Open Prose adapter."""
378 config = OpenProseConfig(
379 server_url=server_url or os.getenv("OPENPROSE_URL", ""),
380 api_key=api_key or os.getenv("OPENPROSE_API_KEY", ""),
381 workspace_id=workspace_id or os.getenv("OPENPROSE_WORKSPACE", ""),
382 **kwargs
383 )
384 return OpenProseAdapter(config)