Coverage for integrations / channels / extensions / matrix_adapter.py: 32.2%
323 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Matrix Protocol Channel Adapter
4Implements Matrix messaging with end-to-end encryption support.
5Based on HevolveBot extension patterns for Matrix.
7Features:
8- End-to-end encryption (E2EE) using Olm/Megolm
9- Room management (create, join, invite, leave)
10- Reactions
11- Thread support (MSC3440)
12- Read receipts
13- Typing indicators
14- Media upload/download
15- Device verification
16"""
18from __future__ import annotations
20import asyncio
21import logging
22import os
23import json
24from typing import Optional, List, Dict, Any, Callable
25from datetime import datetime
26from dataclasses import dataclass, field
28try:
29 from nio import (
30 AsyncClient,
31 AsyncClientConfig,
32 LoginResponse,
33 RoomMessageText,
34 RoomMessageMedia,
35 RoomMemberEvent,
36 InviteMemberEvent,
37 MatrixRoom,
38 Event,
39 SyncResponse,
40 UploadResponse,
41 RoomCreateResponse,
42 JoinResponse,
43 RoomSendResponse,
44 RoomResolveAliasResponse,
45 ToDeviceEvent,
46 KeyVerificationEvent,
47 MegolmEvent,
48 )
49 from nio.store import SqliteStore
50 HAS_MATRIX = True
51except ImportError:
52 HAS_MATRIX = False
54from ..base import (
55 ChannelAdapter,
56 ChannelConfig,
57 ChannelStatus,
58 Message,
59 MessageType,
60 MediaAttachment,
61 SendResult,
62 ChannelConnectionError,
63 ChannelSendError,
64 ChannelRateLimitError,
65)
66from ..room_capable import RoomCapableAdapter, UnsupportedRoomError
68logger = logging.getLogger(__name__)
71@dataclass
72class MatrixConfig(ChannelConfig):
73 """Matrix-specific configuration."""
74 homeserver_url: str = "https://matrix.org"
75 user_id: str = ""
76 device_id: Optional[str] = None
77 device_name: str = "HevolveBotClient"
78 store_path: str = "./matrix_store"
79 enable_e2ee: bool = True
80 auto_join_rooms: bool = True
81 trust_own_devices: bool = True
82 verification_emoji: bool = True
85@dataclass
86class MatrixRoom:
87 """Matrix room information."""
88 room_id: str
89 name: Optional[str] = None
90 topic: Optional[str] = None
91 is_encrypted: bool = False
92 member_count: int = 0
93 is_direct: bool = False
96@dataclass
97class ThreadInfo:
98 """Thread information for Matrix threads (MSC3440)."""
99 root_event_id: str
100 latest_event_id: Optional[str] = None
101 reply_count: int = 0
104class MatrixAdapter(ChannelAdapter, RoomCapableAdapter):
105 """
106 Matrix protocol messaging adapter with E2EE support.
108 Usage:
109 config = MatrixConfig(
110 homeserver_url="https://matrix.org",
111 user_id="@bot:matrix.org",
112 token="access_token",
113 enable_e2ee=True,
114 )
115 adapter = MatrixAdapter(config)
116 adapter.on_message(my_handler)
117 await adapter.start()
118 """
120 def __init__(self, config: MatrixConfig):
121 if not HAS_MATRIX:
122 raise ImportError(
123 "matrix-nio not installed. "
124 "Install with: pip install matrix-nio[e2e]"
125 )
127 super().__init__(config)
128 self.matrix_config: MatrixConfig = config
129 self._client: Optional[AsyncClient] = None
130 self._sync_task: Optional[asyncio.Task] = None
131 self._rooms: Dict[str, MatrixRoom] = {}
132 self._threads: Dict[str, ThreadInfo] = {}
133 self._reaction_handlers: List[Callable] = []
134 self._verified_devices: set = set()
136 @property
137 def name(self) -> str:
138 return "matrix"
140 async def connect(self) -> bool:
141 """Connect to Matrix homeserver with optional E2EE setup."""
142 if not self.matrix_config.homeserver_url:
143 logger.error("Matrix homeserver URL not provided")
144 return False
146 try:
147 # Configure client
148 client_config = AsyncClientConfig(
149 max_limit_exceeded=0,
150 max_timeouts=0,
151 store_sync_tokens=True,
152 encryption_enabled=self.matrix_config.enable_e2ee,
153 )
155 # Initialize store for E2EE
156 store = None
157 if self.matrix_config.enable_e2ee:
158 os.makedirs(self.matrix_config.store_path, exist_ok=True)
159 store = SqliteStore(
160 self.matrix_config.user_id,
161 self.matrix_config.device_id or "HEVOLVEBOT",
162 self.matrix_config.store_path,
163 )
165 # Create client
166 self._client = AsyncClient(
167 homeserver=self.matrix_config.homeserver_url,
168 user=self.matrix_config.user_id,
169 device_id=self.matrix_config.device_id,
170 store_path=self.matrix_config.store_path if store else None,
171 config=client_config,
172 )
174 # Login or use token
175 if self.matrix_config.token:
176 self._client.access_token = self.matrix_config.token
177 self._client.user_id = self.matrix_config.user_id
178 if self.matrix_config.device_id:
179 self._client.device_id = self.matrix_config.device_id
180 else:
181 logger.error("Matrix access token required")
182 return False
184 # Setup E2EE if enabled
185 if self.matrix_config.enable_e2ee:
186 await self._setup_encryption()
188 # Register event callbacks
189 self._register_callbacks()
191 # Start sync
192 self._sync_task = asyncio.create_task(self._sync_forever())
194 self.status = ChannelStatus.CONNECTED
195 logger.info(f"Matrix connected as {self.matrix_config.user_id}")
196 return True
198 except Exception as e:
199 logger.error(f"Failed to connect to Matrix: {e}")
200 self.status = ChannelStatus.ERROR
201 return False
203 async def disconnect(self) -> None:
204 """Disconnect from Matrix homeserver."""
205 if self._sync_task:
206 self._sync_task.cancel()
207 try:
208 await self._sync_task
209 except asyncio.CancelledError:
210 pass
212 if self._client:
213 await self._client.close()
214 self._client = None
216 self.status = ChannelStatus.DISCONNECTED
218 async def _setup_encryption(self) -> None:
219 """Setup end-to-end encryption."""
220 if not self._client:
221 return
223 # Trust own devices if configured
224 if self.matrix_config.trust_own_devices:
225 await self._trust_own_devices()
227 logger.info("Matrix E2EE initialized")
229 async def _trust_own_devices(self) -> None:
230 """Trust all devices belonging to the bot user."""
231 if not self._client:
232 return
234 try:
235 # Get own devices
236 devices = await self._client.devices()
237 if hasattr(devices, 'devices'):
238 for device in devices.devices:
239 self._verified_devices.add(device.device_id)
240 except Exception as e:
241 logger.warning(f"Could not trust own devices: {e}")
243 def _register_callbacks(self) -> None:
244 """Register Matrix event callbacks."""
245 if not self._client:
246 return
248 # Message events
249 self._client.add_event_callback(
250 self._handle_room_message,
251 RoomMessageText
252 )
254 # Room invites
255 self._client.add_event_callback(
256 self._handle_invite,
257 InviteMemberEvent
258 )
260 # Encrypted messages (after decryption)
261 if self.matrix_config.enable_e2ee:
262 self._client.add_event_callback(
263 self._handle_megolm_event,
264 MegolmEvent
265 )
267 async def _sync_forever(self) -> None:
268 """Continuously sync with homeserver."""
269 while self._client and self.status == ChannelStatus.CONNECTED:
270 try:
271 sync_response = await self._client.sync(
272 timeout=30000,
273 full_state=True,
274 )
276 if isinstance(sync_response, SyncResponse):
277 # Update room list
278 for room_id, room in sync_response.rooms.join.items():
279 await self._update_room_info(room_id)
281 except asyncio.CancelledError:
282 break
283 except Exception as e:
284 logger.error(f"Matrix sync error: {e}")
285 await asyncio.sleep(5)
287 async def _handle_room_message(
288 self,
289 room: Any,
290 event: RoomMessageText
291 ) -> None:
292 """Handle incoming room messages."""
293 # Ignore own messages
294 if event.sender == self.matrix_config.user_id:
295 return
297 # Convert to unified message
298 message = self._convert_message(room, event)
299 await self._dispatch_message(message)
301 async def _handle_invite(self, room: Any, event: InviteMemberEvent) -> None:
302 """Handle room invite events."""
303 if not self.matrix_config.auto_join_rooms:
304 return
306 if event.state_key == self.matrix_config.user_id:
307 try:
308 await self._client.join(room.room_id)
309 logger.info(f"Auto-joined room: {room.room_id}")
310 except Exception as e:
311 logger.error(f"Failed to join room: {e}")
313 async def _handle_megolm_event(self, room: Any, event: MegolmEvent) -> None:
314 """Handle decrypted Megolm events."""
315 # This is called after successful decryption
316 if hasattr(event, 'decrypted'):
317 if event.sender == self.matrix_config.user_id:
318 return
320 message = self._convert_message(room, event)
321 await self._dispatch_message(message)
323 async def _update_room_info(self, room_id: str) -> None:
324 """Update cached room information."""
325 if not self._client:
326 return
328 try:
329 room = self._client.rooms.get(room_id)
330 if room:
331 self._rooms[room_id] = MatrixRoom(
332 room_id=room_id,
333 name=room.display_name if hasattr(room, 'display_name') else None,
334 topic=room.topic if hasattr(room, 'topic') else None,
335 is_encrypted=room.encrypted if hasattr(room, 'encrypted') else False,
336 member_count=room.member_count if hasattr(room, 'member_count') else 0,
337 is_direct=room.is_direct if hasattr(room, 'is_direct') else False,
338 )
339 except Exception as e:
340 logger.warning(f"Could not update room info: {e}")
342 def _convert_message(self, room: Any, event: Any) -> Message:
343 """Convert Matrix event to unified Message format."""
344 # Extract text content
345 text = ""
346 if hasattr(event, 'body'):
347 text = event.body
348 elif hasattr(event, 'content') and isinstance(event.content, dict):
349 text = event.content.get('body', '')
351 # Check for reply/thread
352 reply_to_id = None
353 if hasattr(event, 'content') and isinstance(event.content, dict):
354 relates_to = event.content.get('m.relates_to', {})
355 if 'm.in_reply_to' in relates_to:
356 reply_to_id = relates_to['m.in_reply_to'].get('event_id')
357 elif relates_to.get('rel_type') == 'm.thread':
358 reply_to_id = relates_to.get('event_id')
360 # Determine if group
361 is_group = True
362 room_info = self._rooms.get(room.room_id)
363 if room_info and room_info.is_direct:
364 is_group = False
366 # Check for bot mention
367 is_mentioned = False
368 if self.matrix_config.user_id in text:
369 is_mentioned = True
371 return Message(
372 id=event.event_id,
373 channel=self.name,
374 sender_id=event.sender,
375 sender_name=room.user_name(event.sender) if hasattr(room, 'user_name') else event.sender,
376 chat_id=room.room_id,
377 text=text,
378 reply_to_id=reply_to_id,
379 timestamp=datetime.fromtimestamp(event.server_timestamp / 1000) if hasattr(event, 'server_timestamp') else datetime.now(),
380 is_group=is_group,
381 is_bot_mentioned=is_mentioned,
382 raw={
383 'event_type': type(event).__name__,
384 'room_id': room.room_id,
385 'encrypted': room_info.is_encrypted if room_info else False,
386 },
387 )
389 async def send_message(
390 self,
391 chat_id: str,
392 text: str,
393 reply_to: Optional[str] = None,
394 media: Optional[List[MediaAttachment]] = None,
395 buttons: Optional[List[Dict]] = None,
396 ) -> SendResult:
397 """Send a message to a Matrix room."""
398 if not self._client:
399 return SendResult(success=False, error="Not connected")
401 try:
402 # Build message content
403 content = {
404 'msgtype': 'm.text',
405 'body': text,
406 }
408 # Add formatted body (HTML)
409 if '<' in text and '>' in text:
410 content['format'] = 'org.matrix.custom.html'
411 content['formatted_body'] = text
413 # Add reply relation
414 if reply_to:
415 content['m.relates_to'] = {
416 'm.in_reply_to': {
417 'event_id': reply_to
418 }
419 }
421 # Handle media
422 if media and len(media) > 0:
423 return await self._send_media(chat_id, text, media[0], reply_to)
425 # Send message
426 response = await self._client.room_send(
427 room_id=chat_id,
428 message_type='m.room.message',
429 content=content,
430 )
432 if isinstance(response, RoomSendResponse):
433 return SendResult(
434 success=True,
435 message_id=response.event_id,
436 )
437 else:
438 return SendResult(
439 success=False,
440 error=str(response),
441 )
443 except Exception as e:
444 logger.error(f"Failed to send Matrix message: {e}")
445 return SendResult(success=False, error=str(e))
447 async def _send_media(
448 self,
449 chat_id: str,
450 caption: str,
451 media: MediaAttachment,
452 reply_to: Optional[str] = None,
453 ) -> SendResult:
454 """Send media message to Matrix room."""
455 if not self._client:
456 return SendResult(success=False, error="Not connected")
458 try:
459 # Upload media first
460 if media.file_path:
461 with open(media.file_path, 'rb') as f:
462 file_data = f.read()
464 upload_response = await self._client.upload(
465 data_provider=file_data,
466 content_type=media.mime_type or 'application/octet-stream',
467 filename=media.file_name,
468 )
470 if isinstance(upload_response, UploadResponse):
471 mxc_url = upload_response.content_uri
472 else:
473 return SendResult(success=False, error="Upload failed")
474 elif media.url:
475 mxc_url = media.url
476 else:
477 return SendResult(success=False, error="No media source")
479 # Determine message type
480 msgtype = 'm.file'
481 if media.type == MessageType.IMAGE:
482 msgtype = 'm.image'
483 elif media.type == MessageType.VIDEO:
484 msgtype = 'm.video'
485 elif media.type == MessageType.AUDIO:
486 msgtype = 'm.audio'
488 # Build content
489 content = {
490 'msgtype': msgtype,
491 'body': caption or media.file_name or 'attachment',
492 'url': mxc_url,
493 }
495 if media.mime_type:
496 content['info'] = {'mimetype': media.mime_type}
498 if reply_to:
499 content['m.relates_to'] = {
500 'm.in_reply_to': {'event_id': reply_to}
501 }
503 # Send
504 response = await self._client.room_send(
505 room_id=chat_id,
506 message_type='m.room.message',
507 content=content,
508 )
510 if isinstance(response, RoomSendResponse):
511 return SendResult(success=True, message_id=response.event_id)
512 else:
513 return SendResult(success=False, error=str(response))
515 except Exception as e:
516 logger.error(f"Failed to send Matrix media: {e}")
517 return SendResult(success=False, error=str(e))
519 async def edit_message(
520 self,
521 chat_id: str,
522 message_id: str,
523 text: str,
524 buttons: Optional[List[Dict]] = None,
525 ) -> SendResult:
526 """Edit an existing Matrix message."""
527 if not self._client:
528 return SendResult(success=False, error="Not connected")
530 try:
531 content = {
532 'msgtype': 'm.text',
533 'body': f'* {text}',
534 'm.new_content': {
535 'msgtype': 'm.text',
536 'body': text,
537 },
538 'm.relates_to': {
539 'rel_type': 'm.replace',
540 'event_id': message_id,
541 },
542 }
544 response = await self._client.room_send(
545 room_id=chat_id,
546 message_type='m.room.message',
547 content=content,
548 )
550 if isinstance(response, RoomSendResponse):
551 return SendResult(success=True, message_id=response.event_id)
552 else:
553 return SendResult(success=False, error=str(response))
555 except Exception as e:
556 logger.error(f"Failed to edit Matrix message: {e}")
557 return SendResult(success=False, error=str(e))
559 async def delete_message(self, chat_id: str, message_id: str) -> bool:
560 """Redact a Matrix message."""
561 if not self._client:
562 return False
564 try:
565 response = await self._client.room_redact(
566 room_id=chat_id,
567 event_id=message_id,
568 reason="Deleted by bot",
569 )
570 return True
571 except Exception as e:
572 logger.error(f"Failed to delete Matrix message: {e}")
573 return False
575 async def send_typing(self, chat_id: str) -> None:
576 """Send typing indicator."""
577 if self._client:
578 try:
579 await self._client.room_typing(
580 room_id=chat_id,
581 typing_state=True,
582 timeout=30000,
583 )
584 except Exception:
585 pass
587 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
588 """Get information about a Matrix room."""
589 room_info = self._rooms.get(chat_id)
590 if room_info:
591 return {
592 'room_id': room_info.room_id,
593 'name': room_info.name,
594 'topic': room_info.topic,
595 'encrypted': room_info.is_encrypted,
596 'member_count': room_info.member_count,
597 'is_direct': room_info.is_direct,
598 }
599 return None
601 # Matrix-specific methods
603 async def create_room(
604 self,
605 name: str,
606 topic: Optional[str] = None,
607 invite: Optional[List[str]] = None,
608 is_direct: bool = False,
609 encrypted: bool = True,
610 ) -> Optional[str]:
611 """Create a new Matrix room."""
612 if not self._client:
613 return None
615 try:
616 initial_state = []
617 if encrypted and self.matrix_config.enable_e2ee:
618 initial_state.append({
619 'type': 'm.room.encryption',
620 'content': {'algorithm': 'm.megolm.v1.aes-sha2'},
621 })
623 response = await self._client.room_create(
624 name=name,
625 topic=topic,
626 invite=invite or [],
627 is_direct=is_direct,
628 initial_state=initial_state,
629 )
631 if isinstance(response, RoomCreateResponse):
632 return response.room_id
633 return None
635 except Exception as e:
636 logger.error(f"Failed to create room: {e}")
637 return None
639 async def join_room(self, room_id: str,
640 role: str = 'participant') -> bool:
641 """Join a Matrix room (UNIF-G2 RoomCapableAdapter contract).
643 Accepts both room ids (``!room:server``) and aliases
644 (``#alias:server``). Role is informational only — Matrix
645 permissions are server-side via power levels and the bot's
646 send permissions are set by the room's existing ACL.
647 """
648 if not self._client:
649 return False
650 if not room_id:
651 return False
652 try:
653 response = await self._client.join(room_id)
654 ok = isinstance(response, JoinResponse)
655 if ok:
656 logger.info(
657 "Matrix.join_room: %s joined (role=%s)",
658 room_id, role)
659 return ok
660 except Exception as e:
661 logger.error(f"Matrix.join_room: failed for {room_id}: {e}")
662 return False
664 async def leave_room(self, room_id: str) -> bool:
665 """Leave a Matrix room. Idempotent on already-absent."""
666 if not self._client or not room_id:
667 return False
668 try:
669 await self._client.room_leave(room_id)
670 return True
671 except Exception as e:
672 logger.error(f"Matrix.leave_room: failed for {room_id}: {e}")
673 return False
675 async def list_room_members(self, room_id: str) -> List[Dict[str, Any]]:
676 """List Matrix room members.
678 Reads from the local sync state when available
679 (``client.rooms[room_id].users``); falls back to ``[]`` when the
680 client hasn't synced this room yet. Skips the bot's own
681 user_id.
682 """
683 if not self._client or not room_id:
684 return []
685 try:
686 room = (self._client.rooms or {}).get(room_id)
687 if room is None:
688 return []
689 users = getattr(room, 'users', None) or {}
690 self_uid = getattr(self._client, 'user_id', None)
691 result: List[Dict[str, Any]] = []
692 for uid, user in users.items():
693 if uid == self_uid:
694 continue
695 result.append({
696 'id': str(uid),
697 'display_name': (
698 getattr(user, 'display_name', None)
699 or getattr(user, 'name', None) or str(uid)),
700 'is_bot': False,
701 })
702 return result
703 except Exception as e:
704 logger.error(
705 f"Matrix.list_room_members: failed for {room_id}: {e}")
706 return []
708 async def invite_user(self, room_id: str, user_id: str) -> bool:
709 """Invite a user to a Matrix room."""
710 if not self._client:
711 return False
713 try:
714 await self._client.room_invite(room_id, user_id)
715 return True
716 except Exception as e:
717 logger.error(f"Failed to invite user: {e}")
718 return False
720 async def add_reaction(
721 self,
722 chat_id: str,
723 message_id: str,
724 emoji: str,
725 ) -> bool:
726 """Add a reaction to a message."""
727 if not self._client:
728 return False
730 try:
731 content = {
732 'm.relates_to': {
733 'rel_type': 'm.annotation',
734 'event_id': message_id,
735 'key': emoji,
736 }
737 }
739 await self._client.room_send(
740 room_id=chat_id,
741 message_type='m.reaction',
742 content=content,
743 )
744 return True
745 except Exception as e:
746 logger.error(f"Failed to add reaction: {e}")
747 return False
749 async def send_thread_reply(
750 self,
751 chat_id: str,
752 thread_root_id: str,
753 text: str,
754 ) -> SendResult:
755 """Send a reply in a thread (MSC3440)."""
756 if not self._client:
757 return SendResult(success=False, error="Not connected")
759 try:
760 content = {
761 'msgtype': 'm.text',
762 'body': text,
763 'm.relates_to': {
764 'rel_type': 'm.thread',
765 'event_id': thread_root_id,
766 'is_falling_back': True,
767 'm.in_reply_to': {
768 'event_id': thread_root_id
769 }
770 }
771 }
773 response = await self._client.room_send(
774 room_id=chat_id,
775 message_type='m.room.message',
776 content=content,
777 )
779 if isinstance(response, RoomSendResponse):
780 return SendResult(success=True, message_id=response.event_id)
781 else:
782 return SendResult(success=False, error=str(response))
784 except Exception as e:
785 logger.error(f"Failed to send thread reply: {e}")
786 return SendResult(success=False, error=str(e))
788 async def send_read_receipt(self, chat_id: str, message_id: str) -> bool:
789 """Send read receipt for a message."""
790 if not self._client:
791 return False
793 try:
794 await self._client.room_read_markers(
795 room_id=chat_id,
796 fully_read_event=message_id,
797 read_event=message_id,
798 )
799 return True
800 except Exception as e:
801 logger.error(f"Failed to send read receipt: {e}")
802 return False
804 async def verify_device(self, user_id: str, device_id: str) -> bool:
805 """Verify a device for E2EE."""
806 if not self._client or not self.matrix_config.enable_e2ee:
807 return False
809 try:
810 await self._client.verify_device(user_id, device_id)
811 self._verified_devices.add(f"{user_id}:{device_id}")
812 return True
813 except Exception as e:
814 logger.error(f"Failed to verify device: {e}")
815 return False
818def create_matrix_adapter(
819 homeserver_url: str = None,
820 user_id: str = None,
821 token: str = None,
822 **kwargs
823) -> MatrixAdapter:
824 """
825 Factory function to create Matrix adapter.
827 Args:
828 homeserver_url: Matrix homeserver URL (or set MATRIX_HOMESERVER_URL env var)
829 user_id: Bot user ID (or set MATRIX_USER_ID env var)
830 token: Access token (or set MATRIX_ACCESS_TOKEN env var)
831 **kwargs: Additional config options
833 Returns:
834 Configured MatrixAdapter
835 """
836 homeserver_url = homeserver_url or os.getenv("MATRIX_HOMESERVER_URL", "https://matrix.org")
837 user_id = user_id or os.getenv("MATRIX_USER_ID")
838 token = token or os.getenv("MATRIX_ACCESS_TOKEN")
840 if not user_id:
841 raise ValueError("Matrix user ID required")
842 if not token:
843 raise ValueError("Matrix access token required")
845 config = MatrixConfig(
846 homeserver_url=homeserver_url,
847 user_id=user_id,
848 token=token,
849 **kwargs
850 )
851 return MatrixAdapter(config)