Coverage for integrations / channels / extensions / matrix_adapter.py: 32.2%

323 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Matrix Protocol Channel Adapter 

3 

4Implements Matrix messaging with end-to-end encryption support. 

5Based on HevolveBot extension patterns for Matrix. 

6 

7Features: 

8- End-to-end encryption (E2EE) using Olm/Megolm 

9- Room management (create, join, invite, leave) 

10- Reactions 

11- Thread support (MSC3440) 

12- Read receipts 

13- Typing indicators 

14- Media upload/download 

15- Device verification 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import logging 

22import os 

23import json 

24from typing import Optional, List, Dict, Any, Callable 

25from datetime import datetime 

26from dataclasses import dataclass, field 

27 

28try: 

29 from nio import ( 

30 AsyncClient, 

31 AsyncClientConfig, 

32 LoginResponse, 

33 RoomMessageText, 

34 RoomMessageMedia, 

35 RoomMemberEvent, 

36 InviteMemberEvent, 

37 MatrixRoom, 

38 Event, 

39 SyncResponse, 

40 UploadResponse, 

41 RoomCreateResponse, 

42 JoinResponse, 

43 RoomSendResponse, 

44 RoomResolveAliasResponse, 

45 ToDeviceEvent, 

46 KeyVerificationEvent, 

47 MegolmEvent, 

48 ) 

49 from nio.store import SqliteStore 

50 HAS_MATRIX = True 

51except ImportError: 

52 HAS_MATRIX = False 

53 

54from ..base import ( 

55 ChannelAdapter, 

56 ChannelConfig, 

57 ChannelStatus, 

58 Message, 

59 MessageType, 

60 MediaAttachment, 

61 SendResult, 

62 ChannelConnectionError, 

63 ChannelSendError, 

64 ChannelRateLimitError, 

65) 

66from ..room_capable import RoomCapableAdapter, UnsupportedRoomError 

67 

68logger = logging.getLogger(__name__) 

69 

70 

71@dataclass 

72class MatrixConfig(ChannelConfig): 

73 """Matrix-specific configuration.""" 

74 homeserver_url: str = "https://matrix.org" 

75 user_id: str = "" 

76 device_id: Optional[str] = None 

77 device_name: str = "HevolveBotClient" 

78 store_path: str = "./matrix_store" 

79 enable_e2ee: bool = True 

80 auto_join_rooms: bool = True 

81 trust_own_devices: bool = True 

82 verification_emoji: bool = True 

83 

84 

85@dataclass 

86class MatrixRoom: 

87 """Matrix room information.""" 

88 room_id: str 

89 name: Optional[str] = None 

90 topic: Optional[str] = None 

91 is_encrypted: bool = False 

92 member_count: int = 0 

93 is_direct: bool = False 

94 

95 

96@dataclass 

97class ThreadInfo: 

98 """Thread information for Matrix threads (MSC3440).""" 

99 root_event_id: str 

100 latest_event_id: Optional[str] = None 

101 reply_count: int = 0 

102 

103 

104class MatrixAdapter(ChannelAdapter, RoomCapableAdapter): 

105 """ 

106 Matrix protocol messaging adapter with E2EE support. 

107 

108 Usage: 

109 config = MatrixConfig( 

110 homeserver_url="https://matrix.org", 

111 user_id="@bot:matrix.org", 

112 token="access_token", 

113 enable_e2ee=True, 

114 ) 

115 adapter = MatrixAdapter(config) 

116 adapter.on_message(my_handler) 

117 await adapter.start() 

118 """ 

119 

120 def __init__(self, config: MatrixConfig): 

121 if not HAS_MATRIX: 

122 raise ImportError( 

123 "matrix-nio not installed. " 

124 "Install with: pip install matrix-nio[e2e]" 

125 ) 

126 

127 super().__init__(config) 

128 self.matrix_config: MatrixConfig = config 

129 self._client: Optional[AsyncClient] = None 

130 self._sync_task: Optional[asyncio.Task] = None 

131 self._rooms: Dict[str, MatrixRoom] = {} 

132 self._threads: Dict[str, ThreadInfo] = {} 

133 self._reaction_handlers: List[Callable] = [] 

134 self._verified_devices: set = set() 

135 

136 @property 

137 def name(self) -> str: 

138 return "matrix" 

139 

140 async def connect(self) -> bool: 

141 """Connect to Matrix homeserver with optional E2EE setup.""" 

142 if not self.matrix_config.homeserver_url: 

143 logger.error("Matrix homeserver URL not provided") 

144 return False 

145 

146 try: 

147 # Configure client 

148 client_config = AsyncClientConfig( 

149 max_limit_exceeded=0, 

150 max_timeouts=0, 

151 store_sync_tokens=True, 

152 encryption_enabled=self.matrix_config.enable_e2ee, 

153 ) 

154 

155 # Initialize store for E2EE 

156 store = None 

157 if self.matrix_config.enable_e2ee: 

158 os.makedirs(self.matrix_config.store_path, exist_ok=True) 

159 store = SqliteStore( 

160 self.matrix_config.user_id, 

161 self.matrix_config.device_id or "HEVOLVEBOT", 

162 self.matrix_config.store_path, 

163 ) 

164 

165 # Create client 

166 self._client = AsyncClient( 

167 homeserver=self.matrix_config.homeserver_url, 

168 user=self.matrix_config.user_id, 

169 device_id=self.matrix_config.device_id, 

170 store_path=self.matrix_config.store_path if store else None, 

171 config=client_config, 

172 ) 

173 

174 # Login or use token 

175 if self.matrix_config.token: 

176 self._client.access_token = self.matrix_config.token 

177 self._client.user_id = self.matrix_config.user_id 

178 if self.matrix_config.device_id: 

179 self._client.device_id = self.matrix_config.device_id 

180 else: 

181 logger.error("Matrix access token required") 

182 return False 

183 

184 # Setup E2EE if enabled 

185 if self.matrix_config.enable_e2ee: 

186 await self._setup_encryption() 

187 

188 # Register event callbacks 

189 self._register_callbacks() 

190 

191 # Start sync 

192 self._sync_task = asyncio.create_task(self._sync_forever()) 

193 

194 self.status = ChannelStatus.CONNECTED 

195 logger.info(f"Matrix connected as {self.matrix_config.user_id}") 

196 return True 

197 

198 except Exception as e: 

199 logger.error(f"Failed to connect to Matrix: {e}") 

200 self.status = ChannelStatus.ERROR 

201 return False 

202 

203 async def disconnect(self) -> None: 

204 """Disconnect from Matrix homeserver.""" 

205 if self._sync_task: 

206 self._sync_task.cancel() 

207 try: 

208 await self._sync_task 

209 except asyncio.CancelledError: 

210 pass 

211 

212 if self._client: 

213 await self._client.close() 

214 self._client = None 

215 

216 self.status = ChannelStatus.DISCONNECTED 

217 

218 async def _setup_encryption(self) -> None: 

219 """Setup end-to-end encryption.""" 

220 if not self._client: 

221 return 

222 

223 # Trust own devices if configured 

224 if self.matrix_config.trust_own_devices: 

225 await self._trust_own_devices() 

226 

227 logger.info("Matrix E2EE initialized") 

228 

229 async def _trust_own_devices(self) -> None: 

230 """Trust all devices belonging to the bot user.""" 

231 if not self._client: 

232 return 

233 

234 try: 

235 # Get own devices 

236 devices = await self._client.devices() 

237 if hasattr(devices, 'devices'): 

238 for device in devices.devices: 

239 self._verified_devices.add(device.device_id) 

240 except Exception as e: 

241 logger.warning(f"Could not trust own devices: {e}") 

242 

243 def _register_callbacks(self) -> None: 

244 """Register Matrix event callbacks.""" 

245 if not self._client: 

246 return 

247 

248 # Message events 

249 self._client.add_event_callback( 

250 self._handle_room_message, 

251 RoomMessageText 

252 ) 

253 

254 # Room invites 

255 self._client.add_event_callback( 

256 self._handle_invite, 

257 InviteMemberEvent 

258 ) 

259 

260 # Encrypted messages (after decryption) 

261 if self.matrix_config.enable_e2ee: 

262 self._client.add_event_callback( 

263 self._handle_megolm_event, 

264 MegolmEvent 

265 ) 

266 

267 async def _sync_forever(self) -> None: 

268 """Continuously sync with homeserver.""" 

269 while self._client and self.status == ChannelStatus.CONNECTED: 

270 try: 

271 sync_response = await self._client.sync( 

272 timeout=30000, 

273 full_state=True, 

274 ) 

275 

276 if isinstance(sync_response, SyncResponse): 

277 # Update room list 

278 for room_id, room in sync_response.rooms.join.items(): 

279 await self._update_room_info(room_id) 

280 

281 except asyncio.CancelledError: 

282 break 

283 except Exception as e: 

284 logger.error(f"Matrix sync error: {e}") 

285 await asyncio.sleep(5) 

286 

287 async def _handle_room_message( 

288 self, 

289 room: Any, 

290 event: RoomMessageText 

291 ) -> None: 

292 """Handle incoming room messages.""" 

293 # Ignore own messages 

294 if event.sender == self.matrix_config.user_id: 

295 return 

296 

297 # Convert to unified message 

298 message = self._convert_message(room, event) 

299 await self._dispatch_message(message) 

300 

301 async def _handle_invite(self, room: Any, event: InviteMemberEvent) -> None: 

302 """Handle room invite events.""" 

303 if not self.matrix_config.auto_join_rooms: 

304 return 

305 

306 if event.state_key == self.matrix_config.user_id: 

307 try: 

308 await self._client.join(room.room_id) 

309 logger.info(f"Auto-joined room: {room.room_id}") 

310 except Exception as e: 

311 logger.error(f"Failed to join room: {e}") 

312 

313 async def _handle_megolm_event(self, room: Any, event: MegolmEvent) -> None: 

314 """Handle decrypted Megolm events.""" 

315 # This is called after successful decryption 

316 if hasattr(event, 'decrypted'): 

317 if event.sender == self.matrix_config.user_id: 

318 return 

319 

320 message = self._convert_message(room, event) 

321 await self._dispatch_message(message) 

322 

323 async def _update_room_info(self, room_id: str) -> None: 

324 """Update cached room information.""" 

325 if not self._client: 

326 return 

327 

328 try: 

329 room = self._client.rooms.get(room_id) 

330 if room: 

331 self._rooms[room_id] = MatrixRoom( 

332 room_id=room_id, 

333 name=room.display_name if hasattr(room, 'display_name') else None, 

334 topic=room.topic if hasattr(room, 'topic') else None, 

335 is_encrypted=room.encrypted if hasattr(room, 'encrypted') else False, 

336 member_count=room.member_count if hasattr(room, 'member_count') else 0, 

337 is_direct=room.is_direct if hasattr(room, 'is_direct') else False, 

338 ) 

339 except Exception as e: 

340 logger.warning(f"Could not update room info: {e}") 

341 

342 def _convert_message(self, room: Any, event: Any) -> Message: 

343 """Convert Matrix event to unified Message format.""" 

344 # Extract text content 

345 text = "" 

346 if hasattr(event, 'body'): 

347 text = event.body 

348 elif hasattr(event, 'content') and isinstance(event.content, dict): 

349 text = event.content.get('body', '') 

350 

351 # Check for reply/thread 

352 reply_to_id = None 

353 if hasattr(event, 'content') and isinstance(event.content, dict): 

354 relates_to = event.content.get('m.relates_to', {}) 

355 if 'm.in_reply_to' in relates_to: 

356 reply_to_id = relates_to['m.in_reply_to'].get('event_id') 

357 elif relates_to.get('rel_type') == 'm.thread': 

358 reply_to_id = relates_to.get('event_id') 

359 

360 # Determine if group 

361 is_group = True 

362 room_info = self._rooms.get(room.room_id) 

363 if room_info and room_info.is_direct: 

364 is_group = False 

365 

366 # Check for bot mention 

367 is_mentioned = False 

368 if self.matrix_config.user_id in text: 

369 is_mentioned = True 

370 

371 return Message( 

372 id=event.event_id, 

373 channel=self.name, 

374 sender_id=event.sender, 

375 sender_name=room.user_name(event.sender) if hasattr(room, 'user_name') else event.sender, 

376 chat_id=room.room_id, 

377 text=text, 

378 reply_to_id=reply_to_id, 

379 timestamp=datetime.fromtimestamp(event.server_timestamp / 1000) if hasattr(event, 'server_timestamp') else datetime.now(), 

380 is_group=is_group, 

381 is_bot_mentioned=is_mentioned, 

382 raw={ 

383 'event_type': type(event).__name__, 

384 'room_id': room.room_id, 

385 'encrypted': room_info.is_encrypted if room_info else False, 

386 }, 

387 ) 

388 

389 async def send_message( 

390 self, 

391 chat_id: str, 

392 text: str, 

393 reply_to: Optional[str] = None, 

394 media: Optional[List[MediaAttachment]] = None, 

395 buttons: Optional[List[Dict]] = None, 

396 ) -> SendResult: 

397 """Send a message to a Matrix room.""" 

398 if not self._client: 

399 return SendResult(success=False, error="Not connected") 

400 

401 try: 

402 # Build message content 

403 content = { 

404 'msgtype': 'm.text', 

405 'body': text, 

406 } 

407 

408 # Add formatted body (HTML) 

409 if '<' in text and '>' in text: 

410 content['format'] = 'org.matrix.custom.html' 

411 content['formatted_body'] = text 

412 

413 # Add reply relation 

414 if reply_to: 

415 content['m.relates_to'] = { 

416 'm.in_reply_to': { 

417 'event_id': reply_to 

418 } 

419 } 

420 

421 # Handle media 

422 if media and len(media) > 0: 

423 return await self._send_media(chat_id, text, media[0], reply_to) 

424 

425 # Send message 

426 response = await self._client.room_send( 

427 room_id=chat_id, 

428 message_type='m.room.message', 

429 content=content, 

430 ) 

431 

432 if isinstance(response, RoomSendResponse): 

433 return SendResult( 

434 success=True, 

435 message_id=response.event_id, 

436 ) 

437 else: 

438 return SendResult( 

439 success=False, 

440 error=str(response), 

441 ) 

442 

443 except Exception as e: 

444 logger.error(f"Failed to send Matrix message: {e}") 

445 return SendResult(success=False, error=str(e)) 

446 

447 async def _send_media( 

448 self, 

449 chat_id: str, 

450 caption: str, 

451 media: MediaAttachment, 

452 reply_to: Optional[str] = None, 

453 ) -> SendResult: 

454 """Send media message to Matrix room.""" 

455 if not self._client: 

456 return SendResult(success=False, error="Not connected") 

457 

458 try: 

459 # Upload media first 

460 if media.file_path: 

461 with open(media.file_path, 'rb') as f: 

462 file_data = f.read() 

463 

464 upload_response = await self._client.upload( 

465 data_provider=file_data, 

466 content_type=media.mime_type or 'application/octet-stream', 

467 filename=media.file_name, 

468 ) 

469 

470 if isinstance(upload_response, UploadResponse): 

471 mxc_url = upload_response.content_uri 

472 else: 

473 return SendResult(success=False, error="Upload failed") 

474 elif media.url: 

475 mxc_url = media.url 

476 else: 

477 return SendResult(success=False, error="No media source") 

478 

479 # Determine message type 

480 msgtype = 'm.file' 

481 if media.type == MessageType.IMAGE: 

482 msgtype = 'm.image' 

483 elif media.type == MessageType.VIDEO: 

484 msgtype = 'm.video' 

485 elif media.type == MessageType.AUDIO: 

486 msgtype = 'm.audio' 

487 

488 # Build content 

489 content = { 

490 'msgtype': msgtype, 

491 'body': caption or media.file_name or 'attachment', 

492 'url': mxc_url, 

493 } 

494 

495 if media.mime_type: 

496 content['info'] = {'mimetype': media.mime_type} 

497 

498 if reply_to: 

499 content['m.relates_to'] = { 

500 'm.in_reply_to': {'event_id': reply_to} 

501 } 

502 

503 # Send 

504 response = await self._client.room_send( 

505 room_id=chat_id, 

506 message_type='m.room.message', 

507 content=content, 

508 ) 

509 

510 if isinstance(response, RoomSendResponse): 

511 return SendResult(success=True, message_id=response.event_id) 

512 else: 

513 return SendResult(success=False, error=str(response)) 

514 

515 except Exception as e: 

516 logger.error(f"Failed to send Matrix media: {e}") 

517 return SendResult(success=False, error=str(e)) 

518 

519 async def edit_message( 

520 self, 

521 chat_id: str, 

522 message_id: str, 

523 text: str, 

524 buttons: Optional[List[Dict]] = None, 

525 ) -> SendResult: 

526 """Edit an existing Matrix message.""" 

527 if not self._client: 

528 return SendResult(success=False, error="Not connected") 

529 

530 try: 

531 content = { 

532 'msgtype': 'm.text', 

533 'body': f'* {text}', 

534 'm.new_content': { 

535 'msgtype': 'm.text', 

536 'body': text, 

537 }, 

538 'm.relates_to': { 

539 'rel_type': 'm.replace', 

540 'event_id': message_id, 

541 }, 

542 } 

543 

544 response = await self._client.room_send( 

545 room_id=chat_id, 

546 message_type='m.room.message', 

547 content=content, 

548 ) 

549 

550 if isinstance(response, RoomSendResponse): 

551 return SendResult(success=True, message_id=response.event_id) 

552 else: 

553 return SendResult(success=False, error=str(response)) 

554 

555 except Exception as e: 

556 logger.error(f"Failed to edit Matrix message: {e}") 

557 return SendResult(success=False, error=str(e)) 

558 

559 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

560 """Redact a Matrix message.""" 

561 if not self._client: 

562 return False 

563 

564 try: 

565 response = await self._client.room_redact( 

566 room_id=chat_id, 

567 event_id=message_id, 

568 reason="Deleted by bot", 

569 ) 

570 return True 

571 except Exception as e: 

572 logger.error(f"Failed to delete Matrix message: {e}") 

573 return False 

574 

575 async def send_typing(self, chat_id: str) -> None: 

576 """Send typing indicator.""" 

577 if self._client: 

578 try: 

579 await self._client.room_typing( 

580 room_id=chat_id, 

581 typing_state=True, 

582 timeout=30000, 

583 ) 

584 except Exception: 

585 pass 

586 

587 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

588 """Get information about a Matrix room.""" 

589 room_info = self._rooms.get(chat_id) 

590 if room_info: 

591 return { 

592 'room_id': room_info.room_id, 

593 'name': room_info.name, 

594 'topic': room_info.topic, 

595 'encrypted': room_info.is_encrypted, 

596 'member_count': room_info.member_count, 

597 'is_direct': room_info.is_direct, 

598 } 

599 return None 

600 

601 # Matrix-specific methods 

602 

603 async def create_room( 

604 self, 

605 name: str, 

606 topic: Optional[str] = None, 

607 invite: Optional[List[str]] = None, 

608 is_direct: bool = False, 

609 encrypted: bool = True, 

610 ) -> Optional[str]: 

611 """Create a new Matrix room.""" 

612 if not self._client: 

613 return None 

614 

615 try: 

616 initial_state = [] 

617 if encrypted and self.matrix_config.enable_e2ee: 

618 initial_state.append({ 

619 'type': 'm.room.encryption', 

620 'content': {'algorithm': 'm.megolm.v1.aes-sha2'}, 

621 }) 

622 

623 response = await self._client.room_create( 

624 name=name, 

625 topic=topic, 

626 invite=invite or [], 

627 is_direct=is_direct, 

628 initial_state=initial_state, 

629 ) 

630 

631 if isinstance(response, RoomCreateResponse): 

632 return response.room_id 

633 return None 

634 

635 except Exception as e: 

636 logger.error(f"Failed to create room: {e}") 

637 return None 

638 

639 async def join_room(self, room_id: str, 

640 role: str = 'participant') -> bool: 

641 """Join a Matrix room (UNIF-G2 RoomCapableAdapter contract). 

642 

643 Accepts both room ids (``!room:server``) and aliases 

644 (``#alias:server``). Role is informational only — Matrix 

645 permissions are server-side via power levels and the bot's 

646 send permissions are set by the room's existing ACL. 

647 """ 

648 if not self._client: 

649 return False 

650 if not room_id: 

651 return False 

652 try: 

653 response = await self._client.join(room_id) 

654 ok = isinstance(response, JoinResponse) 

655 if ok: 

656 logger.info( 

657 "Matrix.join_room: %s joined (role=%s)", 

658 room_id, role) 

659 return ok 

660 except Exception as e: 

661 logger.error(f"Matrix.join_room: failed for {room_id}: {e}") 

662 return False 

663 

664 async def leave_room(self, room_id: str) -> bool: 

665 """Leave a Matrix room. Idempotent on already-absent.""" 

666 if not self._client or not room_id: 

667 return False 

668 try: 

669 await self._client.room_leave(room_id) 

670 return True 

671 except Exception as e: 

672 logger.error(f"Matrix.leave_room: failed for {room_id}: {e}") 

673 return False 

674 

675 async def list_room_members(self, room_id: str) -> List[Dict[str, Any]]: 

676 """List Matrix room members. 

677 

678 Reads from the local sync state when available 

679 (``client.rooms[room_id].users``); falls back to ``[]`` when the 

680 client hasn't synced this room yet. Skips the bot's own 

681 user_id. 

682 """ 

683 if not self._client or not room_id: 

684 return [] 

685 try: 

686 room = (self._client.rooms or {}).get(room_id) 

687 if room is None: 

688 return [] 

689 users = getattr(room, 'users', None) or {} 

690 self_uid = getattr(self._client, 'user_id', None) 

691 result: List[Dict[str, Any]] = [] 

692 for uid, user in users.items(): 

693 if uid == self_uid: 

694 continue 

695 result.append({ 

696 'id': str(uid), 

697 'display_name': ( 

698 getattr(user, 'display_name', None) 

699 or getattr(user, 'name', None) or str(uid)), 

700 'is_bot': False, 

701 }) 

702 return result 

703 except Exception as e: 

704 logger.error( 

705 f"Matrix.list_room_members: failed for {room_id}: {e}") 

706 return [] 

707 

708 async def invite_user(self, room_id: str, user_id: str) -> bool: 

709 """Invite a user to a Matrix room.""" 

710 if not self._client: 

711 return False 

712 

713 try: 

714 await self._client.room_invite(room_id, user_id) 

715 return True 

716 except Exception as e: 

717 logger.error(f"Failed to invite user: {e}") 

718 return False 

719 

720 async def add_reaction( 

721 self, 

722 chat_id: str, 

723 message_id: str, 

724 emoji: str, 

725 ) -> bool: 

726 """Add a reaction to a message.""" 

727 if not self._client: 

728 return False 

729 

730 try: 

731 content = { 

732 'm.relates_to': { 

733 'rel_type': 'm.annotation', 

734 'event_id': message_id, 

735 'key': emoji, 

736 } 

737 } 

738 

739 await self._client.room_send( 

740 room_id=chat_id, 

741 message_type='m.reaction', 

742 content=content, 

743 ) 

744 return True 

745 except Exception as e: 

746 logger.error(f"Failed to add reaction: {e}") 

747 return False 

748 

749 async def send_thread_reply( 

750 self, 

751 chat_id: str, 

752 thread_root_id: str, 

753 text: str, 

754 ) -> SendResult: 

755 """Send a reply in a thread (MSC3440).""" 

756 if not self._client: 

757 return SendResult(success=False, error="Not connected") 

758 

759 try: 

760 content = { 

761 'msgtype': 'm.text', 

762 'body': text, 

763 'm.relates_to': { 

764 'rel_type': 'm.thread', 

765 'event_id': thread_root_id, 

766 'is_falling_back': True, 

767 'm.in_reply_to': { 

768 'event_id': thread_root_id 

769 } 

770 } 

771 } 

772 

773 response = await self._client.room_send( 

774 room_id=chat_id, 

775 message_type='m.room.message', 

776 content=content, 

777 ) 

778 

779 if isinstance(response, RoomSendResponse): 

780 return SendResult(success=True, message_id=response.event_id) 

781 else: 

782 return SendResult(success=False, error=str(response)) 

783 

784 except Exception as e: 

785 logger.error(f"Failed to send thread reply: {e}") 

786 return SendResult(success=False, error=str(e)) 

787 

788 async def send_read_receipt(self, chat_id: str, message_id: str) -> bool: 

789 """Send read receipt for a message.""" 

790 if not self._client: 

791 return False 

792 

793 try: 

794 await self._client.room_read_markers( 

795 room_id=chat_id, 

796 fully_read_event=message_id, 

797 read_event=message_id, 

798 ) 

799 return True 

800 except Exception as e: 

801 logger.error(f"Failed to send read receipt: {e}") 

802 return False 

803 

804 async def verify_device(self, user_id: str, device_id: str) -> bool: 

805 """Verify a device for E2EE.""" 

806 if not self._client or not self.matrix_config.enable_e2ee: 

807 return False 

808 

809 try: 

810 await self._client.verify_device(user_id, device_id) 

811 self._verified_devices.add(f"{user_id}:{device_id}") 

812 return True 

813 except Exception as e: 

814 logger.error(f"Failed to verify device: {e}") 

815 return False 

816 

817 

818def create_matrix_adapter( 

819 homeserver_url: str = None, 

820 user_id: str = None, 

821 token: str = None, 

822 **kwargs 

823) -> MatrixAdapter: 

824 """ 

825 Factory function to create Matrix adapter. 

826 

827 Args: 

828 homeserver_url: Matrix homeserver URL (or set MATRIX_HOMESERVER_URL env var) 

829 user_id: Bot user ID (or set MATRIX_USER_ID env var) 

830 token: Access token (or set MATRIX_ACCESS_TOKEN env var) 

831 **kwargs: Additional config options 

832 

833 Returns: 

834 Configured MatrixAdapter 

835 """ 

836 homeserver_url = homeserver_url or os.getenv("MATRIX_HOMESERVER_URL", "https://matrix.org") 

837 user_id = user_id or os.getenv("MATRIX_USER_ID") 

838 token = token or os.getenv("MATRIX_ACCESS_TOKEN") 

839 

840 if not user_id: 

841 raise ValueError("Matrix user ID required") 

842 if not token: 

843 raise ValueError("Matrix access token required") 

844 

845 config = MatrixConfig( 

846 homeserver_url=homeserver_url, 

847 user_id=user_id, 

848 token=token, 

849 **kwargs 

850 ) 

851 return MatrixAdapter(config)