Coverage for integrations / channels / extensions / discord_user_adapter.py: 32.0%

194 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Discord User Account Adapter 

3 

4Implements Discord user account (self-bot) integration. 

5Based on HevolveBot extension patterns. 

6 

7WARNING: Self-bots violate Discord's Terms of Service and may result in account termination. 

8This adapter is provided for educational purposes and internal/private use only. 

9 

10Features: 

11- User account authentication 

12- Access to all servers/DMs 

13- Full message history 

14- Docker-compatible 

15""" 

16 

17from __future__ import annotations 

18 

19import asyncio 

20import logging 

21import os 

22import json 

23try: 

24 import aiohttp 

25 HAS_AIOHTTP = True 

26except ImportError: 

27 HAS_AIOHTTP = False 

28from typing import Optional, List, Dict, Any, Callable 

29from datetime import datetime 

30from dataclasses import dataclass, field 

31 

32try: 

33 import websockets 

34 HAS_WEBSOCKETS = True 

35except ImportError: 

36 HAS_WEBSOCKETS = False 

37 

38from ..base import ( 

39 ChannelAdapter, 

40 ChannelConfig, 

41 ChannelStatus, 

42 Message, 

43 MessageType, 

44 SendResult, 

45 ChannelConnectionError, 

46 ChannelSendError, 

47) 

48 

49logger = logging.getLogger(__name__) 

50 

51 

52@dataclass 

53class DiscordUserConfig(ChannelConfig): 

54 """Discord user account configuration.""" 

55 user_token: str = "" # User account token (NOT bot token) 

56 receive_own_messages: bool = False 

57 gateway_url: str = "wss://gateway.discord.gg/?v=10&encoding=json" 

58 api_base: str = "https://discord.com/api/v10" 

59 heartbeat_interval: float = 41.25 

60 

61 @classmethod 

62 def from_env(cls) -> "DiscordUserConfig": 

63 """Create config from environment variables.""" 

64 return cls( 

65 user_token=os.getenv("DISCORD_USER_TOKEN", ""), 

66 ) 

67 

68 

69class DiscordUserAdapter(ChannelAdapter): 

70 """Discord user account adapter (self-bot).""" 

71 

72 channel_type = "discord_user" 

73 

74 @property 

75 def name(self) -> str: 

76 """Get adapter name.""" 

77 return self.channel_type 

78 

79 def __init__(self, config: DiscordUserConfig): 

80 if not HAS_WEBSOCKETS: 

81 raise ImportError("websockets is required for DiscordUserAdapter") 

82 

83 super().__init__(config) 

84 self.config: DiscordUserConfig = config 

85 self._session: Optional[aiohttp.ClientSession] = None 

86 self._ws: Optional[Any] = None 

87 self._ws_task: Optional[asyncio.Task] = None 

88 self._heartbeat_task: Optional[asyncio.Task] = None 

89 self._connected = False 

90 self._message_handlers: List[Callable] = [] 

91 self._sequence: Optional[int] = None 

92 self._session_id: Optional[str] = None 

93 self._user_id: Optional[str] = None 

94 self._guilds: Dict[str, Dict] = {} 

95 self._channels: Dict[str, Dict] = {} 

96 

97 def _get_headers(self) -> Dict[str, str]: 

98 """Get headers for API requests.""" 

99 return { 

100 "Authorization": self.config.user_token, 

101 "Content-Type": "application/json", 

102 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", 

103 } 

104 

105 async def connect(self) -> bool: 

106 """Connect to Discord as user.""" 

107 try: 

108 self._session = aiohttp.ClientSession() 

109 

110 # Verify token 

111 async with self._session.get( 

112 f"{self.config.api_base}/users/@me", 

113 headers=self._get_headers() 

114 ) as resp: 

115 if resp.status != 200: 

116 raise ChannelConnectionError("Invalid user token") 

117 

118 user_data = await resp.json() 

119 self._user_id = user_data["id"] 

120 logger.info(f"Authenticated as {user_data['username']}#{user_data['discriminator']}") 

121 

122 # Connect to gateway 

123 self._ws_task = asyncio.create_task(self._gateway_loop()) 

124 

125 # Wait for ready 

126 for _ in range(100): 

127 if self._session_id: 

128 break 

129 await asyncio.sleep(0.1) 

130 

131 if not self._session_id: 

132 raise ChannelConnectionError("Failed to establish gateway session") 

133 

134 self._connected = True 

135 self._status = ChannelStatus.CONNECTED 

136 return True 

137 

138 except Exception as e: 

139 logger.error(f"Failed to connect to Discord: {e}") 

140 self._status = ChannelStatus.ERROR 

141 raise ChannelConnectionError(str(e)) 

142 

143 async def disconnect(self) -> None: 

144 """Disconnect from Discord.""" 

145 self._connected = False 

146 

147 if self._heartbeat_task: 

148 self._heartbeat_task.cancel() 

149 try: 

150 await self._heartbeat_task 

151 except asyncio.CancelledError: 

152 pass 

153 

154 if self._ws_task: 

155 self._ws_task.cancel() 

156 try: 

157 await self._ws_task 

158 except asyncio.CancelledError: 

159 pass 

160 

161 if self._ws: 

162 await self._ws.close() 

163 self._ws = None 

164 

165 if self._session: 

166 await self._session.close() 

167 self._session = None 

168 

169 self._status = ChannelStatus.DISCONNECTED 

170 logger.info("Disconnected from Discord user account") 

171 

172 async def _gateway_loop(self) -> None: 

173 """Main gateway WebSocket loop.""" 

174 while self._connected or not self._session_id: 

175 try: 

176 async with websockets.connect(self.config.gateway_url) as ws: 

177 self._ws = ws 

178 

179 # Receive Hello 

180 hello = json.loads(await ws.recv()) 

181 if hello["op"] == 10: 

182 interval = hello["d"]["heartbeat_interval"] / 1000 

183 self._heartbeat_task = asyncio.create_task( 

184 self._heartbeat_loop(interval) 

185 ) 

186 

187 # Send Identify 

188 await self._send_identify() 

189 

190 # Message loop 

191 async for message in ws: 

192 await self._handle_gateway_message(json.loads(message)) 

193 

194 except websockets.exceptions.ConnectionClosed: 

195 if not self._connected: 

196 break 

197 logger.warning("Gateway disconnected, reconnecting...") 

198 await asyncio.sleep(5) 

199 

200 except Exception as e: 

201 logger.error(f"Gateway error: {e}") 

202 if not self._connected: 

203 break 

204 await asyncio.sleep(5) 

205 

206 async def _heartbeat_loop(self, interval: float) -> None: 

207 """Send heartbeats to keep connection alive.""" 

208 while True: 

209 try: 

210 await asyncio.sleep(interval) 

211 if self._ws: 

212 await self._ws.send(json.dumps({ 

213 "op": 1, 

214 "d": self._sequence 

215 })) 

216 except asyncio.CancelledError: 

217 break 

218 except Exception as e: 

219 logger.error(f"Heartbeat error: {e}") 

220 

221 async def _send_identify(self) -> None: 

222 """Send identify payload.""" 

223 identify = { 

224 "op": 2, 

225 "d": { 

226 "token": self.config.user_token, 

227 "properties": { 

228 "$os": "linux", 

229 "$browser": "chrome", 

230 "$device": "desktop" 

231 }, 

232 "presence": { 

233 "status": "online", 

234 "since": 0, 

235 "afk": False 

236 } 

237 } 

238 } 

239 await self._ws.send(json.dumps(identify)) 

240 

241 async def _handle_gateway_message(self, data: Dict[str, Any]) -> None: 

242 """Handle gateway message.""" 

243 op = data.get("op") 

244 event = data.get("t") 

245 payload = data.get("d") 

246 

247 if data.get("s"): 

248 self._sequence = data["s"] 

249 

250 if op == 0: # Dispatch 

251 if event == "READY": 

252 self._session_id = payload["session_id"] 

253 self._user_id = payload["user"]["id"] 

254 

255 # Cache guilds 

256 for guild in payload.get("guilds", []): 

257 self._guilds[guild["id"]] = guild 

258 

259 logger.info("Discord gateway ready") 

260 

261 elif event == "MESSAGE_CREATE": 

262 await self._handle_message(payload) 

263 

264 async def _handle_message(self, data: Dict[str, Any]) -> None: 

265 """Handle incoming message.""" 

266 try: 

267 # Skip own messages unless configured 

268 if data.get("author", {}).get("id") == self._user_id: 

269 if not self.config.receive_own_messages: 

270 return 

271 

272 message = self._parse_message(data) 

273 if message: 

274 for handler in self._message_handlers: 

275 asyncio.create_task(handler(message)) 

276 

277 except Exception as e: 

278 logger.error(f"Error handling message: {e}") 

279 

280 def _parse_message(self, data: Dict[str, Any]) -> Optional[Message]: 

281 """Parse Discord message to unified Message.""" 

282 try: 

283 author = data.get("author", {}) 

284 

285 # Determine message type 

286 msg_type = MessageType.TEXT 

287 if data.get("attachments"): 

288 attachment = data["attachments"][0] 

289 content_type = attachment.get("content_type", "") 

290 if "image" in content_type: 

291 msg_type = MessageType.IMAGE 

292 elif "video" in content_type: 

293 msg_type = MessageType.VIDEO 

294 elif "audio" in content_type: 

295 msg_type = MessageType.AUDIO 

296 else: 

297 msg_type = MessageType.FILE 

298 

299 return Message( 

300 id=data.get("id", ""), 

301 channel=self.channel_type, 

302 chat_id=data.get("channel_id", ""), 

303 sender_id=author.get("id", ""), 

304 sender_name=author.get("username", ""), 

305 text=data.get("content", ""), 

306 timestamp=datetime.fromisoformat( 

307 data.get("timestamp", datetime.now().isoformat()).replace("Z", "+00:00") 

308 ), 

309 message_type=msg_type, 

310 reply_to=data.get("referenced_message", {}).get("id") if data.get("referenced_message") else None, 

311 metadata={ 

312 "guild_id": data.get("guild_id"), 

313 "discriminator": author.get("discriminator"), 

314 } 

315 ) 

316 except Exception as e: 

317 logger.error(f"Error parsing message: {e}") 

318 return None 

319 

320 def on_message(self, handler: Callable) -> None: 

321 """Register message handler.""" 

322 self._message_handlers.append(handler) 

323 

324 async def send_message( 

325 self, 

326 chat_id: str, 

327 text: str, 

328 reply_to: Optional[str] = None, 

329 **kwargs 

330 ) -> SendResult: 

331 """Send a message.""" 

332 url = f"{self.config.api_base}/channels/{chat_id}/messages" 

333 

334 payload: Dict[str, Any] = {"content": text} 

335 

336 if reply_to: 

337 payload["message_reference"] = {"message_id": reply_to} 

338 

339 try: 

340 async with self._session.post( 

341 url, 

342 json=payload, 

343 headers=self._get_headers() 

344 ) as resp: 

345 if resp.status not in (200, 201): 

346 error = await resp.text() 

347 raise ChannelSendError(f"Failed to send: {error}") 

348 

349 data = await resp.json() 

350 return SendResult( 

351 success=True, 

352 message_id=data["id"], 

353 timestamp=datetime.now() 

354 ) 

355 

356 except Exception as e: 

357 logger.error(f"Failed to send message: {e}") 

358 raise ChannelSendError(str(e)) 

359 

360 async def edit_message( 

361 self, 

362 chat_id: str, 

363 message_id: str, 

364 text: str, 

365 **kwargs 

366 ) -> bool: 

367 """Edit a message.""" 

368 url = f"{self.config.api_base}/channels/{chat_id}/messages/{message_id}" 

369 

370 try: 

371 async with self._session.patch( 

372 url, 

373 json={"content": text}, 

374 headers=self._get_headers() 

375 ) as resp: 

376 return resp.status == 200 

377 except Exception as e: 

378 logger.error(f"Failed to edit message: {e}") 

379 return False 

380 

381 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool: 

382 """Delete a message.""" 

383 url = f"{self.config.api_base}/channels/{chat_id}/messages/{message_id}" 

384 

385 try: 

386 async with self._session.delete(url, headers=self._get_headers()) as resp: 

387 return resp.status == 204 

388 except Exception as e: 

389 logger.error(f"Failed to delete message: {e}") 

390 return False 

391 

392 async def send_typing(self, chat_id: str, **kwargs) -> None: 

393 """Send typing indicator.""" 

394 url = f"{self.config.api_base}/channels/{chat_id}/typing" 

395 try: 

396 await self._session.post(url, headers=self._get_headers()) 

397 except Exception as e: 

398 logger.debug(f"Failed to send typing: {e}") 

399 

400 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

401 """Get channel information.""" 

402 url = f"{self.config.api_base}/channels/{chat_id}" 

403 

404 try: 

405 async with self._session.get(url, headers=self._get_headers()) as resp: 

406 if resp.status != 200: 

407 return None 

408 

409 data = await resp.json() 

410 return { 

411 "id": data["id"], 

412 "name": data.get("name", "DM"), 

413 "type": data.get("type"), 

414 "guild_id": data.get("guild_id"), 

415 } 

416 except Exception as e: 

417 logger.error(f"Failed to get channel info: {e}") 

418 return None 

419 

420 

421def create_discord_user_adapter( 

422 user_token: Optional[str] = None, 

423 **kwargs 

424) -> DiscordUserAdapter: 

425 """Factory function to create a Discord user adapter.""" 

426 config = DiscordUserConfig( 

427 user_token=user_token or os.getenv("DISCORD_USER_TOKEN", ""), 

428 **kwargs 

429 ) 

430 return DiscordUserAdapter(config)