Coverage for integrations / channels / extensions / twitch_adapter.py: 25.0%
520 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Twitch Channel Adapter
4Implements Twitch chat integration with IRC and Helix API.
5Based on HevolveBot extension patterns for Twitch.
7Features:
8- IRC chat integration (TMI.js compatible)
9- Whispers (private messages)
10- Chat commands with prefix support
11- Bits/Cheers events
12- Channel point redemptions
13- Emote handling
14- Subscriber detection
15- VIP/Moderator detection
16- Raid/Host events
17- EventSub webhooks
18- Reconnection with exponential backoff
19"""
21from __future__ import annotations
23import asyncio
24import logging
25import os
26import json
27import re
28import ssl
29import time
30from typing import Optional, List, Dict, Any, Callable, Set
31from datetime import datetime
32from dataclasses import dataclass, field
33from enum import Enum
35try:
36 import aiohttp
37 import websockets
38 HAS_TWITCH = True
39except ImportError:
40 HAS_TWITCH = False
42from ..base import (
43 ChannelAdapter,
44 ChannelConfig,
45 ChannelStatus,
46 Message,
47 MessageType,
48 MediaAttachment,
49 SendResult,
50 ChannelConnectionError,
51 ChannelSendError,
52 ChannelRateLimitError,
53)
55logger = logging.getLogger(__name__)
57# Twitch IRC server
58TWITCH_IRC_URL = "wss://irc-ws.chat.twitch.tv:443"
59TWITCH_IRC_HOST = "irc.chat.twitch.tv"
60TWITCH_IRC_PORT = 6697
62# Twitch Helix API
63TWITCH_HELIX_URL = "https://api.twitch.tv/helix"
64TWITCH_AUTH_URL = "https://id.twitch.tv/oauth2/token"
67class TwitchUserType(Enum):
68 """Twitch user type."""
69 NORMAL = "normal"
70 VIP = "vip"
71 MODERATOR = "mod"
72 BROADCASTER = "broadcaster"
73 SUBSCRIBER = "subscriber"
76@dataclass
77class TwitchConfig(ChannelConfig):
78 """Twitch-specific configuration."""
79 client_id: str = ""
80 client_secret: str = ""
81 access_token: str = ""
82 refresh_token: str = ""
83 bot_username: str = ""
84 channels: List[str] = field(default_factory=list)
85 command_prefix: str = "!"
86 enable_whispers: bool = True
87 enable_bits: bool = True
88 enable_channel_points: bool = True
89 enable_eventsub: bool = False
90 eventsub_callback_url: str = ""
91 eventsub_secret: str = ""
92 reconnect_attempts: int = 5
93 reconnect_delay: float = 1.0
96@dataclass
97class TwitchUser:
98 """Twitch user information."""
99 id: str
100 login: str
101 display_name: str
102 user_type: TwitchUserType = TwitchUserType.NORMAL
103 is_subscriber: bool = False
104 is_mod: bool = False
105 is_vip: bool = False
106 is_broadcaster: bool = False
107 badges: Dict[str, str] = field(default_factory=dict)
108 color: Optional[str] = None
111@dataclass
112class TwitchBitsEvent:
113 """Bits/Cheer event."""
114 user: TwitchUser
115 channel: str
116 bits: int
117 message: str
118 timestamp: datetime
121@dataclass
122class TwitchRedemptionEvent:
123 """Channel point redemption event."""
124 user: TwitchUser
125 channel: str
126 reward_id: str
127 reward_title: str
128 user_input: Optional[str] = None
129 timestamp: datetime = field(default_factory=datetime.now)
132class TwitchAdapter(ChannelAdapter):
133 """
134 Twitch chat adapter with IRC and Helix API integration.
136 Usage:
137 config = TwitchConfig(
138 client_id="your-client-id",
139 client_secret="your-client-secret",
140 access_token="your-oauth-token",
141 bot_username="your_bot",
142 channels=["channel1", "channel2"],
143 )
144 adapter = TwitchAdapter(config)
145 adapter.on_message(my_handler)
146 await adapter.start()
147 """
149 def __init__(self, config: TwitchConfig):
150 if not HAS_TWITCH:
151 raise ImportError(
152 "websockets and aiohttp not installed. "
153 "Install with: pip install websockets aiohttp"
154 )
156 super().__init__(config)
157 self.twitch_config: TwitchConfig = config
158 self._ws: Optional[websockets.WebSocketClientProtocol] = None
159 self._session: Optional[aiohttp.ClientSession] = None
160 self._read_task: Optional[asyncio.Task] = None
161 self._ping_task: Optional[asyncio.Task] = None
162 self._joined_channels: Set[str] = set()
163 self._command_handlers: Dict[str, Callable] = {}
164 self._bits_handlers: List[Callable] = []
165 self._redemption_handlers: List[Callable] = []
166 self._reconnect_count: int = 0
167 self._last_message_time: Dict[str, float] = {}
168 self._user_cache: Dict[str, TwitchUser] = {}
170 @property
171 def name(self) -> str:
172 return "twitch"
174 async def connect(self) -> bool:
175 """Connect to Twitch IRC."""
176 if not self.twitch_config.access_token:
177 logger.error("Twitch access token required")
178 return False
180 if not self.twitch_config.bot_username:
181 logger.error("Twitch bot username required")
182 return False
184 try:
185 # Create aiohttp session for API calls
186 self._session = aiohttp.ClientSession()
188 # Connect to IRC
189 self._ws = await websockets.connect(
190 TWITCH_IRC_URL,
191 ssl=ssl.create_default_context(),
192 )
194 # Authenticate
195 await self._authenticate()
197 # Request capabilities
198 await self._request_capabilities()
200 # Join channels
201 for channel in self.twitch_config.channels:
202 await self.join_channel(channel)
204 # Start read loop
205 self._read_task = asyncio.create_task(self._read_loop())
207 # Start ping loop
208 self._ping_task = asyncio.create_task(self._ping_loop())
210 self.status = ChannelStatus.CONNECTED
211 self._reconnect_count = 0
212 logger.info(f"Twitch connected as {self.twitch_config.bot_username}")
213 return True
215 except Exception as e:
216 logger.error(f"Failed to connect to Twitch: {e}")
217 self.status = ChannelStatus.ERROR
218 return False
220 async def disconnect(self) -> None:
221 """Disconnect from Twitch IRC."""
222 if self._read_task:
223 self._read_task.cancel()
224 try:
225 await self._read_task
226 except asyncio.CancelledError:
227 pass
229 if self._ping_task:
230 self._ping_task.cancel()
231 try:
232 await self._ping_task
233 except asyncio.CancelledError:
234 pass
236 if self._ws:
237 await self._ws.close()
238 self._ws = None
240 if self._session:
241 await self._session.close()
242 self._session = None
244 self._joined_channels.clear()
245 self.status = ChannelStatus.DISCONNECTED
247 async def _authenticate(self) -> None:
248 """Authenticate with Twitch IRC."""
249 if not self._ws:
250 return
252 # OAuth token format
253 oauth_token = self.twitch_config.access_token
254 if not oauth_token.startswith("oauth:"):
255 oauth_token = f"oauth:{oauth_token}"
257 await self._ws.send(f"PASS {oauth_token}")
258 await self._ws.send(f"NICK {self.twitch_config.bot_username}")
260 async def _request_capabilities(self) -> None:
261 """Request Twitch IRC capabilities."""
262 if not self._ws:
263 return
265 # Request tags, commands, and membership capabilities
266 await self._ws.send("CAP REQ :twitch.tv/tags twitch.tv/commands twitch.tv/membership")
268 async def join_channel(self, channel: str) -> bool:
269 """Join a Twitch channel."""
270 if not self._ws:
271 return False
273 channel = channel.lower()
274 if not channel.startswith("#"):
275 channel = f"#{channel}"
277 try:
278 await self._ws.send(f"JOIN {channel}")
279 self._joined_channels.add(channel)
280 logger.info(f"Joined Twitch channel: {channel}")
281 return True
282 except Exception as e:
283 logger.error(f"Failed to join channel {channel}: {e}")
284 return False
286 async def leave_channel(self, channel: str) -> bool:
287 """Leave a Twitch channel."""
288 if not self._ws:
289 return False
291 channel = channel.lower()
292 if not channel.startswith("#"):
293 channel = f"#{channel}"
295 try:
296 await self._ws.send(f"PART {channel}")
297 self._joined_channels.discard(channel)
298 logger.info(f"Left Twitch channel: {channel}")
299 return True
300 except Exception as e:
301 logger.error(f"Failed to leave channel {channel}: {e}")
302 return False
304 async def _read_loop(self) -> None:
305 """Read messages from IRC connection."""
306 while self._ws and self.status == ChannelStatus.CONNECTED:
307 try:
308 raw_message = await self._ws.recv()
309 await self._handle_raw_message(raw_message)
311 except websockets.ConnectionClosed:
312 logger.warning("Twitch IRC connection closed")
313 await self._handle_disconnect()
314 break
316 except asyncio.CancelledError:
317 break
319 except Exception as e:
320 logger.error(f"Error reading Twitch message: {e}")
322 async def _ping_loop(self) -> None:
323 """Send periodic PINGs to keep connection alive."""
324 while self._ws and self.status == ChannelStatus.CONNECTED:
325 try:
326 await asyncio.sleep(60)
327 if self._ws:
328 await self._ws.send("PING :tmi.twitch.tv")
329 except asyncio.CancelledError:
330 break
331 except Exception as e:
332 logger.error(f"Ping error: {e}")
334 async def _handle_disconnect(self) -> None:
335 """Handle disconnection with reconnection logic."""
336 if self._reconnect_count < self.twitch_config.reconnect_attempts:
337 self._reconnect_count += 1
338 delay = self.twitch_config.reconnect_delay * (2 ** (self._reconnect_count - 1))
339 logger.info(f"Reconnecting to Twitch in {delay}s (attempt {self._reconnect_count})")
341 await asyncio.sleep(delay)
342 await self.connect()
343 else:
344 self.status = ChannelStatus.ERROR
345 logger.error("Max reconnection attempts reached")
347 async def _handle_raw_message(self, raw: str) -> None:
348 """Parse and handle raw IRC message."""
349 for line in raw.strip().split("\r\n"):
350 if not line:
351 continue
353 # Handle PING
354 if line.startswith("PING"):
355 await self._ws.send(line.replace("PING", "PONG"))
356 continue
358 # Parse IRC message
359 parsed = self._parse_irc_message(line)
360 if not parsed:
361 continue
363 command = parsed.get("command")
365 if command == "PRIVMSG":
366 await self._handle_privmsg(parsed)
367 elif command == "WHISPER":
368 await self._handle_whisper(parsed)
369 elif command == "USERNOTICE":
370 await self._handle_usernotice(parsed)
371 elif command == "CLEARCHAT":
372 await self._handle_clearchat(parsed)
373 elif command == "CLEARMSG":
374 await self._handle_clearmsg(parsed)
376 def _parse_irc_message(self, raw: str) -> Optional[Dict[str, Any]]:
377 """Parse IRC message into components."""
378 tags = {}
379 prefix = None
380 command = None
381 params = []
383 idx = 0
385 # Parse tags
386 if raw.startswith("@"):
387 space_idx = raw.index(" ")
388 tag_str = raw[1:space_idx]
389 for tag in tag_str.split(";"):
390 if "=" in tag:
391 key, value = tag.split("=", 1)
392 tags[key] = value.replace("\\s", " ").replace("\\n", "\n")
393 else:
394 tags[tag] = True
395 idx = space_idx + 1
397 # Parse prefix
398 if raw[idx] == ":":
399 space_idx = raw.index(" ", idx)
400 prefix = raw[idx + 1:space_idx]
401 idx = space_idx + 1
403 # Parse command
404 try:
405 space_idx = raw.index(" ", idx)
406 command = raw[idx:space_idx]
407 idx = space_idx + 1
408 except ValueError:
409 command = raw[idx:]
410 return {"tags": tags, "prefix": prefix, "command": command, "params": []}
412 # Parse params
413 while idx < len(raw):
414 if raw[idx] == ":":
415 params.append(raw[idx + 1:])
416 break
417 else:
418 try:
419 space_idx = raw.index(" ", idx)
420 params.append(raw[idx:space_idx])
421 idx = space_idx + 1
422 except ValueError:
423 params.append(raw[idx:])
424 break
426 return {
427 "tags": tags,
428 "prefix": prefix,
429 "command": command,
430 "params": params,
431 }
433 async def _handle_privmsg(self, parsed: Dict[str, Any]) -> None:
434 """Handle PRIVMSG (chat message)."""
435 tags = parsed["tags"]
436 prefix = parsed["prefix"]
437 params = parsed["params"]
439 if len(params) < 2:
440 return
442 channel = params[0]
443 text = params[1]
445 # Extract user info
446 username = prefix.split("!")[0] if "!" in prefix else prefix
447 user = self._parse_user_from_tags(tags, username)
449 # Check for bits
450 if "bits" in tags and self.twitch_config.enable_bits:
451 bits = int(tags["bits"])
452 await self._handle_bits(user, channel, bits, text)
454 # Create message
455 message = Message(
456 id=tags.get("id", str(int(time.time() * 1000))),
457 channel=self.name,
458 sender_id=tags.get("user-id", username),
459 sender_name=user.display_name,
460 chat_id=channel,
461 text=text,
462 timestamp=datetime.now(),
463 is_group=True,
464 is_bot_mentioned=self.twitch_config.bot_username.lower() in text.lower(),
465 raw={
466 "tags": tags,
467 "user": user,
468 "emotes": tags.get("emotes", ""),
469 },
470 )
472 # Check for command
473 if text.startswith(self.twitch_config.command_prefix):
474 await self._handle_command(message, user)
475 else:
476 await self._dispatch_message(message)
478 async def _handle_whisper(self, parsed: Dict[str, Any]) -> None:
479 """Handle WHISPER (private message)."""
480 if not self.twitch_config.enable_whispers:
481 return
483 tags = parsed["tags"]
484 prefix = parsed["prefix"]
485 params = parsed["params"]
487 if len(params) < 2:
488 return
490 text = params[1]
491 username = prefix.split("!")[0] if "!" in prefix else prefix
492 user = self._parse_user_from_tags(tags, username)
494 message = Message(
495 id=tags.get("message-id", str(int(time.time() * 1000))),
496 channel=self.name,
497 sender_id=tags.get("user-id", username),
498 sender_name=user.display_name,
499 chat_id=f"whisper:{username}",
500 text=text,
501 timestamp=datetime.now(),
502 is_group=False,
503 raw={
504 "tags": tags,
505 "user": user,
506 "is_whisper": True,
507 },
508 )
510 await self._dispatch_message(message)
512 async def _handle_usernotice(self, parsed: Dict[str, Any]) -> None:
513 """Handle USERNOTICE (subs, raids, etc.)."""
514 tags = parsed["tags"]
515 msg_id = tags.get("msg-id")
517 if msg_id == "sub" or msg_id == "resub":
518 logger.info(f"Subscription: {tags.get('display-name')}")
519 elif msg_id == "raid":
520 logger.info(f"Raid from {tags.get('display-name')} with {tags.get('msg-param-viewerCount')} viewers")
522 async def _handle_clearchat(self, parsed: Dict[str, Any]) -> None:
523 """Handle CLEARCHAT (timeout/ban)."""
524 tags = parsed["tags"]
525 params = parsed["params"]
527 if len(params) >= 2:
528 logger.info(f"User {params[1]} was timed out/banned")
530 async def _handle_clearmsg(self, parsed: Dict[str, Any]) -> None:
531 """Handle CLEARMSG (message deleted)."""
532 tags = parsed["tags"]
533 logger.info(f"Message deleted: {tags.get('target-msg-id')}")
535 def _parse_user_from_tags(self, tags: Dict[str, Any], username: str) -> TwitchUser:
536 """Parse TwitchUser from IRC tags."""
537 badges = {}
538 if "badges" in tags and tags["badges"]:
539 for badge in tags["badges"].split(","):
540 if "/" in badge:
541 name, version = badge.split("/", 1)
542 badges[name] = version
544 user_type = TwitchUserType.NORMAL
545 is_broadcaster = "broadcaster" in badges
546 is_mod = "moderator" in badges or tags.get("mod") == "1"
547 is_vip = "vip" in badges
548 is_subscriber = "subscriber" in badges or tags.get("subscriber") == "1"
550 if is_broadcaster:
551 user_type = TwitchUserType.BROADCASTER
552 elif is_mod:
553 user_type = TwitchUserType.MODERATOR
554 elif is_vip:
555 user_type = TwitchUserType.VIP
556 elif is_subscriber:
557 user_type = TwitchUserType.SUBSCRIBER
559 return TwitchUser(
560 id=tags.get("user-id", username),
561 login=username,
562 display_name=tags.get("display-name", username),
563 user_type=user_type,
564 is_subscriber=is_subscriber,
565 is_mod=is_mod,
566 is_vip=is_vip,
567 is_broadcaster=is_broadcaster,
568 badges=badges,
569 color=tags.get("color"),
570 )
572 async def _handle_command(self, message: Message, user: TwitchUser) -> None:
573 """Handle chat command."""
574 text = message.text[len(self.twitch_config.command_prefix):]
575 parts = text.split(maxsplit=1)
576 command = parts[0].lower()
577 args = parts[1] if len(parts) > 1 else ""
579 if command in self._command_handlers:
580 handler = self._command_handlers[command]
581 try:
582 await handler(message, user, args)
583 except Exception as e:
584 logger.error(f"Command handler error: {e}")
585 else:
586 # Dispatch as regular message
587 await self._dispatch_message(message)
589 async def _handle_bits(
590 self,
591 user: TwitchUser,
592 channel: str,
593 bits: int,
594 message: str,
595 ) -> None:
596 """Handle bits/cheer event."""
597 event = TwitchBitsEvent(
598 user=user,
599 channel=channel,
600 bits=bits,
601 message=message,
602 timestamp=datetime.now(),
603 )
605 for handler in self._bits_handlers:
606 try:
607 result = handler(event)
608 if asyncio.iscoroutine(result):
609 await result
610 except Exception as e:
611 logger.error(f"Bits handler error: {e}")
613 async def send_message(
614 self,
615 chat_id: str,
616 text: str,
617 reply_to: Optional[str] = None,
618 media: Optional[List[MediaAttachment]] = None,
619 buttons: Optional[List[Dict]] = None,
620 ) -> SendResult:
621 """Send a message to a Twitch channel."""
622 if not self._ws:
623 return SendResult(success=False, error="Not connected")
625 try:
626 channel = chat_id
627 if not channel.startswith("#"):
628 channel = f"#{channel}"
630 # Handle whispers
631 if chat_id.startswith("whisper:"):
632 username = chat_id.replace("whisper:", "")
633 return await self.send_whisper(username, text)
635 # Rate limiting (20 messages per 30 seconds for normal users)
636 now = time.time()
637 if channel in self._last_message_time:
638 elapsed = now - self._last_message_time[channel]
639 if elapsed < 1.5: # Simple rate limit
640 await asyncio.sleep(1.5 - elapsed)
642 # Send with reply if specified
643 if reply_to:
644 await self._ws.send(f"@reply-parent-msg-id={reply_to} PRIVMSG {channel} :{text}")
645 else:
646 await self._ws.send(f"PRIVMSG {channel} :{text}")
648 self._last_message_time[channel] = time.time()
649 return SendResult(success=True)
651 except Exception as e:
652 logger.error(f"Failed to send Twitch message: {e}")
653 return SendResult(success=False, error=str(e))
655 async def send_whisper(self, username: str, text: str) -> SendResult:
656 """Send a whisper (private message)."""
657 if not self._ws:
658 return SendResult(success=False, error="Not connected")
660 if not self.twitch_config.enable_whispers:
661 return SendResult(success=False, error="Whispers disabled")
663 try:
664 # Whispers are sent via PRIVMSG to #jtv (legacy) or API
665 # Modern approach uses Helix API
666 if self._session and self.twitch_config.client_id:
667 return await self._send_whisper_api(username, text)
668 else:
669 # Legacy IRC whisper (may not work)
670 await self._ws.send(f"PRIVMSG #jtv :/w {username} {text}")
671 return SendResult(success=True)
673 except Exception as e:
674 logger.error(f"Failed to send whisper: {e}")
675 return SendResult(success=False, error=str(e))
677 async def _send_whisper_api(self, username: str, text: str) -> SendResult:
678 """Send whisper via Helix API."""
679 if not self._session:
680 return SendResult(success=False, error="No session")
682 try:
683 # Get user ID
684 user_id = await self._get_user_id(username)
685 if not user_id:
686 return SendResult(success=False, error="User not found")
688 # Get bot user ID
689 bot_id = await self._get_user_id(self.twitch_config.bot_username)
690 if not bot_id:
691 return SendResult(success=False, error="Bot user ID not found")
693 headers = {
694 "Authorization": f"Bearer {self.twitch_config.access_token}",
695 "Client-Id": self.twitch_config.client_id,
696 "Content-Type": "application/json",
697 }
699 params = {
700 "from_user_id": bot_id,
701 "to_user_id": user_id,
702 }
704 data = {"message": text}
706 async with self._session.post(
707 f"{TWITCH_HELIX_URL}/whispers",
708 headers=headers,
709 params=params,
710 json=data,
711 ) as resp:
712 if resp.status == 204:
713 return SendResult(success=True)
714 else:
715 error = await resp.text()
716 return SendResult(success=False, error=error)
718 except Exception as e:
719 return SendResult(success=False, error=str(e))
721 async def _get_user_id(self, username: str) -> Optional[str]:
722 """Get user ID from username via Helix API."""
723 if username in self._user_cache:
724 return self._user_cache[username].id
726 if not self._session:
727 return None
729 try:
730 headers = {
731 "Authorization": f"Bearer {self.twitch_config.access_token}",
732 "Client-Id": self.twitch_config.client_id,
733 }
735 async with self._session.get(
736 f"{TWITCH_HELIX_URL}/users",
737 headers=headers,
738 params={"login": username},
739 ) as resp:
740 if resp.status == 200:
741 data = await resp.json()
742 if data.get("data"):
743 user_data = data["data"][0]
744 return user_data["id"]
746 except Exception as e:
747 logger.error(f"Failed to get user ID: {e}")
749 return None
751 async def edit_message(
752 self,
753 chat_id: str,
754 message_id: str,
755 text: str,
756 buttons: Optional[List[Dict]] = None,
757 ) -> SendResult:
758 """
759 Edit a Twitch message.
760 Note: Twitch doesn't support message editing.
761 """
762 logger.warning("Twitch doesn't support message editing")
763 return SendResult(success=False, error="Not supported")
765 async def delete_message(self, chat_id: str, message_id: str) -> bool:
766 """Delete a Twitch message (requires mod privileges)."""
767 if not self._ws:
768 return False
770 try:
771 channel = chat_id
772 if not channel.startswith("#"):
773 channel = f"#{channel}"
775 await self._ws.send(f"PRIVMSG {channel} :/delete {message_id}")
776 return True
778 except Exception as e:
779 logger.error(f"Failed to delete message: {e}")
780 return False
782 async def send_typing(self, chat_id: str) -> None:
783 """
784 Send typing indicator.
785 Note: Twitch doesn't support typing indicators.
786 """
787 pass
789 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
790 """Get information about a Twitch channel."""
791 if not self._session:
792 return None
794 channel = chat_id.lstrip("#")
796 try:
797 headers = {
798 "Authorization": f"Bearer {self.twitch_config.access_token}",
799 "Client-Id": self.twitch_config.client_id,
800 }
802 async with self._session.get(
803 f"{TWITCH_HELIX_URL}/channels",
804 headers=headers,
805 params={"broadcaster_login": channel},
806 ) as resp:
807 if resp.status == 200:
808 data = await resp.json()
809 if data.get("data"):
810 channel_data = data["data"][0]
811 return {
812 "id": channel_data["broadcaster_id"],
813 "name": channel_data["broadcaster_name"],
814 "title": channel_data["title"],
815 "game": channel_data.get("game_name"),
816 "language": channel_data.get("broadcaster_language"),
817 }
819 except Exception as e:
820 logger.error(f"Failed to get channel info: {e}")
822 return None
824 # Twitch-specific methods
826 def register_command(
827 self,
828 command: str,
829 handler: Callable[[Message, TwitchUser, str], Any],
830 ) -> None:
831 """Register a chat command handler."""
832 self._command_handlers[command.lower()] = handler
834 def on_bits(self, handler: Callable[[TwitchBitsEvent], Any]) -> None:
835 """Register a bits/cheer event handler."""
836 self._bits_handlers.append(handler)
838 def on_redemption(self, handler: Callable[[TwitchRedemptionEvent], Any]) -> None:
839 """Register a channel point redemption handler."""
840 self._redemption_handlers.append(handler)
842 async def timeout_user(
843 self,
844 channel: str,
845 username: str,
846 duration: int,
847 reason: str = "",
848 ) -> bool:
849 """Timeout a user (requires mod privileges)."""
850 if not self._ws:
851 return False
853 try:
854 if not channel.startswith("#"):
855 channel = f"#{channel}"
857 cmd = f"/timeout {username} {duration}"
858 if reason:
859 cmd += f" {reason}"
861 await self._ws.send(f"PRIVMSG {channel} :{cmd}")
862 return True
864 except Exception as e:
865 logger.error(f"Failed to timeout user: {e}")
866 return False
868 async def ban_user(
869 self,
870 channel: str,
871 username: str,
872 reason: str = "",
873 ) -> bool:
874 """Ban a user (requires mod privileges)."""
875 if not self._ws:
876 return False
878 try:
879 if not channel.startswith("#"):
880 channel = f"#{channel}"
882 cmd = f"/ban {username}"
883 if reason:
884 cmd += f" {reason}"
886 await self._ws.send(f"PRIVMSG {channel} :{cmd}")
887 return True
889 except Exception as e:
890 logger.error(f"Failed to ban user: {e}")
891 return False
893 async def unban_user(self, channel: str, username: str) -> bool:
894 """Unban a user (requires mod privileges)."""
895 if not self._ws:
896 return False
898 try:
899 if not channel.startswith("#"):
900 channel = f"#{channel}"
902 await self._ws.send(f"PRIVMSG {channel} :/unban {username}")
903 return True
905 except Exception as e:
906 logger.error(f"Failed to unban user: {e}")
907 return False
909 async def clear_chat(self, channel: str) -> bool:
910 """Clear chat (requires mod privileges)."""
911 if not self._ws:
912 return False
914 try:
915 if not channel.startswith("#"):
916 channel = f"#{channel}"
918 await self._ws.send(f"PRIVMSG {channel} :/clear")
919 return True
921 except Exception as e:
922 logger.error(f"Failed to clear chat: {e}")
923 return False
925 async def set_slow_mode(self, channel: str, seconds: int) -> bool:
926 """Set slow mode (requires mod privileges)."""
927 if not self._ws:
928 return False
930 try:
931 if not channel.startswith("#"):
932 channel = f"#{channel}"
934 if seconds > 0:
935 await self._ws.send(f"PRIVMSG {channel} :/slow {seconds}")
936 else:
937 await self._ws.send(f"PRIVMSG {channel} :/slowoff")
938 return True
940 except Exception as e:
941 logger.error(f"Failed to set slow mode: {e}")
942 return False
944 async def announce(self, channel: str, message: str, color: str = "") -> bool:
945 """Send an announcement (requires mod privileges)."""
946 if not self._ws:
947 return False
949 try:
950 if not channel.startswith("#"):
951 channel = f"#{channel}"
953 cmd = f"/announce {message}"
954 if color:
955 cmd = f"/announce{color} {message}"
957 await self._ws.send(f"PRIVMSG {channel} :{cmd}")
958 return True
960 except Exception as e:
961 logger.error(f"Failed to send announcement: {e}")
962 return False
964 async def refresh_token(self) -> bool:
965 """Refresh OAuth token using refresh token."""
966 if not self.twitch_config.refresh_token:
967 return False
969 if not self._session:
970 return False
972 try:
973 data = {
974 "grant_type": "refresh_token",
975 "refresh_token": self.twitch_config.refresh_token,
976 "client_id": self.twitch_config.client_id,
977 "client_secret": self.twitch_config.client_secret,
978 }
980 async with self._session.post(TWITCH_AUTH_URL, data=data) as resp:
981 if resp.status == 200:
982 token_data = await resp.json()
983 self.twitch_config.access_token = token_data["access_token"]
984 if "refresh_token" in token_data:
985 self.twitch_config.refresh_token = token_data["refresh_token"]
986 logger.info("Twitch token refreshed")
987 return True
988 else:
989 logger.error(f"Failed to refresh token: {await resp.text()}")
990 return False
992 except Exception as e:
993 logger.error(f"Token refresh error: {e}")
994 return False
997def create_twitch_adapter(
998 client_id: str = None,
999 client_secret: str = None,
1000 access_token: str = None,
1001 bot_username: str = None,
1002 channels: List[str] = None,
1003 **kwargs
1004) -> TwitchAdapter:
1005 """
1006 Factory function to create Twitch adapter.
1008 Args:
1009 client_id: Twitch app client ID (or set TWITCH_CLIENT_ID env var)
1010 client_secret: Twitch app client secret (or set TWITCH_CLIENT_SECRET env var)
1011 access_token: OAuth access token (or set TWITCH_ACCESS_TOKEN env var)
1012 bot_username: Bot's Twitch username (or set TWITCH_BOT_USERNAME env var)
1013 channels: List of channels to join (or set TWITCH_CHANNELS env var, comma-separated)
1014 **kwargs: Additional config options
1016 Returns:
1017 Configured TwitchAdapter
1018 """
1019 client_id = client_id or os.getenv("TWITCH_CLIENT_ID")
1020 client_secret = client_secret or os.getenv("TWITCH_CLIENT_SECRET")
1021 access_token = access_token or os.getenv("TWITCH_ACCESS_TOKEN")
1022 bot_username = bot_username or os.getenv("TWITCH_BOT_USERNAME")
1024 if channels is None:
1025 channels_env = os.getenv("TWITCH_CHANNELS", "")
1026 channels = [c.strip() for c in channels_env.split(",") if c.strip()]
1028 if not access_token:
1029 raise ValueError("Twitch access token required")
1030 if not bot_username:
1031 raise ValueError("Twitch bot username required")
1033 config = TwitchConfig(
1034 client_id=client_id or "",
1035 client_secret=client_secret or "",
1036 access_token=access_token,
1037 bot_username=bot_username,
1038 channels=channels,
1039 **kwargs
1040 )
1041 return TwitchAdapter(config)