Coverage for integrations / channels / base.py: 83.3%

132 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Base Channel Adapter Interface 

3 

4Defines the contract for all messaging channel adapters. 

5Ported from HevolveBot's ChannelMessagingAdapter pattern. 

6""" 

7 

8from abc import ABC, abstractmethod 

9from dataclasses import dataclass, field 

10from datetime import datetime 

11from enum import Enum 

12from typing import Callable, Optional, List, Dict, Any, Union 

13import asyncio 

14import logging 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class MessageType(Enum): 

20 """Type of message content.""" 

21 TEXT = "text" 

22 IMAGE = "image" 

23 VIDEO = "video" 

24 AUDIO = "audio" 

25 DOCUMENT = "document" 

26 LOCATION = "location" 

27 CONTACT = "contact" 

28 STICKER = "sticker" 

29 VOICE = "voice" 

30 

31 

32class ChannelStatus(Enum): 

33 """Channel connection status.""" 

34 DISCONNECTED = "disconnected" 

35 CONNECTING = "connecting" 

36 CONNECTED = "connected" 

37 ERROR = "error" 

38 RATE_LIMITED = "rate_limited" 

39 

40 

41@dataclass 

42class MediaAttachment: 

43 """Media attachment in a message.""" 

44 type: MessageType 

45 url: Optional[str] = None 

46 file_path: Optional[str] = None 

47 file_id: Optional[str] = None # Platform-specific file ID 

48 mime_type: Optional[str] = None 

49 file_name: Optional[str] = None 

50 file_size: Optional[int] = None 

51 caption: Optional[str] = None 

52 

53 

54@dataclass 

55class Message: 

56 """Unified message format across all channels.""" 

57 id: str 

58 channel: str # telegram, discord, slack, etc. 

59 sender_id: str 

60 sender_name: Optional[str] = None 

61 chat_id: str = "" # Group/channel ID or same as sender for DMs 

62 text: Optional[str] = None 

63 media: List[MediaAttachment] = field(default_factory=list) 

64 reply_to_id: Optional[str] = None 

65 timestamp: datetime = field(default_factory=datetime.now) 

66 is_group: bool = False 

67 is_bot_mentioned: bool = False 

68 raw: Optional[Dict[str, Any]] = None # Original platform message 

69 

70 @property 

71 def has_media(self) -> bool: 

72 return len(self.media) > 0 

73 

74 @property 

75 def content(self) -> str: 

76 """Get text content or media caption.""" 

77 if self.text: 

78 return self.text 

79 for m in self.media: 

80 if m.caption: 

81 return m.caption 

82 return "" 

83 

84 

85@dataclass 

86class SendResult: 

87 """Result of sending a message.""" 

88 success: bool 

89 message_id: Optional[str] = None 

90 error: Optional[str] = None 

91 raw: Optional[Dict[str, Any]] = None 

92 

93 

94@dataclass 

95class ChannelConfig: 

96 """Configuration for a channel adapter.""" 

97 enabled: bool = True 

98 token: Optional[str] = None 

99 webhook_url: Optional[str] = None 

100 dm_policy: str = "pairing" # pairing, open, closed 

101 allow_from: List[str] = field(default_factory=list) 

102 require_mention_in_groups: bool = True 

103 extra: Dict[str, Any] = field(default_factory=dict) 

104 

105 

106class ChannelAdapter(ABC): 

107 """ 

108 Base class for all channel adapters. 

109 

110 Implements the adapter pattern for unified messaging across platforms. 

111 Each platform (Telegram, Discord, etc.) extends this class. 

112 """ 

113 

114 def __init__(self, config: ChannelConfig): 

115 self.config = config 

116 self.status = ChannelStatus.DISCONNECTED 

117 self._message_handlers: List[Callable] = [] 

118 self._running = False 

119 self._task: Optional[asyncio.Task] = None 

120 

121 @property 

122 @abstractmethod 

123 def name(self) -> str: 

124 """Channel name identifier.""" 

125 pass 

126 

127 @abstractmethod 

128 async def connect(self) -> bool: 

129 """ 

130 Connect to the messaging platform. 

131 Returns True if connection successful. 

132 """ 

133 pass 

134 

135 @abstractmethod 

136 async def disconnect(self) -> None: 

137 """Disconnect from the platform.""" 

138 pass 

139 

140 @abstractmethod 

141 async def send_message( 

142 self, 

143 chat_id: str, 

144 text: str, 

145 reply_to: Optional[str] = None, 

146 media: Optional[List[MediaAttachment]] = None, 

147 buttons: Optional[List[Dict]] = None, 

148 ) -> SendResult: 

149 """ 

150 Send a message to a chat. 

151 

152 Args: 

153 chat_id: Target chat/user ID 

154 text: Message text 

155 reply_to: Message ID to reply to 

156 media: Media attachments 

157 buttons: Interactive buttons/keyboard 

158 

159 Returns: 

160 SendResult with success status and message ID 

161 """ 

162 pass 

163 

164 @abstractmethod 

165 async def edit_message( 

166 self, 

167 chat_id: str, 

168 message_id: str, 

169 text: str, 

170 buttons: Optional[List[Dict]] = None, 

171 ) -> SendResult: 

172 """Edit an existing message.""" 

173 pass 

174 

175 @abstractmethod 

176 async def delete_message( 

177 self, 

178 chat_id: str, 

179 message_id: str, 

180 ) -> bool: 

181 """Delete a message.""" 

182 pass 

183 

184 @abstractmethod 

185 async def send_typing(self, chat_id: str) -> None: 

186 """Send typing indicator.""" 

187 pass 

188 

189 @abstractmethod 

190 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

191 """Get information about a chat.""" 

192 pass 

193 

194 def on_message(self, handler: Callable[[Message], Any]) -> None: 

195 """ 

196 Register a message handler. 

197 

198 Handler will be called for each incoming message. 

199 """ 

200 self._message_handlers.append(handler) 

201 

202 async def _dispatch_message(self, message: Message) -> None: 

203 """Dispatch message to all registered handlers.""" 

204 for handler in self._message_handlers: 

205 try: 

206 result = handler(message) 

207 if asyncio.iscoroutine(result): 

208 await result 

209 except Exception as e: 

210 logger.error(f"Error in message handler: {e}") 

211 

212 def get_status(self) -> ChannelStatus: 

213 """Get current connection status.""" 

214 return self.status 

215 

216 async def start(self) -> None: 

217 """Start the channel adapter (begin receiving messages).""" 

218 if self._running: 

219 return 

220 

221 self._running = True 

222 connected = await self.connect() 

223 

224 if connected: 

225 self.status = ChannelStatus.CONNECTED 

226 logger.info(f"{self.name} channel connected") 

227 else: 

228 self.status = ChannelStatus.ERROR 

229 logger.error(f"{self.name} channel failed to connect") 

230 

231 async def stop(self) -> None: 

232 """Stop the channel adapter.""" 

233 self._running = False 

234 await self.disconnect() 

235 self.status = ChannelStatus.DISCONNECTED 

236 logger.info(f"{self.name} channel disconnected") 

237 

238 def is_running(self) -> bool: 

239 """Check if adapter is running.""" 

240 return self._running and self.status == ChannelStatus.CONNECTED 

241 

242 

243class ChannelError(Exception): 

244 """Base exception for channel errors.""" 

245 pass 

246 

247 

248class ChannelConnectionError(ChannelError): 

249 """Error connecting to channel.""" 

250 pass 

251 

252 

253class ChannelSendError(ChannelError): 

254 """Error sending message.""" 

255 pass 

256 

257 

258class ChannelRateLimitError(ChannelError): 

259 """Rate limit exceeded.""" 

260 def __init__(self, retry_after: Optional[int] = None): 

261 self.retry_after = retry_after 

262 super().__init__(f"Rate limited. Retry after {retry_after}s" if retry_after else "Rate limited")