Coverage for integrations / channels / extensions / zalo_user_adapter.py: 33.1%
166 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Zalo User Account Adapter
4Implements Zalo personal account integration (not Official Account).
5Based on HevolveBot extension patterns.
7Features:
8- Personal account messaging
9- Group chats
10- File sharing
11- Reactions
12- Docker-compatible
13"""
15from __future__ import annotations
17import asyncio
18import logging
19import os
20import json
21try:
22 import aiohttp
23 HAS_AIOHTTP = True
24except ImportError:
25 HAS_AIOHTTP = False
26import hashlib
27import time
28from typing import Optional, List, Dict, Any, Callable
29from datetime import datetime
30from dataclasses import dataclass, field
32from ..base import (
33 ChannelAdapter,
34 ChannelConfig,
35 ChannelStatus,
36 Message,
37 MessageType,
38 SendResult,
39 ChannelConnectionError,
40 ChannelSendError,
41)
43logger = logging.getLogger(__name__)
46@dataclass
47class ZaloUserConfig(ChannelConfig):
48 """Zalo user account configuration."""
49 phone_number: str = ""
50 password: str = ""
51 imei: str = "" # Device IMEI for authentication
52 session_cookies: str = "" # Serialized session
53 api_base: str = "https://zalo.me/api"
54 poll_interval: float = 2.0
56 @classmethod
57 def from_env(cls) -> "ZaloUserConfig":
58 """Create config from environment variables."""
59 return cls(
60 phone_number=os.getenv("ZALO_PHONE", ""),
61 password=os.getenv("ZALO_PASSWORD", ""),
62 imei=os.getenv("ZALO_IMEI", ""),
63 session_cookies=os.getenv("ZALO_SESSION", ""),
64 )
67class ZaloUserAdapter(ChannelAdapter):
68 """Zalo personal account adapter."""
70 channel_type = "zalo_user"
72 @property
73 def name(self) -> str:
74 """Get adapter name."""
75 return self.channel_type
77 def __init__(self, config: ZaloUserConfig):
78 super().__init__(config)
79 self.config: ZaloUserConfig = config
80 self._session: Optional[aiohttp.ClientSession] = None
81 self._connected = False
82 self._poll_task: Optional[asyncio.Task] = None
83 self._message_handlers: List[Callable] = []
84 self._user_id: Optional[str] = None
85 self._last_msg_id: Dict[str, str] = {}
86 self._access_token: Optional[str] = None
88 async def connect(self) -> bool:
89 """Connect to Zalo."""
90 try:
91 self._session = aiohttp.ClientSession()
93 # Authenticate
94 if self.config.session_cookies:
95 # Restore session
96 self._access_token = self.config.session_cookies
97 else:
98 # Login with credentials
99 await self._login()
101 # Verify session
102 if not await self._verify_session():
103 raise ChannelConnectionError("Failed to verify Zalo session")
105 # Start polling
106 self._poll_task = asyncio.create_task(self._poll_loop())
108 self._connected = True
109 self._status = ChannelStatus.CONNECTED
110 logger.info(f"Connected to Zalo as {self._user_id}")
111 return True
113 except Exception as e:
114 logger.error(f"Failed to connect to Zalo: {e}")
115 self._status = ChannelStatus.ERROR
116 raise ChannelConnectionError(str(e))
118 async def _login(self) -> None:
119 """Login to Zalo with credentials."""
120 # Generate device signature
121 device_sig = hashlib.md5(
122 f"{self.config.imei}{self.config.phone_number}".encode()
123 ).hexdigest()
125 login_url = f"{self.config.api_base}/login"
127 payload = {
128 "phone": self.config.phone_number,
129 "password": self.config.password,
130 "imei": self.config.imei,
131 "device_sig": device_sig,
132 }
134 async with self._session.post(login_url, json=payload) as resp:
135 if resp.status != 200:
136 raise ChannelConnectionError("Login failed")
138 data = await resp.json()
139 if data.get("error_code", -1) != 0:
140 raise ChannelConnectionError(data.get("error_message", "Login failed"))
142 self._access_token = data.get("data", {}).get("access_token")
143 self._user_id = data.get("data", {}).get("user_id")
145 async def _verify_session(self) -> bool:
146 """Verify current session is valid."""
147 if not self._access_token:
148 return False
150 url = f"{self.config.api_base}/profile"
151 headers = {"Authorization": f"Bearer {self._access_token}"}
153 try:
154 async with self._session.get(url, headers=headers) as resp:
155 if resp.status == 200:
156 data = await resp.json()
157 self._user_id = data.get("data", {}).get("user_id")
158 return True
159 return False
160 except Exception:
161 return False
163 async def disconnect(self) -> None:
164 """Disconnect from Zalo."""
165 self._connected = False
167 if self._poll_task:
168 self._poll_task.cancel()
169 try:
170 await self._poll_task
171 except asyncio.CancelledError:
172 pass
174 if self._session:
175 await self._session.close()
176 self._session = None
178 self._status = ChannelStatus.DISCONNECTED
179 logger.info("Disconnected from Zalo")
181 async def _poll_loop(self) -> None:
182 """Poll for new messages."""
183 while self._connected:
184 try:
185 await self._fetch_messages()
186 await asyncio.sleep(self.config.poll_interval)
187 except asyncio.CancelledError:
188 break
189 except Exception as e:
190 logger.error(f"Poll error: {e}")
191 await asyncio.sleep(5)
193 async def _fetch_messages(self) -> None:
194 """Fetch new messages from conversations."""
195 url = f"{self.config.api_base}/conversations"
196 headers = {"Authorization": f"Bearer {self._access_token}"}
198 try:
199 async with self._session.get(url, headers=headers) as resp:
200 if resp.status != 200:
201 return
203 data = await resp.json()
205 for conv in data.get("data", {}).get("conversations", []):
206 conv_id = conv.get("conversation_id", "")
207 last_msg = conv.get("last_message", {})
208 msg_id = last_msg.get("message_id", "")
210 # Check if new message
211 if msg_id and msg_id != self._last_msg_id.get(conv_id):
212 self._last_msg_id[conv_id] = msg_id
214 # Skip own messages
215 if last_msg.get("sender_id") == self._user_id:
216 continue
218 message = self._parse_message(last_msg, conv_id)
219 if message:
220 for handler in self._message_handlers:
221 asyncio.create_task(handler(message))
223 except Exception as e:
224 logger.error(f"Error fetching messages: {e}")
226 def _parse_message(self, data: Dict[str, Any], conv_id: str) -> Optional[Message]:
227 """Parse Zalo message to unified Message."""
228 try:
229 msg_type = MessageType.TEXT
230 content = data.get("content", "")
232 if data.get("msg_type") == "image":
233 msg_type = MessageType.IMAGE
234 elif data.get("msg_type") == "video":
235 msg_type = MessageType.VIDEO
236 elif data.get("msg_type") == "file":
237 msg_type = MessageType.FILE
239 return Message(
240 id=data.get("message_id", ""),
241 channel=self.channel_type,
242 chat_id=conv_id,
243 sender_id=data.get("sender_id", ""),
244 sender_name=data.get("sender_name", ""),
245 text=content if isinstance(content, str) else json.dumps(content),
246 timestamp=datetime.fromtimestamp(data.get("timestamp", time.time()) / 1000),
247 message_type=msg_type,
248 )
249 except Exception as e:
250 logger.error(f"Error parsing message: {e}")
251 return None
253 def on_message(self, handler: Callable) -> None:
254 """Register message handler."""
255 self._message_handlers.append(handler)
257 async def send_message(
258 self,
259 chat_id: str,
260 text: str,
261 reply_to: Optional[str] = None,
262 **kwargs
263 ) -> SendResult:
264 """Send a message."""
265 url = f"{self.config.api_base}/messages"
266 headers = {"Authorization": f"Bearer {self._access_token}"}
268 payload = {
269 "conversation_id": chat_id,
270 "content": text,
271 "msg_type": "text",
272 }
274 if reply_to:
275 payload["reply_to"] = reply_to
277 try:
278 async with self._session.post(url, json=payload, headers=headers) as resp:
279 if resp.status not in (200, 201):
280 raise ChannelSendError("Failed to send message")
282 data = await resp.json()
283 return SendResult(
284 success=True,
285 message_id=data.get("data", {}).get("message_id", ""),
286 timestamp=datetime.now()
287 )
289 except Exception as e:
290 logger.error(f"Failed to send message: {e}")
291 raise ChannelSendError(str(e))
293 async def edit_message(self, chat_id: str, message_id: str, text: str, **kwargs) -> bool:
294 """Zalo doesn't support message editing."""
295 return False
297 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool:
298 """Delete/recall a message."""
299 url = f"{self.config.api_base}/messages/{message_id}/recall"
300 headers = {"Authorization": f"Bearer {self._access_token}"}
302 try:
303 async with self._session.post(url, headers=headers) as resp:
304 return resp.status == 200
305 except Exception as e:
306 logger.error(f"Failed to delete message: {e}")
307 return False
309 async def send_typing(self, chat_id: str, **kwargs) -> None:
310 """Send typing indicator."""
311 # Zalo may not have public typing API
312 pass
314 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
315 """Get conversation information."""
316 url = f"{self.config.api_base}/conversations/{chat_id}"
317 headers = {"Authorization": f"Bearer {self._access_token}"}
319 try:
320 async with self._session.get(url, headers=headers) as resp:
321 if resp.status != 200:
322 return None
324 data = await resp.json()
325 conv = data.get("data", {})
326 return {
327 "id": conv.get("conversation_id"),
328 "name": conv.get("name"),
329 "type": conv.get("type"),
330 "member_count": conv.get("member_count", 0),
331 }
332 except Exception as e:
333 logger.error(f"Failed to get chat info: {e}")
334 return None
336 async def add_reaction(self, chat_id: str, message_id: str, emoji: str) -> bool:
337 """Add reaction to a message."""
338 url = f"{self.config.api_base}/messages/{message_id}/reactions"
339 headers = {"Authorization": f"Bearer {self._access_token}"}
341 try:
342 async with self._session.post(
343 url,
344 json={"reaction": emoji},
345 headers=headers
346 ) as resp:
347 return resp.status == 200
348 except Exception as e:
349 logger.error(f"Failed to add reaction: {e}")
350 return False
352 def get_session_token(self) -> str:
353 """Get session token for persistence."""
354 return self._access_token or ""
357def create_zalo_user_adapter(
358 phone_number: Optional[str] = None,
359 password: Optional[str] = None,
360 session_cookies: Optional[str] = None,
361 **kwargs
362) -> ZaloUserAdapter:
363 """Factory function to create a Zalo user adapter."""
364 config = ZaloUserConfig(
365 phone_number=phone_number or os.getenv("ZALO_PHONE", ""),
366 password=password or os.getenv("ZALO_PASSWORD", ""),
367 session_cookies=session_cookies or os.getenv("ZALO_SESSION", ""),
368 **kwargs
369 )
370 return ZaloUserAdapter(config)