Coverage for integrations / channels / extensions / telegram_user_adapter.py: 37.5%

136 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Telegram User Account Adapter 

3 

4Implements Telegram user account (not bot) integration using Telethon. 

5Based on HevolveBot extension patterns. 

6 

7This allows using a regular Telegram user account as a messaging channel, 

8which can access groups/channels that bots cannot. 

9 

10Features: 

11- User account authentication 

12- Access to all groups/channels 

13- Send as user (not bot) 

14- Full message history access 

15- Docker-compatible 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import logging 

22import os 

23from typing import Optional, List, Dict, Any, Callable 

24from datetime import datetime 

25from dataclasses import dataclass, field 

26 

27try: 

28 from telethon import TelegramClient, events 

29 from telethon.sessions import StringSession 

30 from telethon.tl.types import User, Chat, Channel 

31 HAS_TELETHON = True 

32except ImportError: 

33 HAS_TELETHON = False 

34 

35from ..base import ( 

36 ChannelAdapter, 

37 ChannelConfig, 

38 ChannelStatus, 

39 Message, 

40 MessageType, 

41 MediaAttachment, 

42 SendResult, 

43 ChannelConnectionError, 

44 ChannelSendError, 

45) 

46 

47logger = logging.getLogger(__name__) 

48 

49 

50@dataclass 

51class TelegramUserConfig(ChannelConfig): 

52 """Telegram user account configuration.""" 

53 api_id: int = 0 

54 api_hash: str = "" 

55 session_string: str = "" # Telethon session string 

56 phone_number: str = "" # For initial auth 

57 proxy: Optional[Dict[str, Any]] = None 

58 receive_own_messages: bool = False 

59 

60 @classmethod 

61 def from_env(cls) -> "TelegramUserConfig": 

62 """Create config from environment variables.""" 

63 return cls( 

64 api_id=int(os.getenv("TELEGRAM_API_ID", "0")), 

65 api_hash=os.getenv("TELEGRAM_API_HASH", ""), 

66 session_string=os.getenv("TELEGRAM_SESSION", ""), 

67 phone_number=os.getenv("TELEGRAM_PHONE", ""), 

68 ) 

69 

70 

71class TelegramUserAdapter(ChannelAdapter): 

72 """Telegram user account adapter using Telethon.""" 

73 

74 channel_type = "telegram_user" 

75 

76 @property 

77 def name(self) -> str: 

78 """Get adapter name.""" 

79 return self.channel_type 

80 

81 def __init__(self, config: TelegramUserConfig): 

82 if not HAS_TELETHON: 

83 raise ImportError("telethon is required for TelegramUserAdapter") 

84 

85 super().__init__(config) 

86 self.config: TelegramUserConfig = config 

87 self._client: Optional[TelegramClient] = None 

88 self._connected = False 

89 self._message_handlers: List[Callable] = [] 

90 self._me: Optional[User] = None 

91 

92 async def connect(self) -> bool: 

93 """Connect to Telegram as user.""" 

94 try: 

95 # Create client with session string 

96 session = StringSession(self.config.session_string) if self.config.session_string else StringSession() 

97 

98 self._client = TelegramClient( 

99 session, 

100 self.config.api_id, 

101 self.config.api_hash, 

102 proxy=self.config.proxy 

103 ) 

104 

105 await self._client.start(phone=self.config.phone_number) 

106 

107 # Get current user info 

108 self._me = await self._client.get_me() 

109 

110 # Register message handler 

111 @self._client.on(events.NewMessage) 

112 async def handler(event): 

113 await self._handle_message(event) 

114 

115 self._connected = True 

116 self._status = ChannelStatus.CONNECTED 

117 logger.info(f"Connected to Telegram as {self._me.username or self._me.first_name}") 

118 

119 # Save session string for future use 

120 if not self.config.session_string: 

121 new_session = self._client.session.save() 

122 logger.info(f"Session string (save this): {new_session}") 

123 

124 return True 

125 

126 except Exception as e: 

127 logger.error(f"Failed to connect to Telegram: {e}") 

128 self._status = ChannelStatus.ERROR 

129 raise ChannelConnectionError(str(e)) 

130 

131 async def disconnect(self) -> None: 

132 """Disconnect from Telegram.""" 

133 self._connected = False 

134 

135 if self._client: 

136 await self._client.disconnect() 

137 self._client = None 

138 

139 self._status = ChannelStatus.DISCONNECTED 

140 logger.info("Disconnected from Telegram user account") 

141 

142 async def _handle_message(self, event) -> None: 

143 """Handle incoming message event.""" 

144 try: 

145 # Skip own messages unless configured otherwise 

146 if event.out and not self.config.receive_own_messages: 

147 return 

148 

149 message = await self._parse_message(event) 

150 if message: 

151 for handler in self._message_handlers: 

152 asyncio.create_task(handler(message)) 

153 

154 except Exception as e: 

155 logger.error(f"Error handling message: {e}") 

156 

157 async def _parse_message(self, event) -> Optional[Message]: 

158 """Parse Telethon event to unified Message.""" 

159 try: 

160 sender = await event.get_sender() 

161 chat = await event.get_chat() 

162 

163 sender_name = "" 

164 if sender: 

165 if hasattr(sender, 'username') and sender.username: 

166 sender_name = sender.username 

167 elif hasattr(sender, 'first_name'): 

168 sender_name = sender.first_name or "" 

169 

170 chat_id = str(event.chat_id) 

171 

172 # Determine message type 

173 msg_type = MessageType.TEXT 

174 attachments = [] 

175 

176 if event.photo: 

177 msg_type = MessageType.IMAGE 

178 elif event.video: 

179 msg_type = MessageType.VIDEO 

180 elif event.voice or event.audio: 

181 msg_type = MessageType.AUDIO 

182 elif event.document: 

183 msg_type = MessageType.FILE 

184 

185 return Message( 

186 id=str(event.id), 

187 channel=self.channel_type, 

188 chat_id=chat_id, 

189 sender_id=str(sender.id) if sender else "", 

190 sender_name=sender_name, 

191 text=event.text or "", 

192 timestamp=event.date or datetime.now(), 

193 message_type=msg_type, 

194 reply_to=str(event.reply_to_msg_id) if event.reply_to_msg_id else None, 

195 attachments=attachments, 

196 metadata={ 

197 "chat_type": type(chat).__name__.lower(), 

198 "out": event.out, 

199 } 

200 ) 

201 except Exception as e: 

202 logger.error(f"Error parsing message: {e}") 

203 return None 

204 

205 def on_message(self, handler: Callable) -> None: 

206 """Register message handler.""" 

207 self._message_handlers.append(handler) 

208 

209 async def send_message( 

210 self, 

211 chat_id: str, 

212 text: str, 

213 reply_to: Optional[str] = None, 

214 **kwargs 

215 ) -> SendResult: 

216 """Send a message.""" 

217 try: 

218 entity = await self._client.get_entity(int(chat_id)) 

219 

220 result = await self._client.send_message( 

221 entity, 

222 text, 

223 reply_to=int(reply_to) if reply_to else None, 

224 parse_mode=kwargs.get("parse_mode", "markdown"), 

225 ) 

226 

227 return SendResult( 

228 success=True, 

229 message_id=str(result.id), 

230 timestamp=result.date or datetime.now() 

231 ) 

232 

233 except Exception as e: 

234 logger.error(f"Failed to send message: {e}") 

235 raise ChannelSendError(str(e)) 

236 

237 async def edit_message( 

238 self, 

239 chat_id: str, 

240 message_id: str, 

241 text: str, 

242 **kwargs 

243 ) -> bool: 

244 """Edit a message.""" 

245 try: 

246 entity = await self._client.get_entity(int(chat_id)) 

247 await self._client.edit_message( 

248 entity, 

249 int(message_id), 

250 text 

251 ) 

252 return True 

253 except Exception as e: 

254 logger.error(f"Failed to edit message: {e}") 

255 return False 

256 

257 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool: 

258 """Delete a message.""" 

259 try: 

260 entity = await self._client.get_entity(int(chat_id)) 

261 await self._client.delete_messages(entity, [int(message_id)]) 

262 return True 

263 except Exception as e: 

264 logger.error(f"Failed to delete message: {e}") 

265 return False 

266 

267 async def send_typing(self, chat_id: str, **kwargs) -> None: 

268 """Send typing action.""" 

269 try: 

270 entity = await self._client.get_entity(int(chat_id)) 

271 await self._client.send_typing(entity) 

272 except Exception as e: 

273 logger.debug(f"Failed to send typing: {e}") 

274 

275 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

276 """Get chat information.""" 

277 try: 

278 entity = await self._client.get_entity(int(chat_id)) 

279 

280 info = { 

281 "id": str(entity.id), 

282 "type": type(entity).__name__.lower(), 

283 } 

284 

285 if hasattr(entity, 'title'): 

286 info["title"] = entity.title 

287 if hasattr(entity, 'username'): 

288 info["username"] = entity.username 

289 if hasattr(entity, 'first_name'): 

290 info["name"] = f"{entity.first_name or ''} {entity.last_name or ''}".strip() 

291 

292 return info 

293 except Exception as e: 

294 logger.error(f"Failed to get chat info: {e}") 

295 return None 

296 

297 async def send_file( 

298 self, 

299 chat_id: str, 

300 file_path: str, 

301 caption: Optional[str] = None, 

302 **kwargs 

303 ) -> Optional[str]: 

304 """Send a file.""" 

305 try: 

306 entity = await self._client.get_entity(int(chat_id)) 

307 result = await self._client.send_file( 

308 entity, 

309 file_path, 

310 caption=caption, 

311 ) 

312 return str(result.id) 

313 except Exception as e: 

314 logger.error(f"Failed to send file: {e}") 

315 return None 

316 

317 def get_session_string(self) -> str: 

318 """Get current session string for persistence.""" 

319 if self._client: 

320 return self._client.session.save() 

321 return "" 

322 

323 

324def create_telegram_user_adapter( 

325 api_id: Optional[int] = None, 

326 api_hash: Optional[str] = None, 

327 session_string: Optional[str] = None, 

328 **kwargs 

329) -> TelegramUserAdapter: 

330 """Factory function to create a Telegram user adapter.""" 

331 config = TelegramUserConfig( 

332 api_id=api_id or int(os.getenv("TELEGRAM_API_ID", "0")), 

333 api_hash=api_hash or os.getenv("TELEGRAM_API_HASH", ""), 

334 session_string=session_string or os.getenv("TELEGRAM_SESSION", ""), 

335 **kwargs 

336 ) 

337 return TelegramUserAdapter(config)