Coverage for integrations / channels / extensions / twitter_adapter.py: 25.2%
425 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Twitter/X Channel Adapter
4Implements Twitter/X Direct Messages and mentions handling.
5Based on SantaClaw extension patterns for Twitter.
7Features:
8- Direct Messages (DMs) send/receive
9- Mention tracking and replies
10- Media attachments
11- Typing indicators (conversation events)
12- Welcome messages
13- Quick replies for DMs
14- Account Activity API webhooks
15- OAuth 1.0a authentication
16"""
18from __future__ import annotations
20import asyncio
21import logging
22import os
23import json
24import hmac
25import hashlib
26import base64
27import time
28import urllib.parse
29from typing import Optional, List, Dict, Any, Callable
30from datetime import datetime
31from dataclasses import dataclass, field
32try:
33 import aiohttp
34 HAS_AIOHTTP = True
35except ImportError:
36 HAS_AIOHTTP = False
38from ..base import (
39 ChannelAdapter,
40 ChannelConfig,
41 ChannelStatus,
42 Message,
43 MessageType,
44 MediaAttachment,
45 SendResult,
46 ChannelConnectionError,
47 ChannelSendError,
48 ChannelRateLimitError,
49)
51logger = logging.getLogger(__name__)
54# Twitter API endpoints
55TWITTER_API_BASE = "https://api.twitter.com"
56TWITTER_API_V2 = f"{TWITTER_API_BASE}/2"
57TWITTER_API_V1 = f"{TWITTER_API_BASE}/1.1"
59# DM endpoints (v1.1 still used for DMs)
60TWITTER_DM_SEND = f"{TWITTER_API_V1}/direct_messages/events/new.json"
61TWITTER_DM_LIST = f"{TWITTER_API_V1}/direct_messages/events/list.json"
62TWITTER_DM_SHOW = f"{TWITTER_API_V1}/direct_messages/events/show.json"
63TWITTER_DM_TYPING = f"{TWITTER_API_V1}/direct_messages/indicate_typing.json"
64TWITTER_DM_MARK_READ = f"{TWITTER_API_V1}/direct_messages/mark_read.json"
66# Tweet endpoints (v2)
67TWITTER_TWEETS = f"{TWITTER_API_V2}/tweets"
68TWITTER_USERS = f"{TWITTER_API_V2}/users"
70# Media endpoints
71TWITTER_MEDIA_UPLOAD = "https://upload.twitter.com/1.1/media/upload.json"
73# Webhook endpoints
74TWITTER_WEBHOOKS = f"{TWITTER_API_V1}/account_activity/all"
77@dataclass
78class TwitterConfig(ChannelConfig):
79 """Twitter-specific configuration."""
80 consumer_key: str = ""
81 consumer_secret: str = ""
82 access_token: str = ""
83 access_token_secret: str = ""
84 bearer_token: Optional[str] = None # For app-only auth
85 environment_name: str = "production" # For Account Activity API
86 enable_dm: bool = True
87 enable_mentions: bool = True
88 enable_welcome_message: bool = False
89 welcome_message_id: Optional[str] = None
92@dataclass
93class QuickReplyOption:
94 """Quick reply option for DMs."""
95 label: str
96 description: Optional[str] = None
97 metadata: Optional[str] = None
99 def to_dict(self) -> Dict[str, Any]:
100 """Convert to API format."""
101 result = {"label": self.label[:36]} # Max 36 chars
102 if self.description:
103 result["description"] = self.description[:72] # Max 72 chars
104 if self.metadata:
105 result["metadata"] = self.metadata[:1000] # Max 1000 chars
106 return result
109@dataclass
110class CallToAction:
111 """Call to action button for DMs."""
112 type: str = "web_url" # web_url
113 label: str = ""
114 url: str = ""
116 def to_dict(self) -> Dict[str, Any]:
117 """Convert to API format."""
118 return {
119 "type": self.type,
120 "label": self.label[:36],
121 "url": self.url,
122 }
125class TwitterOAuth:
126 """
127 OAuth 1.0a helper for Twitter API authentication.
128 """
130 def __init__(
131 self,
132 consumer_key: str,
133 consumer_secret: str,
134 access_token: str,
135 access_token_secret: str,
136 ):
137 self.consumer_key = consumer_key
138 self.consumer_secret = consumer_secret
139 self.access_token = access_token
140 self.access_token_secret = access_token_secret
142 def _generate_nonce(self) -> str:
143 """Generate OAuth nonce."""
144 return base64.b64encode(os.urandom(32)).decode('utf-8').replace('+', '').replace('/', '')[:32]
146 def _generate_timestamp(self) -> str:
147 """Generate OAuth timestamp."""
148 return str(int(time.time()))
150 def _create_signature_base(
151 self,
152 method: str,
153 url: str,
154 params: Dict[str, str],
155 ) -> str:
156 """Create the signature base string."""
157 # Sort and encode parameters
158 sorted_params = sorted(params.items())
159 encoded_params = urllib.parse.urlencode(sorted_params, safe='')
161 # Create base string
162 base = f"{method.upper()}&{urllib.parse.quote(url, safe='')}&{urllib.parse.quote(encoded_params, safe='')}"
163 return base
165 def _create_signature(
166 self,
167 method: str,
168 url: str,
169 params: Dict[str, str],
170 ) -> str:
171 """Create OAuth signature."""
172 base = self._create_signature_base(method, url, params)
174 # Create signing key
175 key = f"{urllib.parse.quote(self.consumer_secret, safe='')}&{urllib.parse.quote(self.access_token_secret, safe='')}"
177 # Calculate HMAC-SHA1
178 signature = hmac.new(
179 key.encode('utf-8'),
180 base.encode('utf-8'),
181 hashlib.sha1
182 ).digest()
184 return base64.b64encode(signature).decode('utf-8')
186 def get_auth_header(
187 self,
188 method: str,
189 url: str,
190 body_params: Optional[Dict[str, str]] = None,
191 ) -> str:
192 """
193 Generate OAuth 1.0a Authorization header.
194 """
195 # OAuth parameters
196 oauth_params = {
197 "oauth_consumer_key": self.consumer_key,
198 "oauth_nonce": self._generate_nonce(),
199 "oauth_signature_method": "HMAC-SHA1",
200 "oauth_timestamp": self._generate_timestamp(),
201 "oauth_token": self.access_token,
202 "oauth_version": "1.0",
203 }
205 # Combine with body params for signature
206 all_params = {**oauth_params}
207 if body_params:
208 all_params.update(body_params)
210 # Generate signature
211 oauth_params["oauth_signature"] = self._create_signature(method, url, all_params)
213 # Build header
214 header_params = ', '.join(
215 f'{urllib.parse.quote(k, safe="")}="{urllib.parse.quote(v, safe="")}"'
216 for k, v in sorted(oauth_params.items())
217 )
219 return f"OAuth {header_params}"
222class TwitterAdapter(ChannelAdapter):
223 """
224 Twitter/X messaging adapter for DMs and mentions.
226 Usage:
227 config = TwitterConfig(
228 consumer_key="your-key",
229 consumer_secret="your-secret",
230 access_token="your-token",
231 access_token_secret="your-token-secret",
232 )
233 adapter = TwitterAdapter(config)
234 adapter.on_message(my_handler)
235 await adapter.start()
236 """
238 def __init__(self, config: TwitterConfig):
239 super().__init__(config)
240 self.twitter_config: TwitterConfig = config
241 self._oauth: Optional[TwitterOAuth] = None
242 self._session: Optional[aiohttp.ClientSession] = None
243 self._user_id: Optional[str] = None
244 self._username: Optional[str] = None
245 self._mention_handlers: List[Callable] = []
246 self._dm_handlers: Dict[str, Callable] = {}
247 self._user_cache: Dict[str, Dict[str, Any]] = {}
249 @property
250 def name(self) -> str:
251 return "twitter"
253 async def connect(self) -> bool:
254 """Initialize Twitter API connection."""
255 if not all([
256 self.twitter_config.consumer_key,
257 self.twitter_config.consumer_secret,
258 self.twitter_config.access_token,
259 self.twitter_config.access_token_secret,
260 ]):
261 logger.error("Twitter OAuth credentials required")
262 return False
264 try:
265 # Create OAuth helper
266 self._oauth = TwitterOAuth(
267 self.twitter_config.consumer_key,
268 self.twitter_config.consumer_secret,
269 self.twitter_config.access_token,
270 self.twitter_config.access_token_secret,
271 )
273 # Create HTTP session
274 self._session = aiohttp.ClientSession()
276 # Verify credentials
277 user_info = await self._verify_credentials()
278 if not user_info:
279 logger.error("Failed to verify Twitter credentials")
280 return False
282 self._user_id = user_info.get("id")
283 self._username = user_info.get("username")
285 self.status = ChannelStatus.CONNECTED
286 logger.info(f"Twitter adapter connected as: @{self._username}")
287 return True
289 except Exception as e:
290 logger.error(f"Failed to connect to Twitter: {e}")
291 self.status = ChannelStatus.ERROR
292 return False
294 async def disconnect(self) -> None:
295 """Disconnect Twitter adapter."""
296 if self._session:
297 await self._session.close()
298 self._session = None
300 self._oauth = None
301 self.status = ChannelStatus.DISCONNECTED
303 async def _verify_credentials(self) -> Optional[Dict[str, Any]]:
304 """Verify OAuth credentials and get user info."""
305 if not self._session or not self._oauth:
306 return None
308 try:
309 url = f"{TWITTER_API_V2}/users/me"
311 auth_header = self._oauth.get_auth_header("GET", url)
313 async with self._session.get(
314 url,
315 headers={"Authorization": auth_header}
316 ) as response:
317 if response.status == 200:
318 data = await response.json()
319 return data.get("data")
320 else:
321 data = await response.json()
322 logger.error(f"Failed to verify credentials: {data}")
323 return None
325 except Exception as e:
326 logger.error(f"Error verifying credentials: {e}")
327 return None
329 def verify_webhook(self, crc_token: str) -> str:
330 """
331 Generate CRC response for webhook verification.
332 Returns the response_token to send back.
333 """
334 signature = hmac.new(
335 self.twitter_config.consumer_secret.encode('utf-8'),
336 crc_token.encode('utf-8'),
337 hashlib.sha256
338 ).digest()
340 return f"sha256={base64.b64encode(signature).decode('utf-8')}"
342 def verify_signature(self, body: bytes, signature: str) -> bool:
343 """Verify webhook request signature."""
344 if not signature.startswith("sha256="):
345 return False
347 expected = hmac.new(
348 self.twitter_config.consumer_secret.encode('utf-8'),
349 body,
350 hashlib.sha256
351 ).digest()
353 expected_sig = f"sha256={base64.b64encode(expected).decode('utf-8')}"
354 return hmac.compare_digest(expected_sig, signature)
356 async def handle_webhook(self, body: str, signature: Optional[str] = None) -> None:
357 """
358 Handle incoming webhook from Twitter Account Activity API.
359 """
360 try:
361 # Verify signature
362 if signature and not self.verify_signature(body.encode('utf-8'), signature):
363 logger.error("Invalid webhook signature")
364 return
366 data = json.loads(body)
368 # Get user ID for this subscription
369 for_user_id = data.get("for_user_id")
371 # Handle direct message events
372 if "direct_message_events" in data and self.twitter_config.enable_dm:
373 for dm_event in data["direct_message_events"]:
374 await self._handle_dm_event(dm_event, data.get("users", {}))
376 # Handle direct message indicate typing
377 if "direct_message_indicate_typing_events" in data:
378 for event in data["direct_message_indicate_typing_events"]:
379 logger.debug(f"User typing: {event.get('sender_id')}")
381 # Handle direct message mark read
382 if "direct_message_mark_read_events" in data:
383 for event in data["direct_message_mark_read_events"]:
384 logger.debug(f"Messages read by: {event.get('sender_id')}")
386 # Handle tweet create events (mentions)
387 if "tweet_create_events" in data and self.twitter_config.enable_mentions:
388 for tweet in data["tweet_create_events"]:
389 await self._handle_mention(tweet, data.get("users", {}))
391 except Exception as e:
392 logger.error(f"Error handling webhook: {e}")
394 async def _handle_dm_event(
395 self,
396 event: Dict[str, Any],
397 users: Dict[str, Any],
398 ) -> None:
399 """Handle DM event from webhook."""
400 event_type = event.get("type")
402 if event_type != "message_create":
403 return
405 message_data = event.get("message_create", {})
406 sender_id = message_data.get("sender_id")
408 # Ignore own messages
409 if sender_id == self._user_id:
410 return
412 # Convert to unified message
413 message = self._convert_dm_message(event, users)
415 # Check for quick reply payload
416 quick_reply = message_data.get("message_data", {}).get("quick_reply_response", {})
417 if quick_reply.get("metadata"):
418 metadata = quick_reply["metadata"]
419 if metadata in self._dm_handlers:
420 handler = self._dm_handlers[metadata]
421 await handler(event)
422 return
424 await self._dispatch_message(message)
426 async def _handle_mention(
427 self,
428 tweet: Dict[str, Any],
429 users: Dict[str, Any],
430 ) -> None:
431 """Handle mention from webhook."""
432 # Ignore own tweets
433 user_data = tweet.get("user", {})
434 if user_data.get("id_str") == self._user_id:
435 return
437 # Check if it mentions the bot
438 mentions = tweet.get("entities", {}).get("user_mentions", [])
439 is_mentioned = any(m.get("id_str") == self._user_id for m in mentions)
441 if not is_mentioned:
442 return
444 # Dispatch to mention handlers
445 for handler in self._mention_handlers:
446 try:
447 result = handler(tweet)
448 if asyncio.iscoroutine(result):
449 await result
450 except Exception as e:
451 logger.error(f"Error in mention handler: {e}")
453 # Also dispatch as regular message if no specific handlers
454 if not self._mention_handlers:
455 message = self._convert_tweet_message(tweet)
456 await self._dispatch_message(message)
458 def _convert_dm_message(
459 self,
460 event: Dict[str, Any],
461 users: Dict[str, Any],
462 ) -> Message:
463 """Convert Twitter DM event to unified Message format."""
464 message_create = event.get("message_create", {})
465 message_data = message_create.get("message_data", {})
466 sender_id = message_create.get("sender_id", "")
467 target_id = message_create.get("target", {}).get("recipient_id", "")
469 # Get user info
470 sender_info = users.get(sender_id, {})
471 sender_name = sender_info.get("name") or sender_info.get("screen_name", "")
473 # Cache user
474 if sender_id:
475 self._user_cache[sender_id] = sender_info
477 # Extract text
478 text = message_data.get("text", "")
480 # Process attachments
481 media = []
482 attachment = message_data.get("attachment", {})
483 if attachment:
484 att_type = attachment.get("type")
485 att_media = attachment.get("media", {})
487 if att_type == "media":
488 media_type = att_media.get("type", "photo")
489 if media_type == "photo":
490 media.append(MediaAttachment(
491 type=MessageType.IMAGE,
492 url=att_media.get("media_url_https"),
493 ))
494 elif media_type in ("video", "animated_gif"):
495 # Get video URL from variants
496 variants = att_media.get("video_info", {}).get("variants", [])
497 video_url = next(
498 (v.get("url") for v in variants if v.get("content_type") == "video/mp4"),
499 None
500 )
501 media.append(MediaAttachment(
502 type=MessageType.VIDEO,
503 url=video_url or att_media.get("media_url_https"),
504 ))
505 elif att_type == "location":
506 shared_place = attachment.get("location", {}).get("shared_place", {})
507 name = shared_place.get("full_name", "")
508 coords = shared_place.get("coordinates", {}).get("coordinates", [])
509 if coords:
510 text = f"[location:{coords[1]},{coords[0]}] {name}"
512 # Handle quick reply
513 quick_reply = message_data.get("quick_reply_response", {})
514 if quick_reply.get("metadata"):
515 text = text or f"[quick_reply:{quick_reply['metadata']}]"
517 # Timestamp
518 created_at = int(event.get("created_timestamp", 0))
520 return Message(
521 id=event.get("id", ""),
522 channel=self.name,
523 sender_id=sender_id,
524 sender_name=sender_name,
525 chat_id=sender_id, # Use sender ID as chat ID for DMs
526 text=text,
527 media=media,
528 timestamp=datetime.fromtimestamp(created_at / 1000) if created_at else datetime.now(),
529 is_group=False,
530 raw={
531 'type': 'dm',
532 'target_id': target_id,
533 'quick_reply': quick_reply.get('metadata'),
534 },
535 )
537 def _convert_tweet_message(self, tweet: Dict[str, Any]) -> Message:
538 """Convert Twitter tweet to unified Message format."""
539 user_data = tweet.get("user", {})
540 sender_id = user_data.get("id_str", "")
541 sender_name = user_data.get("name", "")
542 screen_name = user_data.get("screen_name", "")
544 # Get text
545 text = tweet.get("text", "") or tweet.get("full_text", "")
547 # Remove @mention of bot from text
548 if self._username:
549 text = text.replace(f"@{self._username}", "").strip()
551 # Process media
552 media = []
553 entities = tweet.get("extended_entities", {}) or tweet.get("entities", {})
554 for entity_media in entities.get("media", []):
555 media_type = entity_media.get("type", "photo")
556 if media_type == "photo":
557 media.append(MediaAttachment(
558 type=MessageType.IMAGE,
559 url=entity_media.get("media_url_https"),
560 ))
561 elif media_type in ("video", "animated_gif"):
562 variants = entity_media.get("video_info", {}).get("variants", [])
563 video_url = next(
564 (v.get("url") for v in variants if v.get("content_type") == "video/mp4"),
565 None
566 )
567 media.append(MediaAttachment(
568 type=MessageType.VIDEO,
569 url=video_url,
570 ))
572 return Message(
573 id=tweet.get("id_str", ""),
574 channel=self.name,
575 sender_id=sender_id,
576 sender_name=sender_name,
577 chat_id=tweet.get("id_str", ""), # Tweet ID as chat ID
578 text=text,
579 media=media,
580 timestamp=datetime.now(), # Parse created_at if needed
581 is_group=True, # Mentions are public
582 is_bot_mentioned=True,
583 raw={
584 'type': 'mention',
585 'screen_name': screen_name,
586 'in_reply_to_status_id': tweet.get('in_reply_to_status_id_str'),
587 },
588 )
590 async def send_message(
591 self,
592 chat_id: str,
593 text: str,
594 reply_to: Optional[str] = None,
595 media: Optional[List[MediaAttachment]] = None,
596 buttons: Optional[List[Dict]] = None,
597 ) -> SendResult:
598 """
599 Send a message.
601 For DMs, chat_id is the user ID.
602 For replies to tweets, chat_id is the tweet ID (use reply_to_tweet instead).
603 """
604 if not self._session or not self._oauth:
605 return SendResult(success=False, error="Not connected")
607 # If it looks like a tweet ID (numeric string), reply to tweet
608 if reply_to and len(reply_to) > 15: # Tweet IDs are long
609 return await self.reply_to_tweet(reply_to, text, media)
611 # Otherwise send DM
612 return await self.send_dm(chat_id, text, media, buttons)
614 async def send_dm(
615 self,
616 user_id: str,
617 text: str,
618 media: Optional[List[MediaAttachment]] = None,
619 buttons: Optional[List[Dict]] = None,
620 ) -> SendResult:
621 """Send a direct message."""
622 if not self._session or not self._oauth:
623 return SendResult(success=False, error="Not connected")
625 try:
626 # Build message data
627 message_data: Dict[str, Any] = {"text": text}
629 # Handle media - need to upload first
630 if media and len(media) > 0:
631 media_id = await self._upload_media(media[0])
632 if media_id:
633 message_data["attachment"] = {
634 "type": "media",
635 "media": {"id": media_id}
636 }
638 # Handle buttons as quick replies
639 if buttons:
640 quick_replies = []
641 for btn in buttons[:3]: # Max 3 options
642 quick_replies.append({
643 "label": btn.get("text", "")[:36],
644 "metadata": btn.get("callback_data", btn.get("text", ""))[:1000],
645 })
646 message_data["quick_reply"] = {
647 "type": "options",
648 "options": quick_replies,
649 }
651 # Build event payload
652 payload = {
653 "event": {
654 "type": "message_create",
655 "message_create": {
656 "target": {"recipient_id": user_id},
657 "message_data": message_data,
658 }
659 }
660 }
662 auth_header = self._oauth.get_auth_header("POST", TWITTER_DM_SEND)
664 async with self._session.post(
665 TWITTER_DM_SEND,
666 headers={
667 "Authorization": auth_header,
668 "Content-Type": "application/json",
669 },
670 json=payload,
671 ) as response:
672 data = await response.json()
674 if response.status == 200:
675 event = data.get("event", {})
676 return SendResult(
677 success=True,
678 message_id=event.get("id"),
679 )
680 else:
681 errors = data.get("errors", [{}])
682 error_msg = errors[0].get("message", "Unknown error") if errors else "Unknown error"
684 # Check for rate limiting
685 if response.status == 429:
686 raise ChannelRateLimitError(60)
688 return SendResult(success=False, error=error_msg)
690 except ChannelRateLimitError:
691 raise
692 except Exception as e:
693 logger.error(f"Failed to send DM: {e}")
694 return SendResult(success=False, error=str(e))
696 async def _upload_media(self, media: MediaAttachment) -> Optional[str]:
697 """Upload media and return media_id."""
698 if not self._session or not self._oauth:
699 return None
701 try:
702 # Get media data
703 if media.file_path:
704 with open(media.file_path, 'rb') as f:
705 media_data = f.read()
706 elif media.url:
707 async with self._session.get(media.url) as resp:
708 media_data = await resp.read()
709 else:
710 return None
712 # Encode as base64
713 media_b64 = base64.b64encode(media_data).decode('utf-8')
715 # Determine media category
716 if media.type == MessageType.IMAGE:
717 media_category = "dm_image"
718 elif media.type == MessageType.VIDEO:
719 media_category = "dm_video"
720 elif media.type == MessageType.AUDIO:
721 media_category = "dm_video" # Twitter uses video for audio
722 else:
723 media_category = "dm_image"
725 # Upload
726 form_data = {
727 "media_data": media_b64,
728 "media_category": media_category,
729 }
731 auth_header = self._oauth.get_auth_header("POST", TWITTER_MEDIA_UPLOAD, form_data)
733 async with self._session.post(
734 TWITTER_MEDIA_UPLOAD,
735 headers={"Authorization": auth_header},
736 data=form_data,
737 ) as response:
738 if response.status == 200:
739 data = await response.json()
740 return data.get("media_id_string")
741 else:
742 data = await response.json()
743 logger.error(f"Media upload failed: {data}")
744 return None
746 except Exception as e:
747 logger.error(f"Error uploading media: {e}")
748 return None
750 async def reply_to_tweet(
751 self,
752 tweet_id: str,
753 text: str,
754 media: Optional[List[MediaAttachment]] = None,
755 ) -> SendResult:
756 """Reply to a tweet."""
757 if not self._session or not self._oauth:
758 return SendResult(success=False, error="Not connected")
760 try:
761 # Build tweet payload
762 payload: Dict[str, Any] = {
763 "text": text,
764 "reply": {
765 "in_reply_to_tweet_id": tweet_id,
766 }
767 }
769 # Handle media
770 if media and len(media) > 0:
771 media_id = await self._upload_media(media[0])
772 if media_id:
773 payload["media"] = {"media_ids": [media_id]}
775 auth_header = self._oauth.get_auth_header("POST", TWITTER_TWEETS)
777 async with self._session.post(
778 TWITTER_TWEETS,
779 headers={
780 "Authorization": auth_header,
781 "Content-Type": "application/json",
782 },
783 json=payload,
784 ) as response:
785 data = await response.json()
787 if response.status in (200, 201):
788 tweet_data = data.get("data", {})
789 return SendResult(
790 success=True,
791 message_id=tweet_data.get("id"),
792 )
793 else:
794 error = data.get("detail") or data.get("title") or "Unknown error"
795 if response.status == 429:
796 raise ChannelRateLimitError(60)
797 return SendResult(success=False, error=error)
799 except ChannelRateLimitError:
800 raise
801 except Exception as e:
802 logger.error(f"Failed to reply to tweet: {e}")
803 return SendResult(success=False, error=str(e))
805 async def edit_message(
806 self,
807 chat_id: str,
808 message_id: str,
809 text: str,
810 buttons: Optional[List[Dict]] = None,
811 ) -> SendResult:
812 """Twitter doesn't support message editing for DMs."""
813 logger.warning("Twitter doesn't support DM editing")
814 return await self.send_dm(chat_id, text, buttons=buttons)
816 async def delete_message(self, chat_id: str, message_id: str) -> bool:
817 """
818 Delete a DM.
819 Note: Only deletes for the authenticated user.
820 """
821 if not self._session or not self._oauth:
822 return False
824 try:
825 url = f"{TWITTER_API_V1}/direct_messages/events/destroy.json"
826 params = {"id": message_id}
828 auth_header = self._oauth.get_auth_header("DELETE", url, params)
830 async with self._session.delete(
831 url,
832 headers={"Authorization": auth_header},
833 params=params,
834 ) as response:
835 return response.status == 204
837 except Exception as e:
838 logger.error(f"Failed to delete message: {e}")
839 return False
841 async def send_typing(self, chat_id: str) -> None:
842 """Send typing indicator for DMs."""
843 if not self._session or not self._oauth:
844 return
846 try:
847 form_data = {"recipient_id": chat_id}
848 auth_header = self._oauth.get_auth_header("POST", TWITTER_DM_TYPING, form_data)
850 await self._session.post(
851 TWITTER_DM_TYPING,
852 headers={"Authorization": auth_header},
853 data=form_data,
854 )
855 except Exception:
856 pass
858 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
859 """Get user information."""
860 return await self.get_user(chat_id)
862 # Twitter-specific methods
864 def on_mention(self, handler: Callable[[Dict[str, Any]], Any]) -> None:
865 """Register a handler for mention events."""
866 self._mention_handlers.append(handler)
868 def register_quick_reply_handler(
869 self,
870 metadata: str,
871 handler: Callable[[Dict[str, Any]], Any],
872 ) -> None:
873 """Register a handler for quick reply selections."""
874 self._dm_handlers[metadata] = handler
876 async def get_user(self, user_id: str) -> Optional[Dict[str, Any]]:
877 """Get user information by ID."""
878 # Check cache
879 if user_id in self._user_cache:
880 return self._user_cache[user_id]
882 if not self._session or not self._oauth:
883 return None
885 try:
886 url = f"{TWITTER_API_V2}/users/{user_id}"
887 params = {"user.fields": "id,name,username,profile_image_url,description"}
889 auth_header = self._oauth.get_auth_header("GET", url)
891 async with self._session.get(
892 url,
893 headers={"Authorization": auth_header},
894 params=params,
895 ) as response:
896 if response.status == 200:
897 data = await response.json()
898 user_data = data.get("data")
899 if user_data:
900 self._user_cache[user_id] = user_data
901 return user_data
902 return None
904 except Exception as e:
905 logger.error(f"Error getting user: {e}")
906 return None
908 async def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
909 """Get user information by username."""
910 if not self._session or not self._oauth:
911 return None
913 try:
914 # Remove @ if present
915 username = username.lstrip('@')
917 url = f"{TWITTER_API_V2}/users/by/username/{username}"
918 params = {"user.fields": "id,name,username,profile_image_url,description"}
920 auth_header = self._oauth.get_auth_header("GET", url)
922 async with self._session.get(
923 url,
924 headers={"Authorization": auth_header},
925 params=params,
926 ) as response:
927 if response.status == 200:
928 data = await response.json()
929 return data.get("data")
930 return None
932 except Exception as e:
933 logger.error(f"Error getting user by username: {e}")
934 return None
936 async def mark_read(self, user_id: str, last_read_message_id: str) -> bool:
937 """Mark DM conversation as read."""
938 if not self._session or not self._oauth:
939 return False
941 try:
942 form_data = {
943 "recipient_id": user_id,
944 "last_read_event_id": last_read_message_id,
945 }
947 auth_header = self._oauth.get_auth_header("POST", TWITTER_DM_MARK_READ, form_data)
949 async with self._session.post(
950 TWITTER_DM_MARK_READ,
951 headers={"Authorization": auth_header},
952 data=form_data,
953 ) as response:
954 return response.status == 204
956 except Exception as e:
957 logger.error(f"Error marking read: {e}")
958 return False
960 async def send_dm_with_quick_replies(
961 self,
962 user_id: str,
963 text: str,
964 options: List[QuickReplyOption],
965 ) -> SendResult:
966 """Send a DM with quick reply options."""
967 buttons = [
968 {"text": opt.label, "callback_data": opt.metadata or opt.label}
969 for opt in options
970 ]
971 return await self.send_dm(user_id, text, buttons=buttons)
973 async def send_dm_with_cta(
974 self,
975 user_id: str,
976 text: str,
977 ctas: List[CallToAction],
978 ) -> SendResult:
979 """Send a DM with call-to-action buttons."""
980 if not self._session or not self._oauth:
981 return SendResult(success=False, error="Not connected")
983 try:
984 message_data = {
985 "text": text,
986 "ctas": [cta.to_dict() for cta in ctas[:3]],
987 }
989 payload = {
990 "event": {
991 "type": "message_create",
992 "message_create": {
993 "target": {"recipient_id": user_id},
994 "message_data": message_data,
995 }
996 }
997 }
999 auth_header = self._oauth.get_auth_header("POST", TWITTER_DM_SEND)
1001 async with self._session.post(
1002 TWITTER_DM_SEND,
1003 headers={
1004 "Authorization": auth_header,
1005 "Content-Type": "application/json",
1006 },
1007 json=payload,
1008 ) as response:
1009 data = await response.json()
1011 if response.status == 200:
1012 event = data.get("event", {})
1013 return SendResult(success=True, message_id=event.get("id"))
1014 else:
1015 errors = data.get("errors", [{}])
1016 error_msg = errors[0].get("message", "Unknown error") if errors else "Unknown error"
1017 return SendResult(success=False, error=error_msg)
1019 except Exception as e:
1020 logger.error(f"Failed to send DM with CTA: {e}")
1021 return SendResult(success=False, error=str(e))
1024def create_twitter_adapter(
1025 consumer_key: str = None,
1026 consumer_secret: str = None,
1027 access_token: str = None,
1028 access_token_secret: str = None,
1029 **kwargs
1030) -> TwitterAdapter:
1031 """
1032 Factory function to create Twitter adapter.
1034 Args:
1035 consumer_key: Twitter API consumer key (or set TWITTER_CONSUMER_KEY env var)
1036 consumer_secret: Twitter API consumer secret (or set TWITTER_CONSUMER_SECRET env var)
1037 access_token: Twitter access token (or set TWITTER_ACCESS_TOKEN env var)
1038 access_token_secret: Twitter access token secret (or set TWITTER_ACCESS_TOKEN_SECRET env var)
1039 **kwargs: Additional config options
1041 Returns:
1042 Configured TwitterAdapter
1043 """
1044 consumer_key = consumer_key or os.getenv("TWITTER_CONSUMER_KEY")
1045 consumer_secret = consumer_secret or os.getenv("TWITTER_CONSUMER_SECRET")
1046 access_token = access_token or os.getenv("TWITTER_ACCESS_TOKEN")
1047 access_token_secret = access_token_secret or os.getenv("TWITTER_ACCESS_TOKEN_SECRET")
1049 if not all([consumer_key, consumer_secret, access_token, access_token_secret]):
1050 raise ValueError("Twitter OAuth credentials required")
1052 config = TwitterConfig(
1053 consumer_key=consumer_key,
1054 consumer_secret=consumer_secret,
1055 access_token=access_token,
1056 access_token_secret=access_token_secret,
1057 **kwargs
1058 )
1059 return TwitterAdapter(config)