Coverage for integrations / channels / extensions / instagram_adapter.py: 26.6%
338 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Instagram Direct Messages Channel Adapter
4Implements Instagram Direct Messages API via Meta Graph API.
5Based on SantaClaw extension patterns for Instagram.
7Features:
8- Instagram DM API (via Facebook Graph API)
9- Message types (text, image, video, story share)
10- Ice breakers (conversation starters)
11- Quick replies
12- Generic templates
13- Typing indicators
14- Read receipts
15- Story mentions
16- Comment mentions
17- Webhook handling
18"""
20from __future__ import annotations
22import asyncio
23import logging
24import os
25import json
26import hashlib
27import hmac
28from typing import Optional, List, Dict, Any, Callable
29from datetime import datetime
30from dataclasses import dataclass, field
31try:
32 import aiohttp
33 HAS_AIOHTTP = True
34except ImportError:
35 HAS_AIOHTTP = False
37from ..base import (
38 ChannelAdapter,
39 ChannelConfig,
40 ChannelStatus,
41 Message,
42 MessageType,
43 MediaAttachment,
44 SendResult,
45 ChannelConnectionError,
46 ChannelSendError,
47 ChannelRateLimitError,
48)
50logger = logging.getLogger(__name__)
53# Meta Graph API endpoints
54GRAPH_API_VERSION = "v18.0"
55GRAPH_API_BASE = f"https://graph.facebook.com/{GRAPH_API_VERSION}"
58@dataclass
59class InstagramConfig(ChannelConfig):
60 """Instagram-specific configuration."""
61 page_access_token: str = "" # Facebook page token linked to Instagram
62 app_secret: str = ""
63 verify_token: str = ""
64 instagram_account_id: Optional[str] = None
65 enable_story_replies: bool = True
66 enable_comment_replies: bool = True
67 api_version: str = GRAPH_API_VERSION
70@dataclass
71class IceBreaker:
72 """Ice breaker (conversation starter) configuration."""
73 question: str
74 payload: str
76 def to_dict(self) -> Dict[str, str]:
77 """Convert to API format."""
78 return {
79 "question": self.question[:80], # Max 80 chars
80 "payload": self.payload,
81 }
84@dataclass
85class QuickReply:
86 """Quick reply button for Instagram."""
87 content_type: str = "text"
88 title: Optional[str] = None
89 payload: Optional[str] = None
91 def to_dict(self) -> Dict[str, Any]:
92 """Convert to API format."""
93 result = {"content_type": self.content_type}
94 if self.title:
95 result["title"] = self.title[:20]
96 if self.payload:
97 result["payload"] = self.payload
98 return result
101@dataclass
102class GenericElement:
103 """Generic template element for Instagram."""
104 title: str
105 subtitle: Optional[str] = None
106 image_url: Optional[str] = None
107 buttons: List[Dict[str, Any]] = field(default_factory=list)
109 def to_dict(self) -> Dict[str, Any]:
110 """Convert to API format."""
111 result = {"title": self.title[:80]}
112 if self.subtitle:
113 result["subtitle"] = self.subtitle[:80]
114 if self.image_url:
115 result["image_url"] = self.image_url
116 if self.buttons:
117 result["buttons"] = self.buttons[:3]
118 return result
121class InstagramAdapter(ChannelAdapter):
122 """
123 Instagram Direct Messages adapter using Meta Graph API.
125 Note: Instagram Messaging API requires:
126 - A Facebook Page linked to the Instagram Business/Creator account
127 - Approved Instagram Messaging permission
128 - Business verification for some features
130 Usage:
131 config = InstagramConfig(
132 page_access_token="your-page-token",
133 app_secret="your-app-secret",
134 verify_token="your-verify-token",
135 )
136 adapter = InstagramAdapter(config)
137 adapter.on_message(my_handler)
138 # Use with webhook endpoint
139 """
141 def __init__(self, config: InstagramConfig):
142 super().__init__(config)
143 self.instagram_config: InstagramConfig = config
144 self._session: Optional[aiohttp.ClientSession] = None
145 self._postback_handlers: Dict[str, Callable] = {}
146 self._story_handlers: List[Callable] = []
147 self._comment_handlers: List[Callable] = []
148 self._api_base: str = f"https://graph.facebook.com/{config.api_version}"
150 @property
151 def name(self) -> str:
152 return "instagram"
154 async def connect(self) -> bool:
155 """Initialize Instagram API connection."""
156 if not self.instagram_config.page_access_token:
157 logger.error("Instagram page access token required")
158 return False
160 try:
161 # Create HTTP session
162 self._session = aiohttp.ClientSession()
164 # Get Instagram account ID
165 account_info = await self._get_instagram_account()
166 if not account_info:
167 logger.error("Failed to get Instagram account info")
168 return False
170 self.instagram_config.instagram_account_id = account_info.get("id")
172 self.status = ChannelStatus.CONNECTED
173 username = account_info.get("username", "Unknown")
174 logger.info(f"Instagram adapter connected as: @{username}")
175 return True
177 except Exception as e:
178 logger.error(f"Failed to connect to Instagram: {e}")
179 self.status = ChannelStatus.ERROR
180 return False
182 async def disconnect(self) -> None:
183 """Disconnect Instagram adapter."""
184 if self._session:
185 await self._session.close()
186 self._session = None
188 self.status = ChannelStatus.DISCONNECTED
190 async def _get_instagram_account(self) -> Optional[Dict[str, Any]]:
191 """Get Instagram Business Account info linked to the page."""
192 if not self._session:
193 return None
195 try:
196 # First get the page's Instagram account
197 url = f"{self._api_base}/me"
198 params = {
199 "access_token": self.instagram_config.page_access_token,
200 "fields": "instagram_business_account{id,username,profile_picture_url,name}",
201 }
203 async with self._session.get(url, params=params) as response:
204 if response.status == 200:
205 data = await response.json()
206 return data.get("instagram_business_account")
207 else:
208 data = await response.json()
209 logger.error(f"Failed to get Instagram account: {data}")
210 return None
212 except Exception as e:
213 logger.error(f"Error getting Instagram account: {e}")
214 return None
216 def verify_webhook(self, mode: str, token: str, challenge: str) -> Optional[str]:
217 """
218 Verify webhook subscription request.
219 Returns challenge string if valid, None if invalid.
220 """
221 if mode == "subscribe" and token == self.instagram_config.verify_token:
222 return challenge
223 return None
225 def verify_signature(self, body: bytes, signature: str) -> bool:
226 """Verify webhook request signature."""
227 if not self.instagram_config.app_secret:
228 return True
230 if not signature.startswith("sha256="):
231 return False
233 expected = "sha256=" + hmac.new(
234 self.instagram_config.app_secret.encode('utf-8'),
235 body,
236 hashlib.sha256
237 ).hexdigest()
239 return hmac.compare_digest(expected, signature)
241 async def handle_webhook(self, body: str, signature: Optional[str] = None) -> None:
242 """
243 Handle incoming webhook POST request from Instagram.
244 Should be called from your webhook endpoint.
245 """
246 try:
247 # Verify signature
248 if signature and not self.verify_signature(body.encode('utf-8'), signature):
249 logger.error("Invalid webhook signature")
250 return
252 data = json.loads(body)
254 # Verify it's an Instagram webhook
255 if data.get("object") != "instagram":
256 return
258 # Process each entry
259 for entry in data.get("entry", []):
260 # Handle messaging events
261 for messaging in entry.get("messaging", []):
262 await self._process_messaging_event(messaging)
264 # Handle changes (comments, story mentions)
265 for change in entry.get("changes", []):
266 await self._process_change_event(change)
268 except Exception as e:
269 logger.error(f"Error handling webhook: {e}")
271 async def _process_messaging_event(self, event: Dict[str, Any]) -> None:
272 """Process a messaging event."""
273 if "message" in event:
274 await self._handle_message(event)
275 elif "postback" in event:
276 await self._handle_postback(event)
277 elif "read" in event:
278 logger.debug(f"Message read: {event}")
279 elif "reaction" in event:
280 await self._handle_reaction(event)
282 async def _process_change_event(self, change: Dict[str, Any]) -> None:
283 """Process a change event (comments, story mentions)."""
284 field = change.get("field")
286 if field == "story_insights":
287 # Story mention
288 if self.instagram_config.enable_story_replies:
289 await self._handle_story_mention(change.get("value", {}))
290 elif field == "comments":
291 # Comment on post
292 if self.instagram_config.enable_comment_replies:
293 await self._handle_comment(change.get("value", {}))
295 async def _handle_message(self, event: Dict[str, Any]) -> None:
296 """Handle incoming message event."""
297 message = self._convert_message(event)
298 await self._dispatch_message(message)
300 async def _handle_postback(self, event: Dict[str, Any]) -> None:
301 """Handle postback event."""
302 payload = event.get("postback", {}).get("payload")
303 sender_id = event.get("sender", {}).get("id")
305 if payload in self._postback_handlers:
306 handler = self._postback_handlers[payload]
307 await handler(event)
308 else:
309 # Convert to message
310 message = Message(
311 id=f"postback_{int(datetime.now().timestamp() * 1000)}",
312 channel=self.name,
313 sender_id=sender_id,
314 chat_id=sender_id,
315 text=f"[postback:{payload}]",
316 timestamp=datetime.fromtimestamp(event.get("timestamp", 0) / 1000),
317 is_group=False,
318 raw={'postback': event.get('postback')},
319 )
320 await self._dispatch_message(message)
322 async def _handle_reaction(self, event: Dict[str, Any]) -> None:
323 """Handle message reaction event."""
324 reaction = event.get("reaction", {})
325 logger.debug(f"Reaction received: {reaction}")
327 async def _handle_story_mention(self, data: Dict[str, Any]) -> None:
328 """Handle story mention event."""
329 for handler in self._story_handlers:
330 try:
331 result = handler(data)
332 if asyncio.iscoroutine(result):
333 await result
334 except Exception as e:
335 logger.error(f"Error in story handler: {e}")
337 async def _handle_comment(self, data: Dict[str, Any]) -> None:
338 """Handle comment event."""
339 for handler in self._comment_handlers:
340 try:
341 result = handler(data)
342 if asyncio.iscoroutine(result):
343 await result
344 except Exception as e:
345 logger.error(f"Error in comment handler: {e}")
347 def _convert_message(self, event: Dict[str, Any]) -> Message:
348 """Convert Instagram event to unified Message format."""
349 sender_id = event.get("sender", {}).get("id", "")
350 message_data = event.get("message", {})
351 timestamp = event.get("timestamp", int(datetime.now().timestamp() * 1000))
353 msg_id = message_data.get("mid", "")
354 text = message_data.get("text", "")
356 # Process attachments
357 media = []
358 for attachment in message_data.get("attachments", []):
359 att_type = attachment.get("type")
360 payload = attachment.get("payload", {})
362 if att_type == "image":
363 media.append(MediaAttachment(
364 type=MessageType.IMAGE,
365 url=payload.get("url"),
366 ))
367 elif att_type == "video":
368 media.append(MediaAttachment(
369 type=MessageType.VIDEO,
370 url=payload.get("url"),
371 ))
372 elif att_type == "audio":
373 media.append(MediaAttachment(
374 type=MessageType.AUDIO,
375 url=payload.get("url"),
376 ))
377 elif att_type == "share":
378 # Shared post/reel
379 url = payload.get("url", "")
380 text = f"[shared:{url}]"
381 elif att_type == "story_mention":
382 # Story mention
383 url = payload.get("url", "")
384 text = f"[story_mention:{url}]"
385 elif att_type == "reel":
386 media.append(MediaAttachment(
387 type=MessageType.VIDEO,
388 url=payload.get("url"),
389 ))
391 # Handle quick reply
392 quick_reply = message_data.get("quick_reply", {})
393 if quick_reply.get("payload"):
394 text = text or f"[quick_reply:{quick_reply['payload']}]"
396 # Check if it's a story reply
397 reply_to = message_data.get("reply_to", {})
398 is_story_reply = reply_to.get("story", {}).get("url") is not None
400 return Message(
401 id=msg_id,
402 channel=self.name,
403 sender_id=sender_id,
404 chat_id=sender_id,
405 text=text,
406 media=media,
407 timestamp=datetime.fromtimestamp(timestamp / 1000),
408 is_group=False,
409 raw={
410 'is_story_reply': is_story_reply,
411 'story': reply_to.get('story'),
412 'is_deleted': message_data.get('is_deleted', False),
413 'is_unsupported': message_data.get('is_unsupported', False),
414 },
415 )
417 async def send_message(
418 self,
419 chat_id: str,
420 text: str,
421 reply_to: Optional[str] = None,
422 media: Optional[List[MediaAttachment]] = None,
423 buttons: Optional[List[Dict]] = None,
424 ) -> SendResult:
425 """Send a message to an Instagram user."""
426 if not self._session:
427 return SendResult(success=False, error="Not connected")
429 try:
430 # Handle media
431 if media and len(media) > 0:
432 return await self._send_media_message(chat_id, media[0])
434 # Build message
435 message_data: Dict[str, Any] = {"text": text}
437 return await self._send_api_request(chat_id, message_data)
439 except Exception as e:
440 logger.error(f"Failed to send Instagram message: {e}")
441 return SendResult(success=False, error=str(e))
443 async def _send_media_message(
444 self,
445 chat_id: str,
446 media: MediaAttachment,
447 ) -> SendResult:
448 """Send a media message."""
449 try:
450 # Determine attachment type
451 if media.type == MessageType.IMAGE:
452 message_data = {
453 "attachment": {
454 "type": "image",
455 "payload": {"url": media.url}
456 }
457 }
458 elif media.type in (MessageType.VIDEO, MessageType.AUDIO):
459 message_data = {
460 "attachment": {
461 "type": "video",
462 "payload": {"url": media.url}
463 }
464 }
465 else:
466 # Instagram only supports image and video
467 return SendResult(success=False, error="Unsupported media type")
469 return await self._send_api_request(chat_id, message_data)
471 except Exception as e:
472 logger.error(f"Failed to send media: {e}")
473 return SendResult(success=False, error=str(e))
475 async def _send_api_request(
476 self,
477 recipient_id: str,
478 message: Dict[str, Any],
479 ) -> SendResult:
480 """Send a request to the Instagram Send API."""
481 if not self._session:
482 return SendResult(success=False, error="Not connected")
484 try:
485 url = f"{self._api_base}/me/messages"
486 params = {"access_token": self.instagram_config.page_access_token}
488 payload = {
489 "recipient": {"id": recipient_id},
490 "message": message,
491 }
493 async with self._session.post(url, params=params, json=payload) as response:
494 data = await response.json()
496 if response.status == 200:
497 return SendResult(
498 success=True,
499 message_id=data.get("message_id"),
500 )
501 else:
502 error = data.get("error", {})
503 error_code = error.get("code")
504 error_msg = error.get("message", "Unknown error")
506 if error_code == 613:
507 raise ChannelRateLimitError(60)
509 # Handle 24-hour messaging window
510 if error_code == 10:
511 return SendResult(
512 success=False,
513 error="24-hour messaging window expired"
514 )
516 return SendResult(success=False, error=error_msg)
518 except ChannelRateLimitError:
519 raise
520 except Exception as e:
521 logger.error(f"API request failed: {e}")
522 return SendResult(success=False, error=str(e))
524 async def edit_message(
525 self,
526 chat_id: str,
527 message_id: str,
528 text: str,
529 buttons: Optional[List[Dict]] = None,
530 ) -> SendResult:
531 """Instagram doesn't support message editing."""
532 logger.warning("Instagram doesn't support message editing")
533 return await self.send_message(chat_id, text)
535 async def delete_message(self, chat_id: str, message_id: str) -> bool:
536 """Instagram doesn't support message deletion by bots."""
537 logger.warning("Instagram doesn't support message deletion")
538 return False
540 async def send_typing(self, chat_id: str) -> None:
541 """Send typing indicator."""
542 if not self._session:
543 return
545 try:
546 url = f"{self._api_base}/me/messages"
547 params = {"access_token": self.instagram_config.page_access_token}
549 payload = {
550 "recipient": {"id": chat_id},
551 "sender_action": "typing_on",
552 }
554 await self._session.post(url, params=params, json=payload)
556 except Exception:
557 pass
559 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
560 """Get user profile information."""
561 return await self.get_user_profile(chat_id)
563 # Instagram-specific methods
565 def register_postback_handler(
566 self,
567 payload: str,
568 handler: Callable[[Dict[str, Any]], Any],
569 ) -> None:
570 """Register a handler for postback events."""
571 self._postback_handlers[payload] = handler
573 def on_story_mention(self, handler: Callable[[Dict[str, Any]], Any]) -> None:
574 """Register a handler for story mention events."""
575 self._story_handlers.append(handler)
577 def on_comment(self, handler: Callable[[Dict[str, Any]], Any]) -> None:
578 """Register a handler for comment events."""
579 self._comment_handlers.append(handler)
581 async def get_user_profile(
582 self,
583 user_id: str,
584 ) -> Optional[Dict[str, Any]]:
585 """
586 Get user profile information.
588 Note: Limited fields available for Instagram users.
589 """
590 if not self._session:
591 return None
593 try:
594 url = f"{self._api_base}/{user_id}"
595 params = {
596 "access_token": self.instagram_config.page_access_token,
597 "fields": "id,name,username,profile_pic",
598 }
600 async with self._session.get(url, params=params) as response:
601 if response.status == 200:
602 return await response.json()
603 return None
605 except Exception as e:
606 logger.error(f"Error getting user profile: {e}")
607 return None
609 async def send_quick_replies(
610 self,
611 chat_id: str,
612 text: str,
613 quick_replies: List[QuickReply],
614 ) -> SendResult:
615 """Send a message with quick reply buttons."""
616 message_data = {
617 "text": text,
618 "quick_replies": [qr.to_dict() for qr in quick_replies[:13]],
619 }
621 return await self._send_api_request(chat_id, message_data)
623 async def send_generic_template(
624 self,
625 chat_id: str,
626 elements: List[GenericElement],
627 ) -> SendResult:
628 """Send a generic template (carousel)."""
629 message_data = {
630 "attachment": {
631 "type": "template",
632 "payload": {
633 "template_type": "generic",
634 "elements": [elem.to_dict() for elem in elements[:10]],
635 }
636 }
637 }
639 return await self._send_api_request(chat_id, message_data)
641 async def send_heart_reaction(self, chat_id: str, message_id: str) -> bool:
642 """Send a heart reaction to a message."""
643 if not self._session:
644 return False
646 try:
647 url = f"{self._api_base}/me/messages"
648 params = {"access_token": self.instagram_config.page_access_token}
650 payload = {
651 "recipient": {"id": chat_id},
652 "sender_action": "react",
653 "payload": {
654 "message_id": message_id,
655 "reaction": "love",
656 }
657 }
659 async with self._session.post(url, params=params, json=payload) as response:
660 return response.status == 200
662 except Exception as e:
663 logger.error(f"Failed to send reaction: {e}")
664 return False
666 async def set_ice_breakers(self, ice_breakers: List[IceBreaker]) -> bool:
667 """
668 Set ice breakers (conversation starters) for the Instagram account.
670 Ice breakers appear when a user opens a new conversation.
671 """
672 if not self._session or not self.instagram_config.instagram_account_id:
673 return False
675 try:
676 url = f"{self._api_base}/{self.instagram_config.instagram_account_id}/messenger_profile"
677 params = {"access_token": self.instagram_config.page_access_token}
679 payload = {
680 "ice_breakers": [ib.to_dict() for ib in ice_breakers[:4]], # Max 4
681 }
683 async with self._session.post(url, params=params, json=payload) as response:
684 data = await response.json()
685 return data.get("result") == "success"
687 except Exception as e:
688 logger.error(f"Error setting ice breakers: {e}")
689 return False
691 async def delete_ice_breakers(self) -> bool:
692 """Delete ice breakers."""
693 if not self._session or not self.instagram_config.instagram_account_id:
694 return False
696 try:
697 url = f"{self._api_base}/{self.instagram_config.instagram_account_id}/messenger_profile"
698 params = {"access_token": self.instagram_config.page_access_token}
700 payload = {"fields": ["ice_breakers"]}
702 async with self._session.delete(url, params=params, json=payload) as response:
703 data = await response.json()
704 return data.get("result") == "success"
706 except Exception as e:
707 logger.error(f"Error deleting ice breakers: {e}")
708 return False
710 async def reply_to_story(
711 self,
712 chat_id: str,
713 story_id: str,
714 text: str,
715 ) -> SendResult:
716 """Reply to a user's story."""
717 message_data = {
718 "text": text,
719 "reply_to": {
720 "story_id": story_id,
721 }
722 }
724 return await self._send_api_request(chat_id, message_data)
726 async def reply_to_comment(
727 self,
728 comment_id: str,
729 text: str,
730 ) -> SendResult:
731 """Reply to a comment on a post."""
732 if not self._session:
733 return SendResult(success=False, error="Not connected")
735 try:
736 url = f"{self._api_base}/{comment_id}/replies"
737 params = {"access_token": self.instagram_config.page_access_token}
739 payload = {"message": text}
741 async with self._session.post(url, params=params, json=payload) as response:
742 data = await response.json()
744 if response.status == 200:
745 return SendResult(success=True, message_id=data.get("id"))
746 else:
747 error = data.get("error", {}).get("message", "Unknown error")
748 return SendResult(success=False, error=error)
750 except Exception as e:
751 logger.error(f"Failed to reply to comment: {e}")
752 return SendResult(success=False, error=str(e))
754 async def mark_seen(self, chat_id: str) -> bool:
755 """Mark messages as seen."""
756 if not self._session:
757 return False
759 try:
760 url = f"{self._api_base}/me/messages"
761 params = {"access_token": self.instagram_config.page_access_token}
763 payload = {
764 "recipient": {"id": chat_id},
765 "sender_action": "mark_seen",
766 }
768 async with self._session.post(url, params=params, json=payload) as response:
769 return response.status == 200
771 except Exception:
772 return False
775def create_instagram_adapter(
776 page_access_token: str = None,
777 app_secret: str = None,
778 verify_token: str = None,
779 **kwargs
780) -> InstagramAdapter:
781 """
782 Factory function to create Instagram adapter.
784 Args:
785 page_access_token: Facebook page access token (or set INSTAGRAM_PAGE_TOKEN env var)
786 app_secret: Facebook app secret (or set INSTAGRAM_APP_SECRET env var)
787 verify_token: Webhook verification token (or set INSTAGRAM_VERIFY_TOKEN env var)
788 **kwargs: Additional config options
790 Returns:
791 Configured InstagramAdapter
792 """
793 page_access_token = page_access_token or os.getenv("INSTAGRAM_PAGE_TOKEN")
794 app_secret = app_secret or os.getenv("INSTAGRAM_APP_SECRET")
795 verify_token = verify_token or os.getenv("INSTAGRAM_VERIFY_TOKEN")
797 if not page_access_token:
798 raise ValueError("Instagram page access token required")
800 config = InstagramConfig(
801 page_access_token=page_access_token,
802 app_secret=app_secret or "",
803 verify_token=verify_token or "",
804 **kwargs
805 )
806 return InstagramAdapter(config)