Coverage for integrations / channels / extensions / twitch_adapter.py: 25.0%

520 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Twitch Channel Adapter 

3 

4Implements Twitch chat integration with IRC and Helix API. 

5Based on HevolveBot extension patterns for Twitch. 

6 

7Features: 

8- IRC chat integration (TMI.js compatible) 

9- Whispers (private messages) 

10- Chat commands with prefix support 

11- Bits/Cheers events 

12- Channel point redemptions 

13- Emote handling 

14- Subscriber detection 

15- VIP/Moderator detection 

16- Raid/Host events 

17- EventSub webhooks 

18- Reconnection with exponential backoff 

19""" 

20 

21from __future__ import annotations 

22 

23import asyncio 

24import logging 

25import os 

26import json 

27import re 

28import ssl 

29import time 

30from typing import Optional, List, Dict, Any, Callable, Set 

31from datetime import datetime 

32from dataclasses import dataclass, field 

33from enum import Enum 

34 

35try: 

36 import aiohttp 

37 import websockets 

38 HAS_TWITCH = True 

39except ImportError: 

40 HAS_TWITCH = False 

41 

42from ..base import ( 

43 ChannelAdapter, 

44 ChannelConfig, 

45 ChannelStatus, 

46 Message, 

47 MessageType, 

48 MediaAttachment, 

49 SendResult, 

50 ChannelConnectionError, 

51 ChannelSendError, 

52 ChannelRateLimitError, 

53) 

54 

55logger = logging.getLogger(__name__) 

56 

57# Twitch IRC server 

58TWITCH_IRC_URL = "wss://irc-ws.chat.twitch.tv:443" 

59TWITCH_IRC_HOST = "irc.chat.twitch.tv" 

60TWITCH_IRC_PORT = 6697 

61 

62# Twitch Helix API 

63TWITCH_HELIX_URL = "https://api.twitch.tv/helix" 

64TWITCH_AUTH_URL = "https://id.twitch.tv/oauth2/token" 

65 

66 

67class TwitchUserType(Enum): 

68 """Twitch user type.""" 

69 NORMAL = "normal" 

70 VIP = "vip" 

71 MODERATOR = "mod" 

72 BROADCASTER = "broadcaster" 

73 SUBSCRIBER = "subscriber" 

74 

75 

76@dataclass 

77class TwitchConfig(ChannelConfig): 

78 """Twitch-specific configuration.""" 

79 client_id: str = "" 

80 client_secret: str = "" 

81 access_token: str = "" 

82 refresh_token: str = "" 

83 bot_username: str = "" 

84 channels: List[str] = field(default_factory=list) 

85 command_prefix: str = "!" 

86 enable_whispers: bool = True 

87 enable_bits: bool = True 

88 enable_channel_points: bool = True 

89 enable_eventsub: bool = False 

90 eventsub_callback_url: str = "" 

91 eventsub_secret: str = "" 

92 reconnect_attempts: int = 5 

93 reconnect_delay: float = 1.0 

94 

95 

96@dataclass 

97class TwitchUser: 

98 """Twitch user information.""" 

99 id: str 

100 login: str 

101 display_name: str 

102 user_type: TwitchUserType = TwitchUserType.NORMAL 

103 is_subscriber: bool = False 

104 is_mod: bool = False 

105 is_vip: bool = False 

106 is_broadcaster: bool = False 

107 badges: Dict[str, str] = field(default_factory=dict) 

108 color: Optional[str] = None 

109 

110 

111@dataclass 

112class TwitchBitsEvent: 

113 """Bits/Cheer event.""" 

114 user: TwitchUser 

115 channel: str 

116 bits: int 

117 message: str 

118 timestamp: datetime 

119 

120 

121@dataclass 

122class TwitchRedemptionEvent: 

123 """Channel point redemption event.""" 

124 user: TwitchUser 

125 channel: str 

126 reward_id: str 

127 reward_title: str 

128 user_input: Optional[str] = None 

129 timestamp: datetime = field(default_factory=datetime.now) 

130 

131 

132class TwitchAdapter(ChannelAdapter): 

133 """ 

134 Twitch chat adapter with IRC and Helix API integration. 

135 

136 Usage: 

137 config = TwitchConfig( 

138 client_id="your-client-id", 

139 client_secret="your-client-secret", 

140 access_token="your-oauth-token", 

141 bot_username="your_bot", 

142 channels=["channel1", "channel2"], 

143 ) 

144 adapter = TwitchAdapter(config) 

145 adapter.on_message(my_handler) 

146 await adapter.start() 

147 """ 

148 

149 def __init__(self, config: TwitchConfig): 

150 if not HAS_TWITCH: 

151 raise ImportError( 

152 "websockets and aiohttp not installed. " 

153 "Install with: pip install websockets aiohttp" 

154 ) 

155 

156 super().__init__(config) 

157 self.twitch_config: TwitchConfig = config 

158 self._ws: Optional[websockets.WebSocketClientProtocol] = None 

159 self._session: Optional[aiohttp.ClientSession] = None 

160 self._read_task: Optional[asyncio.Task] = None 

161 self._ping_task: Optional[asyncio.Task] = None 

162 self._joined_channels: Set[str] = set() 

163 self._command_handlers: Dict[str, Callable] = {} 

164 self._bits_handlers: List[Callable] = [] 

165 self._redemption_handlers: List[Callable] = [] 

166 self._reconnect_count: int = 0 

167 self._last_message_time: Dict[str, float] = {} 

168 self._user_cache: Dict[str, TwitchUser] = {} 

169 

170 @property 

171 def name(self) -> str: 

172 return "twitch" 

173 

174 async def connect(self) -> bool: 

175 """Connect to Twitch IRC.""" 

176 if not self.twitch_config.access_token: 

177 logger.error("Twitch access token required") 

178 return False 

179 

180 if not self.twitch_config.bot_username: 

181 logger.error("Twitch bot username required") 

182 return False 

183 

184 try: 

185 # Create aiohttp session for API calls 

186 self._session = aiohttp.ClientSession() 

187 

188 # Connect to IRC 

189 self._ws = await websockets.connect( 

190 TWITCH_IRC_URL, 

191 ssl=ssl.create_default_context(), 

192 ) 

193 

194 # Authenticate 

195 await self._authenticate() 

196 

197 # Request capabilities 

198 await self._request_capabilities() 

199 

200 # Join channels 

201 for channel in self.twitch_config.channels: 

202 await self.join_channel(channel) 

203 

204 # Start read loop 

205 self._read_task = asyncio.create_task(self._read_loop()) 

206 

207 # Start ping loop 

208 self._ping_task = asyncio.create_task(self._ping_loop()) 

209 

210 self.status = ChannelStatus.CONNECTED 

211 self._reconnect_count = 0 

212 logger.info(f"Twitch connected as {self.twitch_config.bot_username}") 

213 return True 

214 

215 except Exception as e: 

216 logger.error(f"Failed to connect to Twitch: {e}") 

217 self.status = ChannelStatus.ERROR 

218 return False 

219 

220 async def disconnect(self) -> None: 

221 """Disconnect from Twitch IRC.""" 

222 if self._read_task: 

223 self._read_task.cancel() 

224 try: 

225 await self._read_task 

226 except asyncio.CancelledError: 

227 pass 

228 

229 if self._ping_task: 

230 self._ping_task.cancel() 

231 try: 

232 await self._ping_task 

233 except asyncio.CancelledError: 

234 pass 

235 

236 if self._ws: 

237 await self._ws.close() 

238 self._ws = None 

239 

240 if self._session: 

241 await self._session.close() 

242 self._session = None 

243 

244 self._joined_channels.clear() 

245 self.status = ChannelStatus.DISCONNECTED 

246 

247 async def _authenticate(self) -> None: 

248 """Authenticate with Twitch IRC.""" 

249 if not self._ws: 

250 return 

251 

252 # OAuth token format 

253 oauth_token = self.twitch_config.access_token 

254 if not oauth_token.startswith("oauth:"): 

255 oauth_token = f"oauth:{oauth_token}" 

256 

257 await self._ws.send(f"PASS {oauth_token}") 

258 await self._ws.send(f"NICK {self.twitch_config.bot_username}") 

259 

260 async def _request_capabilities(self) -> None: 

261 """Request Twitch IRC capabilities.""" 

262 if not self._ws: 

263 return 

264 

265 # Request tags, commands, and membership capabilities 

266 await self._ws.send("CAP REQ :twitch.tv/tags twitch.tv/commands twitch.tv/membership") 

267 

268 async def join_channel(self, channel: str) -> bool: 

269 """Join a Twitch channel.""" 

270 if not self._ws: 

271 return False 

272 

273 channel = channel.lower() 

274 if not channel.startswith("#"): 

275 channel = f"#{channel}" 

276 

277 try: 

278 await self._ws.send(f"JOIN {channel}") 

279 self._joined_channels.add(channel) 

280 logger.info(f"Joined Twitch channel: {channel}") 

281 return True 

282 except Exception as e: 

283 logger.error(f"Failed to join channel {channel}: {e}") 

284 return False 

285 

286 async def leave_channel(self, channel: str) -> bool: 

287 """Leave a Twitch channel.""" 

288 if not self._ws: 

289 return False 

290 

291 channel = channel.lower() 

292 if not channel.startswith("#"): 

293 channel = f"#{channel}" 

294 

295 try: 

296 await self._ws.send(f"PART {channel}") 

297 self._joined_channels.discard(channel) 

298 logger.info(f"Left Twitch channel: {channel}") 

299 return True 

300 except Exception as e: 

301 logger.error(f"Failed to leave channel {channel}: {e}") 

302 return False 

303 

304 async def _read_loop(self) -> None: 

305 """Read messages from IRC connection.""" 

306 while self._ws and self.status == ChannelStatus.CONNECTED: 

307 try: 

308 raw_message = await self._ws.recv() 

309 await self._handle_raw_message(raw_message) 

310 

311 except websockets.ConnectionClosed: 

312 logger.warning("Twitch IRC connection closed") 

313 await self._handle_disconnect() 

314 break 

315 

316 except asyncio.CancelledError: 

317 break 

318 

319 except Exception as e: 

320 logger.error(f"Error reading Twitch message: {e}") 

321 

322 async def _ping_loop(self) -> None: 

323 """Send periodic PINGs to keep connection alive.""" 

324 while self._ws and self.status == ChannelStatus.CONNECTED: 

325 try: 

326 await asyncio.sleep(60) 

327 if self._ws: 

328 await self._ws.send("PING :tmi.twitch.tv") 

329 except asyncio.CancelledError: 

330 break 

331 except Exception as e: 

332 logger.error(f"Ping error: {e}") 

333 

334 async def _handle_disconnect(self) -> None: 

335 """Handle disconnection with reconnection logic.""" 

336 if self._reconnect_count < self.twitch_config.reconnect_attempts: 

337 self._reconnect_count += 1 

338 delay = self.twitch_config.reconnect_delay * (2 ** (self._reconnect_count - 1)) 

339 logger.info(f"Reconnecting to Twitch in {delay}s (attempt {self._reconnect_count})") 

340 

341 await asyncio.sleep(delay) 

342 await self.connect() 

343 else: 

344 self.status = ChannelStatus.ERROR 

345 logger.error("Max reconnection attempts reached") 

346 

347 async def _handle_raw_message(self, raw: str) -> None: 

348 """Parse and handle raw IRC message.""" 

349 for line in raw.strip().split("\r\n"): 

350 if not line: 

351 continue 

352 

353 # Handle PING 

354 if line.startswith("PING"): 

355 await self._ws.send(line.replace("PING", "PONG")) 

356 continue 

357 

358 # Parse IRC message 

359 parsed = self._parse_irc_message(line) 

360 if not parsed: 

361 continue 

362 

363 command = parsed.get("command") 

364 

365 if command == "PRIVMSG": 

366 await self._handle_privmsg(parsed) 

367 elif command == "WHISPER": 

368 await self._handle_whisper(parsed) 

369 elif command == "USERNOTICE": 

370 await self._handle_usernotice(parsed) 

371 elif command == "CLEARCHAT": 

372 await self._handle_clearchat(parsed) 

373 elif command == "CLEARMSG": 

374 await self._handle_clearmsg(parsed) 

375 

376 def _parse_irc_message(self, raw: str) -> Optional[Dict[str, Any]]: 

377 """Parse IRC message into components.""" 

378 tags = {} 

379 prefix = None 

380 command = None 

381 params = [] 

382 

383 idx = 0 

384 

385 # Parse tags 

386 if raw.startswith("@"): 

387 space_idx = raw.index(" ") 

388 tag_str = raw[1:space_idx] 

389 for tag in tag_str.split(";"): 

390 if "=" in tag: 

391 key, value = tag.split("=", 1) 

392 tags[key] = value.replace("\\s", " ").replace("\\n", "\n") 

393 else: 

394 tags[tag] = True 

395 idx = space_idx + 1 

396 

397 # Parse prefix 

398 if raw[idx] == ":": 

399 space_idx = raw.index(" ", idx) 

400 prefix = raw[idx + 1:space_idx] 

401 idx = space_idx + 1 

402 

403 # Parse command 

404 try: 

405 space_idx = raw.index(" ", idx) 

406 command = raw[idx:space_idx] 

407 idx = space_idx + 1 

408 except ValueError: 

409 command = raw[idx:] 

410 return {"tags": tags, "prefix": prefix, "command": command, "params": []} 

411 

412 # Parse params 

413 while idx < len(raw): 

414 if raw[idx] == ":": 

415 params.append(raw[idx + 1:]) 

416 break 

417 else: 

418 try: 

419 space_idx = raw.index(" ", idx) 

420 params.append(raw[idx:space_idx]) 

421 idx = space_idx + 1 

422 except ValueError: 

423 params.append(raw[idx:]) 

424 break 

425 

426 return { 

427 "tags": tags, 

428 "prefix": prefix, 

429 "command": command, 

430 "params": params, 

431 } 

432 

433 async def _handle_privmsg(self, parsed: Dict[str, Any]) -> None: 

434 """Handle PRIVMSG (chat message).""" 

435 tags = parsed["tags"] 

436 prefix = parsed["prefix"] 

437 params = parsed["params"] 

438 

439 if len(params) < 2: 

440 return 

441 

442 channel = params[0] 

443 text = params[1] 

444 

445 # Extract user info 

446 username = prefix.split("!")[0] if "!" in prefix else prefix 

447 user = self._parse_user_from_tags(tags, username) 

448 

449 # Check for bits 

450 if "bits" in tags and self.twitch_config.enable_bits: 

451 bits = int(tags["bits"]) 

452 await self._handle_bits(user, channel, bits, text) 

453 

454 # Create message 

455 message = Message( 

456 id=tags.get("id", str(int(time.time() * 1000))), 

457 channel=self.name, 

458 sender_id=tags.get("user-id", username), 

459 sender_name=user.display_name, 

460 chat_id=channel, 

461 text=text, 

462 timestamp=datetime.now(), 

463 is_group=True, 

464 is_bot_mentioned=self.twitch_config.bot_username.lower() in text.lower(), 

465 raw={ 

466 "tags": tags, 

467 "user": user, 

468 "emotes": tags.get("emotes", ""), 

469 }, 

470 ) 

471 

472 # Check for command 

473 if text.startswith(self.twitch_config.command_prefix): 

474 await self._handle_command(message, user) 

475 else: 

476 await self._dispatch_message(message) 

477 

478 async def _handle_whisper(self, parsed: Dict[str, Any]) -> None: 

479 """Handle WHISPER (private message).""" 

480 if not self.twitch_config.enable_whispers: 

481 return 

482 

483 tags = parsed["tags"] 

484 prefix = parsed["prefix"] 

485 params = parsed["params"] 

486 

487 if len(params) < 2: 

488 return 

489 

490 text = params[1] 

491 username = prefix.split("!")[0] if "!" in prefix else prefix 

492 user = self._parse_user_from_tags(tags, username) 

493 

494 message = Message( 

495 id=tags.get("message-id", str(int(time.time() * 1000))), 

496 channel=self.name, 

497 sender_id=tags.get("user-id", username), 

498 sender_name=user.display_name, 

499 chat_id=f"whisper:{username}", 

500 text=text, 

501 timestamp=datetime.now(), 

502 is_group=False, 

503 raw={ 

504 "tags": tags, 

505 "user": user, 

506 "is_whisper": True, 

507 }, 

508 ) 

509 

510 await self._dispatch_message(message) 

511 

512 async def _handle_usernotice(self, parsed: Dict[str, Any]) -> None: 

513 """Handle USERNOTICE (subs, raids, etc.).""" 

514 tags = parsed["tags"] 

515 msg_id = tags.get("msg-id") 

516 

517 if msg_id == "sub" or msg_id == "resub": 

518 logger.info(f"Subscription: {tags.get('display-name')}") 

519 elif msg_id == "raid": 

520 logger.info(f"Raid from {tags.get('display-name')} with {tags.get('msg-param-viewerCount')} viewers") 

521 

522 async def _handle_clearchat(self, parsed: Dict[str, Any]) -> None: 

523 """Handle CLEARCHAT (timeout/ban).""" 

524 tags = parsed["tags"] 

525 params = parsed["params"] 

526 

527 if len(params) >= 2: 

528 logger.info(f"User {params[1]} was timed out/banned") 

529 

530 async def _handle_clearmsg(self, parsed: Dict[str, Any]) -> None: 

531 """Handle CLEARMSG (message deleted).""" 

532 tags = parsed["tags"] 

533 logger.info(f"Message deleted: {tags.get('target-msg-id')}") 

534 

535 def _parse_user_from_tags(self, tags: Dict[str, Any], username: str) -> TwitchUser: 

536 """Parse TwitchUser from IRC tags.""" 

537 badges = {} 

538 if "badges" in tags and tags["badges"]: 

539 for badge in tags["badges"].split(","): 

540 if "/" in badge: 

541 name, version = badge.split("/", 1) 

542 badges[name] = version 

543 

544 user_type = TwitchUserType.NORMAL 

545 is_broadcaster = "broadcaster" in badges 

546 is_mod = "moderator" in badges or tags.get("mod") == "1" 

547 is_vip = "vip" in badges 

548 is_subscriber = "subscriber" in badges or tags.get("subscriber") == "1" 

549 

550 if is_broadcaster: 

551 user_type = TwitchUserType.BROADCASTER 

552 elif is_mod: 

553 user_type = TwitchUserType.MODERATOR 

554 elif is_vip: 

555 user_type = TwitchUserType.VIP 

556 elif is_subscriber: 

557 user_type = TwitchUserType.SUBSCRIBER 

558 

559 return TwitchUser( 

560 id=tags.get("user-id", username), 

561 login=username, 

562 display_name=tags.get("display-name", username), 

563 user_type=user_type, 

564 is_subscriber=is_subscriber, 

565 is_mod=is_mod, 

566 is_vip=is_vip, 

567 is_broadcaster=is_broadcaster, 

568 badges=badges, 

569 color=tags.get("color"), 

570 ) 

571 

572 async def _handle_command(self, message: Message, user: TwitchUser) -> None: 

573 """Handle chat command.""" 

574 text = message.text[len(self.twitch_config.command_prefix):] 

575 parts = text.split(maxsplit=1) 

576 command = parts[0].lower() 

577 args = parts[1] if len(parts) > 1 else "" 

578 

579 if command in self._command_handlers: 

580 handler = self._command_handlers[command] 

581 try: 

582 await handler(message, user, args) 

583 except Exception as e: 

584 logger.error(f"Command handler error: {e}") 

585 else: 

586 # Dispatch as regular message 

587 await self._dispatch_message(message) 

588 

589 async def _handle_bits( 

590 self, 

591 user: TwitchUser, 

592 channel: str, 

593 bits: int, 

594 message: str, 

595 ) -> None: 

596 """Handle bits/cheer event.""" 

597 event = TwitchBitsEvent( 

598 user=user, 

599 channel=channel, 

600 bits=bits, 

601 message=message, 

602 timestamp=datetime.now(), 

603 ) 

604 

605 for handler in self._bits_handlers: 

606 try: 

607 result = handler(event) 

608 if asyncio.iscoroutine(result): 

609 await result 

610 except Exception as e: 

611 logger.error(f"Bits handler error: {e}") 

612 

613 async def send_message( 

614 self, 

615 chat_id: str, 

616 text: str, 

617 reply_to: Optional[str] = None, 

618 media: Optional[List[MediaAttachment]] = None, 

619 buttons: Optional[List[Dict]] = None, 

620 ) -> SendResult: 

621 """Send a message to a Twitch channel.""" 

622 if not self._ws: 

623 return SendResult(success=False, error="Not connected") 

624 

625 try: 

626 channel = chat_id 

627 if not channel.startswith("#"): 

628 channel = f"#{channel}" 

629 

630 # Handle whispers 

631 if chat_id.startswith("whisper:"): 

632 username = chat_id.replace("whisper:", "") 

633 return await self.send_whisper(username, text) 

634 

635 # Rate limiting (20 messages per 30 seconds for normal users) 

636 now = time.time() 

637 if channel in self._last_message_time: 

638 elapsed = now - self._last_message_time[channel] 

639 if elapsed < 1.5: # Simple rate limit 

640 await asyncio.sleep(1.5 - elapsed) 

641 

642 # Send with reply if specified 

643 if reply_to: 

644 await self._ws.send(f"@reply-parent-msg-id={reply_to} PRIVMSG {channel} :{text}") 

645 else: 

646 await self._ws.send(f"PRIVMSG {channel} :{text}") 

647 

648 self._last_message_time[channel] = time.time() 

649 return SendResult(success=True) 

650 

651 except Exception as e: 

652 logger.error(f"Failed to send Twitch message: {e}") 

653 return SendResult(success=False, error=str(e)) 

654 

655 async def send_whisper(self, username: str, text: str) -> SendResult: 

656 """Send a whisper (private message).""" 

657 if not self._ws: 

658 return SendResult(success=False, error="Not connected") 

659 

660 if not self.twitch_config.enable_whispers: 

661 return SendResult(success=False, error="Whispers disabled") 

662 

663 try: 

664 # Whispers are sent via PRIVMSG to #jtv (legacy) or API 

665 # Modern approach uses Helix API 

666 if self._session and self.twitch_config.client_id: 

667 return await self._send_whisper_api(username, text) 

668 else: 

669 # Legacy IRC whisper (may not work) 

670 await self._ws.send(f"PRIVMSG #jtv :/w {username} {text}") 

671 return SendResult(success=True) 

672 

673 except Exception as e: 

674 logger.error(f"Failed to send whisper: {e}") 

675 return SendResult(success=False, error=str(e)) 

676 

677 async def _send_whisper_api(self, username: str, text: str) -> SendResult: 

678 """Send whisper via Helix API.""" 

679 if not self._session: 

680 return SendResult(success=False, error="No session") 

681 

682 try: 

683 # Get user ID 

684 user_id = await self._get_user_id(username) 

685 if not user_id: 

686 return SendResult(success=False, error="User not found") 

687 

688 # Get bot user ID 

689 bot_id = await self._get_user_id(self.twitch_config.bot_username) 

690 if not bot_id: 

691 return SendResult(success=False, error="Bot user ID not found") 

692 

693 headers = { 

694 "Authorization": f"Bearer {self.twitch_config.access_token}", 

695 "Client-Id": self.twitch_config.client_id, 

696 "Content-Type": "application/json", 

697 } 

698 

699 params = { 

700 "from_user_id": bot_id, 

701 "to_user_id": user_id, 

702 } 

703 

704 data = {"message": text} 

705 

706 async with self._session.post( 

707 f"{TWITCH_HELIX_URL}/whispers", 

708 headers=headers, 

709 params=params, 

710 json=data, 

711 ) as resp: 

712 if resp.status == 204: 

713 return SendResult(success=True) 

714 else: 

715 error = await resp.text() 

716 return SendResult(success=False, error=error) 

717 

718 except Exception as e: 

719 return SendResult(success=False, error=str(e)) 

720 

721 async def _get_user_id(self, username: str) -> Optional[str]: 

722 """Get user ID from username via Helix API.""" 

723 if username in self._user_cache: 

724 return self._user_cache[username].id 

725 

726 if not self._session: 

727 return None 

728 

729 try: 

730 headers = { 

731 "Authorization": f"Bearer {self.twitch_config.access_token}", 

732 "Client-Id": self.twitch_config.client_id, 

733 } 

734 

735 async with self._session.get( 

736 f"{TWITCH_HELIX_URL}/users", 

737 headers=headers, 

738 params={"login": username}, 

739 ) as resp: 

740 if resp.status == 200: 

741 data = await resp.json() 

742 if data.get("data"): 

743 user_data = data["data"][0] 

744 return user_data["id"] 

745 

746 except Exception as e: 

747 logger.error(f"Failed to get user ID: {e}") 

748 

749 return None 

750 

751 async def edit_message( 

752 self, 

753 chat_id: str, 

754 message_id: str, 

755 text: str, 

756 buttons: Optional[List[Dict]] = None, 

757 ) -> SendResult: 

758 """ 

759 Edit a Twitch message. 

760 Note: Twitch doesn't support message editing. 

761 """ 

762 logger.warning("Twitch doesn't support message editing") 

763 return SendResult(success=False, error="Not supported") 

764 

765 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

766 """Delete a Twitch message (requires mod privileges).""" 

767 if not self._ws: 

768 return False 

769 

770 try: 

771 channel = chat_id 

772 if not channel.startswith("#"): 

773 channel = f"#{channel}" 

774 

775 await self._ws.send(f"PRIVMSG {channel} :/delete {message_id}") 

776 return True 

777 

778 except Exception as e: 

779 logger.error(f"Failed to delete message: {e}") 

780 return False 

781 

782 async def send_typing(self, chat_id: str) -> None: 

783 """ 

784 Send typing indicator. 

785 Note: Twitch doesn't support typing indicators. 

786 """ 

787 pass 

788 

789 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

790 """Get information about a Twitch channel.""" 

791 if not self._session: 

792 return None 

793 

794 channel = chat_id.lstrip("#") 

795 

796 try: 

797 headers = { 

798 "Authorization": f"Bearer {self.twitch_config.access_token}", 

799 "Client-Id": self.twitch_config.client_id, 

800 } 

801 

802 async with self._session.get( 

803 f"{TWITCH_HELIX_URL}/channels", 

804 headers=headers, 

805 params={"broadcaster_login": channel}, 

806 ) as resp: 

807 if resp.status == 200: 

808 data = await resp.json() 

809 if data.get("data"): 

810 channel_data = data["data"][0] 

811 return { 

812 "id": channel_data["broadcaster_id"], 

813 "name": channel_data["broadcaster_name"], 

814 "title": channel_data["title"], 

815 "game": channel_data.get("game_name"), 

816 "language": channel_data.get("broadcaster_language"), 

817 } 

818 

819 except Exception as e: 

820 logger.error(f"Failed to get channel info: {e}") 

821 

822 return None 

823 

824 # Twitch-specific methods 

825 

826 def register_command( 

827 self, 

828 command: str, 

829 handler: Callable[[Message, TwitchUser, str], Any], 

830 ) -> None: 

831 """Register a chat command handler.""" 

832 self._command_handlers[command.lower()] = handler 

833 

834 def on_bits(self, handler: Callable[[TwitchBitsEvent], Any]) -> None: 

835 """Register a bits/cheer event handler.""" 

836 self._bits_handlers.append(handler) 

837 

838 def on_redemption(self, handler: Callable[[TwitchRedemptionEvent], Any]) -> None: 

839 """Register a channel point redemption handler.""" 

840 self._redemption_handlers.append(handler) 

841 

842 async def timeout_user( 

843 self, 

844 channel: str, 

845 username: str, 

846 duration: int, 

847 reason: str = "", 

848 ) -> bool: 

849 """Timeout a user (requires mod privileges).""" 

850 if not self._ws: 

851 return False 

852 

853 try: 

854 if not channel.startswith("#"): 

855 channel = f"#{channel}" 

856 

857 cmd = f"/timeout {username} {duration}" 

858 if reason: 

859 cmd += f" {reason}" 

860 

861 await self._ws.send(f"PRIVMSG {channel} :{cmd}") 

862 return True 

863 

864 except Exception as e: 

865 logger.error(f"Failed to timeout user: {e}") 

866 return False 

867 

868 async def ban_user( 

869 self, 

870 channel: str, 

871 username: str, 

872 reason: str = "", 

873 ) -> bool: 

874 """Ban a user (requires mod privileges).""" 

875 if not self._ws: 

876 return False 

877 

878 try: 

879 if not channel.startswith("#"): 

880 channel = f"#{channel}" 

881 

882 cmd = f"/ban {username}" 

883 if reason: 

884 cmd += f" {reason}" 

885 

886 await self._ws.send(f"PRIVMSG {channel} :{cmd}") 

887 return True 

888 

889 except Exception as e: 

890 logger.error(f"Failed to ban user: {e}") 

891 return False 

892 

893 async def unban_user(self, channel: str, username: str) -> bool: 

894 """Unban a user (requires mod privileges).""" 

895 if not self._ws: 

896 return False 

897 

898 try: 

899 if not channel.startswith("#"): 

900 channel = f"#{channel}" 

901 

902 await self._ws.send(f"PRIVMSG {channel} :/unban {username}") 

903 return True 

904 

905 except Exception as e: 

906 logger.error(f"Failed to unban user: {e}") 

907 return False 

908 

909 async def clear_chat(self, channel: str) -> bool: 

910 """Clear chat (requires mod privileges).""" 

911 if not self._ws: 

912 return False 

913 

914 try: 

915 if not channel.startswith("#"): 

916 channel = f"#{channel}" 

917 

918 await self._ws.send(f"PRIVMSG {channel} :/clear") 

919 return True 

920 

921 except Exception as e: 

922 logger.error(f"Failed to clear chat: {e}") 

923 return False 

924 

925 async def set_slow_mode(self, channel: str, seconds: int) -> bool: 

926 """Set slow mode (requires mod privileges).""" 

927 if not self._ws: 

928 return False 

929 

930 try: 

931 if not channel.startswith("#"): 

932 channel = f"#{channel}" 

933 

934 if seconds > 0: 

935 await self._ws.send(f"PRIVMSG {channel} :/slow {seconds}") 

936 else: 

937 await self._ws.send(f"PRIVMSG {channel} :/slowoff") 

938 return True 

939 

940 except Exception as e: 

941 logger.error(f"Failed to set slow mode: {e}") 

942 return False 

943 

944 async def announce(self, channel: str, message: str, color: str = "") -> bool: 

945 """Send an announcement (requires mod privileges).""" 

946 if not self._ws: 

947 return False 

948 

949 try: 

950 if not channel.startswith("#"): 

951 channel = f"#{channel}" 

952 

953 cmd = f"/announce {message}" 

954 if color: 

955 cmd = f"/announce{color} {message}" 

956 

957 await self._ws.send(f"PRIVMSG {channel} :{cmd}") 

958 return True 

959 

960 except Exception as e: 

961 logger.error(f"Failed to send announcement: {e}") 

962 return False 

963 

964 async def refresh_token(self) -> bool: 

965 """Refresh OAuth token using refresh token.""" 

966 if not self.twitch_config.refresh_token: 

967 return False 

968 

969 if not self._session: 

970 return False 

971 

972 try: 

973 data = { 

974 "grant_type": "refresh_token", 

975 "refresh_token": self.twitch_config.refresh_token, 

976 "client_id": self.twitch_config.client_id, 

977 "client_secret": self.twitch_config.client_secret, 

978 } 

979 

980 async with self._session.post(TWITCH_AUTH_URL, data=data) as resp: 

981 if resp.status == 200: 

982 token_data = await resp.json() 

983 self.twitch_config.access_token = token_data["access_token"] 

984 if "refresh_token" in token_data: 

985 self.twitch_config.refresh_token = token_data["refresh_token"] 

986 logger.info("Twitch token refreshed") 

987 return True 

988 else: 

989 logger.error(f"Failed to refresh token: {await resp.text()}") 

990 return False 

991 

992 except Exception as e: 

993 logger.error(f"Token refresh error: {e}") 

994 return False 

995 

996 

997def create_twitch_adapter( 

998 client_id: str = None, 

999 client_secret: str = None, 

1000 access_token: str = None, 

1001 bot_username: str = None, 

1002 channels: List[str] = None, 

1003 **kwargs 

1004) -> TwitchAdapter: 

1005 """ 

1006 Factory function to create Twitch adapter. 

1007 

1008 Args: 

1009 client_id: Twitch app client ID (or set TWITCH_CLIENT_ID env var) 

1010 client_secret: Twitch app client secret (or set TWITCH_CLIENT_SECRET env var) 

1011 access_token: OAuth access token (or set TWITCH_ACCESS_TOKEN env var) 

1012 bot_username: Bot's Twitch username (or set TWITCH_BOT_USERNAME env var) 

1013 channels: List of channels to join (or set TWITCH_CHANNELS env var, comma-separated) 

1014 **kwargs: Additional config options 

1015 

1016 Returns: 

1017 Configured TwitchAdapter 

1018 """ 

1019 client_id = client_id or os.getenv("TWITCH_CLIENT_ID") 

1020 client_secret = client_secret or os.getenv("TWITCH_CLIENT_SECRET") 

1021 access_token = access_token or os.getenv("TWITCH_ACCESS_TOKEN") 

1022 bot_username = bot_username or os.getenv("TWITCH_BOT_USERNAME") 

1023 

1024 if channels is None: 

1025 channels_env = os.getenv("TWITCH_CHANNELS", "") 

1026 channels = [c.strip() for c in channels_env.split(",") if c.strip()] 

1027 

1028 if not access_token: 

1029 raise ValueError("Twitch access token required") 

1030 if not bot_username: 

1031 raise ValueError("Twitch bot username required") 

1032 

1033 config = TwitchConfig( 

1034 client_id=client_id or "", 

1035 client_secret=client_secret or "", 

1036 access_token=access_token, 

1037 bot_username=bot_username, 

1038 channels=channels, 

1039 **kwargs 

1040 ) 

1041 return TwitchAdapter(config)