Coverage for integrations / channels / imessage_adapter.py: 29.1%

254 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2iMessage Channel Adapter 

3 

4Implements iMessage messaging using BlueBubbles API. 

5Designed for Docker-compatible deployments with cross-platform access. 

6 

7Features: 

8- BlueBubbles API integration (cross-platform iMessage access) 

9- Group chats 

10- Tapbacks (reactions) 

11- Attachments 

12- Read receipts 

13- Typing indicators 

14 

15Requirements: 

16- BlueBubbles server running on a Mac (https://bluebubbles.app/) 

17- API access configured 

18 

19Note: BlueBubbles requires a Mac running as a server to relay iMessages. 

20This adapter connects to that Mac's BlueBubbles API from Docker/Linux. 

21""" 

22 

23from __future__ import annotations 

24 

25import asyncio 

26import logging 

27import os 

28import base64 

29import mimetypes 

30from typing import Optional, List, Dict, Any 

31from datetime import datetime 

32from pathlib import Path 

33 

34try: 

35 import aiohttp 

36 HAS_AIOHTTP = True 

37except ImportError: 

38 HAS_AIOHTTP = False 

39 

40try: 

41 import socketio 

42 HAS_SOCKETIO = True 

43except ImportError: 

44 HAS_SOCKETIO = False 

45 

46from .base import ( 

47 ChannelAdapter, 

48 ChannelConfig, 

49 ChannelStatus, 

50 Message, 

51 MessageType, 

52 MediaAttachment, 

53 SendResult, 

54 ChannelConnectionError, 

55 ChannelSendError, 

56 ChannelRateLimitError, 

57) 

58 

59logger = logging.getLogger(__name__) 

60 

61 

62# Tapback reaction mappings 

63TAPBACK_MAP = { 

64 "love": 2000, 

65 "like": 2001, 

66 "dislike": 2002, 

67 "laugh": 2003, 

68 "emphasize": 2004, 

69 "question": 2005, 

70} 

71 

72TAPBACK_EMOJI_MAP = { 

73 "heart": "love", 

74 "thumbsup": "like", 

75 "thumbsdown": "dislike", 

76 "haha": "laugh", 

77 "!!": "emphasize", 

78 "?": "question", 

79} 

80 

81 

82class IMessageAdapter(ChannelAdapter): 

83 """ 

84 iMessage adapter using BlueBubbles API. 

85 

86 Usage: 

87 config = ChannelConfig( 

88 token="your_bluebubbles_password", 

89 extra={ 

90 "api_url": "http://your-mac:1234", 

91 } 

92 ) 

93 adapter = IMessageAdapter(config) 

94 adapter.on_message(my_handler) 

95 await adapter.start() 

96 """ 

97 

98 def __init__(self, config: ChannelConfig): 

99 if not HAS_AIOHTTP: 

100 raise ImportError( 

101 "aiohttp not installed. " 

102 "Install with: pip install aiohttp" 

103 ) 

104 

105 super().__init__(config) 

106 self._password = config.token 

107 self._api_url = config.extra.get("api_url", "http://localhost:1234") 

108 self._session: Optional[aiohttp.ClientSession] = None 

109 self._sio: Optional[Any] = None 

110 self._running = False 

111 self._reconnect_delay = 5 

112 self._max_reconnect_delay = 300 

113 self._last_message_guid: Optional[str] = None 

114 

115 @property 

116 def name(self) -> str: 

117 return "imessage" 

118 

119 async def connect(self) -> bool: 

120 """Connect to BlueBubbles API.""" 

121 if not self._password: 

122 logger.error("BlueBubbles password not provided") 

123 return False 

124 

125 try: 

126 # Create session with auth 

127 self._session = aiohttp.ClientSession( 

128 headers={"Authorization": self._password} 

129 ) 

130 

131 # Verify API connection 

132 async with self._session.get( 

133 f"{self._api_url}/api/v1/server/info" 

134 ) as response: 

135 if response.status != 200: 

136 logger.error("BlueBubbles API not available") 

137 return False 

138 

139 info = await response.json() 

140 logger.info(f"Connected to BlueBubbles v{info.get('data', {}).get('server_version', 'unknown')}") 

141 

142 # Set up Socket.IO for real-time messages 

143 if HAS_SOCKETIO: 

144 await self._setup_socketio() 

145 else: 

146 # Fall back to polling 

147 logger.warning("python-socketio not installed, using polling mode") 

148 asyncio.create_task(self._poll_messages()) 

149 

150 self._running = True 

151 self.status = ChannelStatus.CONNECTED 

152 return True 

153 

154 except aiohttp.ClientError as e: 

155 logger.error(f"Failed to connect to BlueBubbles: {e}") 

156 self.status = ChannelStatus.ERROR 

157 return False 

158 except Exception as e: 

159 logger.error(f"BlueBubbles connection error: {e}") 

160 self.status = ChannelStatus.ERROR 

161 return False 

162 

163 async def _setup_socketio(self) -> None: 

164 """Set up Socket.IO connection for real-time messages.""" 

165 try: 

166 self._sio = socketio.AsyncClient() 

167 

168 @self._sio.event 

169 async def connect(): 

170 logger.info("Socket.IO connected to BlueBubbles") 

171 # Subscribe to new messages 

172 await self._sio.emit("subscribe", {"topic": "new-message"}) 

173 

174 @self._sio.event 

175 async def disconnect(): 

176 logger.warning("Socket.IO disconnected from BlueBubbles") 

177 if self._running: 

178 asyncio.create_task(self._reconnect_socketio()) 

179 

180 @self._sio.on("new-message") 

181 async def on_new_message(data): 

182 message = self._convert_message(data) 

183 if message: 

184 await self._dispatch_message(message) 

185 

186 @self._sio.on("message-send-error") 

187 async def on_send_error(data): 

188 logger.error(f"Message send error: {data}") 

189 

190 # Connect with auth 

191 await self._sio.connect( 

192 self._api_url, 

193 auth={"password": self._password}, 

194 transports=["websocket", "polling"], 

195 ) 

196 

197 except Exception as e: 

198 logger.error(f"Failed to setup Socket.IO: {e}") 

199 # Fall back to polling 

200 asyncio.create_task(self._poll_messages()) 

201 

202 async def _reconnect_socketio(self) -> None: 

203 """Attempt to reconnect Socket.IO.""" 

204 delay = self._reconnect_delay 

205 

206 while self._running: 

207 try: 

208 await asyncio.sleep(delay) 

209 await self._sio.connect( 

210 self._api_url, 

211 auth={"password": self._password}, 

212 ) 

213 break 

214 except Exception as e: 

215 logger.error(f"Socket.IO reconnection failed: {e}") 

216 delay = min(delay * 2, self._max_reconnect_delay) 

217 

218 async def _poll_messages(self) -> None: 

219 """Poll for new messages (fallback when Socket.IO unavailable).""" 

220 reconnect_delay = self._reconnect_delay 

221 

222 while self._running: 

223 try: 

224 params = {"limit": 50, "sort": "DESC"} 

225 if self._last_message_guid: 

226 params["after"] = self._last_message_guid 

227 

228 async with self._session.get( 

229 f"{self._api_url}/api/v1/message", 

230 params=params, 

231 ) as response: 

232 if response.status == 200: 

233 data = await response.json() 

234 messages = data.get("data", []) 

235 

236 # Process in chronological order 

237 for msg_data in reversed(messages): 

238 # Skip sent messages (is_from_me) 

239 if msg_data.get("is_from_me"): 

240 continue 

241 

242 message = self._convert_message(msg_data) 

243 if message: 

244 await self._dispatch_message(message) 

245 self._last_message_guid = msg_data.get("guid") 

246 

247 reconnect_delay = self._reconnect_delay 

248 

249 await asyncio.sleep(2) # Poll interval 

250 

251 except asyncio.CancelledError: 

252 break 

253 except Exception as e: 

254 logger.error(f"Polling error: {e}") 

255 await asyncio.sleep(reconnect_delay) 

256 reconnect_delay = min(reconnect_delay * 2, self._max_reconnect_delay) 

257 

258 async def disconnect(self) -> None: 

259 """Disconnect from BlueBubbles API.""" 

260 self._running = False 

261 

262 if self._sio and self._sio.connected: 

263 await self._sio.disconnect() 

264 self._sio = None 

265 

266 if self._session: 

267 await self._session.close() 

268 self._session = None 

269 

270 self.status = ChannelStatus.DISCONNECTED 

271 logger.info("Disconnected from BlueBubbles") 

272 

273 def _convert_message(self, msg_data: Dict[str, Any]) -> Optional[Message]: 

274 """Convert BlueBubbles message to unified Message format.""" 

275 # Skip system messages or empty messages 

276 if not msg_data.get("text") and not msg_data.get("attachments"): 

277 return None 

278 

279 # Skip messages from self 

280 if msg_data.get("is_from_me"): 

281 return None 

282 

283 handle = msg_data.get("handle", {}) 

284 chat = msg_data.get("chat", {}) or (msg_data.get("chats", [{}])[0] if msg_data.get("chats") else {}) 

285 

286 # Get sender info 

287 sender_id = handle.get("address", "") or handle.get("id", "") 

288 sender_name = handle.get("displayName") or sender_id 

289 

290 # Determine chat type 

291 is_group = chat.get("style") == 43 # Group chat style 

292 

293 # Get chat ID (GUID) 

294 chat_id = chat.get("guid", "") or msg_data.get("chatGuid", "") 

295 

296 # Process attachments 

297 media = [] 

298 for att in msg_data.get("attachments", []): 

299 media_type = self._get_media_type(att.get("mime_type", "")) 

300 media.append(MediaAttachment( 

301 type=media_type, 

302 file_id=att.get("guid"), 

303 file_name=att.get("transfer_name"), 

304 mime_type=att.get("mime_type"), 

305 file_size=att.get("total_bytes"), 

306 )) 

307 

308 # Parse timestamp 

309 timestamp = msg_data.get("date_created") 

310 if isinstance(timestamp, (int, float)): 

311 # BlueBubbles uses milliseconds 

312 timestamp = datetime.fromtimestamp(timestamp / 1000) 

313 else: 

314 timestamp = datetime.now() 

315 

316 return Message( 

317 id=msg_data.get("guid", ""), 

318 channel=self.name, 

319 sender_id=sender_id, 

320 sender_name=sender_name, 

321 chat_id=chat_id, 

322 text=msg_data.get("text", ""), 

323 media=media, 

324 reply_to_id=msg_data.get("thread_origin_guid"), 

325 timestamp=timestamp, 

326 is_group=is_group, 

327 is_bot_mentioned=False, # iMessage doesn't have @mentions 

328 raw=msg_data, 

329 ) 

330 

331 def _get_media_type(self, mime_type: str) -> MessageType: 

332 """Get MessageType from MIME type.""" 

333 if mime_type.startswith("image/"): 

334 return MessageType.IMAGE 

335 elif mime_type.startswith("video/"): 

336 return MessageType.VIDEO 

337 elif mime_type.startswith("audio/"): 

338 return MessageType.AUDIO 

339 else: 

340 return MessageType.DOCUMENT 

341 

342 async def send_message( 

343 self, 

344 chat_id: str, 

345 text: str, 

346 reply_to: Optional[str] = None, 

347 media: Optional[List[MediaAttachment]] = None, 

348 buttons: Optional[List[Dict]] = None, 

349 ) -> SendResult: 

350 """Send an iMessage.""" 

351 if not self._session: 

352 return SendResult(success=False, error="Not connected") 

353 

354 try: 

355 payload = { 

356 "chatGuid": chat_id, 

357 "message": text, 

358 "method": "private-api", # Use private API for better delivery 

359 } 

360 

361 # Handle reply/thread 

362 if reply_to: 

363 payload["selectedMessageGuid"] = reply_to 

364 

365 # Handle attachments 

366 if media and len(media) > 0: 

367 return await self._send_with_attachments(chat_id, text, media, reply_to) 

368 

369 async with self._session.post( 

370 f"{self._api_url}/api/v1/message/text", 

371 json=payload, 

372 ) as response: 

373 if response.status in (200, 201): 

374 data = await response.json() 

375 return SendResult( 

376 success=True, 

377 message_id=data.get("data", {}).get("guid", ""), 

378 raw=data, 

379 ) 

380 else: 

381 error_text = await response.text() 

382 logger.error(f"Failed to send iMessage: {error_text}") 

383 return SendResult(success=False, error=error_text) 

384 

385 except Exception as e: 

386 logger.error(f"Error sending iMessage: {e}") 

387 return SendResult(success=False, error=str(e)) 

388 

389 async def _send_with_attachments( 

390 self, 

391 chat_id: str, 

392 text: str, 

393 media: List[MediaAttachment], 

394 reply_to: Optional[str], 

395 ) -> SendResult: 

396 """Send message with attachments.""" 

397 try: 

398 # BlueBubbles uses multipart form for attachments 

399 data = aiohttp.FormData() 

400 data.add_field("chatGuid", chat_id) 

401 if text: 

402 data.add_field("message", text) 

403 if reply_to: 

404 data.add_field("selectedMessageGuid", reply_to) 

405 

406 for idx, m in enumerate(media): 

407 if m.file_path and Path(m.file_path).exists(): 

408 path = Path(m.file_path) 

409 content = path.read_bytes() 

410 data.add_field( 

411 f"attachment", 

412 content, 

413 filename=m.file_name or path.name, 

414 content_type=m.mime_type or mimetypes.guess_type(str(path))[0], 

415 ) 

416 elif m.url: 

417 # Download and attach 

418 async with self._session.get(m.url) as response: 

419 if response.status == 200: 

420 content = await response.read() 

421 data.add_field( 

422 f"attachment", 

423 content, 

424 filename=m.file_name or "attachment", 

425 content_type=m.mime_type or response.content_type, 

426 ) 

427 

428 async with self._session.post( 

429 f"{self._api_url}/api/v1/message/attachment", 

430 data=data, 

431 ) as response: 

432 if response.status in (200, 201): 

433 result = await response.json() 

434 return SendResult( 

435 success=True, 

436 message_id=result.get("data", {}).get("guid", ""), 

437 raw=result, 

438 ) 

439 else: 

440 error_text = await response.text() 

441 return SendResult(success=False, error=error_text) 

442 

443 except Exception as e: 

444 logger.error(f"Failed to send attachment: {e}") 

445 return SendResult(success=False, error=str(e)) 

446 

447 async def edit_message( 

448 self, 

449 chat_id: str, 

450 message_id: str, 

451 text: str, 

452 buttons: Optional[List[Dict]] = None, 

453 ) -> SendResult: 

454 """Edit an existing message (requires iOS 16+).""" 

455 if not self._session: 

456 return SendResult(success=False, error="Not connected") 

457 

458 try: 

459 payload = { 

460 "editedMessage": text, 

461 "backwardsCompatMessage": f"[Edited] {text}", 

462 } 

463 

464 async with self._session.post( 

465 f"{self._api_url}/api/v1/message/{message_id}/edit", 

466 json=payload, 

467 ) as response: 

468 if response.status in (200, 201): 

469 data = await response.json() 

470 return SendResult( 

471 success=True, 

472 message_id=message_id, 

473 raw=data, 

474 ) 

475 else: 

476 # Fall back to sending edit indicator 

477 return await self.send_message(chat_id, f"[Edit] {text}") 

478 

479 except Exception as e: 

480 logger.error(f"Failed to edit message: {e}") 

481 return await self.send_message(chat_id, f"[Edit] {text}") 

482 

483 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

484 """Unsend a message (requires iOS 16+).""" 

485 if not self._session: 

486 return False 

487 

488 try: 

489 async with self._session.post( 

490 f"{self._api_url}/api/v1/message/{message_id}/unsend" 

491 ) as response: 

492 return response.status in (200, 201, 204) 

493 

494 except Exception as e: 

495 logger.error(f"Failed to unsend message: {e}") 

496 return False 

497 

498 async def send_typing(self, chat_id: str) -> None: 

499 """Send typing indicator.""" 

500 if not self._session: 

501 return 

502 

503 try: 

504 await self._session.post( 

505 f"{self._api_url}/api/v1/chat/{chat_id}/typing", 

506 json={"status": True}, 

507 ) 

508 except Exception as e: 

509 logger.debug(f"Failed to send typing indicator: {e}") 

510 

511 async def stop_typing(self, chat_id: str) -> None: 

512 """Stop typing indicator.""" 

513 if not self._session: 

514 return 

515 

516 try: 

517 await self._session.post( 

518 f"{self._api_url}/api/v1/chat/{chat_id}/typing", 

519 json={"status": False}, 

520 ) 

521 except Exception: 

522 pass 

523 

524 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

525 """Get information about a chat.""" 

526 if not self._session: 

527 return None 

528 

529 try: 

530 async with self._session.get( 

531 f"{self._api_url}/api/v1/chat/{chat_id}" 

532 ) as response: 

533 if response.status == 200: 

534 data = await response.json() 

535 chat = data.get("data", {}) 

536 return { 

537 "id": chat.get("guid"), 

538 "type": "group" if chat.get("style") == 43 else "direct", 

539 "display_name": chat.get("displayName"), 

540 "participants": [ 

541 p.get("address") for p in chat.get("participants", []) 

542 ], 

543 } 

544 

545 except Exception as e: 

546 logger.error(f"Failed to get chat info: {e}") 

547 

548 return None 

549 

550 async def send_tapback( 

551 self, 

552 chat_id: str, 

553 message_id: str, 

554 tapback: str, 

555 remove: bool = False, 

556 ) -> bool: 

557 """ 

558 Send a tapback (reaction) to a message. 

559 

560 Args: 

561 chat_id: Chat GUID 

562 message_id: Message GUID to react to 

563 tapback: Tapback type (love, like, dislike, laugh, emphasize, question) 

564 remove: Whether to remove the tapback 

565 """ 

566 if not self._session: 

567 return False 

568 

569 # Map emoji-style names to tapback names 

570 tapback = TAPBACK_EMOJI_MAP.get(tapback, tapback) 

571 

572 if tapback not in TAPBACK_MAP: 

573 logger.error(f"Invalid tapback type: {tapback}") 

574 return False 

575 

576 try: 

577 payload = { 

578 "selectedMessageGuid": message_id, 

579 "reaction": TAPBACK_MAP[tapback] + (1000 if remove else 0), 

580 } 

581 

582 async with self._session.post( 

583 f"{self._api_url}/api/v1/message/react", 

584 json=payload, 

585 ) as response: 

586 return response.status in (200, 201) 

587 

588 except Exception as e: 

589 logger.error(f"Failed to send tapback: {e}") 

590 return False 

591 

592 async def mark_read(self, chat_id: str) -> bool: 

593 """Mark chat as read.""" 

594 if not self._session: 

595 return False 

596 

597 try: 

598 async with self._session.post( 

599 f"{self._api_url}/api/v1/chat/{chat_id}/read" 

600 ) as response: 

601 return response.status in (200, 201, 204) 

602 

603 except Exception as e: 

604 logger.error(f"Failed to mark chat as read: {e}") 

605 return False 

606 

607 async def create_group( 

608 self, 

609 participants: List[str], 

610 name: Optional[str] = None, 

611 ) -> Optional[str]: 

612 """Create a new group chat.""" 

613 if not self._session: 

614 return None 

615 

616 try: 

617 payload = { 

618 "addresses": participants, 

619 } 

620 if name: 

621 payload["name"] = name 

622 

623 async with self._session.post( 

624 f"{self._api_url}/api/v1/chat/new", 

625 json=payload, 

626 ) as response: 

627 if response.status in (200, 201): 

628 data = await response.json() 

629 return data.get("data", {}).get("guid") 

630 

631 except Exception as e: 

632 logger.error(f"Failed to create group: {e}") 

633 

634 return None 

635 

636 async def download_attachment( 

637 self, 

638 attachment_id: str, 

639 destination: str, 

640 ) -> bool: 

641 """Download an attachment.""" 

642 if not self._session: 

643 return False 

644 

645 try: 

646 async with self._session.get( 

647 f"{self._api_url}/api/v1/attachment/{attachment_id}/download" 

648 ) as response: 

649 if response.status == 200: 

650 content = await response.read() 

651 Path(destination).write_bytes(content) 

652 return True 

653 

654 except Exception as e: 

655 logger.error(f"Failed to download attachment: {e}") 

656 

657 return False 

658 

659 

660def create_imessage_adapter( 

661 password: str = None, 

662 api_url: str = None, 

663 **kwargs 

664) -> IMessageAdapter: 

665 """ 

666 Factory function to create iMessage adapter. 

667 

668 Args: 

669 password: BlueBubbles server password (or set BLUEBUBBLES_PASSWORD env var) 

670 api_url: BlueBubbles API URL (or set BLUEBUBBLES_URL env var) 

671 **kwargs: Additional config options 

672 

673 Returns: 

674 Configured IMessageAdapter 

675 """ 

676 password = password or os.getenv("BLUEBUBBLES_PASSWORD") 

677 if not password: 

678 raise ValueError("BlueBubbles password required") 

679 

680 api_url = api_url or os.getenv("BLUEBUBBLES_URL", "http://localhost:1234") 

681 

682 config = ChannelConfig( 

683 token=password, 

684 extra={"api_url": api_url, **kwargs.get("extra", {})}, 

685 **{k: v for k, v in kwargs.items() if k != "extra"}, 

686 ) 

687 return IMessageAdapter(config)