Coverage for integrations / channels / extensions / telegram_user_adapter.py: 37.5%
136 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Telegram User Account Adapter
4Implements Telegram user account (not bot) integration using Telethon.
5Based on HevolveBot extension patterns.
7This allows using a regular Telegram user account as a messaging channel,
8which can access groups/channels that bots cannot.
10Features:
11- User account authentication
12- Access to all groups/channels
13- Send as user (not bot)
14- Full message history access
15- Docker-compatible
16"""
18from __future__ import annotations
20import asyncio
21import logging
22import os
23from typing import Optional, List, Dict, Any, Callable
24from datetime import datetime
25from dataclasses import dataclass, field
27try:
28 from telethon import TelegramClient, events
29 from telethon.sessions import StringSession
30 from telethon.tl.types import User, Chat, Channel
31 HAS_TELETHON = True
32except ImportError:
33 HAS_TELETHON = False
35from ..base import (
36 ChannelAdapter,
37 ChannelConfig,
38 ChannelStatus,
39 Message,
40 MessageType,
41 MediaAttachment,
42 SendResult,
43 ChannelConnectionError,
44 ChannelSendError,
45)
47logger = logging.getLogger(__name__)
50@dataclass
51class TelegramUserConfig(ChannelConfig):
52 """Telegram user account configuration."""
53 api_id: int = 0
54 api_hash: str = ""
55 session_string: str = "" # Telethon session string
56 phone_number: str = "" # For initial auth
57 proxy: Optional[Dict[str, Any]] = None
58 receive_own_messages: bool = False
60 @classmethod
61 def from_env(cls) -> "TelegramUserConfig":
62 """Create config from environment variables."""
63 return cls(
64 api_id=int(os.getenv("TELEGRAM_API_ID", "0")),
65 api_hash=os.getenv("TELEGRAM_API_HASH", ""),
66 session_string=os.getenv("TELEGRAM_SESSION", ""),
67 phone_number=os.getenv("TELEGRAM_PHONE", ""),
68 )
71class TelegramUserAdapter(ChannelAdapter):
72 """Telegram user account adapter using Telethon."""
74 channel_type = "telegram_user"
76 @property
77 def name(self) -> str:
78 """Get adapter name."""
79 return self.channel_type
81 def __init__(self, config: TelegramUserConfig):
82 if not HAS_TELETHON:
83 raise ImportError("telethon is required for TelegramUserAdapter")
85 super().__init__(config)
86 self.config: TelegramUserConfig = config
87 self._client: Optional[TelegramClient] = None
88 self._connected = False
89 self._message_handlers: List[Callable] = []
90 self._me: Optional[User] = None
92 async def connect(self) -> bool:
93 """Connect to Telegram as user."""
94 try:
95 # Create client with session string
96 session = StringSession(self.config.session_string) if self.config.session_string else StringSession()
98 self._client = TelegramClient(
99 session,
100 self.config.api_id,
101 self.config.api_hash,
102 proxy=self.config.proxy
103 )
105 await self._client.start(phone=self.config.phone_number)
107 # Get current user info
108 self._me = await self._client.get_me()
110 # Register message handler
111 @self._client.on(events.NewMessage)
112 async def handler(event):
113 await self._handle_message(event)
115 self._connected = True
116 self._status = ChannelStatus.CONNECTED
117 logger.info(f"Connected to Telegram as {self._me.username or self._me.first_name}")
119 # Save session string for future use
120 if not self.config.session_string:
121 new_session = self._client.session.save()
122 logger.info(f"Session string (save this): {new_session}")
124 return True
126 except Exception as e:
127 logger.error(f"Failed to connect to Telegram: {e}")
128 self._status = ChannelStatus.ERROR
129 raise ChannelConnectionError(str(e))
131 async def disconnect(self) -> None:
132 """Disconnect from Telegram."""
133 self._connected = False
135 if self._client:
136 await self._client.disconnect()
137 self._client = None
139 self._status = ChannelStatus.DISCONNECTED
140 logger.info("Disconnected from Telegram user account")
142 async def _handle_message(self, event) -> None:
143 """Handle incoming message event."""
144 try:
145 # Skip own messages unless configured otherwise
146 if event.out and not self.config.receive_own_messages:
147 return
149 message = await self._parse_message(event)
150 if message:
151 for handler in self._message_handlers:
152 asyncio.create_task(handler(message))
154 except Exception as e:
155 logger.error(f"Error handling message: {e}")
157 async def _parse_message(self, event) -> Optional[Message]:
158 """Parse Telethon event to unified Message."""
159 try:
160 sender = await event.get_sender()
161 chat = await event.get_chat()
163 sender_name = ""
164 if sender:
165 if hasattr(sender, 'username') and sender.username:
166 sender_name = sender.username
167 elif hasattr(sender, 'first_name'):
168 sender_name = sender.first_name or ""
170 chat_id = str(event.chat_id)
172 # Determine message type
173 msg_type = MessageType.TEXT
174 attachments = []
176 if event.photo:
177 msg_type = MessageType.IMAGE
178 elif event.video:
179 msg_type = MessageType.VIDEO
180 elif event.voice or event.audio:
181 msg_type = MessageType.AUDIO
182 elif event.document:
183 msg_type = MessageType.FILE
185 return Message(
186 id=str(event.id),
187 channel=self.channel_type,
188 chat_id=chat_id,
189 sender_id=str(sender.id) if sender else "",
190 sender_name=sender_name,
191 text=event.text or "",
192 timestamp=event.date or datetime.now(),
193 message_type=msg_type,
194 reply_to=str(event.reply_to_msg_id) if event.reply_to_msg_id else None,
195 attachments=attachments,
196 metadata={
197 "chat_type": type(chat).__name__.lower(),
198 "out": event.out,
199 }
200 )
201 except Exception as e:
202 logger.error(f"Error parsing message: {e}")
203 return None
205 def on_message(self, handler: Callable) -> None:
206 """Register message handler."""
207 self._message_handlers.append(handler)
209 async def send_message(
210 self,
211 chat_id: str,
212 text: str,
213 reply_to: Optional[str] = None,
214 **kwargs
215 ) -> SendResult:
216 """Send a message."""
217 try:
218 entity = await self._client.get_entity(int(chat_id))
220 result = await self._client.send_message(
221 entity,
222 text,
223 reply_to=int(reply_to) if reply_to else None,
224 parse_mode=kwargs.get("parse_mode", "markdown"),
225 )
227 return SendResult(
228 success=True,
229 message_id=str(result.id),
230 timestamp=result.date or datetime.now()
231 )
233 except Exception as e:
234 logger.error(f"Failed to send message: {e}")
235 raise ChannelSendError(str(e))
237 async def edit_message(
238 self,
239 chat_id: str,
240 message_id: str,
241 text: str,
242 **kwargs
243 ) -> bool:
244 """Edit a message."""
245 try:
246 entity = await self._client.get_entity(int(chat_id))
247 await self._client.edit_message(
248 entity,
249 int(message_id),
250 text
251 )
252 return True
253 except Exception as e:
254 logger.error(f"Failed to edit message: {e}")
255 return False
257 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool:
258 """Delete a message."""
259 try:
260 entity = await self._client.get_entity(int(chat_id))
261 await self._client.delete_messages(entity, [int(message_id)])
262 return True
263 except Exception as e:
264 logger.error(f"Failed to delete message: {e}")
265 return False
267 async def send_typing(self, chat_id: str, **kwargs) -> None:
268 """Send typing action."""
269 try:
270 entity = await self._client.get_entity(int(chat_id))
271 await self._client.send_typing(entity)
272 except Exception as e:
273 logger.debug(f"Failed to send typing: {e}")
275 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
276 """Get chat information."""
277 try:
278 entity = await self._client.get_entity(int(chat_id))
280 info = {
281 "id": str(entity.id),
282 "type": type(entity).__name__.lower(),
283 }
285 if hasattr(entity, 'title'):
286 info["title"] = entity.title
287 if hasattr(entity, 'username'):
288 info["username"] = entity.username
289 if hasattr(entity, 'first_name'):
290 info["name"] = f"{entity.first_name or ''} {entity.last_name or ''}".strip()
292 return info
293 except Exception as e:
294 logger.error(f"Failed to get chat info: {e}")
295 return None
297 async def send_file(
298 self,
299 chat_id: str,
300 file_path: str,
301 caption: Optional[str] = None,
302 **kwargs
303 ) -> Optional[str]:
304 """Send a file."""
305 try:
306 entity = await self._client.get_entity(int(chat_id))
307 result = await self._client.send_file(
308 entity,
309 file_path,
310 caption=caption,
311 )
312 return str(result.id)
313 except Exception as e:
314 logger.error(f"Failed to send file: {e}")
315 return None
317 def get_session_string(self) -> str:
318 """Get current session string for persistence."""
319 if self._client:
320 return self._client.session.save()
321 return ""
324def create_telegram_user_adapter(
325 api_id: Optional[int] = None,
326 api_hash: Optional[str] = None,
327 session_string: Optional[str] = None,
328 **kwargs
329) -> TelegramUserAdapter:
330 """Factory function to create a Telegram user adapter."""
331 config = TelegramUserConfig(
332 api_id=api_id or int(os.getenv("TELEGRAM_API_ID", "0")),
333 api_hash=api_hash or os.getenv("TELEGRAM_API_HASH", ""),
334 session_string=session_string or os.getenv("TELEGRAM_SESSION", ""),
335 **kwargs
336 )
337 return TelegramUserAdapter(config)