Coverage for integrations / channels / extensions / zalo_user_adapter.py: 33.1%

166 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Zalo User Account Adapter 

3 

4Implements Zalo personal account integration (not Official Account). 

5Based on HevolveBot extension patterns. 

6 

7Features: 

8- Personal account messaging 

9- Group chats 

10- File sharing 

11- Reactions 

12- Docker-compatible 

13""" 

14 

15from __future__ import annotations 

16 

17import asyncio 

18import logging 

19import os 

20import json 

21try: 

22 import aiohttp 

23 HAS_AIOHTTP = True 

24except ImportError: 

25 HAS_AIOHTTP = False 

26import hashlib 

27import time 

28from typing import Optional, List, Dict, Any, Callable 

29from datetime import datetime 

30from dataclasses import dataclass, field 

31 

32from ..base import ( 

33 ChannelAdapter, 

34 ChannelConfig, 

35 ChannelStatus, 

36 Message, 

37 MessageType, 

38 SendResult, 

39 ChannelConnectionError, 

40 ChannelSendError, 

41) 

42 

43logger = logging.getLogger(__name__) 

44 

45 

46@dataclass 

47class ZaloUserConfig(ChannelConfig): 

48 """Zalo user account configuration.""" 

49 phone_number: str = "" 

50 password: str = "" 

51 imei: str = "" # Device IMEI for authentication 

52 session_cookies: str = "" # Serialized session 

53 api_base: str = "https://zalo.me/api" 

54 poll_interval: float = 2.0 

55 

56 @classmethod 

57 def from_env(cls) -> "ZaloUserConfig": 

58 """Create config from environment variables.""" 

59 return cls( 

60 phone_number=os.getenv("ZALO_PHONE", ""), 

61 password=os.getenv("ZALO_PASSWORD", ""), 

62 imei=os.getenv("ZALO_IMEI", ""), 

63 session_cookies=os.getenv("ZALO_SESSION", ""), 

64 ) 

65 

66 

67class ZaloUserAdapter(ChannelAdapter): 

68 """Zalo personal account adapter.""" 

69 

70 channel_type = "zalo_user" 

71 

72 @property 

73 def name(self) -> str: 

74 """Get adapter name.""" 

75 return self.channel_type 

76 

77 def __init__(self, config: ZaloUserConfig): 

78 super().__init__(config) 

79 self.config: ZaloUserConfig = config 

80 self._session: Optional[aiohttp.ClientSession] = None 

81 self._connected = False 

82 self._poll_task: Optional[asyncio.Task] = None 

83 self._message_handlers: List[Callable] = [] 

84 self._user_id: Optional[str] = None 

85 self._last_msg_id: Dict[str, str] = {} 

86 self._access_token: Optional[str] = None 

87 

88 async def connect(self) -> bool: 

89 """Connect to Zalo.""" 

90 try: 

91 self._session = aiohttp.ClientSession() 

92 

93 # Authenticate 

94 if self.config.session_cookies: 

95 # Restore session 

96 self._access_token = self.config.session_cookies 

97 else: 

98 # Login with credentials 

99 await self._login() 

100 

101 # Verify session 

102 if not await self._verify_session(): 

103 raise ChannelConnectionError("Failed to verify Zalo session") 

104 

105 # Start polling 

106 self._poll_task = asyncio.create_task(self._poll_loop()) 

107 

108 self._connected = True 

109 self._status = ChannelStatus.CONNECTED 

110 logger.info(f"Connected to Zalo as {self._user_id}") 

111 return True 

112 

113 except Exception as e: 

114 logger.error(f"Failed to connect to Zalo: {e}") 

115 self._status = ChannelStatus.ERROR 

116 raise ChannelConnectionError(str(e)) 

117 

118 async def _login(self) -> None: 

119 """Login to Zalo with credentials.""" 

120 # Generate device signature 

121 device_sig = hashlib.md5( 

122 f"{self.config.imei}{self.config.phone_number}".encode() 

123 ).hexdigest() 

124 

125 login_url = f"{self.config.api_base}/login" 

126 

127 payload = { 

128 "phone": self.config.phone_number, 

129 "password": self.config.password, 

130 "imei": self.config.imei, 

131 "device_sig": device_sig, 

132 } 

133 

134 async with self._session.post(login_url, json=payload) as resp: 

135 if resp.status != 200: 

136 raise ChannelConnectionError("Login failed") 

137 

138 data = await resp.json() 

139 if data.get("error_code", -1) != 0: 

140 raise ChannelConnectionError(data.get("error_message", "Login failed")) 

141 

142 self._access_token = data.get("data", {}).get("access_token") 

143 self._user_id = data.get("data", {}).get("user_id") 

144 

145 async def _verify_session(self) -> bool: 

146 """Verify current session is valid.""" 

147 if not self._access_token: 

148 return False 

149 

150 url = f"{self.config.api_base}/profile" 

151 headers = {"Authorization": f"Bearer {self._access_token}"} 

152 

153 try: 

154 async with self._session.get(url, headers=headers) as resp: 

155 if resp.status == 200: 

156 data = await resp.json() 

157 self._user_id = data.get("data", {}).get("user_id") 

158 return True 

159 return False 

160 except Exception: 

161 return False 

162 

163 async def disconnect(self) -> None: 

164 """Disconnect from Zalo.""" 

165 self._connected = False 

166 

167 if self._poll_task: 

168 self._poll_task.cancel() 

169 try: 

170 await self._poll_task 

171 except asyncio.CancelledError: 

172 pass 

173 

174 if self._session: 

175 await self._session.close() 

176 self._session = None 

177 

178 self._status = ChannelStatus.DISCONNECTED 

179 logger.info("Disconnected from Zalo") 

180 

181 async def _poll_loop(self) -> None: 

182 """Poll for new messages.""" 

183 while self._connected: 

184 try: 

185 await self._fetch_messages() 

186 await asyncio.sleep(self.config.poll_interval) 

187 except asyncio.CancelledError: 

188 break 

189 except Exception as e: 

190 logger.error(f"Poll error: {e}") 

191 await asyncio.sleep(5) 

192 

193 async def _fetch_messages(self) -> None: 

194 """Fetch new messages from conversations.""" 

195 url = f"{self.config.api_base}/conversations" 

196 headers = {"Authorization": f"Bearer {self._access_token}"} 

197 

198 try: 

199 async with self._session.get(url, headers=headers) as resp: 

200 if resp.status != 200: 

201 return 

202 

203 data = await resp.json() 

204 

205 for conv in data.get("data", {}).get("conversations", []): 

206 conv_id = conv.get("conversation_id", "") 

207 last_msg = conv.get("last_message", {}) 

208 msg_id = last_msg.get("message_id", "") 

209 

210 # Check if new message 

211 if msg_id and msg_id != self._last_msg_id.get(conv_id): 

212 self._last_msg_id[conv_id] = msg_id 

213 

214 # Skip own messages 

215 if last_msg.get("sender_id") == self._user_id: 

216 continue 

217 

218 message = self._parse_message(last_msg, conv_id) 

219 if message: 

220 for handler in self._message_handlers: 

221 asyncio.create_task(handler(message)) 

222 

223 except Exception as e: 

224 logger.error(f"Error fetching messages: {e}") 

225 

226 def _parse_message(self, data: Dict[str, Any], conv_id: str) -> Optional[Message]: 

227 """Parse Zalo message to unified Message.""" 

228 try: 

229 msg_type = MessageType.TEXT 

230 content = data.get("content", "") 

231 

232 if data.get("msg_type") == "image": 

233 msg_type = MessageType.IMAGE 

234 elif data.get("msg_type") == "video": 

235 msg_type = MessageType.VIDEO 

236 elif data.get("msg_type") == "file": 

237 msg_type = MessageType.FILE 

238 

239 return Message( 

240 id=data.get("message_id", ""), 

241 channel=self.channel_type, 

242 chat_id=conv_id, 

243 sender_id=data.get("sender_id", ""), 

244 sender_name=data.get("sender_name", ""), 

245 text=content if isinstance(content, str) else json.dumps(content), 

246 timestamp=datetime.fromtimestamp(data.get("timestamp", time.time()) / 1000), 

247 message_type=msg_type, 

248 ) 

249 except Exception as e: 

250 logger.error(f"Error parsing message: {e}") 

251 return None 

252 

253 def on_message(self, handler: Callable) -> None: 

254 """Register message handler.""" 

255 self._message_handlers.append(handler) 

256 

257 async def send_message( 

258 self, 

259 chat_id: str, 

260 text: str, 

261 reply_to: Optional[str] = None, 

262 **kwargs 

263 ) -> SendResult: 

264 """Send a message.""" 

265 url = f"{self.config.api_base}/messages" 

266 headers = {"Authorization": f"Bearer {self._access_token}"} 

267 

268 payload = { 

269 "conversation_id": chat_id, 

270 "content": text, 

271 "msg_type": "text", 

272 } 

273 

274 if reply_to: 

275 payload["reply_to"] = reply_to 

276 

277 try: 

278 async with self._session.post(url, json=payload, headers=headers) as resp: 

279 if resp.status not in (200, 201): 

280 raise ChannelSendError("Failed to send message") 

281 

282 data = await resp.json() 

283 return SendResult( 

284 success=True, 

285 message_id=data.get("data", {}).get("message_id", ""), 

286 timestamp=datetime.now() 

287 ) 

288 

289 except Exception as e: 

290 logger.error(f"Failed to send message: {e}") 

291 raise ChannelSendError(str(e)) 

292 

293 async def edit_message(self, chat_id: str, message_id: str, text: str, **kwargs) -> bool: 

294 """Zalo doesn't support message editing.""" 

295 return False 

296 

297 async def delete_message(self, chat_id: str, message_id: str, **kwargs) -> bool: 

298 """Delete/recall a message.""" 

299 url = f"{self.config.api_base}/messages/{message_id}/recall" 

300 headers = {"Authorization": f"Bearer {self._access_token}"} 

301 

302 try: 

303 async with self._session.post(url, headers=headers) as resp: 

304 return resp.status == 200 

305 except Exception as e: 

306 logger.error(f"Failed to delete message: {e}") 

307 return False 

308 

309 async def send_typing(self, chat_id: str, **kwargs) -> None: 

310 """Send typing indicator.""" 

311 # Zalo may not have public typing API 

312 pass 

313 

314 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

315 """Get conversation information.""" 

316 url = f"{self.config.api_base}/conversations/{chat_id}" 

317 headers = {"Authorization": f"Bearer {self._access_token}"} 

318 

319 try: 

320 async with self._session.get(url, headers=headers) as resp: 

321 if resp.status != 200: 

322 return None 

323 

324 data = await resp.json() 

325 conv = data.get("data", {}) 

326 return { 

327 "id": conv.get("conversation_id"), 

328 "name": conv.get("name"), 

329 "type": conv.get("type"), 

330 "member_count": conv.get("member_count", 0), 

331 } 

332 except Exception as e: 

333 logger.error(f"Failed to get chat info: {e}") 

334 return None 

335 

336 async def add_reaction(self, chat_id: str, message_id: str, emoji: str) -> bool: 

337 """Add reaction to a message.""" 

338 url = f"{self.config.api_base}/messages/{message_id}/reactions" 

339 headers = {"Authorization": f"Bearer {self._access_token}"} 

340 

341 try: 

342 async with self._session.post( 

343 url, 

344 json={"reaction": emoji}, 

345 headers=headers 

346 ) as resp: 

347 return resp.status == 200 

348 except Exception as e: 

349 logger.error(f"Failed to add reaction: {e}") 

350 return False 

351 

352 def get_session_token(self) -> str: 

353 """Get session token for persistence.""" 

354 return self._access_token or "" 

355 

356 

357def create_zalo_user_adapter( 

358 phone_number: Optional[str] = None, 

359 password: Optional[str] = None, 

360 session_cookies: Optional[str] = None, 

361 **kwargs 

362) -> ZaloUserAdapter: 

363 """Factory function to create a Zalo user adapter.""" 

364 config = ZaloUserConfig( 

365 phone_number=phone_number or os.getenv("ZALO_PHONE", ""), 

366 password=password or os.getenv("ZALO_PASSWORD", ""), 

367 session_cookies=session_cookies or os.getenv("ZALO_SESSION", ""), 

368 **kwargs 

369 ) 

370 return ZaloUserAdapter(config)