Coverage for integrations / channels / extensions / rocketchat_adapter.py: 34.3%
327 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Rocket.Chat Channel Adapter
4Implements Rocket.Chat messaging integration using REST API and Realtime API.
5Based on HevolveBot extension patterns for Rocket.Chat.
7Features:
8- REST API for CRUD operations
9- Realtime API (WebSocket) for live messaging
10- Direct messages and channels
11- File attachments
12- Reactions and threads
13- Slash commands
14- User mentions
15- Docker-compatible configuration
16"""
18from __future__ import annotations
20import asyncio
21import logging
22import os
23import json
24import hashlib
25try:
26 import aiohttp
27 HAS_AIOHTTP = True
28except ImportError:
29 HAS_AIOHTTP = False
30from typing import Optional, List, Dict, Any, Callable
31from datetime import datetime
32from dataclasses import dataclass, field
33from urllib.parse import urljoin
35try:
36 import websockets
37 from websockets.exceptions import ConnectionClosed
38 HAS_WEBSOCKETS = True
39except ImportError:
40 HAS_WEBSOCKETS = False
42from ..base import (
43 ChannelAdapter,
44 ChannelConfig,
45 ChannelStatus,
46 Message,
47 MessageType,
48 MediaAttachment,
49 SendResult,
50 ChannelConnectionError,
51 ChannelSendError,
52 ChannelRateLimitError,
53)
55logger = logging.getLogger(__name__)
58@dataclass
59class RocketChatConfig(ChannelConfig):
60 """Rocket.Chat-specific configuration."""
61 server_url: str = ""
62 username: str = ""
63 password: str = ""
64 auth_token: str = ""
65 user_id: str = ""
66 enable_realtime: bool = True
67 enable_file_attachments: bool = True
68 enable_threads: bool = True
69 enable_reactions: bool = True
70 reconnect_delay: float = 5.0
71 max_reconnect_attempts: int = 10
72 websocket_timeout: float = 30.0
74 @classmethod
75 def from_env(cls) -> "RocketChatConfig":
76 """Create config from environment variables (Docker-compatible)."""
77 return cls(
78 server_url=os.getenv("ROCKETCHAT_URL", ""),
79 username=os.getenv("ROCKETCHAT_USERNAME", ""),
80 password=os.getenv("ROCKETCHAT_PASSWORD", ""),
81 auth_token=os.getenv("ROCKETCHAT_AUTH_TOKEN", ""),
82 user_id=os.getenv("ROCKETCHAT_USER_ID", ""),
83 )
86@dataclass
87class RocketChatRoom:
88 """Rocket.Chat room information."""
89 id: str
90 name: str
91 type: str # c=channel, p=private, d=direct
92 topic: Optional[str] = None
93 description: Optional[str] = None
94 user_count: int = 0
95 read_only: bool = False
96 archived: bool = False
99@dataclass
100class RocketChatUser:
101 """Rocket.Chat user information."""
102 id: str
103 username: str
104 name: Optional[str] = None
105 email: Optional[str] = None
106 status: str = "offline"
107 roles: List[str] = field(default_factory=list)
110@dataclass
111class RocketChatMessage:
112 """Rocket.Chat message structure."""
113 id: str
114 room_id: str
115 text: str
116 user: RocketChatUser
117 timestamp: datetime
118 updated_at: Optional[datetime] = None
119 thread_id: Optional[str] = None
120 reactions: Dict[str, List[str]] = field(default_factory=dict)
121 attachments: List[Dict[str, Any]] = field(default_factory=list)
122 mentions: List[str] = field(default_factory=list)
125class RocketChatAdapter(ChannelAdapter):
126 """Rocket.Chat channel adapter with REST and Realtime API support."""
128 channel_type = "rocketchat"
130 @property
131 def name(self) -> str:
132 """Get adapter name."""
133 return self.channel_type
135 def __init__(self, config: RocketChatConfig):
136 super().__init__(config)
137 self.config: RocketChatConfig = config
138 self._session: Optional[aiohttp.ClientSession] = None
139 self._ws: Optional[Any] = None
140 self._ws_task: Optional[asyncio.Task] = None
141 self._auth_token: str = config.auth_token
142 self._user_id: str = config.user_id
143 self._connected = False
144 self._reconnect_count = 0
145 self._message_handlers: List[Callable] = []
146 self._rooms_cache: Dict[str, RocketChatRoom] = {}
147 self._users_cache: Dict[str, RocketChatUser] = {}
148 self._ddp_session_id: Optional[str] = None
149 self._msg_id_counter = 0
151 @property
152 def base_url(self) -> str:
153 """Get base API URL."""
154 return urljoin(self.config.server_url, "/api/v1/")
156 @property
157 def ws_url(self) -> str:
158 """Get WebSocket URL for Realtime API."""
159 url = self.config.server_url.replace("http://", "ws://").replace("https://", "wss://")
160 return urljoin(url, "/websocket")
162 def _get_headers(self) -> Dict[str, str]:
163 """Get headers for API requests."""
164 headers = {"Content-Type": "application/json"}
165 if self._auth_token and self._user_id:
166 headers["X-Auth-Token"] = self._auth_token
167 headers["X-User-Id"] = self._user_id
168 return headers
170 async def connect(self) -> bool:
171 """Connect to Rocket.Chat."""
172 try:
173 self._session = aiohttp.ClientSession()
175 # Authenticate if needed
176 if not self._auth_token:
177 await self._authenticate()
179 # Verify connection
180 if not await self._verify_connection():
181 raise ChannelConnectionError("Failed to verify Rocket.Chat connection")
183 # Start Realtime API if enabled
184 if self.config.enable_realtime and HAS_WEBSOCKETS:
185 self._ws_task = asyncio.create_task(self._realtime_loop())
187 self._connected = True
188 self._status = ChannelStatus.CONNECTED
189 logger.info("Connected to Rocket.Chat")
190 return True
192 except Exception as e:
193 logger.error(f"Failed to connect to Rocket.Chat: {e}")
194 self._status = ChannelStatus.ERROR
195 raise ChannelConnectionError(str(e))
197 async def disconnect(self) -> None:
198 """Disconnect from Rocket.Chat."""
199 self._connected = False
201 if self._ws_task:
202 self._ws_task.cancel()
203 try:
204 await self._ws_task
205 except asyncio.CancelledError:
206 pass
208 if self._ws:
209 await self._ws.close()
210 self._ws = None
212 if self._session:
213 await self._session.close()
214 self._session = None
216 self._status = ChannelStatus.DISCONNECTED
217 logger.info("Disconnected from Rocket.Chat")
219 async def _authenticate(self) -> None:
220 """Authenticate with username/password."""
221 url = urljoin(self.base_url, "login")
222 payload = {
223 "user": self.config.username,
224 "password": self.config.password
225 }
227 async with self._session.post(url, json=payload) as resp:
228 if resp.status != 200:
229 raise ChannelConnectionError("Authentication failed")
231 data = await resp.json()
232 if data.get("status") != "success":
233 raise ChannelConnectionError("Authentication failed")
235 self._auth_token = data["data"]["authToken"]
236 self._user_id = data["data"]["userId"]
237 logger.info(f"Authenticated as user {self._user_id}")
239 async def _verify_connection(self) -> bool:
240 """Verify API connection."""
241 url = urljoin(self.base_url, "me")
242 async with self._session.get(url, headers=self._get_headers()) as resp:
243 return resp.status == 200
245 async def _realtime_loop(self) -> None:
246 """Main loop for Realtime API WebSocket."""
247 while self._connected:
248 try:
249 async with websockets.connect(
250 self.ws_url,
251 ping_interval=25,
252 ping_timeout=self.config.websocket_timeout
253 ) as ws:
254 self._ws = ws
255 self._reconnect_count = 0
257 # Connect to DDP
258 await self._ddp_connect()
260 # Login via DDP
261 await self._ddp_login()
263 # Subscribe to messages
264 await self._subscribe_to_messages()
266 # Message receive loop
267 async for message in ws:
268 await self._handle_ws_message(message)
270 except (ConnectionClosed, asyncio.TimeoutError) as e:
271 if not self._connected:
272 break
274 self._reconnect_count += 1
275 if self._reconnect_count >= self.config.max_reconnect_attempts:
276 logger.error("Max reconnection attempts reached")
277 self._status = ChannelStatus.ERROR
278 break
280 delay = self.config.reconnect_delay * self._reconnect_count
281 logger.warning(f"WebSocket disconnected, reconnecting in {delay}s...")
282 await asyncio.sleep(delay)
284 except Exception as e:
285 logger.error(f"Realtime API error: {e}")
286 if not self._connected:
287 break
288 await asyncio.sleep(self.config.reconnect_delay)
290 def _next_msg_id(self) -> str:
291 """Generate next DDP message ID."""
292 self._msg_id_counter += 1
293 return str(self._msg_id_counter)
295 async def _ddp_connect(self) -> None:
296 """Connect to DDP protocol."""
297 msg = {
298 "msg": "connect",
299 "version": "1",
300 "support": ["1"]
301 }
302 await self._ws.send(json.dumps(msg))
304 # Wait for connect response
305 response = await self._ws.recv()
306 data = json.loads(response)
307 if data.get("msg") == "connected":
308 self._ddp_session_id = data.get("session")
309 logger.debug(f"DDP connected, session: {self._ddp_session_id}")
311 async def _ddp_login(self) -> None:
312 """Login via DDP with resume token."""
313 msg = {
314 "msg": "method",
315 "method": "login",
316 "id": self._next_msg_id(),
317 "params": [{"resume": self._auth_token}]
318 }
319 await self._ws.send(json.dumps(msg))
321 async def _subscribe_to_messages(self) -> None:
322 """Subscribe to message stream."""
323 msg = {
324 "msg": "sub",
325 "id": self._next_msg_id(),
326 "name": "stream-room-messages",
327 "params": ["__my_messages__", False]
328 }
329 await self._ws.send(json.dumps(msg))
331 async def _handle_ws_message(self, raw_message: str) -> None:
332 """Handle incoming WebSocket message."""
333 try:
334 data = json.loads(raw_message)
336 # Handle ping
337 if data.get("msg") == "ping":
338 await self._ws.send(json.dumps({"msg": "pong"}))
339 return
341 # Handle message stream
342 if data.get("msg") == "changed" and data.get("collection") == "stream-room-messages":
343 fields = data.get("fields", {})
344 args = fields.get("args", [])
346 for msg_data in args:
347 message = self._parse_message(msg_data)
348 if message:
349 for handler in self._message_handlers:
350 asyncio.create_task(handler(message))
352 except json.JSONDecodeError:
353 logger.warning(f"Invalid JSON in WebSocket message")
354 except Exception as e:
355 logger.error(f"Error handling WebSocket message: {e}")
357 def _parse_message(self, data: Dict[str, Any]) -> Optional[Message]:
358 """Parse Rocket.Chat message to unified Message."""
359 try:
360 user_data = data.get("u", {})
362 # Skip bot's own messages
363 if user_data.get("_id") == self._user_id:
364 return None
366 return Message(
367 id=data.get("_id", ""),
368 channel=self.channel_type,
369 chat_id=data.get("rid", ""),
370 sender_id=user_data.get("_id", ""),
371 sender_name=user_data.get("username", ""),
372 text=data.get("msg", ""),
373 timestamp=datetime.fromisoformat(
374 data.get("ts", {}).get("$date", datetime.now().isoformat())
375 ) if isinstance(data.get("ts"), dict) else datetime.now(),
376 message_type=MessageType.TEXT,
377 reply_to=data.get("tmid"),
378 metadata={
379 "mentions": data.get("mentions", []),
380 "channels": data.get("channels", []),
381 }
382 )
383 except Exception as e:
384 logger.error(f"Error parsing message: {e}")
385 return None
387 def on_message(self, handler: Callable) -> None:
388 """Register message handler."""
389 self._message_handlers.append(handler)
391 async def send_message(
392 self,
393 chat_id: str,
394 text: str,
395 reply_to: Optional[str] = None,
396 **kwargs
397 ) -> SendResult:
398 """Send a message to a room."""
399 url = urljoin(self.base_url, "chat.postMessage")
401 payload: Dict[str, Any] = {
402 "roomId": chat_id,
403 "text": text
404 }
406 # Thread support
407 if reply_to and self.config.enable_threads:
408 payload["tmid"] = reply_to
410 # Attachments
411 attachments = kwargs.get("attachments", [])
412 if attachments:
413 payload["attachments"] = attachments
415 try:
416 async with self._session.post(
417 url,
418 json=payload,
419 headers=self._get_headers()
420 ) as resp:
421 if resp.status == 429:
422 raise ChannelRateLimitError("Rate limited")
424 if resp.status != 200:
425 data = await resp.json()
426 raise ChannelSendError(data.get("error", "Send failed"))
428 data = await resp.json()
429 if not data.get("success"):
430 raise ChannelSendError(data.get("error", "Send failed"))
432 msg = data.get("message", {})
433 return SendResult(
434 success=True,
435 message_id=msg.get("_id", ""),
436 timestamp=datetime.now()
437 )
439 except (ChannelSendError, ChannelRateLimitError):
440 raise
441 except Exception as e:
442 logger.error(f"Failed to send message: {e}")
443 raise ChannelSendError(str(e))
445 async def edit_message(
446 self,
447 chat_id: str,
448 message_id: str,
449 text: str,
450 **kwargs
451 ) -> bool:
452 """Edit a message."""
453 url = urljoin(self.base_url, "chat.update")
455 payload = {
456 "roomId": chat_id,
457 "msgId": message_id,
458 "text": text
459 }
461 try:
462 async with self._session.post(
463 url,
464 json=payload,
465 headers=self._get_headers()
466 ) as resp:
467 if resp.status != 200:
468 return False
469 data = await resp.json()
470 return data.get("success", False)
471 except Exception as e:
472 logger.error(f"Failed to edit message: {e}")
473 return False
475 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool:
476 """Delete a message."""
477 url = urljoin(self.base_url, "chat.delete")
479 payload = {
480 "roomId": chat_id,
481 "msgId": message_id
482 }
484 try:
485 async with self._session.post(
486 url,
487 json=payload,
488 headers=self._get_headers()
489 ) as resp:
490 if resp.status != 200:
491 return False
492 data = await resp.json()
493 return data.get("success", False)
494 except Exception as e:
495 logger.error(f"Failed to delete message: {e}")
496 return False
498 async def send_typing(self, chat_id: str, **kwargs) -> None:
499 """Send typing indicator."""
500 if self._ws and self._ddp_session_id:
501 msg = {
502 "msg": "method",
503 "method": "stream-notify-room",
504 "id": self._next_msg_id(),
505 "params": [f"{chat_id}/typing", self.config.username, True]
506 }
507 try:
508 await self._ws.send(json.dumps(msg))
509 except Exception as e:
510 logger.debug(f"Failed to send typing indicator: {e}")
512 async def add_reaction(self, chat_id: str, message_id: str, emoji: str) -> bool:
513 """Add reaction to a message."""
514 if not self.config.enable_reactions:
515 return False
517 url = urljoin(self.base_url, "chat.react")
518 payload = {
519 "messageId": message_id,
520 "emoji": emoji,
521 "shouldReact": True
522 }
524 try:
525 async with self._session.post(
526 url,
527 json=payload,
528 headers=self._get_headers()
529 ) as resp:
530 if resp.status != 200:
531 return False
532 data = await resp.json()
533 return data.get("success", False)
534 except Exception as e:
535 logger.error(f"Failed to add reaction: {e}")
536 return False
538 async def remove_reaction(self, chat_id: str, message_id: str, emoji: str) -> bool:
539 """Remove reaction from a message."""
540 url = urljoin(self.base_url, "chat.react")
541 payload = {
542 "messageId": message_id,
543 "emoji": emoji,
544 "shouldReact": False
545 }
547 try:
548 async with self._session.post(
549 url,
550 json=payload,
551 headers=self._get_headers()
552 ) as resp:
553 if resp.status != 200:
554 return False
555 data = await resp.json()
556 return data.get("success", False)
557 except Exception as e:
558 logger.error(f"Failed to remove reaction: {e}")
559 return False
561 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
562 """Get room information."""
563 # Check cache
564 if chat_id in self._rooms_cache:
565 room = self._rooms_cache[chat_id]
566 return {
567 "id": room.id,
568 "name": room.name,
569 "type": room.type,
570 "topic": room.topic,
571 "user_count": room.user_count,
572 }
574 url = urljoin(self.base_url, f"rooms.info?roomId={chat_id}")
576 try:
577 async with self._session.get(url, headers=self._get_headers()) as resp:
578 if resp.status != 200:
579 return None
581 data = await resp.json()
582 if not data.get("success"):
583 return None
585 room_data = data.get("room", {})
586 room = RocketChatRoom(
587 id=room_data.get("_id", ""),
588 name=room_data.get("name", ""),
589 type=room_data.get("t", ""),
590 topic=room_data.get("topic"),
591 description=room_data.get("description"),
592 user_count=room_data.get("usersCount", 0),
593 read_only=room_data.get("ro", False),
594 archived=room_data.get("archived", False),
595 )
597 self._rooms_cache[chat_id] = room
599 return {
600 "id": room.id,
601 "name": room.name,
602 "type": room.type,
603 "topic": room.topic,
604 "user_count": room.user_count,
605 }
606 except Exception as e:
607 logger.error(f"Failed to get room info: {e}")
608 return None
610 async def upload_file(
611 self,
612 chat_id: str,
613 file_path: str,
614 description: Optional[str] = None
615 ) -> Optional[str]:
616 """Upload a file to a room."""
617 if not self.config.enable_file_attachments:
618 return None
620 url = urljoin(self.base_url, f"rooms.upload/{chat_id}")
622 try:
623 data = aiohttp.FormData()
624 data.add_field(
625 "file",
626 open(file_path, "rb"),
627 filename=os.path.basename(file_path)
628 )
629 if description:
630 data.add_field("description", description)
632 headers = self._get_headers()
633 del headers["Content-Type"] # Let aiohttp set it
635 async with self._session.post(url, data=data, headers=headers) as resp:
636 if resp.status != 200:
637 return None
639 result = await resp.json()
640 if result.get("success"):
641 return result.get("message", {}).get("_id")
642 return None
644 except Exception as e:
645 logger.error(f"Failed to upload file: {e}")
646 return None
648 async def create_direct_message(self, username: str) -> Optional[str]:
649 """Create a direct message room with a user."""
650 url = urljoin(self.base_url, "im.create")
651 payload = {"username": username}
653 try:
654 async with self._session.post(
655 url,
656 json=payload,
657 headers=self._get_headers()
658 ) as resp:
659 if resp.status != 200:
660 return None
662 data = await resp.json()
663 if data.get("success"):
664 return data.get("room", {}).get("_id")
665 return None
666 except Exception as e:
667 logger.error(f"Failed to create DM: {e}")
668 return None
670 async def get_thread_messages(
671 self,
672 chat_id: str,
673 thread_id: str,
674 limit: int = 50
675 ) -> List[RocketChatMessage]:
676 """Get messages in a thread."""
677 url = urljoin(
678 self.base_url,
679 f"chat.getThreadMessages?tmid={thread_id}&count={limit}"
680 )
682 try:
683 async with self._session.get(url, headers=self._get_headers()) as resp:
684 if resp.status != 200:
685 return []
687 data = await resp.json()
688 if not data.get("success"):
689 return []
691 messages = []
692 for msg in data.get("messages", []):
693 user_data = msg.get("u", {})
694 messages.append(RocketChatMessage(
695 id=msg.get("_id", ""),
696 room_id=msg.get("rid", ""),
697 text=msg.get("msg", ""),
698 user=RocketChatUser(
699 id=user_data.get("_id", ""),
700 username=user_data.get("username", ""),
701 name=user_data.get("name"),
702 ),
703 timestamp=datetime.now(),
704 thread_id=thread_id,
705 ))
707 return messages
708 except Exception as e:
709 logger.error(f"Failed to get thread messages: {e}")
710 return []
713def create_rocketchat_adapter(
714 server_url: Optional[str] = None,
715 username: Optional[str] = None,
716 password: Optional[str] = None,
717 auth_token: Optional[str] = None,
718 user_id: Optional[str] = None,
719 **kwargs
720) -> RocketChatAdapter:
721 """Factory function to create a Rocket.Chat adapter.
723 Args:
724 server_url: Rocket.Chat server URL
725 username: Username for authentication
726 password: Password for authentication
727 auth_token: Pre-existing auth token (optional)
728 user_id: Pre-existing user ID (optional)
729 **kwargs: Additional config options
731 Returns:
732 Configured RocketChatAdapter instance
733 """
734 config = RocketChatConfig(
735 server_url=server_url or os.getenv("ROCKETCHAT_URL", ""),
736 username=username or os.getenv("ROCKETCHAT_USERNAME", ""),
737 password=password or os.getenv("ROCKETCHAT_PASSWORD", ""),
738 auth_token=auth_token or os.getenv("ROCKETCHAT_AUTH_TOKEN", ""),
739 user_id=user_id or os.getenv("ROCKETCHAT_USER_ID", ""),
740 **kwargs
741 )
742 return RocketChatAdapter(config)