Coverage for integrations / channels / telegram_adapter.py: 36.9%

241 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Telegram Channel Adapter 

3 

4Implements Telegram messaging using python-telegram-bot library. 

5Supports both polling and webhook modes. 

6 

7Features: 

8- Text messages 

9- Media (images, videos, documents, voice) 

10- Inline keyboards 

11- Reply threading 

12- Group/DM detection 

13- Bot mention detection 

14""" 

15 

16from __future__ import annotations 

17 

18import asyncio 

19import logging 

20import os 

21from typing import Optional, List, Dict, Any 

22from datetime import datetime 

23 

24try: 

25 from telegram import ( 

26 Update, 

27 Bot, 

28 InlineKeyboardButton, 

29 InlineKeyboardMarkup, 

30 InputMediaPhoto, 

31 InputMediaVideo, 

32 InputMediaDocument, 

33 InputMediaAudio, 

34 ) 

35 from telegram.ext import ( 

36 Application, 

37 ApplicationBuilder, 

38 CommandHandler, 

39 MessageHandler, 

40 CallbackQueryHandler, 

41 filters, 

42 ContextTypes, 

43 ) 

44 from telegram.constants import ChatAction, ParseMode 

45 from telegram.error import TelegramError, RetryAfter 

46 HAS_TELEGRAM = True 

47except ImportError: 

48 HAS_TELEGRAM = False 

49 

50from .base import ( 

51 ChannelAdapter, 

52 ChannelConfig, 

53 ChannelStatus, 

54 Message, 

55 MessageType, 

56 MediaAttachment, 

57 SendResult, 

58 ChannelConnectionError, 

59 ChannelSendError, 

60 ChannelRateLimitError, 

61) 

62from .room_capable import RoomCapableAdapter, UnsupportedRoomError 

63 

64logger = logging.getLogger(__name__) 

65 

66 

67class TelegramAdapter(ChannelAdapter, RoomCapableAdapter): 

68 """ 

69 Telegram messaging adapter. 

70 

71 Usage: 

72 config = ChannelConfig(token="BOT_TOKEN") 

73 adapter = TelegramAdapter(config) 

74 adapter.on_message(my_handler) 

75 await adapter.start() 

76 """ 

77 

78 def __init__(self, config: ChannelConfig): 

79 if not HAS_TELEGRAM: 

80 raise ImportError( 

81 "python-telegram-bot not installed. " 

82 "Install with: pip install python-telegram-bot" 

83 ) 

84 

85 super().__init__(config) 

86 self._app: Optional[Application] = None 

87 self._bot: Optional[Bot] = None 

88 self._bot_username: Optional[str] = None 

89 

90 @property 

91 def name(self) -> str: 

92 return "telegram" 

93 

94 async def connect(self) -> bool: 

95 """Connect to Telegram using bot token.""" 

96 if not self.config.token: 

97 logger.error("Telegram bot token not provided") 

98 return False 

99 

100 try: 

101 # Build application 

102 self._app = ( 

103 ApplicationBuilder() 

104 .token(self.config.token) 

105 .build() 

106 ) 

107 self._bot = self._app.bot 

108 

109 # Get bot info 

110 bot_info = await self._bot.get_me() 

111 self._bot_username = bot_info.username 

112 logger.info(f"Connected as @{self._bot_username}") 

113 

114 # Register handlers 

115 self._register_handlers() 

116 

117 # Start polling in background 

118 await self._app.initialize() 

119 await self._app.start() 

120 asyncio.create_task(self._app.updater.start_polling(drop_pending_updates=True)) 

121 

122 self.status = ChannelStatus.CONNECTED 

123 return True 

124 

125 except TelegramError as e: 

126 logger.error(f"Failed to connect to Telegram: {e}") 

127 self.status = ChannelStatus.ERROR 

128 return False 

129 

130 async def disconnect(self) -> None: 

131 """Disconnect from Telegram.""" 

132 if self._app: 

133 try: 

134 await self._app.updater.stop() 

135 await self._app.stop() 

136 await self._app.shutdown() 

137 except Exception as e: 

138 logger.error(f"Error disconnecting: {e}") 

139 finally: 

140 self._app = None 

141 self._bot = None 

142 self.status = ChannelStatus.DISCONNECTED 

143 

144 def _register_handlers(self) -> None: 

145 """Register message handlers.""" 

146 # Command handlers 

147 self._app.add_handler(CommandHandler("start", self._handle_start)) 

148 self._app.add_handler(CommandHandler("help", self._handle_help)) 

149 self._app.add_handler(CommandHandler("status", self._handle_status)) 

150 

151 # Message handlers 

152 self._app.add_handler(MessageHandler( 

153 filters.TEXT & ~filters.COMMAND, 

154 self._handle_message 

155 )) 

156 self._app.add_handler(MessageHandler( 

157 filters.PHOTO | filters.VIDEO | filters.DOCUMENT | filters.AUDIO | filters.VOICE, 

158 self._handle_media 

159 )) 

160 

161 # Callback query handler (for inline keyboards) 

162 self._app.add_handler(CallbackQueryHandler(self._handle_callback)) 

163 

164 async def _handle_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: 

165 """Handle /start command.""" 

166 await update.message.reply_text( 

167 "Hello! I'm your AI assistant. Send me a message to get started.\n\n" 

168 "Commands:\n" 

169 "/start - Start the bot\n" 

170 "/help - Show help\n" 

171 "/status - Check bot status" 

172 ) 

173 

174 async def _handle_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: 

175 """Handle /help command.""" 

176 await update.message.reply_text( 

177 "I can help you with various tasks. Just send me a message!\n\n" 

178 "I support:\n" 

179 "- Text messages\n" 

180 "- Images and photos\n" 

181 "- Documents\n" 

182 "- Voice messages" 

183 ) 

184 

185 async def _handle_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: 

186 """Handle /status command.""" 

187 await update.message.reply_text( 

188 f"Bot Status: {self.status.value}\n" 

189 f"Bot Username: @{self._bot_username}" 

190 ) 

191 

192 async def _handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: 

193 """Handle incoming text messages.""" 

194 message = self._convert_message(update.message) 

195 await self._dispatch_message(message) 

196 

197 async def _handle_media(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: 

198 """Handle incoming media messages.""" 

199 message = self._convert_message(update.message) 

200 await self._dispatch_message(message) 

201 

202 async def _handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: 

203 """Handle callback queries from inline keyboards.""" 

204 query = update.callback_query 

205 await query.answer() 

206 

207 # Create a message-like event for the callback 

208 message = Message( 

209 id=str(query.id), 

210 channel=self.name, 

211 sender_id=str(query.from_user.id), 

212 sender_name=query.from_user.full_name, 

213 chat_id=str(query.message.chat.id), 

214 text=f"[callback:{query.data}]", 

215 is_group=query.message.chat.type != "private", 

216 raw={"callback_query": query.to_dict()}, 

217 ) 

218 await self._dispatch_message(message) 

219 

220 def _convert_message(self, tg_message) -> Message: 

221 """Convert Telegram message to unified Message format.""" 

222 chat = tg_message.chat 

223 user = tg_message.from_user 

224 

225 # Determine if bot is mentioned 

226 is_mentioned = False 

227 text = tg_message.text or tg_message.caption or "" 

228 

229 if self._bot_username: 

230 is_mentioned = f"@{self._bot_username}" in text 

231 

232 # Check for media 

233 media = [] 

234 if tg_message.photo: 

235 # Get largest photo 

236 photo = max(tg_message.photo, key=lambda p: p.width * p.height) 

237 media.append(MediaAttachment( 

238 type=MessageType.IMAGE, 

239 file_id=photo.file_id, 

240 caption=tg_message.caption, 

241 )) 

242 if tg_message.video: 

243 media.append(MediaAttachment( 

244 type=MessageType.VIDEO, 

245 file_id=tg_message.video.file_id, 

246 file_name=tg_message.video.file_name, 

247 mime_type=tg_message.video.mime_type, 

248 caption=tg_message.caption, 

249 )) 

250 if tg_message.document: 

251 media.append(MediaAttachment( 

252 type=MessageType.DOCUMENT, 

253 file_id=tg_message.document.file_id, 

254 file_name=tg_message.document.file_name, 

255 mime_type=tg_message.document.mime_type, 

256 )) 

257 if tg_message.audio: 

258 media.append(MediaAttachment( 

259 type=MessageType.AUDIO, 

260 file_id=tg_message.audio.file_id, 

261 file_name=tg_message.audio.file_name, 

262 mime_type=tg_message.audio.mime_type, 

263 )) 

264 if tg_message.voice: 

265 media.append(MediaAttachment( 

266 type=MessageType.VOICE, 

267 file_id=tg_message.voice.file_id, 

268 mime_type=tg_message.voice.mime_type, 

269 )) 

270 

271 return Message( 

272 id=str(tg_message.message_id), 

273 channel=self.name, 

274 sender_id=str(user.id), 

275 sender_name=user.full_name, 

276 chat_id=str(chat.id), 

277 text=text, 

278 media=media, 

279 reply_to_id=str(tg_message.reply_to_message.message_id) if tg_message.reply_to_message else None, 

280 timestamp=tg_message.date, 

281 is_group=chat.type in ("group", "supergroup"), 

282 is_bot_mentioned=is_mentioned, 

283 raw=tg_message.to_dict(), 

284 ) 

285 

286 async def send_message( 

287 self, 

288 chat_id: str, 

289 text: str, 

290 reply_to: Optional[str] = None, 

291 media: Optional[List[MediaAttachment]] = None, 

292 buttons: Optional[List[Dict]] = None, 

293 ) -> SendResult: 

294 """Send a message to a Telegram chat.""" 

295 if not self._bot: 

296 return SendResult(success=False, error="Not connected") 

297 

298 try: 

299 # Build keyboard if buttons provided 

300 keyboard = None 

301 if buttons: 

302 keyboard = self._build_keyboard(buttons) 

303 

304 # Handle media 

305 if media and len(media) > 0: 

306 return await self._send_media(chat_id, text, media, reply_to, keyboard) 

307 

308 # Send text message 

309 reply_to_id = int(reply_to) if reply_to else None 

310 msg = await self._bot.send_message( 

311 chat_id=int(chat_id), 

312 text=text, 

313 reply_to_message_id=reply_to_id, 

314 reply_markup=keyboard, 

315 parse_mode=ParseMode.MARKDOWN, 

316 ) 

317 

318 return SendResult( 

319 success=True, 

320 message_id=str(msg.message_id), 

321 raw=msg.to_dict(), 

322 ) 

323 

324 except RetryAfter as e: 

325 raise ChannelRateLimitError(retry_after=e.retry_after) 

326 except TelegramError as e: 

327 logger.error(f"Failed to send message: {e}") 

328 return SendResult(success=False, error=str(e)) 

329 

330 async def _send_media( 

331 self, 

332 chat_id: str, 

333 caption: str, 

334 media: List[MediaAttachment], 

335 reply_to: Optional[str], 

336 keyboard: Optional[InlineKeyboardMarkup], 

337 ) -> SendResult: 

338 """Send media message.""" 

339 reply_to_id = int(reply_to) if reply_to else None 

340 first_media = media[0] 

341 

342 try: 

343 if first_media.type == MessageType.IMAGE: 

344 msg = await self._bot.send_photo( 

345 chat_id=int(chat_id), 

346 photo=first_media.file_id or first_media.file_path or first_media.url, 

347 caption=caption, 

348 reply_to_message_id=reply_to_id, 

349 reply_markup=keyboard, 

350 ) 

351 elif first_media.type == MessageType.VIDEO: 

352 msg = await self._bot.send_video( 

353 chat_id=int(chat_id), 

354 video=first_media.file_id or first_media.file_path or first_media.url, 

355 caption=caption, 

356 reply_to_message_id=reply_to_id, 

357 reply_markup=keyboard, 

358 ) 

359 elif first_media.type == MessageType.DOCUMENT: 

360 msg = await self._bot.send_document( 

361 chat_id=int(chat_id), 

362 document=first_media.file_id or first_media.file_path or first_media.url, 

363 caption=caption, 

364 reply_to_message_id=reply_to_id, 

365 reply_markup=keyboard, 

366 ) 

367 elif first_media.type == MessageType.AUDIO: 

368 msg = await self._bot.send_audio( 

369 chat_id=int(chat_id), 

370 audio=first_media.file_id or first_media.file_path or first_media.url, 

371 caption=caption, 

372 reply_to_message_id=reply_to_id, 

373 reply_markup=keyboard, 

374 ) 

375 elif first_media.type == MessageType.VOICE: 

376 msg = await self._bot.send_voice( 

377 chat_id=int(chat_id), 

378 voice=first_media.file_id or first_media.file_path or first_media.url, 

379 caption=caption, 

380 reply_to_message_id=reply_to_id, 

381 reply_markup=keyboard, 

382 ) 

383 else: 

384 return SendResult(success=False, error=f"Unsupported media type: {first_media.type}") 

385 

386 return SendResult( 

387 success=True, 

388 message_id=str(msg.message_id), 

389 raw=msg.to_dict(), 

390 ) 

391 

392 except TelegramError as e: 

393 logger.error(f"Failed to send media: {e}") 

394 return SendResult(success=False, error=str(e)) 

395 

396 def _build_keyboard(self, buttons: List[Dict]) -> InlineKeyboardMarkup: 

397 """Build inline keyboard from button definitions.""" 

398 keyboard = [] 

399 row = [] 

400 

401 for btn in buttons: 

402 if btn.get("url"): 

403 row.append(InlineKeyboardButton( 

404 text=btn["text"], 

405 url=btn["url"], 

406 )) 

407 else: 

408 row.append(InlineKeyboardButton( 

409 text=btn["text"], 

410 callback_data=btn.get("callback_data", btn["text"]), 

411 )) 

412 

413 # New row after each button or when row_break is set 

414 if btn.get("row_break", False) or len(row) >= 3: 

415 keyboard.append(row) 

416 row = [] 

417 

418 if row: 

419 keyboard.append(row) 

420 

421 return InlineKeyboardMarkup(keyboard) 

422 

423 async def edit_message( 

424 self, 

425 chat_id: str, 

426 message_id: str, 

427 text: str, 

428 buttons: Optional[List[Dict]] = None, 

429 ) -> SendResult: 

430 """Edit an existing message.""" 

431 if not self._bot: 

432 return SendResult(success=False, error="Not connected") 

433 

434 try: 

435 keyboard = self._build_keyboard(buttons) if buttons else None 

436 

437 msg = await self._bot.edit_message_text( 

438 chat_id=int(chat_id), 

439 message_id=int(message_id), 

440 text=text, 

441 reply_markup=keyboard, 

442 parse_mode=ParseMode.MARKDOWN, 

443 ) 

444 

445 return SendResult( 

446 success=True, 

447 message_id=str(msg.message_id), 

448 raw=msg.to_dict() if hasattr(msg, 'to_dict') else None, 

449 ) 

450 

451 except TelegramError as e: 

452 logger.error(f"Failed to edit message: {e}") 

453 return SendResult(success=False, error=str(e)) 

454 

455 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

456 """Delete a message.""" 

457 if not self._bot: 

458 return False 

459 

460 try: 

461 await self._bot.delete_message( 

462 chat_id=int(chat_id), 

463 message_id=int(message_id), 

464 ) 

465 return True 

466 except TelegramError as e: 

467 logger.error(f"Failed to delete message: {e}") 

468 return False 

469 

470 async def send_typing(self, chat_id: str) -> None: 

471 """Send typing indicator.""" 

472 if self._bot: 

473 try: 

474 await self._bot.send_chat_action( 

475 chat_id=int(chat_id), 

476 action=ChatAction.TYPING, 

477 ) 

478 except TelegramError: 

479 pass 

480 

481 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

482 """Get information about a chat.""" 

483 if not self._bot: 

484 return None 

485 

486 try: 

487 chat = await self._bot.get_chat(int(chat_id)) 

488 return { 

489 "id": chat.id, 

490 "type": chat.type, 

491 "title": chat.title, 

492 "username": chat.username, 

493 "first_name": chat.first_name, 

494 "last_name": chat.last_name, 

495 } 

496 except TelegramError as e: 

497 logger.error(f"Failed to get chat info: {e}") 

498 return None 

499 

500 async def download_file(self, file_id: str, destination: str) -> bool: 

501 """Download a file from Telegram.""" 

502 if not self._bot: 

503 return False 

504 

505 try: 

506 file = await self._bot.get_file(file_id) 

507 await file.download_to_drive(destination) 

508 return True 

509 except TelegramError as e: 

510 logger.error(f"Failed to download file: {e}") 

511 return False 

512 

513 # ─── UNIF-G2: RoomCapableAdapter implementation ────────────────── 

514 

515 async def join_room(self, room_id: str, 

516 role: str = 'participant') -> bool: 

517 """Validate the bot's presence in a Telegram chat. 

518 

519 Telegram bots cannot self-join chats — bots are added by an 

520 admin or via invite link. ``join_room`` here probes whether 

521 the bot is ALREADY a member of the chat: ``get_chat`` succeeds 

522 ⇒ in the chat ⇒ True; raises ``Forbidden`` / ``BadRequest`` 

523 ⇒ not yet added ⇒ False (caller can prompt the user to add the 

524 bot first). 

525 

526 Private chats (DMs) → raise ``UnsupportedRoomError`` per Mixin 

527 contract. 

528 """ 

529 if not self._bot or not room_id: 

530 return False 

531 try: 

532 chat = await self._bot.get_chat(room_id) 

533 except TelegramError as e: 

534 logger.info( 

535 "Telegram.join_room: bot not in chat %s (%s)", 

536 room_id, e) 

537 return False 

538 except Exception as e: 

539 logger.error( 

540 "Telegram.join_room: unexpected error for %s: %s", 

541 room_id, e) 

542 return False 

543 # Reject DMs. 

544 chat_type = getattr(chat, 'type', None) 

545 if str(chat_type).lower() == 'private': 

546 raise UnsupportedRoomError( 

547 "Telegram private chats are not rooms — use " 

548 "send_message for 1:1.") 

549 logger.info( 

550 "Telegram.join_room: chat %s present (type=%s, role=%s)", 

551 room_id, chat_type, role) 

552 return True 

553 

554 async def leave_room(self, room_id: str) -> bool: 

555 """Leave a Telegram group / supergroup / channel. 

556 

557 Telegram bots can call ``leave_chat`` for groups, supergroups, 

558 and channels. Private chats can't be "left" by the bot. 

559 Idempotent on already-absent (returns True so the caller can 

560 treat it as a no-op success). 

561 """ 

562 if not self._bot or not room_id: 

563 return False 

564 try: 

565 await self._bot.leave_chat(room_id) 

566 logger.info("Telegram.leave_room: chat %s left", room_id) 

567 return True 

568 except TelegramError as e: 

569 # Already absent / chat deleted / forbidden — treat as 

570 # idempotent success so detach flows aren't blocked by a 

571 # mismatch between bookkeeping and live state. 

572 logger.info( 

573 "Telegram.leave_room: %s already absent (%s)", 

574 room_id, e) 

575 return True 

576 except Exception as e: 

577 logger.error( 

578 "Telegram.leave_room: unexpected error for %s: %s", 

579 room_id, e) 

580 return False 

581 

582 async def list_room_members( 

583 self, room_id: str, 

584 ) -> List[Dict[str, Any]]: 

585 """List Telegram chat members. 

586 

587 Telegram restricts the bot API: regular bots cannot list ALL 

588 members of a group / supergroup. We return administrators + 

589 creator from ``get_chat_administrators`` — that's the 

590 observable subset the bot is allowed to enumerate. Channels 

591 and small groups behave the same way. 

592 """ 

593 if not self._bot or not room_id: 

594 return [] 

595 try: 

596 admins = await self._bot.get_chat_administrators(room_id) 

597 except TelegramError as e: 

598 logger.info( 

599 "Telegram.list_room_members: cannot list %s (%s)", 

600 room_id, e) 

601 return [] 

602 except Exception as e: 

603 logger.error( 

604 "Telegram.list_room_members: unexpected for %s: %s", 

605 room_id, e) 

606 return [] 

607 result: List[Dict[str, Any]] = [] 

608 for member in admins or []: 

609 user = getattr(member, 'user', None) 

610 if user is None: 

611 continue 

612 uid = getattr(user, 'id', None) 

613 if uid is None: 

614 continue 

615 # Skip bot itself. 

616 if (self._bot_user_id is not None and 

617 str(uid) == str(self._bot_user_id)): 

618 continue 

619 full_name = (getattr(user, 'full_name', None) 

620 or getattr(user, 'username', None) or str(uid)) 

621 result.append({ 

622 'id': str(uid), 

623 'display_name': full_name, 

624 'is_bot': bool(getattr(user, 'is_bot', False)), 

625 }) 

626 return result 

627 

628 

629def create_telegram_adapter(token: str = None, **kwargs) -> TelegramAdapter: 

630 """ 

631 Factory function to create Telegram adapter. 

632 

633 Args: 

634 token: Bot token (or set TELEGRAM_BOT_TOKEN env var) 

635 **kwargs: Additional config options 

636 

637 Returns: 

638 Configured TelegramAdapter 

639 """ 

640 token = token or os.getenv("TELEGRAM_BOT_TOKEN") 

641 if not token: 

642 raise ValueError("Telegram bot token required") 

643 

644 config = ChannelConfig(token=token, **kwargs) 

645 return TelegramAdapter(config)