Coverage for integrations / channels / extensions / zalo_adapter.py: 28.9%
425 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Zalo Channel Adapter
4Implements Zalo Official Account (OA) API integration.
5Based on HevolveBot extension patterns for Vietnamese messaging platform.
7Features:
8- Official Account (OA) API integration
9- Text, image, file messaging
10- Quick reply buttons
11- List templates
12- Request user info
13- Broadcast messaging
14- Webhook signature validation
15- User interest tagging
16- Follower management
17- Reconnection with exponential backoff
18"""
20from __future__ import annotations
22import asyncio
23import logging
24import os
25import json
26import hmac
27import hashlib
28import time
29from typing import Optional, List, Dict, Any, Callable
30from datetime import datetime
31from dataclasses import dataclass, field
32from enum import Enum
34try:
35 import aiohttp
36 HAS_ZALO = True
37except ImportError:
38 HAS_ZALO = False
40from ..base import (
41 ChannelAdapter,
42 ChannelConfig,
43 ChannelStatus,
44 Message,
45 MessageType,
46 MediaAttachment,
47 SendResult,
48 ChannelConnectionError,
49 ChannelSendError,
50 ChannelRateLimitError,
51)
53logger = logging.getLogger(__name__)
55# Zalo API endpoints
56ZALO_OA_API_URL = "https://openapi.zalo.me/v2.0/oa"
57ZALO_GRAPH_URL = "https://graph.zalo.me/v2.0"
58ZALO_UPLOAD_URL = "https://openapi.zalo.me/v2.0/oa/upload"
61class ZaloMessageType(Enum):
62 """Zalo message types."""
63 TEXT = "text"
64 IMAGE = "image"
65 FILE = "file"
66 STICKER = "sticker"
67 GIF = "gif"
68 LIST = "list"
69 REQUEST_USER_INFO = "request_user_info"
72class ZaloEventType(Enum):
73 """Zalo webhook event types."""
74 USER_SEND_TEXT = "user_send_text"
75 USER_SEND_IMAGE = "user_send_image"
76 USER_SEND_FILE = "user_send_file"
77 USER_SEND_STICKER = "user_send_sticker"
78 USER_SEND_GIF = "user_send_gif"
79 USER_SEND_LOCATION = "user_send_location"
80 FOLLOW = "follow"
81 UNFOLLOW = "unfollow"
82 USER_CLICK_BUTTON = "user_click_button"
83 USER_SUBMIT_INFO = "user_submit_info"
86@dataclass
87class ZaloConfig(ChannelConfig):
88 """Zalo-specific configuration."""
89 app_id: str = ""
90 app_secret: str = ""
91 access_token: str = ""
92 refresh_token: str = ""
93 oa_id: str = "" # Official Account ID
94 webhook_secret: str = ""
95 enable_user_info_request: bool = True
96 enable_broadcast: bool = False
97 reconnect_attempts: int = 5
98 reconnect_delay: float = 1.0
101@dataclass
102class ZaloUser:
103 """Zalo user information."""
104 user_id: str
105 display_name: Optional[str] = None
106 avatar: Optional[str] = None
107 phone: Optional[str] = None
108 is_follower: bool = True
109 user_id_by_app: Optional[str] = None
110 shared_info: Dict[str, Any] = field(default_factory=dict)
113@dataclass
114class ZaloQuickReply:
115 """Quick reply button."""
116 title: str
117 payload: str
118 image_url: Optional[str] = None
121@dataclass
122class ZaloListElement:
123 """Element in a list template."""
124 title: str
125 subtitle: Optional[str] = None
126 image_url: Optional[str] = None
127 default_action: Optional[Dict[str, Any]] = None
130class ZaloAdapter(ChannelAdapter):
131 """
132 Zalo Official Account API adapter.
134 Usage:
135 config = ZaloConfig(
136 app_id="your-app-id",
137 app_secret="your-secret",
138 access_token="your-token",
139 oa_id="your-oa-id",
140 )
141 adapter = ZaloAdapter(config)
142 adapter.on_message(my_handler)
143 # Use with webhook endpoint
144 """
146 def __init__(self, config: ZaloConfig):
147 if not HAS_ZALO:
148 raise ImportError(
149 "aiohttp not installed. "
150 "Install with: pip install aiohttp"
151 )
153 super().__init__(config)
154 self.zalo_config: ZaloConfig = config
155 self._session: Optional[aiohttp.ClientSession] = None
156 self._user_cache: Dict[str, ZaloUser] = {}
157 self._follow_handlers: List[Callable] = []
158 self._unfollow_handlers: List[Callable] = []
159 self._button_handlers: Dict[str, Callable] = {}
160 self._user_info_handlers: List[Callable] = []
161 self._reconnect_count: int = 0
163 @property
164 def name(self) -> str:
165 return "zalo"
167 async def connect(self) -> bool:
168 """Initialize Zalo OA API client."""
169 if not self.zalo_config.access_token:
170 logger.error("Zalo access token required")
171 return False
173 if not self.zalo_config.oa_id:
174 logger.error("Zalo OA ID required")
175 return False
177 try:
178 # Create aiohttp session
179 self._session = aiohttp.ClientSession()
181 # Verify token by getting OA info
182 oa_info = await self._get_oa_info()
183 if oa_info:
184 logger.info(f"Zalo OA connected: {oa_info.get('name', 'Unknown')}")
185 self.status = ChannelStatus.CONNECTED
186 self._reconnect_count = 0
187 return True
188 else:
189 logger.error("Failed to verify Zalo OA token")
190 self.status = ChannelStatus.ERROR
191 return False
193 except Exception as e:
194 logger.error(f"Failed to connect to Zalo: {e}")
195 self.status = ChannelStatus.ERROR
196 return False
198 async def disconnect(self) -> None:
199 """Disconnect Zalo adapter."""
200 if self._session:
201 await self._session.close()
202 self._session = None
204 self._user_cache.clear()
205 self.status = ChannelStatus.DISCONNECTED
207 async def _get_oa_info(self) -> Optional[Dict[str, Any]]:
208 """Get OA information to verify connection."""
209 if not self._session:
210 return None
212 try:
213 headers = {
214 "access_token": self.zalo_config.access_token,
215 }
217 async with self._session.get(
218 f"{ZALO_OA_API_URL}/getoa",
219 headers=headers,
220 ) as resp:
221 if resp.status == 200:
222 data = await resp.json()
223 if data.get("error") == 0:
224 return data.get("data", {})
225 else:
226 logger.error(f"Zalo API error: {data.get('message')}")
228 except Exception as e:
229 logger.error(f"Failed to get OA info: {e}")
231 return None
233 def validate_signature(self, body: str, timestamp: str, signature: str) -> bool:
234 """Validate webhook signature."""
235 if not self.zalo_config.webhook_secret:
236 return True # No secret configured, skip validation
238 try:
239 # Zalo uses: sha256(app_id + body + timestamp + secret)
240 data = f"{self.zalo_config.app_id}{body}{timestamp}{self.zalo_config.webhook_secret}"
241 expected = hashlib.sha256(data.encode()).hexdigest()
242 return hmac.compare_digest(signature, expected)
243 except Exception:
244 return False
246 async def handle_webhook(self, body: Dict[str, Any]) -> None:
247 """
248 Handle incoming webhook request from Zalo.
249 Should be called from your webhook endpoint.
250 """
251 event_name = body.get("event_name")
252 sender = body.get("sender", {})
253 message = body.get("message", {})
254 timestamp = body.get("timestamp")
256 if not event_name:
257 logger.warning("No event_name in Zalo webhook")
258 return
260 try:
261 event_type = ZaloEventType(event_name)
262 except ValueError:
263 logger.warning(f"Unknown Zalo event: {event_name}")
264 return
266 # Handle different event types
267 if event_type == ZaloEventType.USER_SEND_TEXT:
268 await self._handle_text_message(sender, message, timestamp)
269 elif event_type == ZaloEventType.USER_SEND_IMAGE:
270 await self._handle_image_message(sender, message, timestamp)
271 elif event_type == ZaloEventType.USER_SEND_FILE:
272 await self._handle_file_message(sender, message, timestamp)
273 elif event_type == ZaloEventType.USER_SEND_STICKER:
274 await self._handle_sticker_message(sender, message, timestamp)
275 elif event_type == ZaloEventType.USER_SEND_GIF:
276 await self._handle_gif_message(sender, message, timestamp)
277 elif event_type == ZaloEventType.USER_SEND_LOCATION:
278 await self._handle_location_message(sender, message, timestamp)
279 elif event_type == ZaloEventType.FOLLOW:
280 await self._handle_follow(sender, timestamp)
281 elif event_type == ZaloEventType.UNFOLLOW:
282 await self._handle_unfollow(sender, timestamp)
283 elif event_type == ZaloEventType.USER_CLICK_BUTTON:
284 await self._handle_button_click(sender, message, timestamp)
285 elif event_type == ZaloEventType.USER_SUBMIT_INFO:
286 await self._handle_user_info_submit(sender, body.get("info", {}), timestamp)
288 async def _handle_text_message(
289 self,
290 sender: Dict[str, Any],
291 message: Dict[str, Any],
292 timestamp: int,
293 ) -> None:
294 """Handle text message."""
295 msg = self._convert_message(sender, message, timestamp)
296 await self._dispatch_message(msg)
298 async def _handle_image_message(
299 self,
300 sender: Dict[str, Any],
301 message: Dict[str, Any],
302 timestamp: int,
303 ) -> None:
304 """Handle image message."""
305 msg = self._convert_message(sender, message, timestamp)
306 msg.media.append(MediaAttachment(
307 type=MessageType.IMAGE,
308 url=message.get("url"),
309 file_id=message.get("msg_id"),
310 ))
311 await self._dispatch_message(msg)
313 async def _handle_file_message(
314 self,
315 sender: Dict[str, Any],
316 message: Dict[str, Any],
317 timestamp: int,
318 ) -> None:
319 """Handle file message."""
320 msg = self._convert_message(sender, message, timestamp)
321 msg.media.append(MediaAttachment(
322 type=MessageType.DOCUMENT,
323 url=message.get("url"),
324 file_name=message.get("name"),
325 file_size=message.get("size"),
326 ))
327 await self._dispatch_message(msg)
329 async def _handle_sticker_message(
330 self,
331 sender: Dict[str, Any],
332 message: Dict[str, Any],
333 timestamp: int,
334 ) -> None:
335 """Handle sticker message."""
336 msg = self._convert_message(sender, message, timestamp)
337 msg.text = f"[sticker:{message.get('sticker_id')}]"
338 await self._dispatch_message(msg)
340 async def _handle_gif_message(
341 self,
342 sender: Dict[str, Any],
343 message: Dict[str, Any],
344 timestamp: int,
345 ) -> None:
346 """Handle GIF message."""
347 msg = self._convert_message(sender, message, timestamp)
348 msg.media.append(MediaAttachment(
349 type=MessageType.IMAGE,
350 url=message.get("url"),
351 ))
352 await self._dispatch_message(msg)
354 async def _handle_location_message(
355 self,
356 sender: Dict[str, Any],
357 message: Dict[str, Any],
358 timestamp: int,
359 ) -> None:
360 """Handle location message."""
361 msg = self._convert_message(sender, message, timestamp)
362 lat = message.get("latitude")
363 lon = message.get("longitude")
364 msg.text = f"[location:{lat},{lon}]"
365 await self._dispatch_message(msg)
367 async def _handle_follow(self, sender: Dict[str, Any], timestamp: int) -> None:
368 """Handle follow event."""
369 user = ZaloUser(
370 user_id=sender.get("id", ""),
371 )
373 logger.info(f"User followed: {user.user_id}")
375 for handler in self._follow_handlers:
376 try:
377 result = handler(user)
378 if asyncio.iscoroutine(result):
379 await result
380 except Exception as e:
381 logger.error(f"Follow handler error: {e}")
383 async def _handle_unfollow(self, sender: Dict[str, Any], timestamp: int) -> None:
384 """Handle unfollow event."""
385 user_id = sender.get("id", "")
386 logger.info(f"User unfollowed: {user_id}")
388 for handler in self._unfollow_handlers:
389 try:
390 result = handler(user_id)
391 if asyncio.iscoroutine(result):
392 await result
393 except Exception as e:
394 logger.error(f"Unfollow handler error: {e}")
396 async def _handle_button_click(
397 self,
398 sender: Dict[str, Any],
399 message: Dict[str, Any],
400 timestamp: int,
401 ) -> None:
402 """Handle quick reply button click."""
403 payload = message.get("payload", "")
405 if payload in self._button_handlers:
406 handler = self._button_handlers[payload]
407 try:
408 result = handler(sender, message)
409 if asyncio.iscoroutine(result):
410 await result
411 except Exception as e:
412 logger.error(f"Button handler error: {e}")
413 else:
414 # Convert to regular message
415 msg = self._convert_message(sender, message, timestamp)
416 msg.text = payload
417 msg.raw["button_payload"] = payload
418 await self._dispatch_message(msg)
420 async def _handle_user_info_submit(
421 self,
422 sender: Dict[str, Any],
423 info: Dict[str, Any],
424 timestamp: int,
425 ) -> None:
426 """Handle user info submission."""
427 user = ZaloUser(
428 user_id=sender.get("id", ""),
429 display_name=info.get("name"),
430 phone=info.get("phone"),
431 avatar=info.get("avatar"),
432 shared_info=info,
433 )
435 self._user_cache[user.user_id] = user
437 for handler in self._user_info_handlers:
438 try:
439 result = handler(user)
440 if asyncio.iscoroutine(result):
441 await result
442 except Exception as e:
443 logger.error(f"User info handler error: {e}")
445 def _convert_message(
446 self,
447 sender: Dict[str, Any],
448 message: Dict[str, Any],
449 timestamp: int,
450 ) -> Message:
451 """Convert Zalo event to unified Message format."""
452 user_id = sender.get("id", "")
454 return Message(
455 id=message.get("msg_id", str(timestamp)),
456 channel=self.name,
457 sender_id=user_id,
458 sender_name=sender.get("name"),
459 chat_id=user_id, # 1:1 chat with user
460 text=message.get("text", ""),
461 timestamp=datetime.fromtimestamp(timestamp / 1000) if timestamp else datetime.now(),
462 is_group=False,
463 raw={
464 "sender": sender,
465 "message": message,
466 },
467 )
469 async def send_message(
470 self,
471 chat_id: str,
472 text: str,
473 reply_to: Optional[str] = None,
474 media: Optional[List[MediaAttachment]] = None,
475 buttons: Optional[List[Dict]] = None,
476 ) -> SendResult:
477 """Send a message to a Zalo user."""
478 if not self._session:
479 return SendResult(success=False, error="Not connected")
481 try:
482 # Handle quick reply buttons
483 if buttons:
484 return await self._send_with_buttons(chat_id, text, buttons)
486 # Handle media
487 if media and len(media) > 0:
488 for m in media:
489 if m.type == MessageType.IMAGE:
490 await self._send_image(chat_id, m.url, text)
491 elif m.type == MessageType.DOCUMENT:
492 await self._send_file(chat_id, m.url, m.file_name)
493 return SendResult(success=True)
495 # Send text message
496 return await self._send_text(chat_id, text)
498 except Exception as e:
499 logger.error(f"Failed to send Zalo message: {e}")
500 return SendResult(success=False, error=str(e))
502 async def _send_text(self, user_id: str, text: str) -> SendResult:
503 """Send text message."""
504 if not self._session:
505 return SendResult(success=False, error="Not connected")
507 try:
508 headers = {
509 "access_token": self.zalo_config.access_token,
510 "Content-Type": "application/json",
511 }
513 data = {
514 "recipient": {"user_id": user_id},
515 "message": {"text": text},
516 }
518 async with self._session.post(
519 f"{ZALO_OA_API_URL}/message",
520 headers=headers,
521 json=data,
522 ) as resp:
523 result = await resp.json()
524 if result.get("error") == 0:
525 return SendResult(
526 success=True,
527 message_id=result.get("data", {}).get("message_id"),
528 )
529 else:
530 error_msg = result.get("message", "Unknown error")
531 if result.get("error") == -201:
532 raise ChannelRateLimitError()
533 return SendResult(success=False, error=error_msg)
535 except ChannelRateLimitError:
536 raise
537 except Exception as e:
538 return SendResult(success=False, error=str(e))
540 async def _send_with_buttons(
541 self,
542 user_id: str,
543 text: str,
544 buttons: List[Dict],
545 ) -> SendResult:
546 """Send message with quick reply buttons."""
547 if not self._session:
548 return SendResult(success=False, error="Not connected")
550 try:
551 headers = {
552 "access_token": self.zalo_config.access_token,
553 "Content-Type": "application/json",
554 }
556 # Build quick replies
557 quick_replies = []
558 for btn in buttons:
559 qr = {
560 "content_type": "text",
561 "title": btn.get("text", ""),
562 "payload": btn.get("callback_data", btn.get("text", "")),
563 }
564 if btn.get("image_url"):
565 qr["image_url"] = btn["image_url"]
566 quick_replies.append(qr)
568 data = {
569 "recipient": {"user_id": user_id},
570 "message": {
571 "text": text,
572 "quick_replies": quick_replies,
573 },
574 }
576 async with self._session.post(
577 f"{ZALO_OA_API_URL}/message",
578 headers=headers,
579 json=data,
580 ) as resp:
581 result = await resp.json()
582 if result.get("error") == 0:
583 return SendResult(
584 success=True,
585 message_id=result.get("data", {}).get("message_id"),
586 )
587 else:
588 return SendResult(success=False, error=result.get("message"))
590 except Exception as e:
591 return SendResult(success=False, error=str(e))
593 async def _send_image(
594 self,
595 user_id: str,
596 image_url: str,
597 caption: str = "",
598 ) -> SendResult:
599 """Send image message."""
600 if not self._session:
601 return SendResult(success=False, error="Not connected")
603 try:
604 headers = {
605 "access_token": self.zalo_config.access_token,
606 "Content-Type": "application/json",
607 }
609 data = {
610 "recipient": {"user_id": user_id},
611 "message": {
612 "attachment": {
613 "type": "template",
614 "payload": {
615 "template_type": "media",
616 "elements": [{
617 "media_type": "image",
618 "url": image_url,
619 }],
620 },
621 },
622 },
623 }
625 async with self._session.post(
626 f"{ZALO_OA_API_URL}/message",
627 headers=headers,
628 json=data,
629 ) as resp:
630 result = await resp.json()
631 if result.get("error") == 0:
632 # Send caption as separate message if provided
633 if caption:
634 await self._send_text(user_id, caption)
635 return SendResult(
636 success=True,
637 message_id=result.get("data", {}).get("message_id"),
638 )
639 else:
640 return SendResult(success=False, error=result.get("message"))
642 except Exception as e:
643 return SendResult(success=False, error=str(e))
645 async def _send_file(
646 self,
647 user_id: str,
648 file_url: str,
649 file_name: str = "",
650 ) -> SendResult:
651 """Send file message."""
652 if not self._session:
653 return SendResult(success=False, error="Not connected")
655 try:
656 headers = {
657 "access_token": self.zalo_config.access_token,
658 "Content-Type": "application/json",
659 }
661 data = {
662 "recipient": {"user_id": user_id},
663 "message": {
664 "attachment": {
665 "type": "file",
666 "payload": {
667 "url": file_url,
668 },
669 },
670 },
671 }
673 async with self._session.post(
674 f"{ZALO_OA_API_URL}/message",
675 headers=headers,
676 json=data,
677 ) as resp:
678 result = await resp.json()
679 if result.get("error") == 0:
680 return SendResult(
681 success=True,
682 message_id=result.get("data", {}).get("message_id"),
683 )
684 else:
685 return SendResult(success=False, error=result.get("message"))
687 except Exception as e:
688 return SendResult(success=False, error=str(e))
690 async def edit_message(
691 self,
692 chat_id: str,
693 message_id: str,
694 text: str,
695 buttons: Optional[List[Dict]] = None,
696 ) -> SendResult:
697 """
698 Edit a Zalo message.
699 Note: Zalo doesn't support message editing.
700 """
701 logger.warning("Zalo doesn't support message editing")
702 return SendResult(success=False, error="Not supported")
704 async def delete_message(self, chat_id: str, message_id: str) -> bool:
705 """
706 Delete a Zalo message.
707 Note: Zalo doesn't support message deletion.
708 """
709 logger.warning("Zalo doesn't support message deletion")
710 return False
712 async def send_typing(self, chat_id: str) -> None:
713 """
714 Send typing indicator.
715 Note: Zalo doesn't support typing indicators.
716 """
717 pass
719 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
720 """Get user profile information."""
721 return await self.get_user_profile(chat_id)
723 # Zalo-specific methods
725 async def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]:
726 """Get user profile from Zalo."""
727 if not self._session:
728 return None
730 # Check cache
731 if user_id in self._user_cache:
732 user = self._user_cache[user_id]
733 return {
734 "user_id": user.user_id,
735 "display_name": user.display_name,
736 "avatar": user.avatar,
737 "phone": user.phone,
738 }
740 try:
741 headers = {
742 "access_token": self.zalo_config.access_token,
743 }
745 async with self._session.get(
746 f"{ZALO_OA_API_URL}/getprofile",
747 headers=headers,
748 params={"user_id": user_id},
749 ) as resp:
750 result = await resp.json()
751 if result.get("error") == 0:
752 data = result.get("data", {})
753 return {
754 "user_id": user_id,
755 "display_name": data.get("display_name"),
756 "avatar": data.get("avatar"),
757 "user_id_by_app": data.get("user_id_by_app"),
758 }
760 except Exception as e:
761 logger.error(f"Failed to get user profile: {e}")
763 return None
765 def on_follow(self, handler: Callable[[ZaloUser], Any]) -> None:
766 """Register a follow event handler."""
767 self._follow_handlers.append(handler)
769 def on_unfollow(self, handler: Callable[[str], Any]) -> None:
770 """Register an unfollow event handler."""
771 self._unfollow_handlers.append(handler)
773 def register_button_handler(
774 self,
775 payload: str,
776 handler: Callable[[Dict, Dict], Any],
777 ) -> None:
778 """Register a button click handler."""
779 self._button_handlers[payload] = handler
781 def on_user_info(self, handler: Callable[[ZaloUser], Any]) -> None:
782 """Register a user info submission handler."""
783 self._user_info_handlers.append(handler)
785 async def request_user_info(
786 self,
787 user_id: str,
788 title: str = "Share your info",
789 subtitle: str = "",
790 ) -> SendResult:
791 """Request user info (name, phone, etc.)."""
792 if not self._session:
793 return SendResult(success=False, error="Not connected")
795 if not self.zalo_config.enable_user_info_request:
796 return SendResult(success=False, error="User info request disabled")
798 try:
799 headers = {
800 "access_token": self.zalo_config.access_token,
801 "Content-Type": "application/json",
802 }
804 data = {
805 "recipient": {"user_id": user_id},
806 "message": {
807 "attachment": {
808 "type": "template",
809 "payload": {
810 "template_type": "request_user_info",
811 "elements": [{
812 "title": title,
813 "subtitle": subtitle,
814 "image_url": "",
815 }],
816 },
817 },
818 },
819 }
821 async with self._session.post(
822 f"{ZALO_OA_API_URL}/message",
823 headers=headers,
824 json=data,
825 ) as resp:
826 result = await resp.json()
827 if result.get("error") == 0:
828 return SendResult(success=True)
829 else:
830 return SendResult(success=False, error=result.get("message"))
832 except Exception as e:
833 return SendResult(success=False, error=str(e))
835 async def send_list_template(
836 self,
837 user_id: str,
838 elements: List[ZaloListElement],
839 buttons: Optional[List[Dict]] = None,
840 ) -> SendResult:
841 """Send a list template message."""
842 if not self._session:
843 return SendResult(success=False, error="Not connected")
845 try:
846 headers = {
847 "access_token": self.zalo_config.access_token,
848 "Content-Type": "application/json",
849 }
851 element_list = []
852 for elem in elements:
853 el = {
854 "title": elem.title,
855 }
856 if elem.subtitle:
857 el["subtitle"] = elem.subtitle
858 if elem.image_url:
859 el["image_url"] = elem.image_url
860 if elem.default_action:
861 el["default_action"] = elem.default_action
862 element_list.append(el)
864 payload = {
865 "template_type": "list",
866 "elements": element_list,
867 }
869 if buttons:
870 payload["buttons"] = [
871 {
872 "title": btn.get("text", ""),
873 "type": "oa.open.url" if btn.get("url") else "oa.query.show",
874 "payload": btn.get("url") or btn.get("callback_data", ""),
875 }
876 for btn in buttons
877 ]
879 data = {
880 "recipient": {"user_id": user_id},
881 "message": {
882 "attachment": {
883 "type": "template",
884 "payload": payload,
885 },
886 },
887 }
889 async with self._session.post(
890 f"{ZALO_OA_API_URL}/message",
891 headers=headers,
892 json=data,
893 ) as resp:
894 result = await resp.json()
895 if result.get("error") == 0:
896 return SendResult(success=True)
897 else:
898 return SendResult(success=False, error=result.get("message"))
900 except Exception as e:
901 return SendResult(success=False, error=str(e))
903 async def broadcast_message(
904 self,
905 text: str,
906 target_followers: bool = True,
907 ) -> SendResult:
908 """Broadcast message to all followers (requires approval)."""
909 if not self.zalo_config.enable_broadcast:
910 return SendResult(success=False, error="Broadcast disabled")
912 if not self._session:
913 return SendResult(success=False, error="Not connected")
915 try:
916 headers = {
917 "access_token": self.zalo_config.access_token,
918 "Content-Type": "application/json",
919 }
921 data = {
922 "message": {"text": text},
923 "target": {"all_followers": target_followers},
924 }
926 async with self._session.post(
927 f"{ZALO_OA_API_URL}/message/broadcast",
928 headers=headers,
929 json=data,
930 ) as resp:
931 result = await resp.json()
932 if result.get("error") == 0:
933 return SendResult(success=True)
934 else:
935 return SendResult(success=False, error=result.get("message"))
937 except Exception as e:
938 return SendResult(success=False, error=str(e))
940 async def tag_user(self, user_id: str, tag_name: str) -> bool:
941 """Tag a user with an interest tag."""
942 if not self._session:
943 return False
945 try:
946 headers = {
947 "access_token": self.zalo_config.access_token,
948 "Content-Type": "application/json",
949 }
951 data = {
952 "user_id": user_id,
953 "tag_name": tag_name,
954 }
956 async with self._session.post(
957 f"{ZALO_OA_API_URL}/tag/tagfollower",
958 headers=headers,
959 json=data,
960 ) as resp:
961 result = await resp.json()
962 return result.get("error") == 0
964 except Exception as e:
965 logger.error(f"Failed to tag user: {e}")
966 return False
968 async def untag_user(self, user_id: str, tag_name: str) -> bool:
969 """Remove tag from a user."""
970 if not self._session:
971 return False
973 try:
974 headers = {
975 "access_token": self.zalo_config.access_token,
976 "Content-Type": "application/json",
977 }
979 data = {
980 "user_id": user_id,
981 "tag_name": tag_name,
982 }
984 async with self._session.post(
985 f"{ZALO_OA_API_URL}/tag/rmfollowerfromtag",
986 headers=headers,
987 json=data,
988 ) as resp:
989 result = await resp.json()
990 return result.get("error") == 0
992 except Exception as e:
993 logger.error(f"Failed to untag user: {e}")
994 return False
996 async def refresh_access_token(self) -> bool:
997 """Refresh access token using refresh token."""
998 if not self.zalo_config.refresh_token:
999 return False
1001 if not self._session:
1002 return False
1004 try:
1005 data = {
1006 "app_id": self.zalo_config.app_id,
1007 "app_secret": self.zalo_config.app_secret,
1008 "refresh_token": self.zalo_config.refresh_token,
1009 "grant_type": "refresh_token",
1010 }
1012 async with self._session.post(
1013 f"{ZALO_GRAPH_URL}/oa/access_token",
1014 data=data,
1015 ) as resp:
1016 result = await resp.json()
1017 if "access_token" in result:
1018 self.zalo_config.access_token = result["access_token"]
1019 if "refresh_token" in result:
1020 self.zalo_config.refresh_token = result["refresh_token"]
1021 logger.info("Zalo access token refreshed")
1022 return True
1023 else:
1024 logger.error(f"Failed to refresh token: {result}")
1025 return False
1027 except Exception as e:
1028 logger.error(f"Token refresh error: {e}")
1029 return False
1032def create_zalo_adapter(
1033 app_id: str = None,
1034 app_secret: str = None,
1035 access_token: str = None,
1036 oa_id: str = None,
1037 **kwargs
1038) -> ZaloAdapter:
1039 """
1040 Factory function to create Zalo adapter.
1042 Args:
1043 app_id: Zalo app ID (or set ZALO_APP_ID env var)
1044 app_secret: Zalo app secret (or set ZALO_APP_SECRET env var)
1045 access_token: OA access token (or set ZALO_ACCESS_TOKEN env var)
1046 oa_id: Official Account ID (or set ZALO_OA_ID env var)
1047 **kwargs: Additional config options
1049 Returns:
1050 Configured ZaloAdapter
1051 """
1052 app_id = app_id or os.getenv("ZALO_APP_ID")
1053 app_secret = app_secret or os.getenv("ZALO_APP_SECRET")
1054 access_token = access_token or os.getenv("ZALO_ACCESS_TOKEN")
1055 oa_id = oa_id or os.getenv("ZALO_OA_ID")
1057 if not access_token:
1058 raise ValueError("Zalo access token required")
1059 if not oa_id:
1060 raise ValueError("Zalo OA ID required")
1062 config = ZaloConfig(
1063 app_id=app_id or "",
1064 app_secret=app_secret or "",
1065 access_token=access_token,
1066 oa_id=oa_id,
1067 **kwargs
1068 )
1069 return ZaloAdapter(config)