Coverage for integrations / channels / extensions / openprose_adapter.py: 45.3%

161 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Open Prose Channel Adapter 

3 

4Implements Open Prose messaging integration. 

5Based on HevolveBot extension patterns. 

6 

7Open Prose is an open-source prose/document collaboration platform. 

8 

9Features: 

10- Document collaboration 

11- Real-time editing 

12- Comments and discussions 

13- Docker-compatible 

14""" 

15 

16from __future__ import annotations 

17 

18import asyncio 

19import logging 

20import os 

21import json 

22try: 

23 import aiohttp 

24 HAS_AIOHTTP = True 

25except ImportError: 

26 HAS_AIOHTTP = False 

27from typing import Optional, List, Dict, Any, Callable 

28from datetime import datetime 

29from dataclasses import dataclass, field 

30from urllib.parse import urljoin 

31 

32from ..base import ( 

33 ChannelAdapter, 

34 ChannelConfig, 

35 ChannelStatus, 

36 Message, 

37 MessageType, 

38 SendResult, 

39 ChannelConnectionError, 

40 ChannelSendError, 

41) 

42 

43logger = logging.getLogger(__name__) 

44 

45 

46@dataclass 

47class OpenProseConfig(ChannelConfig): 

48 """Open Prose-specific configuration.""" 

49 server_url: str = "" 

50 api_key: str = "" 

51 workspace_id: str = "" 

52 enable_comments: bool = True 

53 enable_suggestions: bool = True 

54 poll_interval: float = 2.0 

55 reconnect_delay: float = 5.0 

56 

57 @classmethod 

58 def from_env(cls) -> "OpenProseConfig": 

59 """Create config from environment variables.""" 

60 return cls( 

61 server_url=os.getenv("OPENPROSE_URL", ""), 

62 api_key=os.getenv("OPENPROSE_API_KEY", ""), 

63 workspace_id=os.getenv("OPENPROSE_WORKSPACE", ""), 

64 ) 

65 

66 

67@dataclass 

68class Document: 

69 """Open Prose document.""" 

70 id: str 

71 title: str 

72 content: str 

73 author_id: str 

74 created_at: datetime 

75 updated_at: datetime 

76 collaborators: List[str] = field(default_factory=list) 

77 

78 

79@dataclass 

80class Comment: 

81 """Document comment/discussion.""" 

82 id: str 

83 document_id: str 

84 author_id: str 

85 author_name: str 

86 content: str 

87 thread_id: Optional[str] = None 

88 created_at: datetime = field(default_factory=datetime.now) 

89 

90 

91class OpenProseAdapter(ChannelAdapter): 

92 """Open Prose channel adapter.""" 

93 

94 channel_type = "openprose" 

95 

96 @property 

97 def name(self) -> str: 

98 """Get adapter name.""" 

99 return self.channel_type 

100 

101 def __init__(self, config: OpenProseConfig): 

102 super().__init__(config) 

103 self.config: OpenProseConfig = config 

104 self._session: Optional[aiohttp.ClientSession] = None 

105 self._connected = False 

106 self._poll_task: Optional[asyncio.Task] = None 

107 self._message_handlers: List[Callable] = [] 

108 self._last_check: Dict[str, datetime] = {} 

109 

110 @property 

111 def base_url(self) -> str: 

112 """Get base API URL.""" 

113 return urljoin(self.config.server_url, "/api/v1/") 

114 

115 def _get_headers(self) -> Dict[str, str]: 

116 """Get headers for API requests.""" 

117 return { 

118 "Authorization": f"Bearer {self.config.api_key}", 

119 "Content-Type": "application/json", 

120 } 

121 

122 async def connect(self) -> bool: 

123 """Connect to Open Prose server.""" 

124 try: 

125 self._session = aiohttp.ClientSession() 

126 

127 # Verify connection 

128 async with self._session.get( 

129 urljoin(self.base_url, "health"), 

130 headers=self._get_headers() 

131 ) as resp: 

132 if resp.status != 200: 

133 raise ChannelConnectionError("Failed to connect to Open Prose") 

134 

135 # Start polling for comments 

136 self._poll_task = asyncio.create_task(self._poll_loop()) 

137 

138 self._connected = True 

139 self._status = ChannelStatus.CONNECTED 

140 logger.info("Connected to Open Prose") 

141 return True 

142 

143 except Exception as e: 

144 logger.error(f"Failed to connect to Open Prose: {e}") 

145 self._status = ChannelStatus.ERROR 

146 raise ChannelConnectionError(str(e)) 

147 

148 async def disconnect(self) -> None: 

149 """Disconnect from Open Prose.""" 

150 self._connected = False 

151 

152 if self._poll_task: 

153 self._poll_task.cancel() 

154 try: 

155 await self._poll_task 

156 except asyncio.CancelledError: 

157 pass 

158 

159 if self._session: 

160 await self._session.close() 

161 self._session = None 

162 

163 self._status = ChannelStatus.DISCONNECTED 

164 logger.info("Disconnected from Open Prose") 

165 

166 async def _poll_loop(self) -> None: 

167 """Poll for new comments/discussions.""" 

168 while self._connected: 

169 try: 

170 await self._check_comments() 

171 await asyncio.sleep(self.config.poll_interval) 

172 except asyncio.CancelledError: 

173 break 

174 except Exception as e: 

175 logger.error(f"Poll error: {e}") 

176 await asyncio.sleep(self.config.reconnect_delay) 

177 

178 async def _check_comments(self) -> None: 

179 """Check for new comments.""" 

180 if not self.config.workspace_id: 

181 return 

182 

183 url = urljoin(self.base_url, f"workspaces/{self.config.workspace_id}/comments") 

184 

185 try: 

186 async with self._session.get(url, headers=self._get_headers()) as resp: 

187 if resp.status != 200: 

188 return 

189 

190 data = await resp.json() 

191 

192 for comment_data in data.get("comments", []): 

193 comment_id = comment_data.get("id", "") 

194 created_at = datetime.fromisoformat( 

195 comment_data.get("created_at", datetime.now().isoformat()) 

196 ) 

197 

198 # Check if new 

199 doc_id = comment_data.get("document_id", "") 

200 last_check = self._last_check.get(doc_id, datetime.min) 

201 

202 if created_at > last_check: 

203 message = self._parse_comment(comment_data) 

204 if message: 

205 for handler in self._message_handlers: 

206 asyncio.create_task(handler(message)) 

207 

208 self._last_check[doc_id] = datetime.now() 

209 

210 except Exception as e: 

211 logger.error(f"Error checking comments: {e}") 

212 

213 def _parse_comment(self, data: Dict[str, Any]) -> Optional[Message]: 

214 """Parse comment to unified Message.""" 

215 try: 

216 return Message( 

217 id=data.get("id", ""), 

218 channel=self.channel_type, 

219 chat_id=data.get("document_id", ""), 

220 sender_id=data.get("author_id", ""), 

221 sender_name=data.get("author_name", ""), 

222 text=data.get("content", ""), 

223 timestamp=datetime.fromisoformat( 

224 data.get("created_at", datetime.now().isoformat()) 

225 ), 

226 message_type=MessageType.TEXT, 

227 reply_to=data.get("thread_id"), 

228 metadata={ 

229 "document_title": data.get("document_title", ""), 

230 "selection": data.get("selection"), 

231 } 

232 ) 

233 except Exception as e: 

234 logger.error(f"Error parsing comment: {e}") 

235 return None 

236 

237 def on_message(self, handler: Callable) -> None: 

238 """Register message handler.""" 

239 self._message_handlers.append(handler) 

240 

241 async def send_message( 

242 self, 

243 chat_id: str, 

244 text: str, 

245 reply_to: Optional[str] = None, 

246 **kwargs 

247 ) -> SendResult: 

248 """Add a comment to a document.""" 

249 url = urljoin(self.base_url, f"documents/{chat_id}/comments") 

250 

251 payload = { 

252 "content": text, 

253 "thread_id": reply_to, 

254 } 

255 

256 # Add selection if provided 

257 if "selection" in kwargs: 

258 payload["selection"] = kwargs["selection"] 

259 

260 try: 

261 async with self._session.post( 

262 url, 

263 json=payload, 

264 headers=self._get_headers() 

265 ) as resp: 

266 if resp.status not in (200, 201): 

267 raise ChannelSendError("Failed to add comment") 

268 

269 data = await resp.json() 

270 return SendResult( 

271 success=True, 

272 message_id=data.get("id", ""), 

273 timestamp=datetime.now() 

274 ) 

275 

276 except Exception as e: 

277 logger.error(f"Failed to send comment: {e}") 

278 raise ChannelSendError(str(e)) 

279 

280 async def edit_message( 

281 self, 

282 chat_id: str, 

283 message_id: str, 

284 text: str, 

285 **kwargs 

286 ) -> bool: 

287 """Edit a comment.""" 

288 url = urljoin(self.base_url, f"comments/{message_id}") 

289 

290 try: 

291 async with self._session.patch( 

292 url, 

293 json={"content": text}, 

294 headers=self._get_headers() 

295 ) as resp: 

296 return resp.status == 200 

297 except Exception as e: 

298 logger.error(f"Failed to edit comment: {e}") 

299 return False 

300 

301 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool: 

302 """Delete a comment.""" 

303 url = urljoin(self.base_url, f"comments/{message_id}") 

304 

305 try: 

306 async with self._session.delete(url, headers=self._get_headers()) as resp: 

307 return resp.status in (200, 204) 

308 except Exception as e: 

309 logger.error(f"Failed to delete comment: {e}") 

310 return False 

311 

312 async def send_typing(self, chat_id: str, **kwargs) -> None: 

313 """Show typing/composing indicator.""" 

314 # Could implement presence API if available 

315 pass 

316 

317 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

318 """Get document information.""" 

319 url = urljoin(self.base_url, f"documents/{chat_id}") 

320 

321 try: 

322 async with self._session.get(url, headers=self._get_headers()) as resp: 

323 if resp.status != 200: 

324 return None 

325 

326 data = await resp.json() 

327 return { 

328 "id": data.get("id", ""), 

329 "title": data.get("title", ""), 

330 "author": data.get("author_name", ""), 

331 "collaborators": data.get("collaborators", []), 

332 } 

333 except Exception as e: 

334 logger.error(f"Failed to get document info: {e}") 

335 return None 

336 

337 async def add_suggestion( 

338 self, 

339 document_id: str, 

340 selection: Dict[str, Any], 

341 suggested_text: str, 

342 reason: Optional[str] = None 

343 ) -> Optional[str]: 

344 """Add a text suggestion to a document.""" 

345 if not self.config.enable_suggestions: 

346 return None 

347 

348 url = urljoin(self.base_url, f"documents/{document_id}/suggestions") 

349 

350 payload = { 

351 "selection": selection, 

352 "suggested_text": suggested_text, 

353 "reason": reason, 

354 } 

355 

356 try: 

357 async with self._session.post( 

358 url, 

359 json=payload, 

360 headers=self._get_headers() 

361 ) as resp: 

362 if resp.status in (200, 201): 

363 data = await resp.json() 

364 return data.get("id") 

365 return None 

366 except Exception as e: 

367 logger.error(f"Failed to add suggestion: {e}") 

368 return None 

369 

370 

371def create_openprose_adapter( 

372 server_url: Optional[str] = None, 

373 api_key: Optional[str] = None, 

374 workspace_id: Optional[str] = None, 

375 **kwargs 

376) -> OpenProseAdapter: 

377 """Factory function to create an Open Prose adapter.""" 

378 config = OpenProseConfig( 

379 server_url=server_url or os.getenv("OPENPROSE_URL", ""), 

380 api_key=api_key or os.getenv("OPENPROSE_API_KEY", ""), 

381 workspace_id=workspace_id or os.getenv("OPENPROSE_WORKSPACE", ""), 

382 **kwargs 

383 ) 

384 return OpenProseAdapter(config)