Coverage for integrations / channels / extensions / wechat_adapter.py: 26.8%

429 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2WeChat Channel Adapter 

3 

4Implements WeChat messaging via Official Account API and Mini-Programs. 

5Based on HevolveBot extension patterns for WeChat. 

6 

7Features: 

8- Official Account API (Service Account / Subscription Account) 

9- Mini-Programs support 

10- Template messages 

11- Custom menus 

12- QR code generation 

13- Media upload/download 

14- Customer service messages 

15- Message encryption/decryption 

16- Event handling (subscribe, unsubscribe, scan, location, click) 

17""" 

18 

19from __future__ import annotations 

20 

21import asyncio 

22import logging 

23import os 

24import json 

25import time 

26import hashlib 

27import base64 

28import struct 

29import socket 

30from typing import Optional, List, Dict, Any, Callable 

31from datetime import datetime 

32from dataclasses import dataclass, field 

33from xml.etree import ElementTree 

34try: 

35 import aiohttp 

36 HAS_AIOHTTP = True 

37except ImportError: 

38 HAS_AIOHTTP = False 

39 

40try: 

41 from Crypto.Cipher import AES 

42 HAS_CRYPTO = True 

43except ImportError: 

44 HAS_CRYPTO = False 

45 

46from ..base import ( 

47 ChannelAdapter, 

48 ChannelConfig, 

49 ChannelStatus, 

50 Message, 

51 MessageType, 

52 MediaAttachment, 

53 SendResult, 

54 ChannelConnectionError, 

55 ChannelSendError, 

56 ChannelRateLimitError, 

57) 

58 

59logger = logging.getLogger(__name__) 

60 

61 

62# WeChat API endpoints 

63WECHAT_API_BASE = "https://api.weixin.qq.com/cgi-bin" 

64WECHAT_API_SEND = f"{WECHAT_API_BASE}/message/custom/send" 

65WECHAT_API_TEMPLATE = f"{WECHAT_API_BASE}/message/template/send" 

66WECHAT_API_MEDIA_UPLOAD = f"{WECHAT_API_BASE}/media/upload" 

67WECHAT_API_MEDIA_GET = f"{WECHAT_API_BASE}/media/get" 

68WECHAT_API_TOKEN = f"{WECHAT_API_BASE}/token" 

69WECHAT_API_MENU = f"{WECHAT_API_BASE}/menu/create" 

70WECHAT_API_QR = f"{WECHAT_API_BASE}/qrcode/create" 

71WECHAT_API_USER_INFO = f"{WECHAT_API_BASE}/user/info" 

72 

73# Mini-Program API endpoints 

74WECHAT_MP_API_BASE = "https://api.weixin.qq.com/wxa" 

75WECHAT_MP_CODE = f"{WECHAT_MP_API_BASE}/getwxacode" 

76WECHAT_MP_MSG_SEND = f"{WECHAT_MP_API_BASE}/msg_sec_check" 

77 

78 

79@dataclass 

80class WeChatConfig(ChannelConfig): 

81 """WeChat-specific configuration.""" 

82 app_id: str = "" 

83 app_secret: str = "" 

84 encoding_aes_key: Optional[str] = None # For message encryption 

85 token: str = "" # Verification token 

86 account_type: str = "service" # service, subscription, mini_program 

87 enable_encryption: bool = False 

88 enable_mini_program: bool = False 

89 mini_program_app_id: Optional[str] = None 

90 mini_program_secret: Optional[str] = None 

91 template_ids: Dict[str, str] = field(default_factory=dict) 

92 

93 

94@dataclass 

95class WeChatUser: 

96 """WeChat user information.""" 

97 openid: str 

98 unionid: Optional[str] = None 

99 nickname: Optional[str] = None 

100 sex: int = 0 # 0: unknown, 1: male, 2: female 

101 city: Optional[str] = None 

102 province: Optional[str] = None 

103 country: Optional[str] = None 

104 headimgurl: Optional[str] = None 

105 subscribe: bool = True 

106 subscribe_time: Optional[int] = None 

107 language: str = "zh_CN" 

108 

109 

110@dataclass 

111class TemplateMessage: 

112 """Template message builder.""" 

113 template_id: str 

114 touser: str 

115 url: Optional[str] = None 

116 miniprogram: Optional[Dict[str, str]] = None 

117 data: Dict[str, Dict[str, str]] = field(default_factory=dict) 

118 

119 def add_field(self, key: str, value: str, color: str = "#173177") -> 'TemplateMessage': 

120 """Add a data field to the template.""" 

121 self.data[key] = {"value": value, "color": color} 

122 return self 

123 

124 def to_dict(self) -> Dict[str, Any]: 

125 """Convert to API request format.""" 

126 result = { 

127 "touser": self.touser, 

128 "template_id": self.template_id, 

129 "data": self.data, 

130 } 

131 if self.url: 

132 result["url"] = self.url 

133 if self.miniprogram: 

134 result["miniprogram"] = self.miniprogram 

135 return result 

136 

137 

138class WeChatMessageCrypto: 

139 """ 

140 WeChat message encryption/decryption handler. 

141 

142 Implements AES-256-CBC encryption as specified by WeChat. 

143 """ 

144 

145 def __init__(self, app_id: str, encoding_aes_key: str, token: str): 

146 if not HAS_CRYPTO: 

147 raise ImportError("pycryptodome required for encryption. Install with: pip install pycryptodome") 

148 

149 self.app_id = app_id 

150 self.token = token 

151 # Decode the encoding key (43 chars Base64 -> 32 bytes) 

152 self.aes_key = base64.b64decode(encoding_aes_key + "=") 

153 

154 def _pad(self, data: bytes) -> bytes: 

155 """PKCS#7 padding.""" 

156 block_size = 32 

157 padding_len = block_size - (len(data) % block_size) 

158 return data + bytes([padding_len] * padding_len) 

159 

160 def _unpad(self, data: bytes) -> bytes: 

161 """Remove PKCS#7 padding.""" 

162 padding_len = data[-1] 

163 return data[:-padding_len] 

164 

165 def encrypt(self, message: str) -> str: 

166 """Encrypt a message.""" 

167 # Generate random 16-byte string 

168 random_str = os.urandom(16) 

169 

170 # Build plaintext: random(16) + msg_len(4) + msg + app_id 

171 msg_bytes = message.encode('utf-8') 

172 msg_len = struct.pack('>I', len(msg_bytes)) 

173 app_id_bytes = self.app_id.encode('utf-8') 

174 plaintext = random_str + msg_len + msg_bytes + app_id_bytes 

175 

176 # Encrypt 

177 iv = self.aes_key[:16] 

178 cipher = AES.new(self.aes_key, AES.MODE_CBC, iv) 

179 ciphertext = cipher.encrypt(self._pad(plaintext)) 

180 

181 return base64.b64encode(ciphertext).decode('utf-8') 

182 

183 def decrypt(self, encrypted: str) -> str: 

184 """Decrypt a message.""" 

185 ciphertext = base64.b64decode(encrypted) 

186 

187 # Decrypt 

188 iv = self.aes_key[:16] 

189 cipher = AES.new(self.aes_key, AES.MODE_CBC, iv) 

190 plaintext = self._unpad(cipher.decrypt(ciphertext)) 

191 

192 # Extract message 

193 # Skip random(16), read msg_len(4), extract msg 

194 msg_len = struct.unpack('>I', plaintext[16:20])[0] 

195 message = plaintext[20:20 + msg_len].decode('utf-8') 

196 

197 return message 

198 

199 def verify_signature(self, signature: str, timestamp: str, nonce: str, encrypt: str = "") -> bool: 

200 """Verify message signature.""" 

201 parts = sorted([self.token, timestamp, nonce, encrypt]) 

202 sign_str = ''.join(parts) 

203 computed = hashlib.sha1(sign_str.encode('utf-8')).hexdigest() 

204 return computed == signature 

205 

206 

207class WeChatAdapter(ChannelAdapter): 

208 """ 

209 WeChat messaging adapter for Official Accounts and Mini-Programs. 

210 

211 Usage: 

212 config = WeChatConfig( 

213 app_id="your-app-id", 

214 app_secret="your-app-secret", 

215 token="your-verification-token", 

216 ) 

217 adapter = WeChatAdapter(config) 

218 adapter.on_message(my_handler) 

219 # Use with webhook endpoint for receiving messages 

220 """ 

221 

222 def __init__(self, config: WeChatConfig): 

223 super().__init__(config) 

224 self.wechat_config: WeChatConfig = config 

225 self._access_token: Optional[str] = None 

226 self._token_expires_at: int = 0 

227 self._crypto: Optional[WeChatMessageCrypto] = None 

228 self._session: Optional[aiohttp.ClientSession] = None 

229 self._event_handlers: Dict[str, Callable] = {} 

230 self._user_cache: Dict[str, WeChatUser] = {} 

231 

232 @property 

233 def name(self) -> str: 

234 return "wechat" 

235 

236 async def connect(self) -> bool: 

237 """Initialize WeChat API connection.""" 

238 if not self.wechat_config.app_id or not self.wechat_config.app_secret: 

239 logger.error("WeChat app ID and app secret required") 

240 return False 

241 

242 try: 

243 # Create HTTP session 

244 self._session = aiohttp.ClientSession() 

245 

246 # Get initial access token 

247 token_obtained = await self._refresh_access_token() 

248 if not token_obtained: 

249 logger.error("Failed to obtain WeChat access token") 

250 return False 

251 

252 # Setup encryption if enabled 

253 if self.wechat_config.enable_encryption: 

254 if not self.wechat_config.encoding_aes_key: 

255 logger.error("Encoding AES key required for encryption") 

256 return False 

257 

258 self._crypto = WeChatMessageCrypto( 

259 self.wechat_config.app_id, 

260 self.wechat_config.encoding_aes_key, 

261 self.wechat_config.token, 

262 ) 

263 

264 self.status = ChannelStatus.CONNECTED 

265 logger.info(f"WeChat adapter connected for app: {self.wechat_config.app_id}") 

266 return True 

267 

268 except Exception as e: 

269 logger.error(f"Failed to connect to WeChat: {e}") 

270 self.status = ChannelStatus.ERROR 

271 return False 

272 

273 async def disconnect(self) -> None: 

274 """Disconnect WeChat adapter.""" 

275 if self._session: 

276 await self._session.close() 

277 self._session = None 

278 

279 self._access_token = None 

280 self._crypto = None 

281 self.status = ChannelStatus.DISCONNECTED 

282 

283 async def _refresh_access_token(self) -> bool: 

284 """Refresh the access token from WeChat API.""" 

285 if not self._session: 

286 return False 

287 

288 try: 

289 params = { 

290 "grant_type": "client_credential", 

291 "appid": self.wechat_config.app_id, 

292 "secret": self.wechat_config.app_secret, 

293 } 

294 

295 async with self._session.get(WECHAT_API_TOKEN, params=params) as response: 

296 data = await response.json() 

297 

298 if "access_token" in data: 

299 self._access_token = data["access_token"] 

300 expires_in = data.get("expires_in", 7200) 

301 self._token_expires_at = int(time.time()) + expires_in - 300 # 5 min buffer 

302 return True 

303 else: 

304 logger.error(f"Failed to get access token: {data}") 

305 return False 

306 

307 except Exception as e: 

308 logger.error(f"Error refreshing access token: {e}") 

309 return False 

310 

311 async def _ensure_token(self) -> bool: 

312 """Ensure we have a valid access token.""" 

313 if not self._access_token or time.time() >= self._token_expires_at: 

314 return await self._refresh_access_token() 

315 return True 

316 

317 def verify_webhook(self, signature: str, timestamp: str, nonce: str) -> str: 

318 """ 

319 Verify webhook request from WeChat. 

320 Should be called from your webhook endpoint for GET requests. 

321 

322 Returns echostr if valid, empty string if invalid. 

323 """ 

324 if self._crypto: 

325 if self._crypto.verify_signature(signature, timestamp, nonce): 

326 return nonce 

327 else: 

328 # Basic verification without encryption 

329 parts = sorted([self.wechat_config.token, timestamp, nonce]) 

330 sign_str = ''.join(parts) 

331 computed = hashlib.sha1(sign_str.encode('utf-8')).hexdigest() 

332 if computed == signature: 

333 return nonce 

334 

335 return "" 

336 

337 async def handle_webhook( 

338 self, 

339 body: str, 

340 signature: str, 

341 timestamp: str, 

342 nonce: str, 

343 msg_signature: Optional[str] = None, 

344 ) -> Optional[str]: 

345 """ 

346 Handle incoming webhook POST request from WeChat. 

347 Returns response XML string. 

348 """ 

349 try: 

350 # Decrypt if needed 

351 xml_content = body 

352 if self.wechat_config.enable_encryption and self._crypto and msg_signature: 

353 # Parse encrypted XML 

354 root = ElementTree.fromstring(body) 

355 encrypted = root.find('Encrypt').text 

356 

357 # Verify signature 

358 if not self._crypto.verify_signature(msg_signature, timestamp, nonce, encrypted): 

359 logger.error("Invalid message signature") 

360 return None 

361 

362 # Decrypt 

363 xml_content = self._crypto.decrypt(encrypted) 

364 

365 # Parse XML message 

366 root = ElementTree.fromstring(xml_content) 

367 msg_type = root.find('MsgType').text 

368 

369 # Handle different message types 

370 if msg_type == 'text': 

371 await self._handle_text_message(root) 

372 elif msg_type == 'image': 

373 await self._handle_image_message(root) 

374 elif msg_type == 'voice': 

375 await self._handle_voice_message(root) 

376 elif msg_type == 'video': 

377 await self._handle_video_message(root) 

378 elif msg_type == 'location': 

379 await self._handle_location_message(root) 

380 elif msg_type == 'event': 

381 await self._handle_event(root) 

382 

383 # Return success (empty string means success) 

384 return "success" 

385 

386 except Exception as e: 

387 logger.error(f"Error handling webhook: {e}") 

388 return None 

389 

390 async def _handle_text_message(self, root: ElementTree.Element) -> None: 

391 """Handle text message.""" 

392 message = self._convert_message(root) 

393 await self._dispatch_message(message) 

394 

395 async def _handle_image_message(self, root: ElementTree.Element) -> None: 

396 """Handle image message.""" 

397 message = self._convert_message(root, MessageType.IMAGE) 

398 await self._dispatch_message(message) 

399 

400 async def _handle_voice_message(self, root: ElementTree.Element) -> None: 

401 """Handle voice message.""" 

402 message = self._convert_message(root, MessageType.VOICE) 

403 await self._dispatch_message(message) 

404 

405 async def _handle_video_message(self, root: ElementTree.Element) -> None: 

406 """Handle video message.""" 

407 message = self._convert_message(root, MessageType.VIDEO) 

408 await self._dispatch_message(message) 

409 

410 async def _handle_location_message(self, root: ElementTree.Element) -> None: 

411 """Handle location message.""" 

412 message = self._convert_message(root, MessageType.LOCATION) 

413 await self._dispatch_message(message) 

414 

415 async def _handle_event(self, root: ElementTree.Element) -> None: 

416 """Handle event message.""" 

417 event_type = root.find('Event').text.lower() 

418 openid = root.find('FromUserName').text 

419 

420 # Check for registered event handler 

421 if event_type in self._event_handlers: 

422 handler = self._event_handlers[event_type] 

423 await handler(root) 

424 

425 # Log events 

426 if event_type == 'subscribe': 

427 logger.info(f"User subscribed: {openid}") 

428 elif event_type == 'unsubscribe': 

429 logger.info(f"User unsubscribed: {openid}") 

430 elif event_type == 'scan': 

431 event_key = root.find('EventKey').text if root.find('EventKey') is not None else None 

432 logger.info(f"User scanned QR: {openid}, key: {event_key}") 

433 elif event_type == 'click': 

434 event_key = root.find('EventKey').text 

435 logger.info(f"Menu click: {openid}, key: {event_key}") 

436 

437 def _convert_message( 

438 self, 

439 root: ElementTree.Element, 

440 media_type: Optional[MessageType] = None, 

441 ) -> Message: 

442 """Convert WeChat XML message to unified Message format.""" 

443 openid = root.find('FromUserName').text 

444 msg_id = root.find('MsgId').text if root.find('MsgId') is not None else str(int(time.time() * 1000)) 

445 create_time = int(root.find('CreateTime').text) 

446 

447 text = "" 

448 media = [] 

449 

450 msg_type = root.find('MsgType').text 

451 

452 if msg_type == 'text': 

453 text = root.find('Content').text 

454 elif msg_type == 'image': 

455 pic_url = root.find('PicUrl').text 

456 media_id = root.find('MediaId').text 

457 media.append(MediaAttachment( 

458 type=MessageType.IMAGE, 

459 url=pic_url, 

460 file_id=media_id, 

461 )) 

462 elif msg_type == 'voice': 

463 media_id = root.find('MediaId').text 

464 recognition = root.find('Recognition') 

465 if recognition is not None: 

466 text = recognition.text 

467 media.append(MediaAttachment( 

468 type=MessageType.VOICE, 

469 file_id=media_id, 

470 )) 

471 elif msg_type == 'video' or msg_type == 'shortvideo': 

472 media_id = root.find('MediaId').text 

473 thumb_media_id = root.find('ThumbMediaId').text 

474 media.append(MediaAttachment( 

475 type=MessageType.VIDEO, 

476 file_id=media_id, 

477 )) 

478 elif msg_type == 'location': 

479 lat = root.find('Location_X').text 

480 lon = root.find('Location_Y').text 

481 scale = root.find('Scale').text 

482 label = root.find('Label').text 

483 text = f"[location:{lat},{lon}] {label}" 

484 

485 return Message( 

486 id=msg_id, 

487 channel=self.name, 

488 sender_id=openid, 

489 chat_id=openid, # WeChat uses openid for both 

490 text=text, 

491 media=media, 

492 timestamp=datetime.fromtimestamp(create_time), 

493 is_group=False, # Official account messages are 1:1 

494 raw={ 

495 'msg_type': msg_type, 

496 'openid': openid, 

497 }, 

498 ) 

499 

500 async def send_message( 

501 self, 

502 chat_id: str, 

503 text: str, 

504 reply_to: Optional[str] = None, 

505 media: Optional[List[MediaAttachment]] = None, 

506 buttons: Optional[List[Dict]] = None, 

507 ) -> SendResult: 

508 """Send a customer service message to a user.""" 

509 if not await self._ensure_token(): 

510 return SendResult(success=False, error="Failed to get access token") 

511 

512 try: 

513 # Build message based on content type 

514 if media and len(media) > 0: 

515 return await self._send_media_message(chat_id, media[0], text) 

516 

517 # Send text message 

518 payload = { 

519 "touser": chat_id, 

520 "msgtype": "text", 

521 "text": { 

522 "content": text 

523 } 

524 } 

525 

526 url = f"{WECHAT_API_SEND}?access_token={self._access_token}" 

527 

528 async with self._session.post(url, json=payload) as response: 

529 data = await response.json() 

530 

531 if data.get("errcode", 0) == 0: 

532 return SendResult(success=True) 

533 elif data.get("errcode") == 45015: 

534 # User not interacting in 48 hours 

535 return SendResult(success=False, error="User inactive for 48 hours") 

536 elif data.get("errcode") == 45047: 

537 # Rate limited 

538 raise ChannelRateLimitError(60) 

539 else: 

540 return SendResult(success=False, error=data.get("errmsg", "Unknown error")) 

541 

542 except ChannelRateLimitError: 

543 raise 

544 except Exception as e: 

545 logger.error(f"Failed to send WeChat message: {e}") 

546 return SendResult(success=False, error=str(e)) 

547 

548 async def _send_media_message( 

549 self, 

550 chat_id: str, 

551 media: MediaAttachment, 

552 caption: Optional[str] = None, 

553 ) -> SendResult: 

554 """Send a media message.""" 

555 if not self._session: 

556 return SendResult(success=False, error="Not connected") 

557 

558 try: 

559 # Determine message type 

560 if media.type == MessageType.IMAGE: 

561 msgtype = "image" 

562 elif media.type == MessageType.VOICE: 

563 msgtype = "voice" 

564 elif media.type == MessageType.VIDEO: 

565 msgtype = "video" 

566 else: 

567 msgtype = "file" 

568 

569 # Need media_id - upload if we have URL/file 

570 media_id = media.file_id 

571 

572 if not media_id and (media.url or media.file_path): 

573 media_id = await self._upload_media(media) 

574 

575 if not media_id: 

576 return SendResult(success=False, error="No media ID") 

577 

578 payload = { 

579 "touser": chat_id, 

580 "msgtype": msgtype, 

581 msgtype: { 

582 "media_id": media_id 

583 } 

584 } 

585 

586 url = f"{WECHAT_API_SEND}?access_token={self._access_token}" 

587 

588 async with self._session.post(url, json=payload) as response: 

589 data = await response.json() 

590 

591 if data.get("errcode", 0) == 0: 

592 return SendResult(success=True) 

593 else: 

594 return SendResult(success=False, error=data.get("errmsg", "Unknown error")) 

595 

596 except Exception as e: 

597 logger.error(f"Failed to send media message: {e}") 

598 return SendResult(success=False, error=str(e)) 

599 

600 async def _upload_media(self, media: MediaAttachment) -> Optional[str]: 

601 """Upload media to WeChat servers.""" 

602 if not self._session or not await self._ensure_token(): 

603 return None 

604 

605 try: 

606 # Determine media type 

607 if media.type == MessageType.IMAGE: 

608 media_type = "image" 

609 elif media.type == MessageType.VOICE: 

610 media_type = "voice" 

611 elif media.type == MessageType.VIDEO: 

612 media_type = "video" 

613 else: 

614 media_type = "file" 

615 

616 url = f"{WECHAT_API_MEDIA_UPLOAD}?access_token={self._access_token}&type={media_type}" 

617 

618 # Get file data 

619 if media.file_path: 

620 with open(media.file_path, 'rb') as f: 

621 file_data = f.read() 

622 filename = os.path.basename(media.file_path) 

623 elif media.url: 

624 async with self._session.get(media.url) as resp: 

625 file_data = await resp.read() 

626 filename = media.file_name or "file" 

627 else: 

628 return None 

629 

630 # Upload 

631 data = aiohttp.FormData() 

632 data.add_field( 

633 'media', 

634 file_data, 

635 filename=filename, 

636 content_type=media.mime_type or 'application/octet-stream' 

637 ) 

638 

639 async with self._session.post(url, data=data) as response: 

640 result = await response.json() 

641 return result.get("media_id") 

642 

643 except Exception as e: 

644 logger.error(f"Failed to upload media: {e}") 

645 return None 

646 

647 async def edit_message( 

648 self, 

649 chat_id: str, 

650 message_id: str, 

651 text: str, 

652 buttons: Optional[List[Dict]] = None, 

653 ) -> SendResult: 

654 """ 

655 Edit a message. 

656 Note: WeChat doesn't support message editing, sends new message. 

657 """ 

658 logger.warning("WeChat doesn't support message editing, sending new message") 

659 return await self.send_message(chat_id, text, buttons=buttons) 

660 

661 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

662 """ 

663 Delete a message. 

664 Note: WeChat doesn't support message deletion. 

665 """ 

666 logger.warning("WeChat doesn't support message deletion") 

667 return False 

668 

669 async def send_typing(self, chat_id: str) -> None: 

670 """ 

671 Send typing indicator. 

672 Note: WeChat doesn't have a typing indicator. 

673 """ 

674 pass 

675 

676 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

677 """Get user information by OpenID.""" 

678 user = await self.get_user_info(chat_id) 

679 if user: 

680 return { 

681 'openid': user.openid, 

682 'unionid': user.unionid, 

683 'nickname': user.nickname, 

684 'avatar': user.headimgurl, 

685 'sex': user.sex, 

686 'city': user.city, 

687 'province': user.province, 

688 'country': user.country, 

689 'subscribed': user.subscribe, 

690 } 

691 return None 

692 

693 # WeChat-specific methods 

694 

695 def register_event_handler( 

696 self, 

697 event_type: str, 

698 handler: Callable[[ElementTree.Element], Any], 

699 ) -> None: 

700 """Register a handler for WeChat events.""" 

701 self._event_handlers[event_type.lower()] = handler 

702 

703 async def get_user_info(self, openid: str) -> Optional[WeChatUser]: 

704 """Get user profile information.""" 

705 if not await self._ensure_token(): 

706 return None 

707 

708 # Check cache 

709 if openid in self._user_cache: 

710 return self._user_cache[openid] 

711 

712 try: 

713 url = f"{WECHAT_API_USER_INFO}?access_token={self._access_token}&openid={openid}&lang=zh_CN" 

714 

715 async with self._session.get(url) as response: 

716 data = await response.json() 

717 

718 if data.get("errcode"): 

719 logger.error(f"Failed to get user info: {data}") 

720 return None 

721 

722 user = WeChatUser( 

723 openid=data.get("openid"), 

724 unionid=data.get("unionid"), 

725 nickname=data.get("nickname"), 

726 sex=data.get("sex", 0), 

727 city=data.get("city"), 

728 province=data.get("province"), 

729 country=data.get("country"), 

730 headimgurl=data.get("headimgurl"), 

731 subscribe=data.get("subscribe") == 1, 

732 subscribe_time=data.get("subscribe_time"), 

733 language=data.get("language", "zh_CN"), 

734 ) 

735 

736 self._user_cache[openid] = user 

737 return user 

738 

739 except Exception as e: 

740 logger.error(f"Error getting user info: {e}") 

741 return None 

742 

743 async def send_template_message( 

744 self, 

745 template: TemplateMessage, 

746 ) -> SendResult: 

747 """Send a template message.""" 

748 if not await self._ensure_token(): 

749 return SendResult(success=False, error="Failed to get access token") 

750 

751 try: 

752 url = f"{WECHAT_API_TEMPLATE}?access_token={self._access_token}" 

753 

754 async with self._session.post(url, json=template.to_dict()) as response: 

755 data = await response.json() 

756 

757 if data.get("errcode", 0) == 0: 

758 return SendResult( 

759 success=True, 

760 message_id=str(data.get("msgid")), 

761 ) 

762 else: 

763 return SendResult( 

764 success=False, 

765 error=data.get("errmsg", "Unknown error"), 

766 ) 

767 

768 except Exception as e: 

769 logger.error(f"Failed to send template message: {e}") 

770 return SendResult(success=False, error=str(e)) 

771 

772 async def create_menu(self, menu: Dict[str, Any]) -> bool: 

773 """Create custom menu for Official Account.""" 

774 if not await self._ensure_token(): 

775 return False 

776 

777 try: 

778 url = f"{WECHAT_API_MENU}?access_token={self._access_token}" 

779 

780 async with self._session.post(url, json=menu) as response: 

781 data = await response.json() 

782 return data.get("errcode", 0) == 0 

783 

784 except Exception as e: 

785 logger.error(f"Failed to create menu: {e}") 

786 return False 

787 

788 async def create_qr_code( 

789 self, 

790 scene: str, 

791 permanent: bool = False, 

792 expire_seconds: int = 2592000, 

793 ) -> Optional[str]: 

794 """ 

795 Create a QR code for a scene. 

796 

797 Returns the URL to get the QR code image. 

798 """ 

799 if not await self._ensure_token(): 

800 return None 

801 

802 try: 

803 url = f"{WECHAT_API_QR}?access_token={self._access_token}" 

804 

805 if permanent: 

806 payload = { 

807 "action_name": "QR_LIMIT_STR_SCENE", 

808 "action_info": { 

809 "scene": {"scene_str": scene} 

810 } 

811 } 

812 else: 

813 payload = { 

814 "expire_seconds": expire_seconds, 

815 "action_name": "QR_STR_SCENE", 

816 "action_info": { 

817 "scene": {"scene_str": scene} 

818 } 

819 } 

820 

821 async with self._session.post(url, json=payload) as response: 

822 data = await response.json() 

823 

824 if "ticket" in data: 

825 ticket = data["ticket"] 

826 return f"https://mp.weixin.qq.com/cgi-bin/showqrcode?ticket={ticket}" 

827 

828 return None 

829 

830 except Exception as e: 

831 logger.error(f"Failed to create QR code: {e}") 

832 return None 

833 

834 async def get_media_content(self, media_id: str) -> Optional[bytes]: 

835 """Download media content by media ID.""" 

836 if not await self._ensure_token(): 

837 return None 

838 

839 try: 

840 url = f"{WECHAT_API_MEDIA_GET}?access_token={self._access_token}&media_id={media_id}" 

841 

842 async with self._session.get(url) as response: 

843 if response.content_type.startswith('application/json'): 

844 # Error response 

845 data = await response.json() 

846 logger.error(f"Failed to get media: {data}") 

847 return None 

848 

849 return await response.read() 

850 

851 except Exception as e: 

852 logger.error(f"Error getting media content: {e}") 

853 return None 

854 

855 # Mini-Program methods 

856 

857 async def get_mini_program_qr_code( 

858 self, 

859 path: str, 

860 width: int = 430, 

861 ) -> Optional[bytes]: 

862 """Generate Mini-Program QR code.""" 

863 if not self.wechat_config.enable_mini_program: 

864 return None 

865 

866 if not await self._ensure_token(): 

867 return None 

868 

869 try: 

870 url = f"{WECHAT_MP_CODE}?access_token={self._access_token}" 

871 

872 payload = { 

873 "path": path, 

874 "width": width, 

875 } 

876 

877 async with self._session.post(url, json=payload) as response: 

878 if response.content_type.startswith('image'): 

879 return await response.read() 

880 

881 data = await response.json() 

882 logger.error(f"Failed to get mini program QR: {data}") 

883 return None 

884 

885 except Exception as e: 

886 logger.error(f"Error getting mini program QR: {e}") 

887 return None 

888 

889 

890def create_wechat_adapter( 

891 app_id: str = None, 

892 app_secret: str = None, 

893 token: str = None, 

894 **kwargs 

895) -> WeChatAdapter: 

896 """ 

897 Factory function to create WeChat adapter. 

898 

899 Args: 

900 app_id: WeChat app ID (or set WECHAT_APP_ID env var) 

901 app_secret: WeChat app secret (or set WECHAT_APP_SECRET env var) 

902 token: Verification token (or set WECHAT_TOKEN env var) 

903 **kwargs: Additional config options 

904 

905 Returns: 

906 Configured WeChatAdapter 

907 """ 

908 app_id = app_id or os.getenv("WECHAT_APP_ID") 

909 app_secret = app_secret or os.getenv("WECHAT_APP_SECRET") 

910 token = token or os.getenv("WECHAT_TOKEN") 

911 

912 if not app_id: 

913 raise ValueError("WeChat app ID required") 

914 if not app_secret: 

915 raise ValueError("WeChat app secret required") 

916 

917 config = WeChatConfig( 

918 app_id=app_id, 

919 app_secret=app_secret, 

920 token=token or "", 

921 **kwargs 

922 ) 

923 return WeChatAdapter(config)