Coverage for integrations / channels / extensions / rocketchat_adapter.py: 34.3%

327 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Rocket.Chat Channel Adapter 

3 

4Implements Rocket.Chat messaging integration using REST API and Realtime API. 

5Based on HevolveBot extension patterns for Rocket.Chat. 

6 

7Features: 

8- REST API for CRUD operations 

9- Realtime API (WebSocket) for live messaging 

10- Direct messages and channels 

11- File attachments 

12- Reactions and threads 

13- Slash commands 

14- User mentions 

15- Docker-compatible configuration 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import logging 

22import os 

23import json 

24import hashlib 

25try: 

26 import aiohttp 

27 HAS_AIOHTTP = True 

28except ImportError: 

29 HAS_AIOHTTP = False 

30from typing import Optional, List, Dict, Any, Callable 

31from datetime import datetime 

32from dataclasses import dataclass, field 

33from urllib.parse import urljoin 

34 

35try: 

36 import websockets 

37 from websockets.exceptions import ConnectionClosed 

38 HAS_WEBSOCKETS = True 

39except ImportError: 

40 HAS_WEBSOCKETS = False 

41 

42from ..base import ( 

43 ChannelAdapter, 

44 ChannelConfig, 

45 ChannelStatus, 

46 Message, 

47 MessageType, 

48 MediaAttachment, 

49 SendResult, 

50 ChannelConnectionError, 

51 ChannelSendError, 

52 ChannelRateLimitError, 

53) 

54 

55logger = logging.getLogger(__name__) 

56 

57 

58@dataclass 

59class RocketChatConfig(ChannelConfig): 

60 """Rocket.Chat-specific configuration.""" 

61 server_url: str = "" 

62 username: str = "" 

63 password: str = "" 

64 auth_token: str = "" 

65 user_id: str = "" 

66 enable_realtime: bool = True 

67 enable_file_attachments: bool = True 

68 enable_threads: bool = True 

69 enable_reactions: bool = True 

70 reconnect_delay: float = 5.0 

71 max_reconnect_attempts: int = 10 

72 websocket_timeout: float = 30.0 

73 

74 @classmethod 

75 def from_env(cls) -> "RocketChatConfig": 

76 """Create config from environment variables (Docker-compatible).""" 

77 return cls( 

78 server_url=os.getenv("ROCKETCHAT_URL", ""), 

79 username=os.getenv("ROCKETCHAT_USERNAME", ""), 

80 password=os.getenv("ROCKETCHAT_PASSWORD", ""), 

81 auth_token=os.getenv("ROCKETCHAT_AUTH_TOKEN", ""), 

82 user_id=os.getenv("ROCKETCHAT_USER_ID", ""), 

83 ) 

84 

85 

86@dataclass 

87class RocketChatRoom: 

88 """Rocket.Chat room information.""" 

89 id: str 

90 name: str 

91 type: str # c=channel, p=private, d=direct 

92 topic: Optional[str] = None 

93 description: Optional[str] = None 

94 user_count: int = 0 

95 read_only: bool = False 

96 archived: bool = False 

97 

98 

99@dataclass 

100class RocketChatUser: 

101 """Rocket.Chat user information.""" 

102 id: str 

103 username: str 

104 name: Optional[str] = None 

105 email: Optional[str] = None 

106 status: str = "offline" 

107 roles: List[str] = field(default_factory=list) 

108 

109 

110@dataclass 

111class RocketChatMessage: 

112 """Rocket.Chat message structure.""" 

113 id: str 

114 room_id: str 

115 text: str 

116 user: RocketChatUser 

117 timestamp: datetime 

118 updated_at: Optional[datetime] = None 

119 thread_id: Optional[str] = None 

120 reactions: Dict[str, List[str]] = field(default_factory=dict) 

121 attachments: List[Dict[str, Any]] = field(default_factory=list) 

122 mentions: List[str] = field(default_factory=list) 

123 

124 

125class RocketChatAdapter(ChannelAdapter): 

126 """Rocket.Chat channel adapter with REST and Realtime API support.""" 

127 

128 channel_type = "rocketchat" 

129 

130 @property 

131 def name(self) -> str: 

132 """Get adapter name.""" 

133 return self.channel_type 

134 

135 def __init__(self, config: RocketChatConfig): 

136 super().__init__(config) 

137 self.config: RocketChatConfig = config 

138 self._session: Optional[aiohttp.ClientSession] = None 

139 self._ws: Optional[Any] = None 

140 self._ws_task: Optional[asyncio.Task] = None 

141 self._auth_token: str = config.auth_token 

142 self._user_id: str = config.user_id 

143 self._connected = False 

144 self._reconnect_count = 0 

145 self._message_handlers: List[Callable] = [] 

146 self._rooms_cache: Dict[str, RocketChatRoom] = {} 

147 self._users_cache: Dict[str, RocketChatUser] = {} 

148 self._ddp_session_id: Optional[str] = None 

149 self._msg_id_counter = 0 

150 

151 @property 

152 def base_url(self) -> str: 

153 """Get base API URL.""" 

154 return urljoin(self.config.server_url, "/api/v1/") 

155 

156 @property 

157 def ws_url(self) -> str: 

158 """Get WebSocket URL for Realtime API.""" 

159 url = self.config.server_url.replace("http://", "ws://").replace("https://", "wss://") 

160 return urljoin(url, "/websocket") 

161 

162 def _get_headers(self) -> Dict[str, str]: 

163 """Get headers for API requests.""" 

164 headers = {"Content-Type": "application/json"} 

165 if self._auth_token and self._user_id: 

166 headers["X-Auth-Token"] = self._auth_token 

167 headers["X-User-Id"] = self._user_id 

168 return headers 

169 

170 async def connect(self) -> bool: 

171 """Connect to Rocket.Chat.""" 

172 try: 

173 self._session = aiohttp.ClientSession() 

174 

175 # Authenticate if needed 

176 if not self._auth_token: 

177 await self._authenticate() 

178 

179 # Verify connection 

180 if not await self._verify_connection(): 

181 raise ChannelConnectionError("Failed to verify Rocket.Chat connection") 

182 

183 # Start Realtime API if enabled 

184 if self.config.enable_realtime and HAS_WEBSOCKETS: 

185 self._ws_task = asyncio.create_task(self._realtime_loop()) 

186 

187 self._connected = True 

188 self._status = ChannelStatus.CONNECTED 

189 logger.info("Connected to Rocket.Chat") 

190 return True 

191 

192 except Exception as e: 

193 logger.error(f"Failed to connect to Rocket.Chat: {e}") 

194 self._status = ChannelStatus.ERROR 

195 raise ChannelConnectionError(str(e)) 

196 

197 async def disconnect(self) -> None: 

198 """Disconnect from Rocket.Chat.""" 

199 self._connected = False 

200 

201 if self._ws_task: 

202 self._ws_task.cancel() 

203 try: 

204 await self._ws_task 

205 except asyncio.CancelledError: 

206 pass 

207 

208 if self._ws: 

209 await self._ws.close() 

210 self._ws = None 

211 

212 if self._session: 

213 await self._session.close() 

214 self._session = None 

215 

216 self._status = ChannelStatus.DISCONNECTED 

217 logger.info("Disconnected from Rocket.Chat") 

218 

219 async def _authenticate(self) -> None: 

220 """Authenticate with username/password.""" 

221 url = urljoin(self.base_url, "login") 

222 payload = { 

223 "user": self.config.username, 

224 "password": self.config.password 

225 } 

226 

227 async with self._session.post(url, json=payload) as resp: 

228 if resp.status != 200: 

229 raise ChannelConnectionError("Authentication failed") 

230 

231 data = await resp.json() 

232 if data.get("status") != "success": 

233 raise ChannelConnectionError("Authentication failed") 

234 

235 self._auth_token = data["data"]["authToken"] 

236 self._user_id = data["data"]["userId"] 

237 logger.info(f"Authenticated as user {self._user_id}") 

238 

239 async def _verify_connection(self) -> bool: 

240 """Verify API connection.""" 

241 url = urljoin(self.base_url, "me") 

242 async with self._session.get(url, headers=self._get_headers()) as resp: 

243 return resp.status == 200 

244 

245 async def _realtime_loop(self) -> None: 

246 """Main loop for Realtime API WebSocket.""" 

247 while self._connected: 

248 try: 

249 async with websockets.connect( 

250 self.ws_url, 

251 ping_interval=25, 

252 ping_timeout=self.config.websocket_timeout 

253 ) as ws: 

254 self._ws = ws 

255 self._reconnect_count = 0 

256 

257 # Connect to DDP 

258 await self._ddp_connect() 

259 

260 # Login via DDP 

261 await self._ddp_login() 

262 

263 # Subscribe to messages 

264 await self._subscribe_to_messages() 

265 

266 # Message receive loop 

267 async for message in ws: 

268 await self._handle_ws_message(message) 

269 

270 except (ConnectionClosed, asyncio.TimeoutError) as e: 

271 if not self._connected: 

272 break 

273 

274 self._reconnect_count += 1 

275 if self._reconnect_count >= self.config.max_reconnect_attempts: 

276 logger.error("Max reconnection attempts reached") 

277 self._status = ChannelStatus.ERROR 

278 break 

279 

280 delay = self.config.reconnect_delay * self._reconnect_count 

281 logger.warning(f"WebSocket disconnected, reconnecting in {delay}s...") 

282 await asyncio.sleep(delay) 

283 

284 except Exception as e: 

285 logger.error(f"Realtime API error: {e}") 

286 if not self._connected: 

287 break 

288 await asyncio.sleep(self.config.reconnect_delay) 

289 

290 def _next_msg_id(self) -> str: 

291 """Generate next DDP message ID.""" 

292 self._msg_id_counter += 1 

293 return str(self._msg_id_counter) 

294 

295 async def _ddp_connect(self) -> None: 

296 """Connect to DDP protocol.""" 

297 msg = { 

298 "msg": "connect", 

299 "version": "1", 

300 "support": ["1"] 

301 } 

302 await self._ws.send(json.dumps(msg)) 

303 

304 # Wait for connect response 

305 response = await self._ws.recv() 

306 data = json.loads(response) 

307 if data.get("msg") == "connected": 

308 self._ddp_session_id = data.get("session") 

309 logger.debug(f"DDP connected, session: {self._ddp_session_id}") 

310 

311 async def _ddp_login(self) -> None: 

312 """Login via DDP with resume token.""" 

313 msg = { 

314 "msg": "method", 

315 "method": "login", 

316 "id": self._next_msg_id(), 

317 "params": [{"resume": self._auth_token}] 

318 } 

319 await self._ws.send(json.dumps(msg)) 

320 

321 async def _subscribe_to_messages(self) -> None: 

322 """Subscribe to message stream.""" 

323 msg = { 

324 "msg": "sub", 

325 "id": self._next_msg_id(), 

326 "name": "stream-room-messages", 

327 "params": ["__my_messages__", False] 

328 } 

329 await self._ws.send(json.dumps(msg)) 

330 

331 async def _handle_ws_message(self, raw_message: str) -> None: 

332 """Handle incoming WebSocket message.""" 

333 try: 

334 data = json.loads(raw_message) 

335 

336 # Handle ping 

337 if data.get("msg") == "ping": 

338 await self._ws.send(json.dumps({"msg": "pong"})) 

339 return 

340 

341 # Handle message stream 

342 if data.get("msg") == "changed" and data.get("collection") == "stream-room-messages": 

343 fields = data.get("fields", {}) 

344 args = fields.get("args", []) 

345 

346 for msg_data in args: 

347 message = self._parse_message(msg_data) 

348 if message: 

349 for handler in self._message_handlers: 

350 asyncio.create_task(handler(message)) 

351 

352 except json.JSONDecodeError: 

353 logger.warning(f"Invalid JSON in WebSocket message") 

354 except Exception as e: 

355 logger.error(f"Error handling WebSocket message: {e}") 

356 

357 def _parse_message(self, data: Dict[str, Any]) -> Optional[Message]: 

358 """Parse Rocket.Chat message to unified Message.""" 

359 try: 

360 user_data = data.get("u", {}) 

361 

362 # Skip bot's own messages 

363 if user_data.get("_id") == self._user_id: 

364 return None 

365 

366 return Message( 

367 id=data.get("_id", ""), 

368 channel=self.channel_type, 

369 chat_id=data.get("rid", ""), 

370 sender_id=user_data.get("_id", ""), 

371 sender_name=user_data.get("username", ""), 

372 text=data.get("msg", ""), 

373 timestamp=datetime.fromisoformat( 

374 data.get("ts", {}).get("$date", datetime.now().isoformat()) 

375 ) if isinstance(data.get("ts"), dict) else datetime.now(), 

376 message_type=MessageType.TEXT, 

377 reply_to=data.get("tmid"), 

378 metadata={ 

379 "mentions": data.get("mentions", []), 

380 "channels": data.get("channels", []), 

381 } 

382 ) 

383 except Exception as e: 

384 logger.error(f"Error parsing message: {e}") 

385 return None 

386 

387 def on_message(self, handler: Callable) -> None: 

388 """Register message handler.""" 

389 self._message_handlers.append(handler) 

390 

391 async def send_message( 

392 self, 

393 chat_id: str, 

394 text: str, 

395 reply_to: Optional[str] = None, 

396 **kwargs 

397 ) -> SendResult: 

398 """Send a message to a room.""" 

399 url = urljoin(self.base_url, "chat.postMessage") 

400 

401 payload: Dict[str, Any] = { 

402 "roomId": chat_id, 

403 "text": text 

404 } 

405 

406 # Thread support 

407 if reply_to and self.config.enable_threads: 

408 payload["tmid"] = reply_to 

409 

410 # Attachments 

411 attachments = kwargs.get("attachments", []) 

412 if attachments: 

413 payload["attachments"] = attachments 

414 

415 try: 

416 async with self._session.post( 

417 url, 

418 json=payload, 

419 headers=self._get_headers() 

420 ) as resp: 

421 if resp.status == 429: 

422 raise ChannelRateLimitError("Rate limited") 

423 

424 if resp.status != 200: 

425 data = await resp.json() 

426 raise ChannelSendError(data.get("error", "Send failed")) 

427 

428 data = await resp.json() 

429 if not data.get("success"): 

430 raise ChannelSendError(data.get("error", "Send failed")) 

431 

432 msg = data.get("message", {}) 

433 return SendResult( 

434 success=True, 

435 message_id=msg.get("_id", ""), 

436 timestamp=datetime.now() 

437 ) 

438 

439 except (ChannelSendError, ChannelRateLimitError): 

440 raise 

441 except Exception as e: 

442 logger.error(f"Failed to send message: {e}") 

443 raise ChannelSendError(str(e)) 

444 

445 async def edit_message( 

446 self, 

447 chat_id: str, 

448 message_id: str, 

449 text: str, 

450 **kwargs 

451 ) -> bool: 

452 """Edit a message.""" 

453 url = urljoin(self.base_url, "chat.update") 

454 

455 payload = { 

456 "roomId": chat_id, 

457 "msgId": message_id, 

458 "text": text 

459 } 

460 

461 try: 

462 async with self._session.post( 

463 url, 

464 json=payload, 

465 headers=self._get_headers() 

466 ) as resp: 

467 if resp.status != 200: 

468 return False 

469 data = await resp.json() 

470 return data.get("success", False) 

471 except Exception as e: 

472 logger.error(f"Failed to edit message: {e}") 

473 return False 

474 

475 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool: 

476 """Delete a message.""" 

477 url = urljoin(self.base_url, "chat.delete") 

478 

479 payload = { 

480 "roomId": chat_id, 

481 "msgId": message_id 

482 } 

483 

484 try: 

485 async with self._session.post( 

486 url, 

487 json=payload, 

488 headers=self._get_headers() 

489 ) as resp: 

490 if resp.status != 200: 

491 return False 

492 data = await resp.json() 

493 return data.get("success", False) 

494 except Exception as e: 

495 logger.error(f"Failed to delete message: {e}") 

496 return False 

497 

498 async def send_typing(self, chat_id: str, **kwargs) -> None: 

499 """Send typing indicator.""" 

500 if self._ws and self._ddp_session_id: 

501 msg = { 

502 "msg": "method", 

503 "method": "stream-notify-room", 

504 "id": self._next_msg_id(), 

505 "params": [f"{chat_id}/typing", self.config.username, True] 

506 } 

507 try: 

508 await self._ws.send(json.dumps(msg)) 

509 except Exception as e: 

510 logger.debug(f"Failed to send typing indicator: {e}") 

511 

512 async def add_reaction(self, chat_id: str, message_id: str, emoji: str) -> bool: 

513 """Add reaction to a message.""" 

514 if not self.config.enable_reactions: 

515 return False 

516 

517 url = urljoin(self.base_url, "chat.react") 

518 payload = { 

519 "messageId": message_id, 

520 "emoji": emoji, 

521 "shouldReact": True 

522 } 

523 

524 try: 

525 async with self._session.post( 

526 url, 

527 json=payload, 

528 headers=self._get_headers() 

529 ) as resp: 

530 if resp.status != 200: 

531 return False 

532 data = await resp.json() 

533 return data.get("success", False) 

534 except Exception as e: 

535 logger.error(f"Failed to add reaction: {e}") 

536 return False 

537 

538 async def remove_reaction(self, chat_id: str, message_id: str, emoji: str) -> bool: 

539 """Remove reaction from a message.""" 

540 url = urljoin(self.base_url, "chat.react") 

541 payload = { 

542 "messageId": message_id, 

543 "emoji": emoji, 

544 "shouldReact": False 

545 } 

546 

547 try: 

548 async with self._session.post( 

549 url, 

550 json=payload, 

551 headers=self._get_headers() 

552 ) as resp: 

553 if resp.status != 200: 

554 return False 

555 data = await resp.json() 

556 return data.get("success", False) 

557 except Exception as e: 

558 logger.error(f"Failed to remove reaction: {e}") 

559 return False 

560 

561 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

562 """Get room information.""" 

563 # Check cache 

564 if chat_id in self._rooms_cache: 

565 room = self._rooms_cache[chat_id] 

566 return { 

567 "id": room.id, 

568 "name": room.name, 

569 "type": room.type, 

570 "topic": room.topic, 

571 "user_count": room.user_count, 

572 } 

573 

574 url = urljoin(self.base_url, f"rooms.info?roomId={chat_id}") 

575 

576 try: 

577 async with self._session.get(url, headers=self._get_headers()) as resp: 

578 if resp.status != 200: 

579 return None 

580 

581 data = await resp.json() 

582 if not data.get("success"): 

583 return None 

584 

585 room_data = data.get("room", {}) 

586 room = RocketChatRoom( 

587 id=room_data.get("_id", ""), 

588 name=room_data.get("name", ""), 

589 type=room_data.get("t", ""), 

590 topic=room_data.get("topic"), 

591 description=room_data.get("description"), 

592 user_count=room_data.get("usersCount", 0), 

593 read_only=room_data.get("ro", False), 

594 archived=room_data.get("archived", False), 

595 ) 

596 

597 self._rooms_cache[chat_id] = room 

598 

599 return { 

600 "id": room.id, 

601 "name": room.name, 

602 "type": room.type, 

603 "topic": room.topic, 

604 "user_count": room.user_count, 

605 } 

606 except Exception as e: 

607 logger.error(f"Failed to get room info: {e}") 

608 return None 

609 

610 async def upload_file( 

611 self, 

612 chat_id: str, 

613 file_path: str, 

614 description: Optional[str] = None 

615 ) -> Optional[str]: 

616 """Upload a file to a room.""" 

617 if not self.config.enable_file_attachments: 

618 return None 

619 

620 url = urljoin(self.base_url, f"rooms.upload/{chat_id}") 

621 

622 try: 

623 data = aiohttp.FormData() 

624 data.add_field( 

625 "file", 

626 open(file_path, "rb"), 

627 filename=os.path.basename(file_path) 

628 ) 

629 if description: 

630 data.add_field("description", description) 

631 

632 headers = self._get_headers() 

633 del headers["Content-Type"] # Let aiohttp set it 

634 

635 async with self._session.post(url, data=data, headers=headers) as resp: 

636 if resp.status != 200: 

637 return None 

638 

639 result = await resp.json() 

640 if result.get("success"): 

641 return result.get("message", {}).get("_id") 

642 return None 

643 

644 except Exception as e: 

645 logger.error(f"Failed to upload file: {e}") 

646 return None 

647 

648 async def create_direct_message(self, username: str) -> Optional[str]: 

649 """Create a direct message room with a user.""" 

650 url = urljoin(self.base_url, "im.create") 

651 payload = {"username": username} 

652 

653 try: 

654 async with self._session.post( 

655 url, 

656 json=payload, 

657 headers=self._get_headers() 

658 ) as resp: 

659 if resp.status != 200: 

660 return None 

661 

662 data = await resp.json() 

663 if data.get("success"): 

664 return data.get("room", {}).get("_id") 

665 return None 

666 except Exception as e: 

667 logger.error(f"Failed to create DM: {e}") 

668 return None 

669 

670 async def get_thread_messages( 

671 self, 

672 chat_id: str, 

673 thread_id: str, 

674 limit: int = 50 

675 ) -> List[RocketChatMessage]: 

676 """Get messages in a thread.""" 

677 url = urljoin( 

678 self.base_url, 

679 f"chat.getThreadMessages?tmid={thread_id}&count={limit}" 

680 ) 

681 

682 try: 

683 async with self._session.get(url, headers=self._get_headers()) as resp: 

684 if resp.status != 200: 

685 return [] 

686 

687 data = await resp.json() 

688 if not data.get("success"): 

689 return [] 

690 

691 messages = [] 

692 for msg in data.get("messages", []): 

693 user_data = msg.get("u", {}) 

694 messages.append(RocketChatMessage( 

695 id=msg.get("_id", ""), 

696 room_id=msg.get("rid", ""), 

697 text=msg.get("msg", ""), 

698 user=RocketChatUser( 

699 id=user_data.get("_id", ""), 

700 username=user_data.get("username", ""), 

701 name=user_data.get("name"), 

702 ), 

703 timestamp=datetime.now(), 

704 thread_id=thread_id, 

705 )) 

706 

707 return messages 

708 except Exception as e: 

709 logger.error(f"Failed to get thread messages: {e}") 

710 return [] 

711 

712 

713def create_rocketchat_adapter( 

714 server_url: Optional[str] = None, 

715 username: Optional[str] = None, 

716 password: Optional[str] = None, 

717 auth_token: Optional[str] = None, 

718 user_id: Optional[str] = None, 

719 **kwargs 

720) -> RocketChatAdapter: 

721 """Factory function to create a Rocket.Chat adapter. 

722 

723 Args: 

724 server_url: Rocket.Chat server URL 

725 username: Username for authentication 

726 password: Password for authentication 

727 auth_token: Pre-existing auth token (optional) 

728 user_id: Pre-existing user ID (optional) 

729 **kwargs: Additional config options 

730 

731 Returns: 

732 Configured RocketChatAdapter instance 

733 """ 

734 config = RocketChatConfig( 

735 server_url=server_url or os.getenv("ROCKETCHAT_URL", ""), 

736 username=username or os.getenv("ROCKETCHAT_USERNAME", ""), 

737 password=password or os.getenv("ROCKETCHAT_PASSWORD", ""), 

738 auth_token=auth_token or os.getenv("ROCKETCHAT_AUTH_TOKEN", ""), 

739 user_id=user_id or os.getenv("ROCKETCHAT_USER_ID", ""), 

740 **kwargs 

741 ) 

742 return RocketChatAdapter(config)