Coverage for integrations / channels / extensions / instagram_adapter.py: 26.6%

338 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Instagram Direct Messages Channel Adapter 

3 

4Implements Instagram Direct Messages API via Meta Graph API. 

5Based on SantaClaw extension patterns for Instagram. 

6 

7Features: 

8- Instagram DM API (via Facebook Graph API) 

9- Message types (text, image, video, story share) 

10- Ice breakers (conversation starters) 

11- Quick replies 

12- Generic templates 

13- Typing indicators 

14- Read receipts 

15- Story mentions 

16- Comment mentions 

17- Webhook handling 

18""" 

19 

20from __future__ import annotations 

21 

22import asyncio 

23import logging 

24import os 

25import json 

26import hashlib 

27import hmac 

28from typing import Optional, List, Dict, Any, Callable 

29from datetime import datetime 

30from dataclasses import dataclass, field 

31try: 

32 import aiohttp 

33 HAS_AIOHTTP = True 

34except ImportError: 

35 HAS_AIOHTTP = False 

36 

37from ..base import ( 

38 ChannelAdapter, 

39 ChannelConfig, 

40 ChannelStatus, 

41 Message, 

42 MessageType, 

43 MediaAttachment, 

44 SendResult, 

45 ChannelConnectionError, 

46 ChannelSendError, 

47 ChannelRateLimitError, 

48) 

49 

50logger = logging.getLogger(__name__) 

51 

52 

53# Meta Graph API endpoints 

54GRAPH_API_VERSION = "v18.0" 

55GRAPH_API_BASE = f"https://graph.facebook.com/{GRAPH_API_VERSION}" 

56 

57 

58@dataclass 

59class InstagramConfig(ChannelConfig): 

60 """Instagram-specific configuration.""" 

61 page_access_token: str = "" # Facebook page token linked to Instagram 

62 app_secret: str = "" 

63 verify_token: str = "" 

64 instagram_account_id: Optional[str] = None 

65 enable_story_replies: bool = True 

66 enable_comment_replies: bool = True 

67 api_version: str = GRAPH_API_VERSION 

68 

69 

70@dataclass 

71class IceBreaker: 

72 """Ice breaker (conversation starter) configuration.""" 

73 question: str 

74 payload: str 

75 

76 def to_dict(self) -> Dict[str, str]: 

77 """Convert to API format.""" 

78 return { 

79 "question": self.question[:80], # Max 80 chars 

80 "payload": self.payload, 

81 } 

82 

83 

84@dataclass 

85class QuickReply: 

86 """Quick reply button for Instagram.""" 

87 content_type: str = "text" 

88 title: Optional[str] = None 

89 payload: Optional[str] = None 

90 

91 def to_dict(self) -> Dict[str, Any]: 

92 """Convert to API format.""" 

93 result = {"content_type": self.content_type} 

94 if self.title: 

95 result["title"] = self.title[:20] 

96 if self.payload: 

97 result["payload"] = self.payload 

98 return result 

99 

100 

101@dataclass 

102class GenericElement: 

103 """Generic template element for Instagram.""" 

104 title: str 

105 subtitle: Optional[str] = None 

106 image_url: Optional[str] = None 

107 buttons: List[Dict[str, Any]] = field(default_factory=list) 

108 

109 def to_dict(self) -> Dict[str, Any]: 

110 """Convert to API format.""" 

111 result = {"title": self.title[:80]} 

112 if self.subtitle: 

113 result["subtitle"] = self.subtitle[:80] 

114 if self.image_url: 

115 result["image_url"] = self.image_url 

116 if self.buttons: 

117 result["buttons"] = self.buttons[:3] 

118 return result 

119 

120 

121class InstagramAdapter(ChannelAdapter): 

122 """ 

123 Instagram Direct Messages adapter using Meta Graph API. 

124 

125 Note: Instagram Messaging API requires: 

126 - A Facebook Page linked to the Instagram Business/Creator account 

127 - Approved Instagram Messaging permission 

128 - Business verification for some features 

129 

130 Usage: 

131 config = InstagramConfig( 

132 page_access_token="your-page-token", 

133 app_secret="your-app-secret", 

134 verify_token="your-verify-token", 

135 ) 

136 adapter = InstagramAdapter(config) 

137 adapter.on_message(my_handler) 

138 # Use with webhook endpoint 

139 """ 

140 

141 def __init__(self, config: InstagramConfig): 

142 super().__init__(config) 

143 self.instagram_config: InstagramConfig = config 

144 self._session: Optional[aiohttp.ClientSession] = None 

145 self._postback_handlers: Dict[str, Callable] = {} 

146 self._story_handlers: List[Callable] = [] 

147 self._comment_handlers: List[Callable] = [] 

148 self._api_base: str = f"https://graph.facebook.com/{config.api_version}" 

149 

150 @property 

151 def name(self) -> str: 

152 return "instagram" 

153 

154 async def connect(self) -> bool: 

155 """Initialize Instagram API connection.""" 

156 if not self.instagram_config.page_access_token: 

157 logger.error("Instagram page access token required") 

158 return False 

159 

160 try: 

161 # Create HTTP session 

162 self._session = aiohttp.ClientSession() 

163 

164 # Get Instagram account ID 

165 account_info = await self._get_instagram_account() 

166 if not account_info: 

167 logger.error("Failed to get Instagram account info") 

168 return False 

169 

170 self.instagram_config.instagram_account_id = account_info.get("id") 

171 

172 self.status = ChannelStatus.CONNECTED 

173 username = account_info.get("username", "Unknown") 

174 logger.info(f"Instagram adapter connected as: @{username}") 

175 return True 

176 

177 except Exception as e: 

178 logger.error(f"Failed to connect to Instagram: {e}") 

179 self.status = ChannelStatus.ERROR 

180 return False 

181 

182 async def disconnect(self) -> None: 

183 """Disconnect Instagram adapter.""" 

184 if self._session: 

185 await self._session.close() 

186 self._session = None 

187 

188 self.status = ChannelStatus.DISCONNECTED 

189 

190 async def _get_instagram_account(self) -> Optional[Dict[str, Any]]: 

191 """Get Instagram Business Account info linked to the page.""" 

192 if not self._session: 

193 return None 

194 

195 try: 

196 # First get the page's Instagram account 

197 url = f"{self._api_base}/me" 

198 params = { 

199 "access_token": self.instagram_config.page_access_token, 

200 "fields": "instagram_business_account{id,username,profile_picture_url,name}", 

201 } 

202 

203 async with self._session.get(url, params=params) as response: 

204 if response.status == 200: 

205 data = await response.json() 

206 return data.get("instagram_business_account") 

207 else: 

208 data = await response.json() 

209 logger.error(f"Failed to get Instagram account: {data}") 

210 return None 

211 

212 except Exception as e: 

213 logger.error(f"Error getting Instagram account: {e}") 

214 return None 

215 

216 def verify_webhook(self, mode: str, token: str, challenge: str) -> Optional[str]: 

217 """ 

218 Verify webhook subscription request. 

219 Returns challenge string if valid, None if invalid. 

220 """ 

221 if mode == "subscribe" and token == self.instagram_config.verify_token: 

222 return challenge 

223 return None 

224 

225 def verify_signature(self, body: bytes, signature: str) -> bool: 

226 """Verify webhook request signature.""" 

227 if not self.instagram_config.app_secret: 

228 return True 

229 

230 if not signature.startswith("sha256="): 

231 return False 

232 

233 expected = "sha256=" + hmac.new( 

234 self.instagram_config.app_secret.encode('utf-8'), 

235 body, 

236 hashlib.sha256 

237 ).hexdigest() 

238 

239 return hmac.compare_digest(expected, signature) 

240 

241 async def handle_webhook(self, body: str, signature: Optional[str] = None) -> None: 

242 """ 

243 Handle incoming webhook POST request from Instagram. 

244 Should be called from your webhook endpoint. 

245 """ 

246 try: 

247 # Verify signature 

248 if signature and not self.verify_signature(body.encode('utf-8'), signature): 

249 logger.error("Invalid webhook signature") 

250 return 

251 

252 data = json.loads(body) 

253 

254 # Verify it's an Instagram webhook 

255 if data.get("object") != "instagram": 

256 return 

257 

258 # Process each entry 

259 for entry in data.get("entry", []): 

260 # Handle messaging events 

261 for messaging in entry.get("messaging", []): 

262 await self._process_messaging_event(messaging) 

263 

264 # Handle changes (comments, story mentions) 

265 for change in entry.get("changes", []): 

266 await self._process_change_event(change) 

267 

268 except Exception as e: 

269 logger.error(f"Error handling webhook: {e}") 

270 

271 async def _process_messaging_event(self, event: Dict[str, Any]) -> None: 

272 """Process a messaging event.""" 

273 if "message" in event: 

274 await self._handle_message(event) 

275 elif "postback" in event: 

276 await self._handle_postback(event) 

277 elif "read" in event: 

278 logger.debug(f"Message read: {event}") 

279 elif "reaction" in event: 

280 await self._handle_reaction(event) 

281 

282 async def _process_change_event(self, change: Dict[str, Any]) -> None: 

283 """Process a change event (comments, story mentions).""" 

284 field = change.get("field") 

285 

286 if field == "story_insights": 

287 # Story mention 

288 if self.instagram_config.enable_story_replies: 

289 await self._handle_story_mention(change.get("value", {})) 

290 elif field == "comments": 

291 # Comment on post 

292 if self.instagram_config.enable_comment_replies: 

293 await self._handle_comment(change.get("value", {})) 

294 

295 async def _handle_message(self, event: Dict[str, Any]) -> None: 

296 """Handle incoming message event.""" 

297 message = self._convert_message(event) 

298 await self._dispatch_message(message) 

299 

300 async def _handle_postback(self, event: Dict[str, Any]) -> None: 

301 """Handle postback event.""" 

302 payload = event.get("postback", {}).get("payload") 

303 sender_id = event.get("sender", {}).get("id") 

304 

305 if payload in self._postback_handlers: 

306 handler = self._postback_handlers[payload] 

307 await handler(event) 

308 else: 

309 # Convert to message 

310 message = Message( 

311 id=f"postback_{int(datetime.now().timestamp() * 1000)}", 

312 channel=self.name, 

313 sender_id=sender_id, 

314 chat_id=sender_id, 

315 text=f"[postback:{payload}]", 

316 timestamp=datetime.fromtimestamp(event.get("timestamp", 0) / 1000), 

317 is_group=False, 

318 raw={'postback': event.get('postback')}, 

319 ) 

320 await self._dispatch_message(message) 

321 

322 async def _handle_reaction(self, event: Dict[str, Any]) -> None: 

323 """Handle message reaction event.""" 

324 reaction = event.get("reaction", {}) 

325 logger.debug(f"Reaction received: {reaction}") 

326 

327 async def _handle_story_mention(self, data: Dict[str, Any]) -> None: 

328 """Handle story mention event.""" 

329 for handler in self._story_handlers: 

330 try: 

331 result = handler(data) 

332 if asyncio.iscoroutine(result): 

333 await result 

334 except Exception as e: 

335 logger.error(f"Error in story handler: {e}") 

336 

337 async def _handle_comment(self, data: Dict[str, Any]) -> None: 

338 """Handle comment event.""" 

339 for handler in self._comment_handlers: 

340 try: 

341 result = handler(data) 

342 if asyncio.iscoroutine(result): 

343 await result 

344 except Exception as e: 

345 logger.error(f"Error in comment handler: {e}") 

346 

347 def _convert_message(self, event: Dict[str, Any]) -> Message: 

348 """Convert Instagram event to unified Message format.""" 

349 sender_id = event.get("sender", {}).get("id", "") 

350 message_data = event.get("message", {}) 

351 timestamp = event.get("timestamp", int(datetime.now().timestamp() * 1000)) 

352 

353 msg_id = message_data.get("mid", "") 

354 text = message_data.get("text", "") 

355 

356 # Process attachments 

357 media = [] 

358 for attachment in message_data.get("attachments", []): 

359 att_type = attachment.get("type") 

360 payload = attachment.get("payload", {}) 

361 

362 if att_type == "image": 

363 media.append(MediaAttachment( 

364 type=MessageType.IMAGE, 

365 url=payload.get("url"), 

366 )) 

367 elif att_type == "video": 

368 media.append(MediaAttachment( 

369 type=MessageType.VIDEO, 

370 url=payload.get("url"), 

371 )) 

372 elif att_type == "audio": 

373 media.append(MediaAttachment( 

374 type=MessageType.AUDIO, 

375 url=payload.get("url"), 

376 )) 

377 elif att_type == "share": 

378 # Shared post/reel 

379 url = payload.get("url", "") 

380 text = f"[shared:{url}]" 

381 elif att_type == "story_mention": 

382 # Story mention 

383 url = payload.get("url", "") 

384 text = f"[story_mention:{url}]" 

385 elif att_type == "reel": 

386 media.append(MediaAttachment( 

387 type=MessageType.VIDEO, 

388 url=payload.get("url"), 

389 )) 

390 

391 # Handle quick reply 

392 quick_reply = message_data.get("quick_reply", {}) 

393 if quick_reply.get("payload"): 

394 text = text or f"[quick_reply:{quick_reply['payload']}]" 

395 

396 # Check if it's a story reply 

397 reply_to = message_data.get("reply_to", {}) 

398 is_story_reply = reply_to.get("story", {}).get("url") is not None 

399 

400 return Message( 

401 id=msg_id, 

402 channel=self.name, 

403 sender_id=sender_id, 

404 chat_id=sender_id, 

405 text=text, 

406 media=media, 

407 timestamp=datetime.fromtimestamp(timestamp / 1000), 

408 is_group=False, 

409 raw={ 

410 'is_story_reply': is_story_reply, 

411 'story': reply_to.get('story'), 

412 'is_deleted': message_data.get('is_deleted', False), 

413 'is_unsupported': message_data.get('is_unsupported', False), 

414 }, 

415 ) 

416 

417 async def send_message( 

418 self, 

419 chat_id: str, 

420 text: str, 

421 reply_to: Optional[str] = None, 

422 media: Optional[List[MediaAttachment]] = None, 

423 buttons: Optional[List[Dict]] = None, 

424 ) -> SendResult: 

425 """Send a message to an Instagram user.""" 

426 if not self._session: 

427 return SendResult(success=False, error="Not connected") 

428 

429 try: 

430 # Handle media 

431 if media and len(media) > 0: 

432 return await self._send_media_message(chat_id, media[0]) 

433 

434 # Build message 

435 message_data: Dict[str, Any] = {"text": text} 

436 

437 return await self._send_api_request(chat_id, message_data) 

438 

439 except Exception as e: 

440 logger.error(f"Failed to send Instagram message: {e}") 

441 return SendResult(success=False, error=str(e)) 

442 

443 async def _send_media_message( 

444 self, 

445 chat_id: str, 

446 media: MediaAttachment, 

447 ) -> SendResult: 

448 """Send a media message.""" 

449 try: 

450 # Determine attachment type 

451 if media.type == MessageType.IMAGE: 

452 message_data = { 

453 "attachment": { 

454 "type": "image", 

455 "payload": {"url": media.url} 

456 } 

457 } 

458 elif media.type in (MessageType.VIDEO, MessageType.AUDIO): 

459 message_data = { 

460 "attachment": { 

461 "type": "video", 

462 "payload": {"url": media.url} 

463 } 

464 } 

465 else: 

466 # Instagram only supports image and video 

467 return SendResult(success=False, error="Unsupported media type") 

468 

469 return await self._send_api_request(chat_id, message_data) 

470 

471 except Exception as e: 

472 logger.error(f"Failed to send media: {e}") 

473 return SendResult(success=False, error=str(e)) 

474 

475 async def _send_api_request( 

476 self, 

477 recipient_id: str, 

478 message: Dict[str, Any], 

479 ) -> SendResult: 

480 """Send a request to the Instagram Send API.""" 

481 if not self._session: 

482 return SendResult(success=False, error="Not connected") 

483 

484 try: 

485 url = f"{self._api_base}/me/messages" 

486 params = {"access_token": self.instagram_config.page_access_token} 

487 

488 payload = { 

489 "recipient": {"id": recipient_id}, 

490 "message": message, 

491 } 

492 

493 async with self._session.post(url, params=params, json=payload) as response: 

494 data = await response.json() 

495 

496 if response.status == 200: 

497 return SendResult( 

498 success=True, 

499 message_id=data.get("message_id"), 

500 ) 

501 else: 

502 error = data.get("error", {}) 

503 error_code = error.get("code") 

504 error_msg = error.get("message", "Unknown error") 

505 

506 if error_code == 613: 

507 raise ChannelRateLimitError(60) 

508 

509 # Handle 24-hour messaging window 

510 if error_code == 10: 

511 return SendResult( 

512 success=False, 

513 error="24-hour messaging window expired" 

514 ) 

515 

516 return SendResult(success=False, error=error_msg) 

517 

518 except ChannelRateLimitError: 

519 raise 

520 except Exception as e: 

521 logger.error(f"API request failed: {e}") 

522 return SendResult(success=False, error=str(e)) 

523 

524 async def edit_message( 

525 self, 

526 chat_id: str, 

527 message_id: str, 

528 text: str, 

529 buttons: Optional[List[Dict]] = None, 

530 ) -> SendResult: 

531 """Instagram doesn't support message editing.""" 

532 logger.warning("Instagram doesn't support message editing") 

533 return await self.send_message(chat_id, text) 

534 

535 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

536 """Instagram doesn't support message deletion by bots.""" 

537 logger.warning("Instagram doesn't support message deletion") 

538 return False 

539 

540 async def send_typing(self, chat_id: str) -> None: 

541 """Send typing indicator.""" 

542 if not self._session: 

543 return 

544 

545 try: 

546 url = f"{self._api_base}/me/messages" 

547 params = {"access_token": self.instagram_config.page_access_token} 

548 

549 payload = { 

550 "recipient": {"id": chat_id}, 

551 "sender_action": "typing_on", 

552 } 

553 

554 await self._session.post(url, params=params, json=payload) 

555 

556 except Exception: 

557 pass 

558 

559 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

560 """Get user profile information.""" 

561 return await self.get_user_profile(chat_id) 

562 

563 # Instagram-specific methods 

564 

565 def register_postback_handler( 

566 self, 

567 payload: str, 

568 handler: Callable[[Dict[str, Any]], Any], 

569 ) -> None: 

570 """Register a handler for postback events.""" 

571 self._postback_handlers[payload] = handler 

572 

573 def on_story_mention(self, handler: Callable[[Dict[str, Any]], Any]) -> None: 

574 """Register a handler for story mention events.""" 

575 self._story_handlers.append(handler) 

576 

577 def on_comment(self, handler: Callable[[Dict[str, Any]], Any]) -> None: 

578 """Register a handler for comment events.""" 

579 self._comment_handlers.append(handler) 

580 

581 async def get_user_profile( 

582 self, 

583 user_id: str, 

584 ) -> Optional[Dict[str, Any]]: 

585 """ 

586 Get user profile information. 

587 

588 Note: Limited fields available for Instagram users. 

589 """ 

590 if not self._session: 

591 return None 

592 

593 try: 

594 url = f"{self._api_base}/{user_id}" 

595 params = { 

596 "access_token": self.instagram_config.page_access_token, 

597 "fields": "id,name,username,profile_pic", 

598 } 

599 

600 async with self._session.get(url, params=params) as response: 

601 if response.status == 200: 

602 return await response.json() 

603 return None 

604 

605 except Exception as e: 

606 logger.error(f"Error getting user profile: {e}") 

607 return None 

608 

609 async def send_quick_replies( 

610 self, 

611 chat_id: str, 

612 text: str, 

613 quick_replies: List[QuickReply], 

614 ) -> SendResult: 

615 """Send a message with quick reply buttons.""" 

616 message_data = { 

617 "text": text, 

618 "quick_replies": [qr.to_dict() for qr in quick_replies[:13]], 

619 } 

620 

621 return await self._send_api_request(chat_id, message_data) 

622 

623 async def send_generic_template( 

624 self, 

625 chat_id: str, 

626 elements: List[GenericElement], 

627 ) -> SendResult: 

628 """Send a generic template (carousel).""" 

629 message_data = { 

630 "attachment": { 

631 "type": "template", 

632 "payload": { 

633 "template_type": "generic", 

634 "elements": [elem.to_dict() for elem in elements[:10]], 

635 } 

636 } 

637 } 

638 

639 return await self._send_api_request(chat_id, message_data) 

640 

641 async def send_heart_reaction(self, chat_id: str, message_id: str) -> bool: 

642 """Send a heart reaction to a message.""" 

643 if not self._session: 

644 return False 

645 

646 try: 

647 url = f"{self._api_base}/me/messages" 

648 params = {"access_token": self.instagram_config.page_access_token} 

649 

650 payload = { 

651 "recipient": {"id": chat_id}, 

652 "sender_action": "react", 

653 "payload": { 

654 "message_id": message_id, 

655 "reaction": "love", 

656 } 

657 } 

658 

659 async with self._session.post(url, params=params, json=payload) as response: 

660 return response.status == 200 

661 

662 except Exception as e: 

663 logger.error(f"Failed to send reaction: {e}") 

664 return False 

665 

666 async def set_ice_breakers(self, ice_breakers: List[IceBreaker]) -> bool: 

667 """ 

668 Set ice breakers (conversation starters) for the Instagram account. 

669 

670 Ice breakers appear when a user opens a new conversation. 

671 """ 

672 if not self._session or not self.instagram_config.instagram_account_id: 

673 return False 

674 

675 try: 

676 url = f"{self._api_base}/{self.instagram_config.instagram_account_id}/messenger_profile" 

677 params = {"access_token": self.instagram_config.page_access_token} 

678 

679 payload = { 

680 "ice_breakers": [ib.to_dict() for ib in ice_breakers[:4]], # Max 4 

681 } 

682 

683 async with self._session.post(url, params=params, json=payload) as response: 

684 data = await response.json() 

685 return data.get("result") == "success" 

686 

687 except Exception as e: 

688 logger.error(f"Error setting ice breakers: {e}") 

689 return False 

690 

691 async def delete_ice_breakers(self) -> bool: 

692 """Delete ice breakers.""" 

693 if not self._session or not self.instagram_config.instagram_account_id: 

694 return False 

695 

696 try: 

697 url = f"{self._api_base}/{self.instagram_config.instagram_account_id}/messenger_profile" 

698 params = {"access_token": self.instagram_config.page_access_token} 

699 

700 payload = {"fields": ["ice_breakers"]} 

701 

702 async with self._session.delete(url, params=params, json=payload) as response: 

703 data = await response.json() 

704 return data.get("result") == "success" 

705 

706 except Exception as e: 

707 logger.error(f"Error deleting ice breakers: {e}") 

708 return False 

709 

710 async def reply_to_story( 

711 self, 

712 chat_id: str, 

713 story_id: str, 

714 text: str, 

715 ) -> SendResult: 

716 """Reply to a user's story.""" 

717 message_data = { 

718 "text": text, 

719 "reply_to": { 

720 "story_id": story_id, 

721 } 

722 } 

723 

724 return await self._send_api_request(chat_id, message_data) 

725 

726 async def reply_to_comment( 

727 self, 

728 comment_id: str, 

729 text: str, 

730 ) -> SendResult: 

731 """Reply to a comment on a post.""" 

732 if not self._session: 

733 return SendResult(success=False, error="Not connected") 

734 

735 try: 

736 url = f"{self._api_base}/{comment_id}/replies" 

737 params = {"access_token": self.instagram_config.page_access_token} 

738 

739 payload = {"message": text} 

740 

741 async with self._session.post(url, params=params, json=payload) as response: 

742 data = await response.json() 

743 

744 if response.status == 200: 

745 return SendResult(success=True, message_id=data.get("id")) 

746 else: 

747 error = data.get("error", {}).get("message", "Unknown error") 

748 return SendResult(success=False, error=error) 

749 

750 except Exception as e: 

751 logger.error(f"Failed to reply to comment: {e}") 

752 return SendResult(success=False, error=str(e)) 

753 

754 async def mark_seen(self, chat_id: str) -> bool: 

755 """Mark messages as seen.""" 

756 if not self._session: 

757 return False 

758 

759 try: 

760 url = f"{self._api_base}/me/messages" 

761 params = {"access_token": self.instagram_config.page_access_token} 

762 

763 payload = { 

764 "recipient": {"id": chat_id}, 

765 "sender_action": "mark_seen", 

766 } 

767 

768 async with self._session.post(url, params=params, json=payload) as response: 

769 return response.status == 200 

770 

771 except Exception: 

772 return False 

773 

774 

775def create_instagram_adapter( 

776 page_access_token: str = None, 

777 app_secret: str = None, 

778 verify_token: str = None, 

779 **kwargs 

780) -> InstagramAdapter: 

781 """ 

782 Factory function to create Instagram adapter. 

783 

784 Args: 

785 page_access_token: Facebook page access token (or set INSTAGRAM_PAGE_TOKEN env var) 

786 app_secret: Facebook app secret (or set INSTAGRAM_APP_SECRET env var) 

787 verify_token: Webhook verification token (or set INSTAGRAM_VERIFY_TOKEN env var) 

788 **kwargs: Additional config options 

789 

790 Returns: 

791 Configured InstagramAdapter 

792 """ 

793 page_access_token = page_access_token or os.getenv("INSTAGRAM_PAGE_TOKEN") 

794 app_secret = app_secret or os.getenv("INSTAGRAM_APP_SECRET") 

795 verify_token = verify_token or os.getenv("INSTAGRAM_VERIFY_TOKEN") 

796 

797 if not page_access_token: 

798 raise ValueError("Instagram page access token required") 

799 

800 config = InstagramConfig( 

801 page_access_token=page_access_token, 

802 app_secret=app_secret or "", 

803 verify_token=verify_token or "", 

804 **kwargs 

805 ) 

806 return InstagramAdapter(config)