Coverage for integrations / channels / extensions / wechat_adapter.py: 26.8%
429 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2WeChat Channel Adapter
4Implements WeChat messaging via Official Account API and Mini-Programs.
5Based on HevolveBot extension patterns for WeChat.
7Features:
8- Official Account API (Service Account / Subscription Account)
9- Mini-Programs support
10- Template messages
11- Custom menus
12- QR code generation
13- Media upload/download
14- Customer service messages
15- Message encryption/decryption
16- Event handling (subscribe, unsubscribe, scan, location, click)
17"""
19from __future__ import annotations
21import asyncio
22import logging
23import os
24import json
25import time
26import hashlib
27import base64
28import struct
29import socket
30from typing import Optional, List, Dict, Any, Callable
31from datetime import datetime
32from dataclasses import dataclass, field
33from xml.etree import ElementTree
34try:
35 import aiohttp
36 HAS_AIOHTTP = True
37except ImportError:
38 HAS_AIOHTTP = False
40try:
41 from Crypto.Cipher import AES
42 HAS_CRYPTO = True
43except ImportError:
44 HAS_CRYPTO = False
46from ..base import (
47 ChannelAdapter,
48 ChannelConfig,
49 ChannelStatus,
50 Message,
51 MessageType,
52 MediaAttachment,
53 SendResult,
54 ChannelConnectionError,
55 ChannelSendError,
56 ChannelRateLimitError,
57)
59logger = logging.getLogger(__name__)
62# WeChat API endpoints
63WECHAT_API_BASE = "https://api.weixin.qq.com/cgi-bin"
64WECHAT_API_SEND = f"{WECHAT_API_BASE}/message/custom/send"
65WECHAT_API_TEMPLATE = f"{WECHAT_API_BASE}/message/template/send"
66WECHAT_API_MEDIA_UPLOAD = f"{WECHAT_API_BASE}/media/upload"
67WECHAT_API_MEDIA_GET = f"{WECHAT_API_BASE}/media/get"
68WECHAT_API_TOKEN = f"{WECHAT_API_BASE}/token"
69WECHAT_API_MENU = f"{WECHAT_API_BASE}/menu/create"
70WECHAT_API_QR = f"{WECHAT_API_BASE}/qrcode/create"
71WECHAT_API_USER_INFO = f"{WECHAT_API_BASE}/user/info"
73# Mini-Program API endpoints
74WECHAT_MP_API_BASE = "https://api.weixin.qq.com/wxa"
75WECHAT_MP_CODE = f"{WECHAT_MP_API_BASE}/getwxacode"
76WECHAT_MP_MSG_SEND = f"{WECHAT_MP_API_BASE}/msg_sec_check"
79@dataclass
80class WeChatConfig(ChannelConfig):
81 """WeChat-specific configuration."""
82 app_id: str = ""
83 app_secret: str = ""
84 encoding_aes_key: Optional[str] = None # For message encryption
85 token: str = "" # Verification token
86 account_type: str = "service" # service, subscription, mini_program
87 enable_encryption: bool = False
88 enable_mini_program: bool = False
89 mini_program_app_id: Optional[str] = None
90 mini_program_secret: Optional[str] = None
91 template_ids: Dict[str, str] = field(default_factory=dict)
94@dataclass
95class WeChatUser:
96 """WeChat user information."""
97 openid: str
98 unionid: Optional[str] = None
99 nickname: Optional[str] = None
100 sex: int = 0 # 0: unknown, 1: male, 2: female
101 city: Optional[str] = None
102 province: Optional[str] = None
103 country: Optional[str] = None
104 headimgurl: Optional[str] = None
105 subscribe: bool = True
106 subscribe_time: Optional[int] = None
107 language: str = "zh_CN"
110@dataclass
111class TemplateMessage:
112 """Template message builder."""
113 template_id: str
114 touser: str
115 url: Optional[str] = None
116 miniprogram: Optional[Dict[str, str]] = None
117 data: Dict[str, Dict[str, str]] = field(default_factory=dict)
119 def add_field(self, key: str, value: str, color: str = "#173177") -> 'TemplateMessage':
120 """Add a data field to the template."""
121 self.data[key] = {"value": value, "color": color}
122 return self
124 def to_dict(self) -> Dict[str, Any]:
125 """Convert to API request format."""
126 result = {
127 "touser": self.touser,
128 "template_id": self.template_id,
129 "data": self.data,
130 }
131 if self.url:
132 result["url"] = self.url
133 if self.miniprogram:
134 result["miniprogram"] = self.miniprogram
135 return result
138class WeChatMessageCrypto:
139 """
140 WeChat message encryption/decryption handler.
142 Implements AES-256-CBC encryption as specified by WeChat.
143 """
145 def __init__(self, app_id: str, encoding_aes_key: str, token: str):
146 if not HAS_CRYPTO:
147 raise ImportError("pycryptodome required for encryption. Install with: pip install pycryptodome")
149 self.app_id = app_id
150 self.token = token
151 # Decode the encoding key (43 chars Base64 -> 32 bytes)
152 self.aes_key = base64.b64decode(encoding_aes_key + "=")
154 def _pad(self, data: bytes) -> bytes:
155 """PKCS#7 padding."""
156 block_size = 32
157 padding_len = block_size - (len(data) % block_size)
158 return data + bytes([padding_len] * padding_len)
160 def _unpad(self, data: bytes) -> bytes:
161 """Remove PKCS#7 padding."""
162 padding_len = data[-1]
163 return data[:-padding_len]
165 def encrypt(self, message: str) -> str:
166 """Encrypt a message."""
167 # Generate random 16-byte string
168 random_str = os.urandom(16)
170 # Build plaintext: random(16) + msg_len(4) + msg + app_id
171 msg_bytes = message.encode('utf-8')
172 msg_len = struct.pack('>I', len(msg_bytes))
173 app_id_bytes = self.app_id.encode('utf-8')
174 plaintext = random_str + msg_len + msg_bytes + app_id_bytes
176 # Encrypt
177 iv = self.aes_key[:16]
178 cipher = AES.new(self.aes_key, AES.MODE_CBC, iv)
179 ciphertext = cipher.encrypt(self._pad(plaintext))
181 return base64.b64encode(ciphertext).decode('utf-8')
183 def decrypt(self, encrypted: str) -> str:
184 """Decrypt a message."""
185 ciphertext = base64.b64decode(encrypted)
187 # Decrypt
188 iv = self.aes_key[:16]
189 cipher = AES.new(self.aes_key, AES.MODE_CBC, iv)
190 plaintext = self._unpad(cipher.decrypt(ciphertext))
192 # Extract message
193 # Skip random(16), read msg_len(4), extract msg
194 msg_len = struct.unpack('>I', plaintext[16:20])[0]
195 message = plaintext[20:20 + msg_len].decode('utf-8')
197 return message
199 def verify_signature(self, signature: str, timestamp: str, nonce: str, encrypt: str = "") -> bool:
200 """Verify message signature."""
201 parts = sorted([self.token, timestamp, nonce, encrypt])
202 sign_str = ''.join(parts)
203 computed = hashlib.sha1(sign_str.encode('utf-8')).hexdigest()
204 return computed == signature
207class WeChatAdapter(ChannelAdapter):
208 """
209 WeChat messaging adapter for Official Accounts and Mini-Programs.
211 Usage:
212 config = WeChatConfig(
213 app_id="your-app-id",
214 app_secret="your-app-secret",
215 token="your-verification-token",
216 )
217 adapter = WeChatAdapter(config)
218 adapter.on_message(my_handler)
219 # Use with webhook endpoint for receiving messages
220 """
222 def __init__(self, config: WeChatConfig):
223 super().__init__(config)
224 self.wechat_config: WeChatConfig = config
225 self._access_token: Optional[str] = None
226 self._token_expires_at: int = 0
227 self._crypto: Optional[WeChatMessageCrypto] = None
228 self._session: Optional[aiohttp.ClientSession] = None
229 self._event_handlers: Dict[str, Callable] = {}
230 self._user_cache: Dict[str, WeChatUser] = {}
232 @property
233 def name(self) -> str:
234 return "wechat"
236 async def connect(self) -> bool:
237 """Initialize WeChat API connection."""
238 if not self.wechat_config.app_id or not self.wechat_config.app_secret:
239 logger.error("WeChat app ID and app secret required")
240 return False
242 try:
243 # Create HTTP session
244 self._session = aiohttp.ClientSession()
246 # Get initial access token
247 token_obtained = await self._refresh_access_token()
248 if not token_obtained:
249 logger.error("Failed to obtain WeChat access token")
250 return False
252 # Setup encryption if enabled
253 if self.wechat_config.enable_encryption:
254 if not self.wechat_config.encoding_aes_key:
255 logger.error("Encoding AES key required for encryption")
256 return False
258 self._crypto = WeChatMessageCrypto(
259 self.wechat_config.app_id,
260 self.wechat_config.encoding_aes_key,
261 self.wechat_config.token,
262 )
264 self.status = ChannelStatus.CONNECTED
265 logger.info(f"WeChat adapter connected for app: {self.wechat_config.app_id}")
266 return True
268 except Exception as e:
269 logger.error(f"Failed to connect to WeChat: {e}")
270 self.status = ChannelStatus.ERROR
271 return False
273 async def disconnect(self) -> None:
274 """Disconnect WeChat adapter."""
275 if self._session:
276 await self._session.close()
277 self._session = None
279 self._access_token = None
280 self._crypto = None
281 self.status = ChannelStatus.DISCONNECTED
283 async def _refresh_access_token(self) -> bool:
284 """Refresh the access token from WeChat API."""
285 if not self._session:
286 return False
288 try:
289 params = {
290 "grant_type": "client_credential",
291 "appid": self.wechat_config.app_id,
292 "secret": self.wechat_config.app_secret,
293 }
295 async with self._session.get(WECHAT_API_TOKEN, params=params) as response:
296 data = await response.json()
298 if "access_token" in data:
299 self._access_token = data["access_token"]
300 expires_in = data.get("expires_in", 7200)
301 self._token_expires_at = int(time.time()) + expires_in - 300 # 5 min buffer
302 return True
303 else:
304 logger.error(f"Failed to get access token: {data}")
305 return False
307 except Exception as e:
308 logger.error(f"Error refreshing access token: {e}")
309 return False
311 async def _ensure_token(self) -> bool:
312 """Ensure we have a valid access token."""
313 if not self._access_token or time.time() >= self._token_expires_at:
314 return await self._refresh_access_token()
315 return True
317 def verify_webhook(self, signature: str, timestamp: str, nonce: str) -> str:
318 """
319 Verify webhook request from WeChat.
320 Should be called from your webhook endpoint for GET requests.
322 Returns echostr if valid, empty string if invalid.
323 """
324 if self._crypto:
325 if self._crypto.verify_signature(signature, timestamp, nonce):
326 return nonce
327 else:
328 # Basic verification without encryption
329 parts = sorted([self.wechat_config.token, timestamp, nonce])
330 sign_str = ''.join(parts)
331 computed = hashlib.sha1(sign_str.encode('utf-8')).hexdigest()
332 if computed == signature:
333 return nonce
335 return ""
337 async def handle_webhook(
338 self,
339 body: str,
340 signature: str,
341 timestamp: str,
342 nonce: str,
343 msg_signature: Optional[str] = None,
344 ) -> Optional[str]:
345 """
346 Handle incoming webhook POST request from WeChat.
347 Returns response XML string.
348 """
349 try:
350 # Decrypt if needed
351 xml_content = body
352 if self.wechat_config.enable_encryption and self._crypto and msg_signature:
353 # Parse encrypted XML
354 root = ElementTree.fromstring(body)
355 encrypted = root.find('Encrypt').text
357 # Verify signature
358 if not self._crypto.verify_signature(msg_signature, timestamp, nonce, encrypted):
359 logger.error("Invalid message signature")
360 return None
362 # Decrypt
363 xml_content = self._crypto.decrypt(encrypted)
365 # Parse XML message
366 root = ElementTree.fromstring(xml_content)
367 msg_type = root.find('MsgType').text
369 # Handle different message types
370 if msg_type == 'text':
371 await self._handle_text_message(root)
372 elif msg_type == 'image':
373 await self._handle_image_message(root)
374 elif msg_type == 'voice':
375 await self._handle_voice_message(root)
376 elif msg_type == 'video':
377 await self._handle_video_message(root)
378 elif msg_type == 'location':
379 await self._handle_location_message(root)
380 elif msg_type == 'event':
381 await self._handle_event(root)
383 # Return success (empty string means success)
384 return "success"
386 except Exception as e:
387 logger.error(f"Error handling webhook: {e}")
388 return None
390 async def _handle_text_message(self, root: ElementTree.Element) -> None:
391 """Handle text message."""
392 message = self._convert_message(root)
393 await self._dispatch_message(message)
395 async def _handle_image_message(self, root: ElementTree.Element) -> None:
396 """Handle image message."""
397 message = self._convert_message(root, MessageType.IMAGE)
398 await self._dispatch_message(message)
400 async def _handle_voice_message(self, root: ElementTree.Element) -> None:
401 """Handle voice message."""
402 message = self._convert_message(root, MessageType.VOICE)
403 await self._dispatch_message(message)
405 async def _handle_video_message(self, root: ElementTree.Element) -> None:
406 """Handle video message."""
407 message = self._convert_message(root, MessageType.VIDEO)
408 await self._dispatch_message(message)
410 async def _handle_location_message(self, root: ElementTree.Element) -> None:
411 """Handle location message."""
412 message = self._convert_message(root, MessageType.LOCATION)
413 await self._dispatch_message(message)
415 async def _handle_event(self, root: ElementTree.Element) -> None:
416 """Handle event message."""
417 event_type = root.find('Event').text.lower()
418 openid = root.find('FromUserName').text
420 # Check for registered event handler
421 if event_type in self._event_handlers:
422 handler = self._event_handlers[event_type]
423 await handler(root)
425 # Log events
426 if event_type == 'subscribe':
427 logger.info(f"User subscribed: {openid}")
428 elif event_type == 'unsubscribe':
429 logger.info(f"User unsubscribed: {openid}")
430 elif event_type == 'scan':
431 event_key = root.find('EventKey').text if root.find('EventKey') is not None else None
432 logger.info(f"User scanned QR: {openid}, key: {event_key}")
433 elif event_type == 'click':
434 event_key = root.find('EventKey').text
435 logger.info(f"Menu click: {openid}, key: {event_key}")
437 def _convert_message(
438 self,
439 root: ElementTree.Element,
440 media_type: Optional[MessageType] = None,
441 ) -> Message:
442 """Convert WeChat XML message to unified Message format."""
443 openid = root.find('FromUserName').text
444 msg_id = root.find('MsgId').text if root.find('MsgId') is not None else str(int(time.time() * 1000))
445 create_time = int(root.find('CreateTime').text)
447 text = ""
448 media = []
450 msg_type = root.find('MsgType').text
452 if msg_type == 'text':
453 text = root.find('Content').text
454 elif msg_type == 'image':
455 pic_url = root.find('PicUrl').text
456 media_id = root.find('MediaId').text
457 media.append(MediaAttachment(
458 type=MessageType.IMAGE,
459 url=pic_url,
460 file_id=media_id,
461 ))
462 elif msg_type == 'voice':
463 media_id = root.find('MediaId').text
464 recognition = root.find('Recognition')
465 if recognition is not None:
466 text = recognition.text
467 media.append(MediaAttachment(
468 type=MessageType.VOICE,
469 file_id=media_id,
470 ))
471 elif msg_type == 'video' or msg_type == 'shortvideo':
472 media_id = root.find('MediaId').text
473 thumb_media_id = root.find('ThumbMediaId').text
474 media.append(MediaAttachment(
475 type=MessageType.VIDEO,
476 file_id=media_id,
477 ))
478 elif msg_type == 'location':
479 lat = root.find('Location_X').text
480 lon = root.find('Location_Y').text
481 scale = root.find('Scale').text
482 label = root.find('Label').text
483 text = f"[location:{lat},{lon}] {label}"
485 return Message(
486 id=msg_id,
487 channel=self.name,
488 sender_id=openid,
489 chat_id=openid, # WeChat uses openid for both
490 text=text,
491 media=media,
492 timestamp=datetime.fromtimestamp(create_time),
493 is_group=False, # Official account messages are 1:1
494 raw={
495 'msg_type': msg_type,
496 'openid': openid,
497 },
498 )
500 async def send_message(
501 self,
502 chat_id: str,
503 text: str,
504 reply_to: Optional[str] = None,
505 media: Optional[List[MediaAttachment]] = None,
506 buttons: Optional[List[Dict]] = None,
507 ) -> SendResult:
508 """Send a customer service message to a user."""
509 if not await self._ensure_token():
510 return SendResult(success=False, error="Failed to get access token")
512 try:
513 # Build message based on content type
514 if media and len(media) > 0:
515 return await self._send_media_message(chat_id, media[0], text)
517 # Send text message
518 payload = {
519 "touser": chat_id,
520 "msgtype": "text",
521 "text": {
522 "content": text
523 }
524 }
526 url = f"{WECHAT_API_SEND}?access_token={self._access_token}"
528 async with self._session.post(url, json=payload) as response:
529 data = await response.json()
531 if data.get("errcode", 0) == 0:
532 return SendResult(success=True)
533 elif data.get("errcode") == 45015:
534 # User not interacting in 48 hours
535 return SendResult(success=False, error="User inactive for 48 hours")
536 elif data.get("errcode") == 45047:
537 # Rate limited
538 raise ChannelRateLimitError(60)
539 else:
540 return SendResult(success=False, error=data.get("errmsg", "Unknown error"))
542 except ChannelRateLimitError:
543 raise
544 except Exception as e:
545 logger.error(f"Failed to send WeChat message: {e}")
546 return SendResult(success=False, error=str(e))
548 async def _send_media_message(
549 self,
550 chat_id: str,
551 media: MediaAttachment,
552 caption: Optional[str] = None,
553 ) -> SendResult:
554 """Send a media message."""
555 if not self._session:
556 return SendResult(success=False, error="Not connected")
558 try:
559 # Determine message type
560 if media.type == MessageType.IMAGE:
561 msgtype = "image"
562 elif media.type == MessageType.VOICE:
563 msgtype = "voice"
564 elif media.type == MessageType.VIDEO:
565 msgtype = "video"
566 else:
567 msgtype = "file"
569 # Need media_id - upload if we have URL/file
570 media_id = media.file_id
572 if not media_id and (media.url or media.file_path):
573 media_id = await self._upload_media(media)
575 if not media_id:
576 return SendResult(success=False, error="No media ID")
578 payload = {
579 "touser": chat_id,
580 "msgtype": msgtype,
581 msgtype: {
582 "media_id": media_id
583 }
584 }
586 url = f"{WECHAT_API_SEND}?access_token={self._access_token}"
588 async with self._session.post(url, json=payload) as response:
589 data = await response.json()
591 if data.get("errcode", 0) == 0:
592 return SendResult(success=True)
593 else:
594 return SendResult(success=False, error=data.get("errmsg", "Unknown error"))
596 except Exception as e:
597 logger.error(f"Failed to send media message: {e}")
598 return SendResult(success=False, error=str(e))
600 async def _upload_media(self, media: MediaAttachment) -> Optional[str]:
601 """Upload media to WeChat servers."""
602 if not self._session or not await self._ensure_token():
603 return None
605 try:
606 # Determine media type
607 if media.type == MessageType.IMAGE:
608 media_type = "image"
609 elif media.type == MessageType.VOICE:
610 media_type = "voice"
611 elif media.type == MessageType.VIDEO:
612 media_type = "video"
613 else:
614 media_type = "file"
616 url = f"{WECHAT_API_MEDIA_UPLOAD}?access_token={self._access_token}&type={media_type}"
618 # Get file data
619 if media.file_path:
620 with open(media.file_path, 'rb') as f:
621 file_data = f.read()
622 filename = os.path.basename(media.file_path)
623 elif media.url:
624 async with self._session.get(media.url) as resp:
625 file_data = await resp.read()
626 filename = media.file_name or "file"
627 else:
628 return None
630 # Upload
631 data = aiohttp.FormData()
632 data.add_field(
633 'media',
634 file_data,
635 filename=filename,
636 content_type=media.mime_type or 'application/octet-stream'
637 )
639 async with self._session.post(url, data=data) as response:
640 result = await response.json()
641 return result.get("media_id")
643 except Exception as e:
644 logger.error(f"Failed to upload media: {e}")
645 return None
647 async def edit_message(
648 self,
649 chat_id: str,
650 message_id: str,
651 text: str,
652 buttons: Optional[List[Dict]] = None,
653 ) -> SendResult:
654 """
655 Edit a message.
656 Note: WeChat doesn't support message editing, sends new message.
657 """
658 logger.warning("WeChat doesn't support message editing, sending new message")
659 return await self.send_message(chat_id, text, buttons=buttons)
661 async def delete_message(self, chat_id: str, message_id: str) -> bool:
662 """
663 Delete a message.
664 Note: WeChat doesn't support message deletion.
665 """
666 logger.warning("WeChat doesn't support message deletion")
667 return False
669 async def send_typing(self, chat_id: str) -> None:
670 """
671 Send typing indicator.
672 Note: WeChat doesn't have a typing indicator.
673 """
674 pass
676 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
677 """Get user information by OpenID."""
678 user = await self.get_user_info(chat_id)
679 if user:
680 return {
681 'openid': user.openid,
682 'unionid': user.unionid,
683 'nickname': user.nickname,
684 'avatar': user.headimgurl,
685 'sex': user.sex,
686 'city': user.city,
687 'province': user.province,
688 'country': user.country,
689 'subscribed': user.subscribe,
690 }
691 return None
693 # WeChat-specific methods
695 def register_event_handler(
696 self,
697 event_type: str,
698 handler: Callable[[ElementTree.Element], Any],
699 ) -> None:
700 """Register a handler for WeChat events."""
701 self._event_handlers[event_type.lower()] = handler
703 async def get_user_info(self, openid: str) -> Optional[WeChatUser]:
704 """Get user profile information."""
705 if not await self._ensure_token():
706 return None
708 # Check cache
709 if openid in self._user_cache:
710 return self._user_cache[openid]
712 try:
713 url = f"{WECHAT_API_USER_INFO}?access_token={self._access_token}&openid={openid}&lang=zh_CN"
715 async with self._session.get(url) as response:
716 data = await response.json()
718 if data.get("errcode"):
719 logger.error(f"Failed to get user info: {data}")
720 return None
722 user = WeChatUser(
723 openid=data.get("openid"),
724 unionid=data.get("unionid"),
725 nickname=data.get("nickname"),
726 sex=data.get("sex", 0),
727 city=data.get("city"),
728 province=data.get("province"),
729 country=data.get("country"),
730 headimgurl=data.get("headimgurl"),
731 subscribe=data.get("subscribe") == 1,
732 subscribe_time=data.get("subscribe_time"),
733 language=data.get("language", "zh_CN"),
734 )
736 self._user_cache[openid] = user
737 return user
739 except Exception as e:
740 logger.error(f"Error getting user info: {e}")
741 return None
743 async def send_template_message(
744 self,
745 template: TemplateMessage,
746 ) -> SendResult:
747 """Send a template message."""
748 if not await self._ensure_token():
749 return SendResult(success=False, error="Failed to get access token")
751 try:
752 url = f"{WECHAT_API_TEMPLATE}?access_token={self._access_token}"
754 async with self._session.post(url, json=template.to_dict()) as response:
755 data = await response.json()
757 if data.get("errcode", 0) == 0:
758 return SendResult(
759 success=True,
760 message_id=str(data.get("msgid")),
761 )
762 else:
763 return SendResult(
764 success=False,
765 error=data.get("errmsg", "Unknown error"),
766 )
768 except Exception as e:
769 logger.error(f"Failed to send template message: {e}")
770 return SendResult(success=False, error=str(e))
772 async def create_menu(self, menu: Dict[str, Any]) -> bool:
773 """Create custom menu for Official Account."""
774 if not await self._ensure_token():
775 return False
777 try:
778 url = f"{WECHAT_API_MENU}?access_token={self._access_token}"
780 async with self._session.post(url, json=menu) as response:
781 data = await response.json()
782 return data.get("errcode", 0) == 0
784 except Exception as e:
785 logger.error(f"Failed to create menu: {e}")
786 return False
788 async def create_qr_code(
789 self,
790 scene: str,
791 permanent: bool = False,
792 expire_seconds: int = 2592000,
793 ) -> Optional[str]:
794 """
795 Create a QR code for a scene.
797 Returns the URL to get the QR code image.
798 """
799 if not await self._ensure_token():
800 return None
802 try:
803 url = f"{WECHAT_API_QR}?access_token={self._access_token}"
805 if permanent:
806 payload = {
807 "action_name": "QR_LIMIT_STR_SCENE",
808 "action_info": {
809 "scene": {"scene_str": scene}
810 }
811 }
812 else:
813 payload = {
814 "expire_seconds": expire_seconds,
815 "action_name": "QR_STR_SCENE",
816 "action_info": {
817 "scene": {"scene_str": scene}
818 }
819 }
821 async with self._session.post(url, json=payload) as response:
822 data = await response.json()
824 if "ticket" in data:
825 ticket = data["ticket"]
826 return f"https://mp.weixin.qq.com/cgi-bin/showqrcode?ticket={ticket}"
828 return None
830 except Exception as e:
831 logger.error(f"Failed to create QR code: {e}")
832 return None
834 async def get_media_content(self, media_id: str) -> Optional[bytes]:
835 """Download media content by media ID."""
836 if not await self._ensure_token():
837 return None
839 try:
840 url = f"{WECHAT_API_MEDIA_GET}?access_token={self._access_token}&media_id={media_id}"
842 async with self._session.get(url) as response:
843 if response.content_type.startswith('application/json'):
844 # Error response
845 data = await response.json()
846 logger.error(f"Failed to get media: {data}")
847 return None
849 return await response.read()
851 except Exception as e:
852 logger.error(f"Error getting media content: {e}")
853 return None
855 # Mini-Program methods
857 async def get_mini_program_qr_code(
858 self,
859 path: str,
860 width: int = 430,
861 ) -> Optional[bytes]:
862 """Generate Mini-Program QR code."""
863 if not self.wechat_config.enable_mini_program:
864 return None
866 if not await self._ensure_token():
867 return None
869 try:
870 url = f"{WECHAT_MP_CODE}?access_token={self._access_token}"
872 payload = {
873 "path": path,
874 "width": width,
875 }
877 async with self._session.post(url, json=payload) as response:
878 if response.content_type.startswith('image'):
879 return await response.read()
881 data = await response.json()
882 logger.error(f"Failed to get mini program QR: {data}")
883 return None
885 except Exception as e:
886 logger.error(f"Error getting mini program QR: {e}")
887 return None
890def create_wechat_adapter(
891 app_id: str = None,
892 app_secret: str = None,
893 token: str = None,
894 **kwargs
895) -> WeChatAdapter:
896 """
897 Factory function to create WeChat adapter.
899 Args:
900 app_id: WeChat app ID (or set WECHAT_APP_ID env var)
901 app_secret: WeChat app secret (or set WECHAT_APP_SECRET env var)
902 token: Verification token (or set WECHAT_TOKEN env var)
903 **kwargs: Additional config options
905 Returns:
906 Configured WeChatAdapter
907 """
908 app_id = app_id or os.getenv("WECHAT_APP_ID")
909 app_secret = app_secret or os.getenv("WECHAT_APP_SECRET")
910 token = token or os.getenv("WECHAT_TOKEN")
912 if not app_id:
913 raise ValueError("WeChat app ID required")
914 if not app_secret:
915 raise ValueError("WeChat app secret required")
917 config = WeChatConfig(
918 app_id=app_id,
919 app_secret=app_secret,
920 token=token or "",
921 **kwargs
922 )
923 return WeChatAdapter(config)