Coverage for integrations / channels / extensions / discord_user_adapter.py: 32.0%
194 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Discord User Account Adapter
4Implements Discord user account (self-bot) integration.
5Based on HevolveBot extension patterns.
7WARNING: Self-bots violate Discord's Terms of Service and may result in account termination.
8This adapter is provided for educational purposes and internal/private use only.
10Features:
11- User account authentication
12- Access to all servers/DMs
13- Full message history
14- Docker-compatible
15"""
17from __future__ import annotations
19import asyncio
20import logging
21import os
22import json
23try:
24 import aiohttp
25 HAS_AIOHTTP = True
26except ImportError:
27 HAS_AIOHTTP = False
28from typing import Optional, List, Dict, Any, Callable
29from datetime import datetime
30from dataclasses import dataclass, field
32try:
33 import websockets
34 HAS_WEBSOCKETS = True
35except ImportError:
36 HAS_WEBSOCKETS = False
38from ..base import (
39 ChannelAdapter,
40 ChannelConfig,
41 ChannelStatus,
42 Message,
43 MessageType,
44 SendResult,
45 ChannelConnectionError,
46 ChannelSendError,
47)
49logger = logging.getLogger(__name__)
52@dataclass
53class DiscordUserConfig(ChannelConfig):
54 """Discord user account configuration."""
55 user_token: str = "" # User account token (NOT bot token)
56 receive_own_messages: bool = False
57 gateway_url: str = "wss://gateway.discord.gg/?v=10&encoding=json"
58 api_base: str = "https://discord.com/api/v10"
59 heartbeat_interval: float = 41.25
61 @classmethod
62 def from_env(cls) -> "DiscordUserConfig":
63 """Create config from environment variables."""
64 return cls(
65 user_token=os.getenv("DISCORD_USER_TOKEN", ""),
66 )
69class DiscordUserAdapter(ChannelAdapter):
70 """Discord user account adapter (self-bot)."""
72 channel_type = "discord_user"
74 @property
75 def name(self) -> str:
76 """Get adapter name."""
77 return self.channel_type
79 def __init__(self, config: DiscordUserConfig):
80 if not HAS_WEBSOCKETS:
81 raise ImportError("websockets is required for DiscordUserAdapter")
83 super().__init__(config)
84 self.config: DiscordUserConfig = config
85 self._session: Optional[aiohttp.ClientSession] = None
86 self._ws: Optional[Any] = None
87 self._ws_task: Optional[asyncio.Task] = None
88 self._heartbeat_task: Optional[asyncio.Task] = None
89 self._connected = False
90 self._message_handlers: List[Callable] = []
91 self._sequence: Optional[int] = None
92 self._session_id: Optional[str] = None
93 self._user_id: Optional[str] = None
94 self._guilds: Dict[str, Dict] = {}
95 self._channels: Dict[str, Dict] = {}
97 def _get_headers(self) -> Dict[str, str]:
98 """Get headers for API requests."""
99 return {
100 "Authorization": self.config.user_token,
101 "Content-Type": "application/json",
102 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
103 }
105 async def connect(self) -> bool:
106 """Connect to Discord as user."""
107 try:
108 self._session = aiohttp.ClientSession()
110 # Verify token
111 async with self._session.get(
112 f"{self.config.api_base}/users/@me",
113 headers=self._get_headers()
114 ) as resp:
115 if resp.status != 200:
116 raise ChannelConnectionError("Invalid user token")
118 user_data = await resp.json()
119 self._user_id = user_data["id"]
120 logger.info(f"Authenticated as {user_data['username']}#{user_data['discriminator']}")
122 # Connect to gateway
123 self._ws_task = asyncio.create_task(self._gateway_loop())
125 # Wait for ready
126 for _ in range(100):
127 if self._session_id:
128 break
129 await asyncio.sleep(0.1)
131 if not self._session_id:
132 raise ChannelConnectionError("Failed to establish gateway session")
134 self._connected = True
135 self._status = ChannelStatus.CONNECTED
136 return True
138 except Exception as e:
139 logger.error(f"Failed to connect to Discord: {e}")
140 self._status = ChannelStatus.ERROR
141 raise ChannelConnectionError(str(e))
143 async def disconnect(self) -> None:
144 """Disconnect from Discord."""
145 self._connected = False
147 if self._heartbeat_task:
148 self._heartbeat_task.cancel()
149 try:
150 await self._heartbeat_task
151 except asyncio.CancelledError:
152 pass
154 if self._ws_task:
155 self._ws_task.cancel()
156 try:
157 await self._ws_task
158 except asyncio.CancelledError:
159 pass
161 if self._ws:
162 await self._ws.close()
163 self._ws = None
165 if self._session:
166 await self._session.close()
167 self._session = None
169 self._status = ChannelStatus.DISCONNECTED
170 logger.info("Disconnected from Discord user account")
172 async def _gateway_loop(self) -> None:
173 """Main gateway WebSocket loop."""
174 while self._connected or not self._session_id:
175 try:
176 async with websockets.connect(self.config.gateway_url) as ws:
177 self._ws = ws
179 # Receive Hello
180 hello = json.loads(await ws.recv())
181 if hello["op"] == 10:
182 interval = hello["d"]["heartbeat_interval"] / 1000
183 self._heartbeat_task = asyncio.create_task(
184 self._heartbeat_loop(interval)
185 )
187 # Send Identify
188 await self._send_identify()
190 # Message loop
191 async for message in ws:
192 await self._handle_gateway_message(json.loads(message))
194 except websockets.exceptions.ConnectionClosed:
195 if not self._connected:
196 break
197 logger.warning("Gateway disconnected, reconnecting...")
198 await asyncio.sleep(5)
200 except Exception as e:
201 logger.error(f"Gateway error: {e}")
202 if not self._connected:
203 break
204 await asyncio.sleep(5)
206 async def _heartbeat_loop(self, interval: float) -> None:
207 """Send heartbeats to keep connection alive."""
208 while True:
209 try:
210 await asyncio.sleep(interval)
211 if self._ws:
212 await self._ws.send(json.dumps({
213 "op": 1,
214 "d": self._sequence
215 }))
216 except asyncio.CancelledError:
217 break
218 except Exception as e:
219 logger.error(f"Heartbeat error: {e}")
221 async def _send_identify(self) -> None:
222 """Send identify payload."""
223 identify = {
224 "op": 2,
225 "d": {
226 "token": self.config.user_token,
227 "properties": {
228 "$os": "linux",
229 "$browser": "chrome",
230 "$device": "desktop"
231 },
232 "presence": {
233 "status": "online",
234 "since": 0,
235 "afk": False
236 }
237 }
238 }
239 await self._ws.send(json.dumps(identify))
241 async def _handle_gateway_message(self, data: Dict[str, Any]) -> None:
242 """Handle gateway message."""
243 op = data.get("op")
244 event = data.get("t")
245 payload = data.get("d")
247 if data.get("s"):
248 self._sequence = data["s"]
250 if op == 0: # Dispatch
251 if event == "READY":
252 self._session_id = payload["session_id"]
253 self._user_id = payload["user"]["id"]
255 # Cache guilds
256 for guild in payload.get("guilds", []):
257 self._guilds[guild["id"]] = guild
259 logger.info("Discord gateway ready")
261 elif event == "MESSAGE_CREATE":
262 await self._handle_message(payload)
264 async def _handle_message(self, data: Dict[str, Any]) -> None:
265 """Handle incoming message."""
266 try:
267 # Skip own messages unless configured
268 if data.get("author", {}).get("id") == self._user_id:
269 if not self.config.receive_own_messages:
270 return
272 message = self._parse_message(data)
273 if message:
274 for handler in self._message_handlers:
275 asyncio.create_task(handler(message))
277 except Exception as e:
278 logger.error(f"Error handling message: {e}")
280 def _parse_message(self, data: Dict[str, Any]) -> Optional[Message]:
281 """Parse Discord message to unified Message."""
282 try:
283 author = data.get("author", {})
285 # Determine message type
286 msg_type = MessageType.TEXT
287 if data.get("attachments"):
288 attachment = data["attachments"][0]
289 content_type = attachment.get("content_type", "")
290 if "image" in content_type:
291 msg_type = MessageType.IMAGE
292 elif "video" in content_type:
293 msg_type = MessageType.VIDEO
294 elif "audio" in content_type:
295 msg_type = MessageType.AUDIO
296 else:
297 msg_type = MessageType.FILE
299 return Message(
300 id=data.get("id", ""),
301 channel=self.channel_type,
302 chat_id=data.get("channel_id", ""),
303 sender_id=author.get("id", ""),
304 sender_name=author.get("username", ""),
305 text=data.get("content", ""),
306 timestamp=datetime.fromisoformat(
307 data.get("timestamp", datetime.now().isoformat()).replace("Z", "+00:00")
308 ),
309 message_type=msg_type,
310 reply_to=data.get("referenced_message", {}).get("id") if data.get("referenced_message") else None,
311 metadata={
312 "guild_id": data.get("guild_id"),
313 "discriminator": author.get("discriminator"),
314 }
315 )
316 except Exception as e:
317 logger.error(f"Error parsing message: {e}")
318 return None
320 def on_message(self, handler: Callable) -> None:
321 """Register message handler."""
322 self._message_handlers.append(handler)
324 async def send_message(
325 self,
326 chat_id: str,
327 text: str,
328 reply_to: Optional[str] = None,
329 **kwargs
330 ) -> SendResult:
331 """Send a message."""
332 url = f"{self.config.api_base}/channels/{chat_id}/messages"
334 payload: Dict[str, Any] = {"content": text}
336 if reply_to:
337 payload["message_reference"] = {"message_id": reply_to}
339 try:
340 async with self._session.post(
341 url,
342 json=payload,
343 headers=self._get_headers()
344 ) as resp:
345 if resp.status not in (200, 201):
346 error = await resp.text()
347 raise ChannelSendError(f"Failed to send: {error}")
349 data = await resp.json()
350 return SendResult(
351 success=True,
352 message_id=data["id"],
353 timestamp=datetime.now()
354 )
356 except Exception as e:
357 logger.error(f"Failed to send message: {e}")
358 raise ChannelSendError(str(e))
360 async def edit_message(
361 self,
362 chat_id: str,
363 message_id: str,
364 text: str,
365 **kwargs
366 ) -> bool:
367 """Edit a message."""
368 url = f"{self.config.api_base}/channels/{chat_id}/messages/{message_id}"
370 try:
371 async with self._session.patch(
372 url,
373 json={"content": text},
374 headers=self._get_headers()
375 ) as resp:
376 return resp.status == 200
377 except Exception as e:
378 logger.error(f"Failed to edit message: {e}")
379 return False
381 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool:
382 """Delete a message."""
383 url = f"{self.config.api_base}/channels/{chat_id}/messages/{message_id}"
385 try:
386 async with self._session.delete(url, headers=self._get_headers()) as resp:
387 return resp.status == 204
388 except Exception as e:
389 logger.error(f"Failed to delete message: {e}")
390 return False
392 async def send_typing(self, chat_id: str, **kwargs) -> None:
393 """Send typing indicator."""
394 url = f"{self.config.api_base}/channels/{chat_id}/typing"
395 try:
396 await self._session.post(url, headers=self._get_headers())
397 except Exception as e:
398 logger.debug(f"Failed to send typing: {e}")
400 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
401 """Get channel information."""
402 url = f"{self.config.api_base}/channels/{chat_id}"
404 try:
405 async with self._session.get(url, headers=self._get_headers()) as resp:
406 if resp.status != 200:
407 return None
409 data = await resp.json()
410 return {
411 "id": data["id"],
412 "name": data.get("name", "DM"),
413 "type": data.get("type"),
414 "guild_id": data.get("guild_id"),
415 }
416 except Exception as e:
417 logger.error(f"Failed to get channel info: {e}")
418 return None
421def create_discord_user_adapter(
422 user_token: Optional[str] = None,
423 **kwargs
424) -> DiscordUserAdapter:
425 """Factory function to create a Discord user adapter."""
426 config = DiscordUserConfig(
427 user_token=user_token or os.getenv("DISCORD_USER_TOKEN", ""),
428 **kwargs
429 )
430 return DiscordUserAdapter(config)