Coverage for integrations / channels / extensions / twitter_adapter.py: 25.2%

425 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Twitter/X Channel Adapter 

3 

4Implements Twitter/X Direct Messages and mentions handling. 

5Based on SantaClaw extension patterns for Twitter. 

6 

7Features: 

8- Direct Messages (DMs) send/receive 

9- Mention tracking and replies 

10- Media attachments 

11- Typing indicators (conversation events) 

12- Welcome messages 

13- Quick replies for DMs 

14- Account Activity API webhooks 

15- OAuth 1.0a authentication 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import logging 

22import os 

23import json 

24import hmac 

25import hashlib 

26import base64 

27import time 

28import urllib.parse 

29from typing import Optional, List, Dict, Any, Callable 

30from datetime import datetime 

31from dataclasses import dataclass, field 

32try: 

33 import aiohttp 

34 HAS_AIOHTTP = True 

35except ImportError: 

36 HAS_AIOHTTP = False 

37 

38from ..base import ( 

39 ChannelAdapter, 

40 ChannelConfig, 

41 ChannelStatus, 

42 Message, 

43 MessageType, 

44 MediaAttachment, 

45 SendResult, 

46 ChannelConnectionError, 

47 ChannelSendError, 

48 ChannelRateLimitError, 

49) 

50 

51logger = logging.getLogger(__name__) 

52 

53 

54# Twitter API endpoints 

55TWITTER_API_BASE = "https://api.twitter.com" 

56TWITTER_API_V2 = f"{TWITTER_API_BASE}/2" 

57TWITTER_API_V1 = f"{TWITTER_API_BASE}/1.1" 

58 

59# DM endpoints (v1.1 still used for DMs) 

60TWITTER_DM_SEND = f"{TWITTER_API_V1}/direct_messages/events/new.json" 

61TWITTER_DM_LIST = f"{TWITTER_API_V1}/direct_messages/events/list.json" 

62TWITTER_DM_SHOW = f"{TWITTER_API_V1}/direct_messages/events/show.json" 

63TWITTER_DM_TYPING = f"{TWITTER_API_V1}/direct_messages/indicate_typing.json" 

64TWITTER_DM_MARK_READ = f"{TWITTER_API_V1}/direct_messages/mark_read.json" 

65 

66# Tweet endpoints (v2) 

67TWITTER_TWEETS = f"{TWITTER_API_V2}/tweets" 

68TWITTER_USERS = f"{TWITTER_API_V2}/users" 

69 

70# Media endpoints 

71TWITTER_MEDIA_UPLOAD = "https://upload.twitter.com/1.1/media/upload.json" 

72 

73# Webhook endpoints 

74TWITTER_WEBHOOKS = f"{TWITTER_API_V1}/account_activity/all" 

75 

76 

77@dataclass 

78class TwitterConfig(ChannelConfig): 

79 """Twitter-specific configuration.""" 

80 consumer_key: str = "" 

81 consumer_secret: str = "" 

82 access_token: str = "" 

83 access_token_secret: str = "" 

84 bearer_token: Optional[str] = None # For app-only auth 

85 environment_name: str = "production" # For Account Activity API 

86 enable_dm: bool = True 

87 enable_mentions: bool = True 

88 enable_welcome_message: bool = False 

89 welcome_message_id: Optional[str] = None 

90 

91 

92@dataclass 

93class QuickReplyOption: 

94 """Quick reply option for DMs.""" 

95 label: str 

96 description: Optional[str] = None 

97 metadata: Optional[str] = None 

98 

99 def to_dict(self) -> Dict[str, Any]: 

100 """Convert to API format.""" 

101 result = {"label": self.label[:36]} # Max 36 chars 

102 if self.description: 

103 result["description"] = self.description[:72] # Max 72 chars 

104 if self.metadata: 

105 result["metadata"] = self.metadata[:1000] # Max 1000 chars 

106 return result 

107 

108 

109@dataclass 

110class CallToAction: 

111 """Call to action button for DMs.""" 

112 type: str = "web_url" # web_url 

113 label: str = "" 

114 url: str = "" 

115 

116 def to_dict(self) -> Dict[str, Any]: 

117 """Convert to API format.""" 

118 return { 

119 "type": self.type, 

120 "label": self.label[:36], 

121 "url": self.url, 

122 } 

123 

124 

125class TwitterOAuth: 

126 """ 

127 OAuth 1.0a helper for Twitter API authentication. 

128 """ 

129 

130 def __init__( 

131 self, 

132 consumer_key: str, 

133 consumer_secret: str, 

134 access_token: str, 

135 access_token_secret: str, 

136 ): 

137 self.consumer_key = consumer_key 

138 self.consumer_secret = consumer_secret 

139 self.access_token = access_token 

140 self.access_token_secret = access_token_secret 

141 

142 def _generate_nonce(self) -> str: 

143 """Generate OAuth nonce.""" 

144 return base64.b64encode(os.urandom(32)).decode('utf-8').replace('+', '').replace('/', '')[:32] 

145 

146 def _generate_timestamp(self) -> str: 

147 """Generate OAuth timestamp.""" 

148 return str(int(time.time())) 

149 

150 def _create_signature_base( 

151 self, 

152 method: str, 

153 url: str, 

154 params: Dict[str, str], 

155 ) -> str: 

156 """Create the signature base string.""" 

157 # Sort and encode parameters 

158 sorted_params = sorted(params.items()) 

159 encoded_params = urllib.parse.urlencode(sorted_params, safe='') 

160 

161 # Create base string 

162 base = f"{method.upper()}&{urllib.parse.quote(url, safe='')}&{urllib.parse.quote(encoded_params, safe='')}" 

163 return base 

164 

165 def _create_signature( 

166 self, 

167 method: str, 

168 url: str, 

169 params: Dict[str, str], 

170 ) -> str: 

171 """Create OAuth signature.""" 

172 base = self._create_signature_base(method, url, params) 

173 

174 # Create signing key 

175 key = f"{urllib.parse.quote(self.consumer_secret, safe='')}&{urllib.parse.quote(self.access_token_secret, safe='')}" 

176 

177 # Calculate HMAC-SHA1 

178 signature = hmac.new( 

179 key.encode('utf-8'), 

180 base.encode('utf-8'), 

181 hashlib.sha1 

182 ).digest() 

183 

184 return base64.b64encode(signature).decode('utf-8') 

185 

186 def get_auth_header( 

187 self, 

188 method: str, 

189 url: str, 

190 body_params: Optional[Dict[str, str]] = None, 

191 ) -> str: 

192 """ 

193 Generate OAuth 1.0a Authorization header. 

194 """ 

195 # OAuth parameters 

196 oauth_params = { 

197 "oauth_consumer_key": self.consumer_key, 

198 "oauth_nonce": self._generate_nonce(), 

199 "oauth_signature_method": "HMAC-SHA1", 

200 "oauth_timestamp": self._generate_timestamp(), 

201 "oauth_token": self.access_token, 

202 "oauth_version": "1.0", 

203 } 

204 

205 # Combine with body params for signature 

206 all_params = {**oauth_params} 

207 if body_params: 

208 all_params.update(body_params) 

209 

210 # Generate signature 

211 oauth_params["oauth_signature"] = self._create_signature(method, url, all_params) 

212 

213 # Build header 

214 header_params = ', '.join( 

215 f'{urllib.parse.quote(k, safe="")}="{urllib.parse.quote(v, safe="")}"' 

216 for k, v in sorted(oauth_params.items()) 

217 ) 

218 

219 return f"OAuth {header_params}" 

220 

221 

222class TwitterAdapter(ChannelAdapter): 

223 """ 

224 Twitter/X messaging adapter for DMs and mentions. 

225 

226 Usage: 

227 config = TwitterConfig( 

228 consumer_key="your-key", 

229 consumer_secret="your-secret", 

230 access_token="your-token", 

231 access_token_secret="your-token-secret", 

232 ) 

233 adapter = TwitterAdapter(config) 

234 adapter.on_message(my_handler) 

235 await adapter.start() 

236 """ 

237 

238 def __init__(self, config: TwitterConfig): 

239 super().__init__(config) 

240 self.twitter_config: TwitterConfig = config 

241 self._oauth: Optional[TwitterOAuth] = None 

242 self._session: Optional[aiohttp.ClientSession] = None 

243 self._user_id: Optional[str] = None 

244 self._username: Optional[str] = None 

245 self._mention_handlers: List[Callable] = [] 

246 self._dm_handlers: Dict[str, Callable] = {} 

247 self._user_cache: Dict[str, Dict[str, Any]] = {} 

248 

249 @property 

250 def name(self) -> str: 

251 return "twitter" 

252 

253 async def connect(self) -> bool: 

254 """Initialize Twitter API connection.""" 

255 if not all([ 

256 self.twitter_config.consumer_key, 

257 self.twitter_config.consumer_secret, 

258 self.twitter_config.access_token, 

259 self.twitter_config.access_token_secret, 

260 ]): 

261 logger.error("Twitter OAuth credentials required") 

262 return False 

263 

264 try: 

265 # Create OAuth helper 

266 self._oauth = TwitterOAuth( 

267 self.twitter_config.consumer_key, 

268 self.twitter_config.consumer_secret, 

269 self.twitter_config.access_token, 

270 self.twitter_config.access_token_secret, 

271 ) 

272 

273 # Create HTTP session 

274 self._session = aiohttp.ClientSession() 

275 

276 # Verify credentials 

277 user_info = await self._verify_credentials() 

278 if not user_info: 

279 logger.error("Failed to verify Twitter credentials") 

280 return False 

281 

282 self._user_id = user_info.get("id") 

283 self._username = user_info.get("username") 

284 

285 self.status = ChannelStatus.CONNECTED 

286 logger.info(f"Twitter adapter connected as: @{self._username}") 

287 return True 

288 

289 except Exception as e: 

290 logger.error(f"Failed to connect to Twitter: {e}") 

291 self.status = ChannelStatus.ERROR 

292 return False 

293 

294 async def disconnect(self) -> None: 

295 """Disconnect Twitter adapter.""" 

296 if self._session: 

297 await self._session.close() 

298 self._session = None 

299 

300 self._oauth = None 

301 self.status = ChannelStatus.DISCONNECTED 

302 

303 async def _verify_credentials(self) -> Optional[Dict[str, Any]]: 

304 """Verify OAuth credentials and get user info.""" 

305 if not self._session or not self._oauth: 

306 return None 

307 

308 try: 

309 url = f"{TWITTER_API_V2}/users/me" 

310 

311 auth_header = self._oauth.get_auth_header("GET", url) 

312 

313 async with self._session.get( 

314 url, 

315 headers={"Authorization": auth_header} 

316 ) as response: 

317 if response.status == 200: 

318 data = await response.json() 

319 return data.get("data") 

320 else: 

321 data = await response.json() 

322 logger.error(f"Failed to verify credentials: {data}") 

323 return None 

324 

325 except Exception as e: 

326 logger.error(f"Error verifying credentials: {e}") 

327 return None 

328 

329 def verify_webhook(self, crc_token: str) -> str: 

330 """ 

331 Generate CRC response for webhook verification. 

332 Returns the response_token to send back. 

333 """ 

334 signature = hmac.new( 

335 self.twitter_config.consumer_secret.encode('utf-8'), 

336 crc_token.encode('utf-8'), 

337 hashlib.sha256 

338 ).digest() 

339 

340 return f"sha256={base64.b64encode(signature).decode('utf-8')}" 

341 

342 def verify_signature(self, body: bytes, signature: str) -> bool: 

343 """Verify webhook request signature.""" 

344 if not signature.startswith("sha256="): 

345 return False 

346 

347 expected = hmac.new( 

348 self.twitter_config.consumer_secret.encode('utf-8'), 

349 body, 

350 hashlib.sha256 

351 ).digest() 

352 

353 expected_sig = f"sha256={base64.b64encode(expected).decode('utf-8')}" 

354 return hmac.compare_digest(expected_sig, signature) 

355 

356 async def handle_webhook(self, body: str, signature: Optional[str] = None) -> None: 

357 """ 

358 Handle incoming webhook from Twitter Account Activity API. 

359 """ 

360 try: 

361 # Verify signature 

362 if signature and not self.verify_signature(body.encode('utf-8'), signature): 

363 logger.error("Invalid webhook signature") 

364 return 

365 

366 data = json.loads(body) 

367 

368 # Get user ID for this subscription 

369 for_user_id = data.get("for_user_id") 

370 

371 # Handle direct message events 

372 if "direct_message_events" in data and self.twitter_config.enable_dm: 

373 for dm_event in data["direct_message_events"]: 

374 await self._handle_dm_event(dm_event, data.get("users", {})) 

375 

376 # Handle direct message indicate typing 

377 if "direct_message_indicate_typing_events" in data: 

378 for event in data["direct_message_indicate_typing_events"]: 

379 logger.debug(f"User typing: {event.get('sender_id')}") 

380 

381 # Handle direct message mark read 

382 if "direct_message_mark_read_events" in data: 

383 for event in data["direct_message_mark_read_events"]: 

384 logger.debug(f"Messages read by: {event.get('sender_id')}") 

385 

386 # Handle tweet create events (mentions) 

387 if "tweet_create_events" in data and self.twitter_config.enable_mentions: 

388 for tweet in data["tweet_create_events"]: 

389 await self._handle_mention(tweet, data.get("users", {})) 

390 

391 except Exception as e: 

392 logger.error(f"Error handling webhook: {e}") 

393 

394 async def _handle_dm_event( 

395 self, 

396 event: Dict[str, Any], 

397 users: Dict[str, Any], 

398 ) -> None: 

399 """Handle DM event from webhook.""" 

400 event_type = event.get("type") 

401 

402 if event_type != "message_create": 

403 return 

404 

405 message_data = event.get("message_create", {}) 

406 sender_id = message_data.get("sender_id") 

407 

408 # Ignore own messages 

409 if sender_id == self._user_id: 

410 return 

411 

412 # Convert to unified message 

413 message = self._convert_dm_message(event, users) 

414 

415 # Check for quick reply payload 

416 quick_reply = message_data.get("message_data", {}).get("quick_reply_response", {}) 

417 if quick_reply.get("metadata"): 

418 metadata = quick_reply["metadata"] 

419 if metadata in self._dm_handlers: 

420 handler = self._dm_handlers[metadata] 

421 await handler(event) 

422 return 

423 

424 await self._dispatch_message(message) 

425 

426 async def _handle_mention( 

427 self, 

428 tweet: Dict[str, Any], 

429 users: Dict[str, Any], 

430 ) -> None: 

431 """Handle mention from webhook.""" 

432 # Ignore own tweets 

433 user_data = tweet.get("user", {}) 

434 if user_data.get("id_str") == self._user_id: 

435 return 

436 

437 # Check if it mentions the bot 

438 mentions = tweet.get("entities", {}).get("user_mentions", []) 

439 is_mentioned = any(m.get("id_str") == self._user_id for m in mentions) 

440 

441 if not is_mentioned: 

442 return 

443 

444 # Dispatch to mention handlers 

445 for handler in self._mention_handlers: 

446 try: 

447 result = handler(tweet) 

448 if asyncio.iscoroutine(result): 

449 await result 

450 except Exception as e: 

451 logger.error(f"Error in mention handler: {e}") 

452 

453 # Also dispatch as regular message if no specific handlers 

454 if not self._mention_handlers: 

455 message = self._convert_tweet_message(tweet) 

456 await self._dispatch_message(message) 

457 

458 def _convert_dm_message( 

459 self, 

460 event: Dict[str, Any], 

461 users: Dict[str, Any], 

462 ) -> Message: 

463 """Convert Twitter DM event to unified Message format.""" 

464 message_create = event.get("message_create", {}) 

465 message_data = message_create.get("message_data", {}) 

466 sender_id = message_create.get("sender_id", "") 

467 target_id = message_create.get("target", {}).get("recipient_id", "") 

468 

469 # Get user info 

470 sender_info = users.get(sender_id, {}) 

471 sender_name = sender_info.get("name") or sender_info.get("screen_name", "") 

472 

473 # Cache user 

474 if sender_id: 

475 self._user_cache[sender_id] = sender_info 

476 

477 # Extract text 

478 text = message_data.get("text", "") 

479 

480 # Process attachments 

481 media = [] 

482 attachment = message_data.get("attachment", {}) 

483 if attachment: 

484 att_type = attachment.get("type") 

485 att_media = attachment.get("media", {}) 

486 

487 if att_type == "media": 

488 media_type = att_media.get("type", "photo") 

489 if media_type == "photo": 

490 media.append(MediaAttachment( 

491 type=MessageType.IMAGE, 

492 url=att_media.get("media_url_https"), 

493 )) 

494 elif media_type in ("video", "animated_gif"): 

495 # Get video URL from variants 

496 variants = att_media.get("video_info", {}).get("variants", []) 

497 video_url = next( 

498 (v.get("url") for v in variants if v.get("content_type") == "video/mp4"), 

499 None 

500 ) 

501 media.append(MediaAttachment( 

502 type=MessageType.VIDEO, 

503 url=video_url or att_media.get("media_url_https"), 

504 )) 

505 elif att_type == "location": 

506 shared_place = attachment.get("location", {}).get("shared_place", {}) 

507 name = shared_place.get("full_name", "") 

508 coords = shared_place.get("coordinates", {}).get("coordinates", []) 

509 if coords: 

510 text = f"[location:{coords[1]},{coords[0]}] {name}" 

511 

512 # Handle quick reply 

513 quick_reply = message_data.get("quick_reply_response", {}) 

514 if quick_reply.get("metadata"): 

515 text = text or f"[quick_reply:{quick_reply['metadata']}]" 

516 

517 # Timestamp 

518 created_at = int(event.get("created_timestamp", 0)) 

519 

520 return Message( 

521 id=event.get("id", ""), 

522 channel=self.name, 

523 sender_id=sender_id, 

524 sender_name=sender_name, 

525 chat_id=sender_id, # Use sender ID as chat ID for DMs 

526 text=text, 

527 media=media, 

528 timestamp=datetime.fromtimestamp(created_at / 1000) if created_at else datetime.now(), 

529 is_group=False, 

530 raw={ 

531 'type': 'dm', 

532 'target_id': target_id, 

533 'quick_reply': quick_reply.get('metadata'), 

534 }, 

535 ) 

536 

537 def _convert_tweet_message(self, tweet: Dict[str, Any]) -> Message: 

538 """Convert Twitter tweet to unified Message format.""" 

539 user_data = tweet.get("user", {}) 

540 sender_id = user_data.get("id_str", "") 

541 sender_name = user_data.get("name", "") 

542 screen_name = user_data.get("screen_name", "") 

543 

544 # Get text 

545 text = tweet.get("text", "") or tweet.get("full_text", "") 

546 

547 # Remove @mention of bot from text 

548 if self._username: 

549 text = text.replace(f"@{self._username}", "").strip() 

550 

551 # Process media 

552 media = [] 

553 entities = tweet.get("extended_entities", {}) or tweet.get("entities", {}) 

554 for entity_media in entities.get("media", []): 

555 media_type = entity_media.get("type", "photo") 

556 if media_type == "photo": 

557 media.append(MediaAttachment( 

558 type=MessageType.IMAGE, 

559 url=entity_media.get("media_url_https"), 

560 )) 

561 elif media_type in ("video", "animated_gif"): 

562 variants = entity_media.get("video_info", {}).get("variants", []) 

563 video_url = next( 

564 (v.get("url") for v in variants if v.get("content_type") == "video/mp4"), 

565 None 

566 ) 

567 media.append(MediaAttachment( 

568 type=MessageType.VIDEO, 

569 url=video_url, 

570 )) 

571 

572 return Message( 

573 id=tweet.get("id_str", ""), 

574 channel=self.name, 

575 sender_id=sender_id, 

576 sender_name=sender_name, 

577 chat_id=tweet.get("id_str", ""), # Tweet ID as chat ID 

578 text=text, 

579 media=media, 

580 timestamp=datetime.now(), # Parse created_at if needed 

581 is_group=True, # Mentions are public 

582 is_bot_mentioned=True, 

583 raw={ 

584 'type': 'mention', 

585 'screen_name': screen_name, 

586 'in_reply_to_status_id': tweet.get('in_reply_to_status_id_str'), 

587 }, 

588 ) 

589 

590 async def send_message( 

591 self, 

592 chat_id: str, 

593 text: str, 

594 reply_to: Optional[str] = None, 

595 media: Optional[List[MediaAttachment]] = None, 

596 buttons: Optional[List[Dict]] = None, 

597 ) -> SendResult: 

598 """ 

599 Send a message. 

600 

601 For DMs, chat_id is the user ID. 

602 For replies to tweets, chat_id is the tweet ID (use reply_to_tweet instead). 

603 """ 

604 if not self._session or not self._oauth: 

605 return SendResult(success=False, error="Not connected") 

606 

607 # If it looks like a tweet ID (numeric string), reply to tweet 

608 if reply_to and len(reply_to) > 15: # Tweet IDs are long 

609 return await self.reply_to_tweet(reply_to, text, media) 

610 

611 # Otherwise send DM 

612 return await self.send_dm(chat_id, text, media, buttons) 

613 

614 async def send_dm( 

615 self, 

616 user_id: str, 

617 text: str, 

618 media: Optional[List[MediaAttachment]] = None, 

619 buttons: Optional[List[Dict]] = None, 

620 ) -> SendResult: 

621 """Send a direct message.""" 

622 if not self._session or not self._oauth: 

623 return SendResult(success=False, error="Not connected") 

624 

625 try: 

626 # Build message data 

627 message_data: Dict[str, Any] = {"text": text} 

628 

629 # Handle media - need to upload first 

630 if media and len(media) > 0: 

631 media_id = await self._upload_media(media[0]) 

632 if media_id: 

633 message_data["attachment"] = { 

634 "type": "media", 

635 "media": {"id": media_id} 

636 } 

637 

638 # Handle buttons as quick replies 

639 if buttons: 

640 quick_replies = [] 

641 for btn in buttons[:3]: # Max 3 options 

642 quick_replies.append({ 

643 "label": btn.get("text", "")[:36], 

644 "metadata": btn.get("callback_data", btn.get("text", ""))[:1000], 

645 }) 

646 message_data["quick_reply"] = { 

647 "type": "options", 

648 "options": quick_replies, 

649 } 

650 

651 # Build event payload 

652 payload = { 

653 "event": { 

654 "type": "message_create", 

655 "message_create": { 

656 "target": {"recipient_id": user_id}, 

657 "message_data": message_data, 

658 } 

659 } 

660 } 

661 

662 auth_header = self._oauth.get_auth_header("POST", TWITTER_DM_SEND) 

663 

664 async with self._session.post( 

665 TWITTER_DM_SEND, 

666 headers={ 

667 "Authorization": auth_header, 

668 "Content-Type": "application/json", 

669 }, 

670 json=payload, 

671 ) as response: 

672 data = await response.json() 

673 

674 if response.status == 200: 

675 event = data.get("event", {}) 

676 return SendResult( 

677 success=True, 

678 message_id=event.get("id"), 

679 ) 

680 else: 

681 errors = data.get("errors", [{}]) 

682 error_msg = errors[0].get("message", "Unknown error") if errors else "Unknown error" 

683 

684 # Check for rate limiting 

685 if response.status == 429: 

686 raise ChannelRateLimitError(60) 

687 

688 return SendResult(success=False, error=error_msg) 

689 

690 except ChannelRateLimitError: 

691 raise 

692 except Exception as e: 

693 logger.error(f"Failed to send DM: {e}") 

694 return SendResult(success=False, error=str(e)) 

695 

696 async def _upload_media(self, media: MediaAttachment) -> Optional[str]: 

697 """Upload media and return media_id.""" 

698 if not self._session or not self._oauth: 

699 return None 

700 

701 try: 

702 # Get media data 

703 if media.file_path: 

704 with open(media.file_path, 'rb') as f: 

705 media_data = f.read() 

706 elif media.url: 

707 async with self._session.get(media.url) as resp: 

708 media_data = await resp.read() 

709 else: 

710 return None 

711 

712 # Encode as base64 

713 media_b64 = base64.b64encode(media_data).decode('utf-8') 

714 

715 # Determine media category 

716 if media.type == MessageType.IMAGE: 

717 media_category = "dm_image" 

718 elif media.type == MessageType.VIDEO: 

719 media_category = "dm_video" 

720 elif media.type == MessageType.AUDIO: 

721 media_category = "dm_video" # Twitter uses video for audio 

722 else: 

723 media_category = "dm_image" 

724 

725 # Upload 

726 form_data = { 

727 "media_data": media_b64, 

728 "media_category": media_category, 

729 } 

730 

731 auth_header = self._oauth.get_auth_header("POST", TWITTER_MEDIA_UPLOAD, form_data) 

732 

733 async with self._session.post( 

734 TWITTER_MEDIA_UPLOAD, 

735 headers={"Authorization": auth_header}, 

736 data=form_data, 

737 ) as response: 

738 if response.status == 200: 

739 data = await response.json() 

740 return data.get("media_id_string") 

741 else: 

742 data = await response.json() 

743 logger.error(f"Media upload failed: {data}") 

744 return None 

745 

746 except Exception as e: 

747 logger.error(f"Error uploading media: {e}") 

748 return None 

749 

750 async def reply_to_tweet( 

751 self, 

752 tweet_id: str, 

753 text: str, 

754 media: Optional[List[MediaAttachment]] = None, 

755 ) -> SendResult: 

756 """Reply to a tweet.""" 

757 if not self._session or not self._oauth: 

758 return SendResult(success=False, error="Not connected") 

759 

760 try: 

761 # Build tweet payload 

762 payload: Dict[str, Any] = { 

763 "text": text, 

764 "reply": { 

765 "in_reply_to_tweet_id": tweet_id, 

766 } 

767 } 

768 

769 # Handle media 

770 if media and len(media) > 0: 

771 media_id = await self._upload_media(media[0]) 

772 if media_id: 

773 payload["media"] = {"media_ids": [media_id]} 

774 

775 auth_header = self._oauth.get_auth_header("POST", TWITTER_TWEETS) 

776 

777 async with self._session.post( 

778 TWITTER_TWEETS, 

779 headers={ 

780 "Authorization": auth_header, 

781 "Content-Type": "application/json", 

782 }, 

783 json=payload, 

784 ) as response: 

785 data = await response.json() 

786 

787 if response.status in (200, 201): 

788 tweet_data = data.get("data", {}) 

789 return SendResult( 

790 success=True, 

791 message_id=tweet_data.get("id"), 

792 ) 

793 else: 

794 error = data.get("detail") or data.get("title") or "Unknown error" 

795 if response.status == 429: 

796 raise ChannelRateLimitError(60) 

797 return SendResult(success=False, error=error) 

798 

799 except ChannelRateLimitError: 

800 raise 

801 except Exception as e: 

802 logger.error(f"Failed to reply to tweet: {e}") 

803 return SendResult(success=False, error=str(e)) 

804 

805 async def edit_message( 

806 self, 

807 chat_id: str, 

808 message_id: str, 

809 text: str, 

810 buttons: Optional[List[Dict]] = None, 

811 ) -> SendResult: 

812 """Twitter doesn't support message editing for DMs.""" 

813 logger.warning("Twitter doesn't support DM editing") 

814 return await self.send_dm(chat_id, text, buttons=buttons) 

815 

816 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

817 """ 

818 Delete a DM. 

819 Note: Only deletes for the authenticated user. 

820 """ 

821 if not self._session or not self._oauth: 

822 return False 

823 

824 try: 

825 url = f"{TWITTER_API_V1}/direct_messages/events/destroy.json" 

826 params = {"id": message_id} 

827 

828 auth_header = self._oauth.get_auth_header("DELETE", url, params) 

829 

830 async with self._session.delete( 

831 url, 

832 headers={"Authorization": auth_header}, 

833 params=params, 

834 ) as response: 

835 return response.status == 204 

836 

837 except Exception as e: 

838 logger.error(f"Failed to delete message: {e}") 

839 return False 

840 

841 async def send_typing(self, chat_id: str) -> None: 

842 """Send typing indicator for DMs.""" 

843 if not self._session or not self._oauth: 

844 return 

845 

846 try: 

847 form_data = {"recipient_id": chat_id} 

848 auth_header = self._oauth.get_auth_header("POST", TWITTER_DM_TYPING, form_data) 

849 

850 await self._session.post( 

851 TWITTER_DM_TYPING, 

852 headers={"Authorization": auth_header}, 

853 data=form_data, 

854 ) 

855 except Exception: 

856 pass 

857 

858 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

859 """Get user information.""" 

860 return await self.get_user(chat_id) 

861 

862 # Twitter-specific methods 

863 

864 def on_mention(self, handler: Callable[[Dict[str, Any]], Any]) -> None: 

865 """Register a handler for mention events.""" 

866 self._mention_handlers.append(handler) 

867 

868 def register_quick_reply_handler( 

869 self, 

870 metadata: str, 

871 handler: Callable[[Dict[str, Any]], Any], 

872 ) -> None: 

873 """Register a handler for quick reply selections.""" 

874 self._dm_handlers[metadata] = handler 

875 

876 async def get_user(self, user_id: str) -> Optional[Dict[str, Any]]: 

877 """Get user information by ID.""" 

878 # Check cache 

879 if user_id in self._user_cache: 

880 return self._user_cache[user_id] 

881 

882 if not self._session or not self._oauth: 

883 return None 

884 

885 try: 

886 url = f"{TWITTER_API_V2}/users/{user_id}" 

887 params = {"user.fields": "id,name,username,profile_image_url,description"} 

888 

889 auth_header = self._oauth.get_auth_header("GET", url) 

890 

891 async with self._session.get( 

892 url, 

893 headers={"Authorization": auth_header}, 

894 params=params, 

895 ) as response: 

896 if response.status == 200: 

897 data = await response.json() 

898 user_data = data.get("data") 

899 if user_data: 

900 self._user_cache[user_id] = user_data 

901 return user_data 

902 return None 

903 

904 except Exception as e: 

905 logger.error(f"Error getting user: {e}") 

906 return None 

907 

908 async def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]: 

909 """Get user information by username.""" 

910 if not self._session or not self._oauth: 

911 return None 

912 

913 try: 

914 # Remove @ if present 

915 username = username.lstrip('@') 

916 

917 url = f"{TWITTER_API_V2}/users/by/username/{username}" 

918 params = {"user.fields": "id,name,username,profile_image_url,description"} 

919 

920 auth_header = self._oauth.get_auth_header("GET", url) 

921 

922 async with self._session.get( 

923 url, 

924 headers={"Authorization": auth_header}, 

925 params=params, 

926 ) as response: 

927 if response.status == 200: 

928 data = await response.json() 

929 return data.get("data") 

930 return None 

931 

932 except Exception as e: 

933 logger.error(f"Error getting user by username: {e}") 

934 return None 

935 

936 async def mark_read(self, user_id: str, last_read_message_id: str) -> bool: 

937 """Mark DM conversation as read.""" 

938 if not self._session or not self._oauth: 

939 return False 

940 

941 try: 

942 form_data = { 

943 "recipient_id": user_id, 

944 "last_read_event_id": last_read_message_id, 

945 } 

946 

947 auth_header = self._oauth.get_auth_header("POST", TWITTER_DM_MARK_READ, form_data) 

948 

949 async with self._session.post( 

950 TWITTER_DM_MARK_READ, 

951 headers={"Authorization": auth_header}, 

952 data=form_data, 

953 ) as response: 

954 return response.status == 204 

955 

956 except Exception as e: 

957 logger.error(f"Error marking read: {e}") 

958 return False 

959 

960 async def send_dm_with_quick_replies( 

961 self, 

962 user_id: str, 

963 text: str, 

964 options: List[QuickReplyOption], 

965 ) -> SendResult: 

966 """Send a DM with quick reply options.""" 

967 buttons = [ 

968 {"text": opt.label, "callback_data": opt.metadata or opt.label} 

969 for opt in options 

970 ] 

971 return await self.send_dm(user_id, text, buttons=buttons) 

972 

973 async def send_dm_with_cta( 

974 self, 

975 user_id: str, 

976 text: str, 

977 ctas: List[CallToAction], 

978 ) -> SendResult: 

979 """Send a DM with call-to-action buttons.""" 

980 if not self._session or not self._oauth: 

981 return SendResult(success=False, error="Not connected") 

982 

983 try: 

984 message_data = { 

985 "text": text, 

986 "ctas": [cta.to_dict() for cta in ctas[:3]], 

987 } 

988 

989 payload = { 

990 "event": { 

991 "type": "message_create", 

992 "message_create": { 

993 "target": {"recipient_id": user_id}, 

994 "message_data": message_data, 

995 } 

996 } 

997 } 

998 

999 auth_header = self._oauth.get_auth_header("POST", TWITTER_DM_SEND) 

1000 

1001 async with self._session.post( 

1002 TWITTER_DM_SEND, 

1003 headers={ 

1004 "Authorization": auth_header, 

1005 "Content-Type": "application/json", 

1006 }, 

1007 json=payload, 

1008 ) as response: 

1009 data = await response.json() 

1010 

1011 if response.status == 200: 

1012 event = data.get("event", {}) 

1013 return SendResult(success=True, message_id=event.get("id")) 

1014 else: 

1015 errors = data.get("errors", [{}]) 

1016 error_msg = errors[0].get("message", "Unknown error") if errors else "Unknown error" 

1017 return SendResult(success=False, error=error_msg) 

1018 

1019 except Exception as e: 

1020 logger.error(f"Failed to send DM with CTA: {e}") 

1021 return SendResult(success=False, error=str(e)) 

1022 

1023 

1024def create_twitter_adapter( 

1025 consumer_key: str = None, 

1026 consumer_secret: str = None, 

1027 access_token: str = None, 

1028 access_token_secret: str = None, 

1029 **kwargs 

1030) -> TwitterAdapter: 

1031 """ 

1032 Factory function to create Twitter adapter. 

1033 

1034 Args: 

1035 consumer_key: Twitter API consumer key (or set TWITTER_CONSUMER_KEY env var) 

1036 consumer_secret: Twitter API consumer secret (or set TWITTER_CONSUMER_SECRET env var) 

1037 access_token: Twitter access token (or set TWITTER_ACCESS_TOKEN env var) 

1038 access_token_secret: Twitter access token secret (or set TWITTER_ACCESS_TOKEN_SECRET env var) 

1039 **kwargs: Additional config options 

1040 

1041 Returns: 

1042 Configured TwitterAdapter 

1043 """ 

1044 consumer_key = consumer_key or os.getenv("TWITTER_CONSUMER_KEY") 

1045 consumer_secret = consumer_secret or os.getenv("TWITTER_CONSUMER_SECRET") 

1046 access_token = access_token or os.getenv("TWITTER_ACCESS_TOKEN") 

1047 access_token_secret = access_token_secret or os.getenv("TWITTER_ACCESS_TOKEN_SECRET") 

1048 

1049 if not all([consumer_key, consumer_secret, access_token, access_token_secret]): 

1050 raise ValueError("Twitter OAuth credentials required") 

1051 

1052 config = TwitterConfig( 

1053 consumer_key=consumer_key, 

1054 consumer_secret=consumer_secret, 

1055 access_token=access_token, 

1056 access_token_secret=access_token_secret, 

1057 **kwargs 

1058 ) 

1059 return TwitterAdapter(config)