Coverage for integrations / channels / telegram_adapter.py: 36.9%
241 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Telegram Channel Adapter
4Implements Telegram messaging using python-telegram-bot library.
5Supports both polling and webhook modes.
7Features:
8- Text messages
9- Media (images, videos, documents, voice)
10- Inline keyboards
11- Reply threading
12- Group/DM detection
13- Bot mention detection
14"""
16from __future__ import annotations
18import asyncio
19import logging
20import os
21from typing import Optional, List, Dict, Any
22from datetime import datetime
24try:
25 from telegram import (
26 Update,
27 Bot,
28 InlineKeyboardButton,
29 InlineKeyboardMarkup,
30 InputMediaPhoto,
31 InputMediaVideo,
32 InputMediaDocument,
33 InputMediaAudio,
34 )
35 from telegram.ext import (
36 Application,
37 ApplicationBuilder,
38 CommandHandler,
39 MessageHandler,
40 CallbackQueryHandler,
41 filters,
42 ContextTypes,
43 )
44 from telegram.constants import ChatAction, ParseMode
45 from telegram.error import TelegramError, RetryAfter
46 HAS_TELEGRAM = True
47except ImportError:
48 HAS_TELEGRAM = False
50from .base import (
51 ChannelAdapter,
52 ChannelConfig,
53 ChannelStatus,
54 Message,
55 MessageType,
56 MediaAttachment,
57 SendResult,
58 ChannelConnectionError,
59 ChannelSendError,
60 ChannelRateLimitError,
61)
62from .room_capable import RoomCapableAdapter, UnsupportedRoomError
64logger = logging.getLogger(__name__)
67class TelegramAdapter(ChannelAdapter, RoomCapableAdapter):
68 """
69 Telegram messaging adapter.
71 Usage:
72 config = ChannelConfig(token="BOT_TOKEN")
73 adapter = TelegramAdapter(config)
74 adapter.on_message(my_handler)
75 await adapter.start()
76 """
78 def __init__(self, config: ChannelConfig):
79 if not HAS_TELEGRAM:
80 raise ImportError(
81 "python-telegram-bot not installed. "
82 "Install with: pip install python-telegram-bot"
83 )
85 super().__init__(config)
86 self._app: Optional[Application] = None
87 self._bot: Optional[Bot] = None
88 self._bot_username: Optional[str] = None
90 @property
91 def name(self) -> str:
92 return "telegram"
94 async def connect(self) -> bool:
95 """Connect to Telegram using bot token."""
96 if not self.config.token:
97 logger.error("Telegram bot token not provided")
98 return False
100 try:
101 # Build application
102 self._app = (
103 ApplicationBuilder()
104 .token(self.config.token)
105 .build()
106 )
107 self._bot = self._app.bot
109 # Get bot info
110 bot_info = await self._bot.get_me()
111 self._bot_username = bot_info.username
112 logger.info(f"Connected as @{self._bot_username}")
114 # Register handlers
115 self._register_handlers()
117 # Start polling in background
118 await self._app.initialize()
119 await self._app.start()
120 asyncio.create_task(self._app.updater.start_polling(drop_pending_updates=True))
122 self.status = ChannelStatus.CONNECTED
123 return True
125 except TelegramError as e:
126 logger.error(f"Failed to connect to Telegram: {e}")
127 self.status = ChannelStatus.ERROR
128 return False
130 async def disconnect(self) -> None:
131 """Disconnect from Telegram."""
132 if self._app:
133 try:
134 await self._app.updater.stop()
135 await self._app.stop()
136 await self._app.shutdown()
137 except Exception as e:
138 logger.error(f"Error disconnecting: {e}")
139 finally:
140 self._app = None
141 self._bot = None
142 self.status = ChannelStatus.DISCONNECTED
144 def _register_handlers(self) -> None:
145 """Register message handlers."""
146 # Command handlers
147 self._app.add_handler(CommandHandler("start", self._handle_start))
148 self._app.add_handler(CommandHandler("help", self._handle_help))
149 self._app.add_handler(CommandHandler("status", self._handle_status))
151 # Message handlers
152 self._app.add_handler(MessageHandler(
153 filters.TEXT & ~filters.COMMAND,
154 self._handle_message
155 ))
156 self._app.add_handler(MessageHandler(
157 filters.PHOTO | filters.VIDEO | filters.DOCUMENT | filters.AUDIO | filters.VOICE,
158 self._handle_media
159 ))
161 # Callback query handler (for inline keyboards)
162 self._app.add_handler(CallbackQueryHandler(self._handle_callback))
164 async def _handle_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
165 """Handle /start command."""
166 await update.message.reply_text(
167 "Hello! I'm your AI assistant. Send me a message to get started.\n\n"
168 "Commands:\n"
169 "/start - Start the bot\n"
170 "/help - Show help\n"
171 "/status - Check bot status"
172 )
174 async def _handle_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
175 """Handle /help command."""
176 await update.message.reply_text(
177 "I can help you with various tasks. Just send me a message!\n\n"
178 "I support:\n"
179 "- Text messages\n"
180 "- Images and photos\n"
181 "- Documents\n"
182 "- Voice messages"
183 )
185 async def _handle_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
186 """Handle /status command."""
187 await update.message.reply_text(
188 f"Bot Status: {self.status.value}\n"
189 f"Bot Username: @{self._bot_username}"
190 )
192 async def _handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
193 """Handle incoming text messages."""
194 message = self._convert_message(update.message)
195 await self._dispatch_message(message)
197 async def _handle_media(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
198 """Handle incoming media messages."""
199 message = self._convert_message(update.message)
200 await self._dispatch_message(message)
202 async def _handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
203 """Handle callback queries from inline keyboards."""
204 query = update.callback_query
205 await query.answer()
207 # Create a message-like event for the callback
208 message = Message(
209 id=str(query.id),
210 channel=self.name,
211 sender_id=str(query.from_user.id),
212 sender_name=query.from_user.full_name,
213 chat_id=str(query.message.chat.id),
214 text=f"[callback:{query.data}]",
215 is_group=query.message.chat.type != "private",
216 raw={"callback_query": query.to_dict()},
217 )
218 await self._dispatch_message(message)
220 def _convert_message(self, tg_message) -> Message:
221 """Convert Telegram message to unified Message format."""
222 chat = tg_message.chat
223 user = tg_message.from_user
225 # Determine if bot is mentioned
226 is_mentioned = False
227 text = tg_message.text or tg_message.caption or ""
229 if self._bot_username:
230 is_mentioned = f"@{self._bot_username}" in text
232 # Check for media
233 media = []
234 if tg_message.photo:
235 # Get largest photo
236 photo = max(tg_message.photo, key=lambda p: p.width * p.height)
237 media.append(MediaAttachment(
238 type=MessageType.IMAGE,
239 file_id=photo.file_id,
240 caption=tg_message.caption,
241 ))
242 if tg_message.video:
243 media.append(MediaAttachment(
244 type=MessageType.VIDEO,
245 file_id=tg_message.video.file_id,
246 file_name=tg_message.video.file_name,
247 mime_type=tg_message.video.mime_type,
248 caption=tg_message.caption,
249 ))
250 if tg_message.document:
251 media.append(MediaAttachment(
252 type=MessageType.DOCUMENT,
253 file_id=tg_message.document.file_id,
254 file_name=tg_message.document.file_name,
255 mime_type=tg_message.document.mime_type,
256 ))
257 if tg_message.audio:
258 media.append(MediaAttachment(
259 type=MessageType.AUDIO,
260 file_id=tg_message.audio.file_id,
261 file_name=tg_message.audio.file_name,
262 mime_type=tg_message.audio.mime_type,
263 ))
264 if tg_message.voice:
265 media.append(MediaAttachment(
266 type=MessageType.VOICE,
267 file_id=tg_message.voice.file_id,
268 mime_type=tg_message.voice.mime_type,
269 ))
271 return Message(
272 id=str(tg_message.message_id),
273 channel=self.name,
274 sender_id=str(user.id),
275 sender_name=user.full_name,
276 chat_id=str(chat.id),
277 text=text,
278 media=media,
279 reply_to_id=str(tg_message.reply_to_message.message_id) if tg_message.reply_to_message else None,
280 timestamp=tg_message.date,
281 is_group=chat.type in ("group", "supergroup"),
282 is_bot_mentioned=is_mentioned,
283 raw=tg_message.to_dict(),
284 )
286 async def send_message(
287 self,
288 chat_id: str,
289 text: str,
290 reply_to: Optional[str] = None,
291 media: Optional[List[MediaAttachment]] = None,
292 buttons: Optional[List[Dict]] = None,
293 ) -> SendResult:
294 """Send a message to a Telegram chat."""
295 if not self._bot:
296 return SendResult(success=False, error="Not connected")
298 try:
299 # Build keyboard if buttons provided
300 keyboard = None
301 if buttons:
302 keyboard = self._build_keyboard(buttons)
304 # Handle media
305 if media and len(media) > 0:
306 return await self._send_media(chat_id, text, media, reply_to, keyboard)
308 # Send text message
309 reply_to_id = int(reply_to) if reply_to else None
310 msg = await self._bot.send_message(
311 chat_id=int(chat_id),
312 text=text,
313 reply_to_message_id=reply_to_id,
314 reply_markup=keyboard,
315 parse_mode=ParseMode.MARKDOWN,
316 )
318 return SendResult(
319 success=True,
320 message_id=str(msg.message_id),
321 raw=msg.to_dict(),
322 )
324 except RetryAfter as e:
325 raise ChannelRateLimitError(retry_after=e.retry_after)
326 except TelegramError as e:
327 logger.error(f"Failed to send message: {e}")
328 return SendResult(success=False, error=str(e))
330 async def _send_media(
331 self,
332 chat_id: str,
333 caption: str,
334 media: List[MediaAttachment],
335 reply_to: Optional[str],
336 keyboard: Optional[InlineKeyboardMarkup],
337 ) -> SendResult:
338 """Send media message."""
339 reply_to_id = int(reply_to) if reply_to else None
340 first_media = media[0]
342 try:
343 if first_media.type == MessageType.IMAGE:
344 msg = await self._bot.send_photo(
345 chat_id=int(chat_id),
346 photo=first_media.file_id or first_media.file_path or first_media.url,
347 caption=caption,
348 reply_to_message_id=reply_to_id,
349 reply_markup=keyboard,
350 )
351 elif first_media.type == MessageType.VIDEO:
352 msg = await self._bot.send_video(
353 chat_id=int(chat_id),
354 video=first_media.file_id or first_media.file_path or first_media.url,
355 caption=caption,
356 reply_to_message_id=reply_to_id,
357 reply_markup=keyboard,
358 )
359 elif first_media.type == MessageType.DOCUMENT:
360 msg = await self._bot.send_document(
361 chat_id=int(chat_id),
362 document=first_media.file_id or first_media.file_path or first_media.url,
363 caption=caption,
364 reply_to_message_id=reply_to_id,
365 reply_markup=keyboard,
366 )
367 elif first_media.type == MessageType.AUDIO:
368 msg = await self._bot.send_audio(
369 chat_id=int(chat_id),
370 audio=first_media.file_id or first_media.file_path or first_media.url,
371 caption=caption,
372 reply_to_message_id=reply_to_id,
373 reply_markup=keyboard,
374 )
375 elif first_media.type == MessageType.VOICE:
376 msg = await self._bot.send_voice(
377 chat_id=int(chat_id),
378 voice=first_media.file_id or first_media.file_path or first_media.url,
379 caption=caption,
380 reply_to_message_id=reply_to_id,
381 reply_markup=keyboard,
382 )
383 else:
384 return SendResult(success=False, error=f"Unsupported media type: {first_media.type}")
386 return SendResult(
387 success=True,
388 message_id=str(msg.message_id),
389 raw=msg.to_dict(),
390 )
392 except TelegramError as e:
393 logger.error(f"Failed to send media: {e}")
394 return SendResult(success=False, error=str(e))
396 def _build_keyboard(self, buttons: List[Dict]) -> InlineKeyboardMarkup:
397 """Build inline keyboard from button definitions."""
398 keyboard = []
399 row = []
401 for btn in buttons:
402 if btn.get("url"):
403 row.append(InlineKeyboardButton(
404 text=btn["text"],
405 url=btn["url"],
406 ))
407 else:
408 row.append(InlineKeyboardButton(
409 text=btn["text"],
410 callback_data=btn.get("callback_data", btn["text"]),
411 ))
413 # New row after each button or when row_break is set
414 if btn.get("row_break", False) or len(row) >= 3:
415 keyboard.append(row)
416 row = []
418 if row:
419 keyboard.append(row)
421 return InlineKeyboardMarkup(keyboard)
423 async def edit_message(
424 self,
425 chat_id: str,
426 message_id: str,
427 text: str,
428 buttons: Optional[List[Dict]] = None,
429 ) -> SendResult:
430 """Edit an existing message."""
431 if not self._bot:
432 return SendResult(success=False, error="Not connected")
434 try:
435 keyboard = self._build_keyboard(buttons) if buttons else None
437 msg = await self._bot.edit_message_text(
438 chat_id=int(chat_id),
439 message_id=int(message_id),
440 text=text,
441 reply_markup=keyboard,
442 parse_mode=ParseMode.MARKDOWN,
443 )
445 return SendResult(
446 success=True,
447 message_id=str(msg.message_id),
448 raw=msg.to_dict() if hasattr(msg, 'to_dict') else None,
449 )
451 except TelegramError as e:
452 logger.error(f"Failed to edit message: {e}")
453 return SendResult(success=False, error=str(e))
455 async def delete_message(self, chat_id: str, message_id: str) -> bool:
456 """Delete a message."""
457 if not self._bot:
458 return False
460 try:
461 await self._bot.delete_message(
462 chat_id=int(chat_id),
463 message_id=int(message_id),
464 )
465 return True
466 except TelegramError as e:
467 logger.error(f"Failed to delete message: {e}")
468 return False
470 async def send_typing(self, chat_id: str) -> None:
471 """Send typing indicator."""
472 if self._bot:
473 try:
474 await self._bot.send_chat_action(
475 chat_id=int(chat_id),
476 action=ChatAction.TYPING,
477 )
478 except TelegramError:
479 pass
481 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
482 """Get information about a chat."""
483 if not self._bot:
484 return None
486 try:
487 chat = await self._bot.get_chat(int(chat_id))
488 return {
489 "id": chat.id,
490 "type": chat.type,
491 "title": chat.title,
492 "username": chat.username,
493 "first_name": chat.first_name,
494 "last_name": chat.last_name,
495 }
496 except TelegramError as e:
497 logger.error(f"Failed to get chat info: {e}")
498 return None
500 async def download_file(self, file_id: str, destination: str) -> bool:
501 """Download a file from Telegram."""
502 if not self._bot:
503 return False
505 try:
506 file = await self._bot.get_file(file_id)
507 await file.download_to_drive(destination)
508 return True
509 except TelegramError as e:
510 logger.error(f"Failed to download file: {e}")
511 return False
513 # ─── UNIF-G2: RoomCapableAdapter implementation ──────────────────
515 async def join_room(self, room_id: str,
516 role: str = 'participant') -> bool:
517 """Validate the bot's presence in a Telegram chat.
519 Telegram bots cannot self-join chats — bots are added by an
520 admin or via invite link. ``join_room`` here probes whether
521 the bot is ALREADY a member of the chat: ``get_chat`` succeeds
522 ⇒ in the chat ⇒ True; raises ``Forbidden`` / ``BadRequest``
523 ⇒ not yet added ⇒ False (caller can prompt the user to add the
524 bot first).
526 Private chats (DMs) → raise ``UnsupportedRoomError`` per Mixin
527 contract.
528 """
529 if not self._bot or not room_id:
530 return False
531 try:
532 chat = await self._bot.get_chat(room_id)
533 except TelegramError as e:
534 logger.info(
535 "Telegram.join_room: bot not in chat %s (%s)",
536 room_id, e)
537 return False
538 except Exception as e:
539 logger.error(
540 "Telegram.join_room: unexpected error for %s: %s",
541 room_id, e)
542 return False
543 # Reject DMs.
544 chat_type = getattr(chat, 'type', None)
545 if str(chat_type).lower() == 'private':
546 raise UnsupportedRoomError(
547 "Telegram private chats are not rooms — use "
548 "send_message for 1:1.")
549 logger.info(
550 "Telegram.join_room: chat %s present (type=%s, role=%s)",
551 room_id, chat_type, role)
552 return True
554 async def leave_room(self, room_id: str) -> bool:
555 """Leave a Telegram group / supergroup / channel.
557 Telegram bots can call ``leave_chat`` for groups, supergroups,
558 and channels. Private chats can't be "left" by the bot.
559 Idempotent on already-absent (returns True so the caller can
560 treat it as a no-op success).
561 """
562 if not self._bot or not room_id:
563 return False
564 try:
565 await self._bot.leave_chat(room_id)
566 logger.info("Telegram.leave_room: chat %s left", room_id)
567 return True
568 except TelegramError as e:
569 # Already absent / chat deleted / forbidden — treat as
570 # idempotent success so detach flows aren't blocked by a
571 # mismatch between bookkeeping and live state.
572 logger.info(
573 "Telegram.leave_room: %s already absent (%s)",
574 room_id, e)
575 return True
576 except Exception as e:
577 logger.error(
578 "Telegram.leave_room: unexpected error for %s: %s",
579 room_id, e)
580 return False
582 async def list_room_members(
583 self, room_id: str,
584 ) -> List[Dict[str, Any]]:
585 """List Telegram chat members.
587 Telegram restricts the bot API: regular bots cannot list ALL
588 members of a group / supergroup. We return administrators +
589 creator from ``get_chat_administrators`` — that's the
590 observable subset the bot is allowed to enumerate. Channels
591 and small groups behave the same way.
592 """
593 if not self._bot or not room_id:
594 return []
595 try:
596 admins = await self._bot.get_chat_administrators(room_id)
597 except TelegramError as e:
598 logger.info(
599 "Telegram.list_room_members: cannot list %s (%s)",
600 room_id, e)
601 return []
602 except Exception as e:
603 logger.error(
604 "Telegram.list_room_members: unexpected for %s: %s",
605 room_id, e)
606 return []
607 result: List[Dict[str, Any]] = []
608 for member in admins or []:
609 user = getattr(member, 'user', None)
610 if user is None:
611 continue
612 uid = getattr(user, 'id', None)
613 if uid is None:
614 continue
615 # Skip bot itself.
616 if (self._bot_user_id is not None and
617 str(uid) == str(self._bot_user_id)):
618 continue
619 full_name = (getattr(user, 'full_name', None)
620 or getattr(user, 'username', None) or str(uid))
621 result.append({
622 'id': str(uid),
623 'display_name': full_name,
624 'is_bot': bool(getattr(user, 'is_bot', False)),
625 })
626 return result
629def create_telegram_adapter(token: str = None, **kwargs) -> TelegramAdapter:
630 """
631 Factory function to create Telegram adapter.
633 Args:
634 token: Bot token (or set TELEGRAM_BOT_TOKEN env var)
635 **kwargs: Additional config options
637 Returns:
638 Configured TelegramAdapter
639 """
640 token = token or os.getenv("TELEGRAM_BOT_TOKEN")
641 if not token:
642 raise ValueError("Telegram bot token required")
644 config = ChannelConfig(token=token, **kwargs)
645 return TelegramAdapter(config)