Coverage for integrations / channels / imessage_adapter.py: 29.1%
254 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2iMessage Channel Adapter
4Implements iMessage messaging using BlueBubbles API.
5Designed for Docker-compatible deployments with cross-platform access.
7Features:
8- BlueBubbles API integration (cross-platform iMessage access)
9- Group chats
10- Tapbacks (reactions)
11- Attachments
12- Read receipts
13- Typing indicators
15Requirements:
16- BlueBubbles server running on a Mac (https://bluebubbles.app/)
17- API access configured
19Note: BlueBubbles requires a Mac running as a server to relay iMessages.
20This adapter connects to that Mac's BlueBubbles API from Docker/Linux.
21"""
23from __future__ import annotations
25import asyncio
26import logging
27import os
28import base64
29import mimetypes
30from typing import Optional, List, Dict, Any
31from datetime import datetime
32from pathlib import Path
34try:
35 import aiohttp
36 HAS_AIOHTTP = True
37except ImportError:
38 HAS_AIOHTTP = False
40try:
41 import socketio
42 HAS_SOCKETIO = True
43except ImportError:
44 HAS_SOCKETIO = False
46from .base import (
47 ChannelAdapter,
48 ChannelConfig,
49 ChannelStatus,
50 Message,
51 MessageType,
52 MediaAttachment,
53 SendResult,
54 ChannelConnectionError,
55 ChannelSendError,
56 ChannelRateLimitError,
57)
59logger = logging.getLogger(__name__)
62# Tapback reaction mappings
63TAPBACK_MAP = {
64 "love": 2000,
65 "like": 2001,
66 "dislike": 2002,
67 "laugh": 2003,
68 "emphasize": 2004,
69 "question": 2005,
70}
72TAPBACK_EMOJI_MAP = {
73 "heart": "love",
74 "thumbsup": "like",
75 "thumbsdown": "dislike",
76 "haha": "laugh",
77 "!!": "emphasize",
78 "?": "question",
79}
82class IMessageAdapter(ChannelAdapter):
83 """
84 iMessage adapter using BlueBubbles API.
86 Usage:
87 config = ChannelConfig(
88 token="your_bluebubbles_password",
89 extra={
90 "api_url": "http://your-mac:1234",
91 }
92 )
93 adapter = IMessageAdapter(config)
94 adapter.on_message(my_handler)
95 await adapter.start()
96 """
98 def __init__(self, config: ChannelConfig):
99 if not HAS_AIOHTTP:
100 raise ImportError(
101 "aiohttp not installed. "
102 "Install with: pip install aiohttp"
103 )
105 super().__init__(config)
106 self._password = config.token
107 self._api_url = config.extra.get("api_url", "http://localhost:1234")
108 self._session: Optional[aiohttp.ClientSession] = None
109 self._sio: Optional[Any] = None
110 self._running = False
111 self._reconnect_delay = 5
112 self._max_reconnect_delay = 300
113 self._last_message_guid: Optional[str] = None
115 @property
116 def name(self) -> str:
117 return "imessage"
119 async def connect(self) -> bool:
120 """Connect to BlueBubbles API."""
121 if not self._password:
122 logger.error("BlueBubbles password not provided")
123 return False
125 try:
126 # Create session with auth
127 self._session = aiohttp.ClientSession(
128 headers={"Authorization": self._password}
129 )
131 # Verify API connection
132 async with self._session.get(
133 f"{self._api_url}/api/v1/server/info"
134 ) as response:
135 if response.status != 200:
136 logger.error("BlueBubbles API not available")
137 return False
139 info = await response.json()
140 logger.info(f"Connected to BlueBubbles v{info.get('data', {}).get('server_version', 'unknown')}")
142 # Set up Socket.IO for real-time messages
143 if HAS_SOCKETIO:
144 await self._setup_socketio()
145 else:
146 # Fall back to polling
147 logger.warning("python-socketio not installed, using polling mode")
148 asyncio.create_task(self._poll_messages())
150 self._running = True
151 self.status = ChannelStatus.CONNECTED
152 return True
154 except aiohttp.ClientError as e:
155 logger.error(f"Failed to connect to BlueBubbles: {e}")
156 self.status = ChannelStatus.ERROR
157 return False
158 except Exception as e:
159 logger.error(f"BlueBubbles connection error: {e}")
160 self.status = ChannelStatus.ERROR
161 return False
163 async def _setup_socketio(self) -> None:
164 """Set up Socket.IO connection for real-time messages."""
165 try:
166 self._sio = socketio.AsyncClient()
168 @self._sio.event
169 async def connect():
170 logger.info("Socket.IO connected to BlueBubbles")
171 # Subscribe to new messages
172 await self._sio.emit("subscribe", {"topic": "new-message"})
174 @self._sio.event
175 async def disconnect():
176 logger.warning("Socket.IO disconnected from BlueBubbles")
177 if self._running:
178 asyncio.create_task(self._reconnect_socketio())
180 @self._sio.on("new-message")
181 async def on_new_message(data):
182 message = self._convert_message(data)
183 if message:
184 await self._dispatch_message(message)
186 @self._sio.on("message-send-error")
187 async def on_send_error(data):
188 logger.error(f"Message send error: {data}")
190 # Connect with auth
191 await self._sio.connect(
192 self._api_url,
193 auth={"password": self._password},
194 transports=["websocket", "polling"],
195 )
197 except Exception as e:
198 logger.error(f"Failed to setup Socket.IO: {e}")
199 # Fall back to polling
200 asyncio.create_task(self._poll_messages())
202 async def _reconnect_socketio(self) -> None:
203 """Attempt to reconnect Socket.IO."""
204 delay = self._reconnect_delay
206 while self._running:
207 try:
208 await asyncio.sleep(delay)
209 await self._sio.connect(
210 self._api_url,
211 auth={"password": self._password},
212 )
213 break
214 except Exception as e:
215 logger.error(f"Socket.IO reconnection failed: {e}")
216 delay = min(delay * 2, self._max_reconnect_delay)
218 async def _poll_messages(self) -> None:
219 """Poll for new messages (fallback when Socket.IO unavailable)."""
220 reconnect_delay = self._reconnect_delay
222 while self._running:
223 try:
224 params = {"limit": 50, "sort": "DESC"}
225 if self._last_message_guid:
226 params["after"] = self._last_message_guid
228 async with self._session.get(
229 f"{self._api_url}/api/v1/message",
230 params=params,
231 ) as response:
232 if response.status == 200:
233 data = await response.json()
234 messages = data.get("data", [])
236 # Process in chronological order
237 for msg_data in reversed(messages):
238 # Skip sent messages (is_from_me)
239 if msg_data.get("is_from_me"):
240 continue
242 message = self._convert_message(msg_data)
243 if message:
244 await self._dispatch_message(message)
245 self._last_message_guid = msg_data.get("guid")
247 reconnect_delay = self._reconnect_delay
249 await asyncio.sleep(2) # Poll interval
251 except asyncio.CancelledError:
252 break
253 except Exception as e:
254 logger.error(f"Polling error: {e}")
255 await asyncio.sleep(reconnect_delay)
256 reconnect_delay = min(reconnect_delay * 2, self._max_reconnect_delay)
258 async def disconnect(self) -> None:
259 """Disconnect from BlueBubbles API."""
260 self._running = False
262 if self._sio and self._sio.connected:
263 await self._sio.disconnect()
264 self._sio = None
266 if self._session:
267 await self._session.close()
268 self._session = None
270 self.status = ChannelStatus.DISCONNECTED
271 logger.info("Disconnected from BlueBubbles")
273 def _convert_message(self, msg_data: Dict[str, Any]) -> Optional[Message]:
274 """Convert BlueBubbles message to unified Message format."""
275 # Skip system messages or empty messages
276 if not msg_data.get("text") and not msg_data.get("attachments"):
277 return None
279 # Skip messages from self
280 if msg_data.get("is_from_me"):
281 return None
283 handle = msg_data.get("handle", {})
284 chat = msg_data.get("chat", {}) or (msg_data.get("chats", [{}])[0] if msg_data.get("chats") else {})
286 # Get sender info
287 sender_id = handle.get("address", "") or handle.get("id", "")
288 sender_name = handle.get("displayName") or sender_id
290 # Determine chat type
291 is_group = chat.get("style") == 43 # Group chat style
293 # Get chat ID (GUID)
294 chat_id = chat.get("guid", "") or msg_data.get("chatGuid", "")
296 # Process attachments
297 media = []
298 for att in msg_data.get("attachments", []):
299 media_type = self._get_media_type(att.get("mime_type", ""))
300 media.append(MediaAttachment(
301 type=media_type,
302 file_id=att.get("guid"),
303 file_name=att.get("transfer_name"),
304 mime_type=att.get("mime_type"),
305 file_size=att.get("total_bytes"),
306 ))
308 # Parse timestamp
309 timestamp = msg_data.get("date_created")
310 if isinstance(timestamp, (int, float)):
311 # BlueBubbles uses milliseconds
312 timestamp = datetime.fromtimestamp(timestamp / 1000)
313 else:
314 timestamp = datetime.now()
316 return Message(
317 id=msg_data.get("guid", ""),
318 channel=self.name,
319 sender_id=sender_id,
320 sender_name=sender_name,
321 chat_id=chat_id,
322 text=msg_data.get("text", ""),
323 media=media,
324 reply_to_id=msg_data.get("thread_origin_guid"),
325 timestamp=timestamp,
326 is_group=is_group,
327 is_bot_mentioned=False, # iMessage doesn't have @mentions
328 raw=msg_data,
329 )
331 def _get_media_type(self, mime_type: str) -> MessageType:
332 """Get MessageType from MIME type."""
333 if mime_type.startswith("image/"):
334 return MessageType.IMAGE
335 elif mime_type.startswith("video/"):
336 return MessageType.VIDEO
337 elif mime_type.startswith("audio/"):
338 return MessageType.AUDIO
339 else:
340 return MessageType.DOCUMENT
342 async def send_message(
343 self,
344 chat_id: str,
345 text: str,
346 reply_to: Optional[str] = None,
347 media: Optional[List[MediaAttachment]] = None,
348 buttons: Optional[List[Dict]] = None,
349 ) -> SendResult:
350 """Send an iMessage."""
351 if not self._session:
352 return SendResult(success=False, error="Not connected")
354 try:
355 payload = {
356 "chatGuid": chat_id,
357 "message": text,
358 "method": "private-api", # Use private API for better delivery
359 }
361 # Handle reply/thread
362 if reply_to:
363 payload["selectedMessageGuid"] = reply_to
365 # Handle attachments
366 if media and len(media) > 0:
367 return await self._send_with_attachments(chat_id, text, media, reply_to)
369 async with self._session.post(
370 f"{self._api_url}/api/v1/message/text",
371 json=payload,
372 ) as response:
373 if response.status in (200, 201):
374 data = await response.json()
375 return SendResult(
376 success=True,
377 message_id=data.get("data", {}).get("guid", ""),
378 raw=data,
379 )
380 else:
381 error_text = await response.text()
382 logger.error(f"Failed to send iMessage: {error_text}")
383 return SendResult(success=False, error=error_text)
385 except Exception as e:
386 logger.error(f"Error sending iMessage: {e}")
387 return SendResult(success=False, error=str(e))
389 async def _send_with_attachments(
390 self,
391 chat_id: str,
392 text: str,
393 media: List[MediaAttachment],
394 reply_to: Optional[str],
395 ) -> SendResult:
396 """Send message with attachments."""
397 try:
398 # BlueBubbles uses multipart form for attachments
399 data = aiohttp.FormData()
400 data.add_field("chatGuid", chat_id)
401 if text:
402 data.add_field("message", text)
403 if reply_to:
404 data.add_field("selectedMessageGuid", reply_to)
406 for idx, m in enumerate(media):
407 if m.file_path and Path(m.file_path).exists():
408 path = Path(m.file_path)
409 content = path.read_bytes()
410 data.add_field(
411 f"attachment",
412 content,
413 filename=m.file_name or path.name,
414 content_type=m.mime_type or mimetypes.guess_type(str(path))[0],
415 )
416 elif m.url:
417 # Download and attach
418 async with self._session.get(m.url) as response:
419 if response.status == 200:
420 content = await response.read()
421 data.add_field(
422 f"attachment",
423 content,
424 filename=m.file_name or "attachment",
425 content_type=m.mime_type or response.content_type,
426 )
428 async with self._session.post(
429 f"{self._api_url}/api/v1/message/attachment",
430 data=data,
431 ) as response:
432 if response.status in (200, 201):
433 result = await response.json()
434 return SendResult(
435 success=True,
436 message_id=result.get("data", {}).get("guid", ""),
437 raw=result,
438 )
439 else:
440 error_text = await response.text()
441 return SendResult(success=False, error=error_text)
443 except Exception as e:
444 logger.error(f"Failed to send attachment: {e}")
445 return SendResult(success=False, error=str(e))
447 async def edit_message(
448 self,
449 chat_id: str,
450 message_id: str,
451 text: str,
452 buttons: Optional[List[Dict]] = None,
453 ) -> SendResult:
454 """Edit an existing message (requires iOS 16+)."""
455 if not self._session:
456 return SendResult(success=False, error="Not connected")
458 try:
459 payload = {
460 "editedMessage": text,
461 "backwardsCompatMessage": f"[Edited] {text}",
462 }
464 async with self._session.post(
465 f"{self._api_url}/api/v1/message/{message_id}/edit",
466 json=payload,
467 ) as response:
468 if response.status in (200, 201):
469 data = await response.json()
470 return SendResult(
471 success=True,
472 message_id=message_id,
473 raw=data,
474 )
475 else:
476 # Fall back to sending edit indicator
477 return await self.send_message(chat_id, f"[Edit] {text}")
479 except Exception as e:
480 logger.error(f"Failed to edit message: {e}")
481 return await self.send_message(chat_id, f"[Edit] {text}")
483 async def delete_message(self, chat_id: str, message_id: str) -> bool:
484 """Unsend a message (requires iOS 16+)."""
485 if not self._session:
486 return False
488 try:
489 async with self._session.post(
490 f"{self._api_url}/api/v1/message/{message_id}/unsend"
491 ) as response:
492 return response.status in (200, 201, 204)
494 except Exception as e:
495 logger.error(f"Failed to unsend message: {e}")
496 return False
498 async def send_typing(self, chat_id: str) -> None:
499 """Send typing indicator."""
500 if not self._session:
501 return
503 try:
504 await self._session.post(
505 f"{self._api_url}/api/v1/chat/{chat_id}/typing",
506 json={"status": True},
507 )
508 except Exception as e:
509 logger.debug(f"Failed to send typing indicator: {e}")
511 async def stop_typing(self, chat_id: str) -> None:
512 """Stop typing indicator."""
513 if not self._session:
514 return
516 try:
517 await self._session.post(
518 f"{self._api_url}/api/v1/chat/{chat_id}/typing",
519 json={"status": False},
520 )
521 except Exception:
522 pass
524 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
525 """Get information about a chat."""
526 if not self._session:
527 return None
529 try:
530 async with self._session.get(
531 f"{self._api_url}/api/v1/chat/{chat_id}"
532 ) as response:
533 if response.status == 200:
534 data = await response.json()
535 chat = data.get("data", {})
536 return {
537 "id": chat.get("guid"),
538 "type": "group" if chat.get("style") == 43 else "direct",
539 "display_name": chat.get("displayName"),
540 "participants": [
541 p.get("address") for p in chat.get("participants", [])
542 ],
543 }
545 except Exception as e:
546 logger.error(f"Failed to get chat info: {e}")
548 return None
550 async def send_tapback(
551 self,
552 chat_id: str,
553 message_id: str,
554 tapback: str,
555 remove: bool = False,
556 ) -> bool:
557 """
558 Send a tapback (reaction) to a message.
560 Args:
561 chat_id: Chat GUID
562 message_id: Message GUID to react to
563 tapback: Tapback type (love, like, dislike, laugh, emphasize, question)
564 remove: Whether to remove the tapback
565 """
566 if not self._session:
567 return False
569 # Map emoji-style names to tapback names
570 tapback = TAPBACK_EMOJI_MAP.get(tapback, tapback)
572 if tapback not in TAPBACK_MAP:
573 logger.error(f"Invalid tapback type: {tapback}")
574 return False
576 try:
577 payload = {
578 "selectedMessageGuid": message_id,
579 "reaction": TAPBACK_MAP[tapback] + (1000 if remove else 0),
580 }
582 async with self._session.post(
583 f"{self._api_url}/api/v1/message/react",
584 json=payload,
585 ) as response:
586 return response.status in (200, 201)
588 except Exception as e:
589 logger.error(f"Failed to send tapback: {e}")
590 return False
592 async def mark_read(self, chat_id: str) -> bool:
593 """Mark chat as read."""
594 if not self._session:
595 return False
597 try:
598 async with self._session.post(
599 f"{self._api_url}/api/v1/chat/{chat_id}/read"
600 ) as response:
601 return response.status in (200, 201, 204)
603 except Exception as e:
604 logger.error(f"Failed to mark chat as read: {e}")
605 return False
607 async def create_group(
608 self,
609 participants: List[str],
610 name: Optional[str] = None,
611 ) -> Optional[str]:
612 """Create a new group chat."""
613 if not self._session:
614 return None
616 try:
617 payload = {
618 "addresses": participants,
619 }
620 if name:
621 payload["name"] = name
623 async with self._session.post(
624 f"{self._api_url}/api/v1/chat/new",
625 json=payload,
626 ) as response:
627 if response.status in (200, 201):
628 data = await response.json()
629 return data.get("data", {}).get("guid")
631 except Exception as e:
632 logger.error(f"Failed to create group: {e}")
634 return None
636 async def download_attachment(
637 self,
638 attachment_id: str,
639 destination: str,
640 ) -> bool:
641 """Download an attachment."""
642 if not self._session:
643 return False
645 try:
646 async with self._session.get(
647 f"{self._api_url}/api/v1/attachment/{attachment_id}/download"
648 ) as response:
649 if response.status == 200:
650 content = await response.read()
651 Path(destination).write_bytes(content)
652 return True
654 except Exception as e:
655 logger.error(f"Failed to download attachment: {e}")
657 return False
660def create_imessage_adapter(
661 password: str = None,
662 api_url: str = None,
663 **kwargs
664) -> IMessageAdapter:
665 """
666 Factory function to create iMessage adapter.
668 Args:
669 password: BlueBubbles server password (or set BLUEBUBBLES_PASSWORD env var)
670 api_url: BlueBubbles API URL (or set BLUEBUBBLES_URL env var)
671 **kwargs: Additional config options
673 Returns:
674 Configured IMessageAdapter
675 """
676 password = password or os.getenv("BLUEBUBBLES_PASSWORD")
677 if not password:
678 raise ValueError("BlueBubbles password required")
680 api_url = api_url or os.getenv("BLUEBUBBLES_URL", "http://localhost:1234")
682 config = ChannelConfig(
683 token=password,
684 extra={"api_url": api_url, **kwargs.get("extra", {})},
685 **{k: v for k, v in kwargs.items() if k != "extra"},
686 )
687 return IMessageAdapter(config)