Coverage for integrations / channels / extensions / viber_adapter.py: 25.3%

450 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Viber Channel Adapter 

3 

4Implements Viber Bot API integration. 

5Based on HevolveBot extension patterns for Viber. 

6 

7Features: 

8- Viber Bot API integration 

9- Rich keyboards support 

10- Carousels 

11- Message types (text, picture, video, file, contact, location, sticker, URL) 

12- User information 

13- Broadcast messaging 

14- Online status 

15- Webhooks 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import logging 

22import os 

23import json 

24import hashlib 

25import hmac 

26from typing import Optional, List, Dict, Any, Callable 

27from datetime import datetime 

28from dataclasses import dataclass, field 

29try: 

30 import aiohttp 

31 HAS_AIOHTTP = True 

32except ImportError: 

33 HAS_AIOHTTP = False 

34 

35from ..base import ( 

36 ChannelAdapter, 

37 ChannelConfig, 

38 ChannelStatus, 

39 Message, 

40 MessageType, 

41 MediaAttachment, 

42 SendResult, 

43 ChannelConnectionError, 

44 ChannelSendError, 

45 ChannelRateLimitError, 

46) 

47 

48logger = logging.getLogger(__name__) 

49 

50 

51# Viber API endpoints 

52VIBER_API_BASE = "https://chatapi.viber.com/pa" 

53VIBER_API_SET_WEBHOOK = f"{VIBER_API_BASE}/set_webhook" 

54VIBER_API_SEND_MESSAGE = f"{VIBER_API_BASE}/send_message" 

55VIBER_API_BROADCAST = f"{VIBER_API_BASE}/broadcast_message" 

56VIBER_API_GET_ACCOUNT_INFO = f"{VIBER_API_BASE}/get_account_info" 

57VIBER_API_GET_USER_DETAILS = f"{VIBER_API_BASE}/get_user_details" 

58VIBER_API_GET_ONLINE = f"{VIBER_API_BASE}/get_online" 

59 

60 

61@dataclass 

62class ViberConfig(ChannelConfig): 

63 """Viber-specific configuration.""" 

64 auth_token: str = "" 

65 bot_name: str = "" 

66 bot_avatar: Optional[str] = None 

67 webhook_url: Optional[str] = None 

68 webhook_events: List[str] = field(default_factory=lambda: [ 

69 "delivered", "seen", "failed", "subscribed", 

70 "unsubscribed", "conversation_started" 

71 ]) 

72 enable_keyboard: bool = True 

73 default_keyboard_bg_color: str = "#FFFFFF" 

74 

75 

76@dataclass 

77class ViberUser: 

78 """Viber user information.""" 

79 id: str 

80 name: str 

81 avatar: Optional[str] = None 

82 country: Optional[str] = None 

83 language: Optional[str] = None 

84 api_version: int = 1 

85 primary_device_os: Optional[str] = None 

86 viber_version: Optional[str] = None 

87 device_type: Optional[str] = None 

88 

89 

90@dataclass 

91class KeyboardButton: 

92 """Viber keyboard button.""" 

93 text: str 

94 action_type: str = "reply" # reply, open-url, location-picker, share-phone, none 

95 action_body: str = "" 

96 bg_color: Optional[str] = None 

97 text_size: str = "regular" # small, regular, large 

98 columns: int = 6 # 1-6 for keyboards 

99 rows: int = 1 # 1-2 for keyboards 

100 image: Optional[str] = None 

101 silent: bool = False 

102 

103 def to_dict(self) -> Dict[str, Any]: 

104 """Convert to API format.""" 

105 btn = { 

106 "ActionType": self.action_type, 

107 "ActionBody": self.action_body or self.text, 

108 "Text": self.text, 

109 "TextSize": self.text_size, 

110 "Columns": self.columns, 

111 "Rows": self.rows, 

112 "Silent": self.silent, 

113 } 

114 if self.bg_color: 

115 btn["BgColor"] = self.bg_color 

116 if self.image: 

117 btn["Image"] = self.image 

118 return btn 

119 

120 

121@dataclass 

122class Keyboard: 

123 """Viber keyboard builder.""" 

124 buttons: List[KeyboardButton] = field(default_factory=list) 

125 bg_color: str = "#FFFFFF" 

126 default_height: bool = True 

127 input_field_state: str = "regular" # regular, minimized, hidden 

128 

129 def add_button( 

130 self, 

131 text: str, 

132 action_type: str = "reply", 

133 action_body: str = "", 

134 **kwargs 

135 ) -> 'Keyboard': 

136 """Add a button to the keyboard.""" 

137 self.buttons.append(KeyboardButton( 

138 text=text, 

139 action_type=action_type, 

140 action_body=action_body or text, 

141 **kwargs 

142 )) 

143 return self 

144 

145 def add_url_button(self, text: str, url: str, **kwargs) -> 'Keyboard': 

146 """Add a URL button.""" 

147 return self.add_button(text, "open-url", url, **kwargs) 

148 

149 def add_location_button(self, text: str = "Share Location", **kwargs) -> 'Keyboard': 

150 """Add a location picker button.""" 

151 return self.add_button(text, "location-picker", "location", **kwargs) 

152 

153 def add_phone_button(self, text: str = "Share Phone", **kwargs) -> 'Keyboard': 

154 """Add a share phone button.""" 

155 return self.add_button(text, "share-phone", "phone", **kwargs) 

156 

157 def to_dict(self) -> Dict[str, Any]: 

158 """Convert to API format.""" 

159 return { 

160 "Type": "keyboard", 

161 "BgColor": self.bg_color, 

162 "DefaultHeight": self.default_height, 

163 "InputFieldState": self.input_field_state, 

164 "Buttons": [btn.to_dict() for btn in self.buttons], 

165 } 

166 

167 

168@dataclass 

169class CarouselItem: 

170 """Carousel item for rich messages.""" 

171 title: str 

172 subtitle: Optional[str] = None 

173 image: Optional[str] = None 

174 buttons: List[KeyboardButton] = field(default_factory=list) 

175 

176 def add_button(self, text: str, action_type: str = "reply", action_body: str = "") -> 'CarouselItem': 

177 """Add a button to this carousel item.""" 

178 self.buttons.append(KeyboardButton( 

179 text=text, 

180 action_type=action_type, 

181 action_body=action_body or text, 

182 columns=6, 

183 rows=1, 

184 )) 

185 return self 

186 

187 def to_dict(self) -> Dict[str, Any]: 

188 """Convert to rich media element format.""" 

189 columns = 6 # Full width 

190 

191 elements = [] 

192 

193 # Add image if present 

194 if self.image: 

195 elements.append({ 

196 "Columns": columns, 

197 "Rows": 3, 

198 "ActionType": "none", 

199 "Image": self.image, 

200 }) 

201 

202 # Add title 

203 elements.append({ 

204 "Columns": columns, 

205 "Rows": 1, 

206 "ActionType": "none", 

207 "Text": f"<b>{self.title}</b>", 

208 "TextSize": "medium", 

209 "TextVAlign": "middle", 

210 "TextHAlign": "center", 

211 }) 

212 

213 # Add subtitle if present 

214 if self.subtitle: 

215 elements.append({ 

216 "Columns": columns, 

217 "Rows": 1, 

218 "ActionType": "none", 

219 "Text": self.subtitle, 

220 "TextSize": "small", 

221 "TextVAlign": "middle", 

222 "TextHAlign": "center", 

223 }) 

224 

225 # Add buttons 

226 for btn in self.buttons: 

227 btn_dict = btn.to_dict() 

228 btn_dict["Columns"] = columns 

229 elements.append(btn_dict) 

230 

231 return elements 

232 

233 

234class ViberAdapter(ChannelAdapter): 

235 """ 

236 Viber Bot API adapter. 

237 

238 Usage: 

239 config = ViberConfig( 

240 auth_token="your-bot-token", 

241 bot_name="MyBot", 

242 ) 

243 adapter = ViberAdapter(config) 

244 adapter.on_message(my_handler) 

245 await adapter.start() 

246 """ 

247 

248 def __init__(self, config: ViberConfig): 

249 super().__init__(config) 

250 self.viber_config: ViberConfig = config 

251 self._session: Optional[aiohttp.ClientSession] = None 

252 self._user_cache: Dict[str, ViberUser] = {} 

253 self._callback_handlers: Dict[str, Callable] = {} 

254 self._account_info: Optional[Dict[str, Any]] = None 

255 

256 @property 

257 def name(self) -> str: 

258 return "viber" 

259 

260 async def connect(self) -> bool: 

261 """Initialize Viber Bot connection.""" 

262 if not self.viber_config.auth_token: 

263 logger.error("Viber auth token required") 

264 return False 

265 

266 try: 

267 # Create HTTP session 

268 self._session = aiohttp.ClientSession( 

269 headers={"X-Viber-Auth-Token": self.viber_config.auth_token} 

270 ) 

271 

272 # Get account info to verify token 

273 self._account_info = await self._get_account_info() 

274 if not self._account_info: 

275 logger.error("Failed to verify Viber bot token") 

276 return False 

277 

278 # Set webhook if URL provided 

279 if self.viber_config.webhook_url: 

280 webhook_set = await self._set_webhook(self.viber_config.webhook_url) 

281 if not webhook_set: 

282 logger.warning("Failed to set webhook, manual setup required") 

283 

284 self.status = ChannelStatus.CONNECTED 

285 bot_name = self._account_info.get("name", "Unknown") 

286 logger.info(f"Viber adapter connected as: {bot_name}") 

287 return True 

288 

289 except Exception as e: 

290 logger.error(f"Failed to connect to Viber: {e}") 

291 self.status = ChannelStatus.ERROR 

292 return False 

293 

294 async def disconnect(self) -> None: 

295 """Disconnect Viber adapter.""" 

296 if self._session: 

297 await self._session.close() 

298 self._session = None 

299 

300 self._account_info = None 

301 self._user_cache.clear() 

302 self.status = ChannelStatus.DISCONNECTED 

303 

304 async def _get_account_info(self) -> Optional[Dict[str, Any]]: 

305 """Get bot account information.""" 

306 if not self._session: 

307 return None 

308 

309 try: 

310 async with self._session.post(VIBER_API_GET_ACCOUNT_INFO, json={}) as response: 

311 data = await response.json() 

312 

313 if data.get("status") == 0: 

314 return data 

315 else: 

316 logger.error(f"Failed to get account info: {data}") 

317 return None 

318 

319 except Exception as e: 

320 logger.error(f"Error getting account info: {e}") 

321 return None 

322 

323 async def _set_webhook(self, url: str) -> bool: 

324 """Set webhook URL.""" 

325 if not self._session: 

326 return False 

327 

328 try: 

329 payload = { 

330 "url": url, 

331 "event_types": self.viber_config.webhook_events, 

332 "send_name": True, 

333 "send_photo": True, 

334 } 

335 

336 async with self._session.post(VIBER_API_SET_WEBHOOK, json=payload) as response: 

337 data = await response.json() 

338 return data.get("status") == 0 

339 

340 except Exception as e: 

341 logger.error(f"Error setting webhook: {e}") 

342 return False 

343 

344 def verify_signature(self, body: bytes, signature: str) -> bool: 

345 """Verify webhook signature.""" 

346 computed = hmac.new( 

347 self.viber_config.auth_token.encode('utf-8'), 

348 body, 

349 hashlib.sha256 

350 ).hexdigest() 

351 return hmac.compare_digest(computed, signature) 

352 

353 async def handle_webhook(self, body: str, signature: Optional[str] = None) -> None: 

354 """ 

355 Handle incoming webhook request from Viber. 

356 Should be called from your webhook endpoint. 

357 """ 

358 try: 

359 # Verify signature if provided 

360 if signature: 

361 if not self.verify_signature(body.encode('utf-8'), signature): 

362 logger.error("Invalid webhook signature") 

363 return 

364 

365 data = json.loads(body) 

366 event_type = data.get("event") 

367 

368 # Handle different event types 

369 if event_type == "message": 

370 await self._handle_message_event(data) 

371 elif event_type == "conversation_started": 

372 await self._handle_conversation_started(data) 

373 elif event_type == "subscribed": 

374 await self._handle_subscribed(data) 

375 elif event_type == "unsubscribed": 

376 await self._handle_unsubscribed(data) 

377 elif event_type == "delivered": 

378 await self._handle_delivered(data) 

379 elif event_type == "seen": 

380 await self._handle_seen(data) 

381 elif event_type == "failed": 

382 await self._handle_failed(data) 

383 

384 except Exception as e: 

385 logger.error(f"Error handling webhook: {e}") 

386 

387 async def _handle_message_event(self, data: Dict[str, Any]) -> None: 

388 """Handle incoming message event.""" 

389 message = self._convert_message(data) 

390 await self._dispatch_message(message) 

391 

392 async def _handle_conversation_started(self, data: Dict[str, Any]) -> None: 

393 """Handle conversation started event (user opened chat).""" 

394 user_data = data.get("user", {}) 

395 user_id = user_data.get("id") 

396 logger.info(f"Conversation started with user: {user_id}") 

397 

398 # Cache user info 

399 if user_id: 

400 self._user_cache[user_id] = self._parse_user(user_data) 

401 

402 # Check for callback handler 

403 if "conversation_started" in self._callback_handlers: 

404 handler = self._callback_handlers["conversation_started"] 

405 await handler(data) 

406 

407 async def _handle_subscribed(self, data: Dict[str, Any]) -> None: 

408 """Handle user subscription event.""" 

409 user_data = data.get("user", {}) 

410 user_id = user_data.get("id") 

411 logger.info(f"User subscribed: {user_id}") 

412 

413 if "subscribed" in self._callback_handlers: 

414 handler = self._callback_handlers["subscribed"] 

415 await handler(data) 

416 

417 async def _handle_unsubscribed(self, data: Dict[str, Any]) -> None: 

418 """Handle user unsubscription event.""" 

419 user_id = data.get("user_id") 

420 logger.info(f"User unsubscribed: {user_id}") 

421 

422 # Remove from cache 

423 if user_id in self._user_cache: 

424 del self._user_cache[user_id] 

425 

426 if "unsubscribed" in self._callback_handlers: 

427 handler = self._callback_handlers["unsubscribed"] 

428 await handler(data) 

429 

430 async def _handle_delivered(self, data: Dict[str, Any]) -> None: 

431 """Handle message delivered event.""" 

432 if "delivered" in self._callback_handlers: 

433 handler = self._callback_handlers["delivered"] 

434 await handler(data) 

435 

436 async def _handle_seen(self, data: Dict[str, Any]) -> None: 

437 """Handle message seen event.""" 

438 if "seen" in self._callback_handlers: 

439 handler = self._callback_handlers["seen"] 

440 await handler(data) 

441 

442 async def _handle_failed(self, data: Dict[str, Any]) -> None: 

443 """Handle message failed event.""" 

444 logger.error(f"Message delivery failed: {data}") 

445 

446 if "failed" in self._callback_handlers: 

447 handler = self._callback_handlers["failed"] 

448 await handler(data) 

449 

450 def _parse_user(self, user_data: Dict[str, Any]) -> ViberUser: 

451 """Parse user data into ViberUser object.""" 

452 return ViberUser( 

453 id=user_data.get("id", ""), 

454 name=user_data.get("name", ""), 

455 avatar=user_data.get("avatar"), 

456 country=user_data.get("country"), 

457 language=user_data.get("language"), 

458 api_version=user_data.get("api_version", 1), 

459 primary_device_os=user_data.get("primary_device_os"), 

460 viber_version=user_data.get("viber_version"), 

461 device_type=user_data.get("device_type"), 

462 ) 

463 

464 def _convert_message(self, data: Dict[str, Any]) -> Message: 

465 """Convert Viber webhook data to unified Message format.""" 

466 sender_data = data.get("sender", {}) 

467 message_data = data.get("message", {}) 

468 

469 sender_id = sender_data.get("id", "") 

470 sender_name = sender_data.get("name", "") 

471 message_token = str(data.get("message_token", "")) 

472 timestamp = data.get("timestamp", int(datetime.now().timestamp() * 1000)) 

473 

474 # Cache user 

475 if sender_id: 

476 self._user_cache[sender_id] = self._parse_user(sender_data) 

477 

478 # Extract content 

479 text = "" 

480 media = [] 

481 msg_type = message_data.get("type", "text") 

482 

483 if msg_type == "text": 

484 text = message_data.get("text", "") 

485 elif msg_type == "picture": 

486 media.append(MediaAttachment( 

487 type=MessageType.IMAGE, 

488 url=message_data.get("media"), 

489 caption=message_data.get("text"), 

490 file_name=message_data.get("file_name"), 

491 file_size=message_data.get("size"), 

492 )) 

493 elif msg_type == "video": 

494 media.append(MediaAttachment( 

495 type=MessageType.VIDEO, 

496 url=message_data.get("media"), 

497 file_size=message_data.get("size"), 

498 )) 

499 elif msg_type == "file": 

500 media.append(MediaAttachment( 

501 type=MessageType.DOCUMENT, 

502 url=message_data.get("media"), 

503 file_name=message_data.get("file_name"), 

504 file_size=message_data.get("size"), 

505 )) 

506 elif msg_type == "contact": 

507 contact = message_data.get("contact", {}) 

508 text = f"[contact:{contact.get('name', '')} - {contact.get('phone_number', '')}]" 

509 elif msg_type == "location": 

510 location = message_data.get("location", {}) 

511 text = f"[location:{location.get('lat', '')},{location.get('lon', '')}]" 

512 elif msg_type == "sticker": 

513 sticker_id = message_data.get("sticker_id") 

514 text = f"[sticker:{sticker_id}]" 

515 elif msg_type == "url": 

516 text = message_data.get("media", "") 

517 

518 return Message( 

519 id=message_token, 

520 channel=self.name, 

521 sender_id=sender_id, 

522 sender_name=sender_name, 

523 chat_id=sender_id, # Viber uses user ID as chat ID 

524 text=text, 

525 media=media, 

526 timestamp=datetime.fromtimestamp(timestamp / 1000), 

527 is_group=False, # Viber bots are 1:1 

528 raw={ 

529 'message_type': msg_type, 

530 'sender': sender_data, 

531 }, 

532 ) 

533 

534 async def send_message( 

535 self, 

536 chat_id: str, 

537 text: str, 

538 reply_to: Optional[str] = None, 

539 media: Optional[List[MediaAttachment]] = None, 

540 buttons: Optional[List[Dict]] = None, 

541 ) -> SendResult: 

542 """Send a message to a Viber user.""" 

543 if not self._session: 

544 return SendResult(success=False, error="Not connected") 

545 

546 try: 

547 # Build message payload 

548 payload = { 

549 "receiver": chat_id, 

550 "min_api_version": 1, 

551 "sender": { 

552 "name": self.viber_config.bot_name or self._account_info.get("name", "Bot"), 

553 }, 

554 } 

555 

556 if self.viber_config.bot_avatar: 

557 payload["sender"]["avatar"] = self.viber_config.bot_avatar 

558 

559 # Handle media 

560 if media and len(media) > 0: 

561 return await self._send_media_message(chat_id, media[0], text, buttons) 

562 

563 # Text message 

564 payload["type"] = "text" 

565 payload["text"] = text 

566 

567 # Add keyboard if buttons provided 

568 if buttons and self.viber_config.enable_keyboard: 

569 keyboard = self._build_keyboard(buttons) 

570 payload["keyboard"] = keyboard.to_dict() 

571 

572 async with self._session.post(VIBER_API_SEND_MESSAGE, json=payload) as response: 

573 data = await response.json() 

574 

575 if data.get("status") == 0: 

576 return SendResult( 

577 success=True, 

578 message_id=str(data.get("message_token")), 

579 ) 

580 else: 

581 error_msg = data.get("status_message", "Unknown error") 

582 status = data.get("status") 

583 

584 if status == 3: # Rate limited 

585 raise ChannelRateLimitError(60) 

586 

587 return SendResult(success=False, error=error_msg) 

588 

589 except ChannelRateLimitError: 

590 raise 

591 except Exception as e: 

592 logger.error(f"Failed to send Viber message: {e}") 

593 return SendResult(success=False, error=str(e)) 

594 

595 async def _send_media_message( 

596 self, 

597 chat_id: str, 

598 media: MediaAttachment, 

599 caption: Optional[str] = None, 

600 buttons: Optional[List[Dict]] = None, 

601 ) -> SendResult: 

602 """Send a media message.""" 

603 if not self._session: 

604 return SendResult(success=False, error="Not connected") 

605 

606 try: 

607 payload = { 

608 "receiver": chat_id, 

609 "min_api_version": 1, 

610 "sender": { 

611 "name": self.viber_config.bot_name or self._account_info.get("name", "Bot"), 

612 }, 

613 } 

614 

615 if self.viber_config.bot_avatar: 

616 payload["sender"]["avatar"] = self.viber_config.bot_avatar 

617 

618 # Determine message type 

619 if media.type == MessageType.IMAGE: 

620 payload["type"] = "picture" 

621 payload["media"] = media.url 

622 if caption: 

623 payload["text"] = caption 

624 elif media.type == MessageType.VIDEO: 

625 payload["type"] = "video" 

626 payload["media"] = media.url 

627 payload["size"] = media.file_size or 0 

628 elif media.type == MessageType.DOCUMENT: 

629 payload["type"] = "file" 

630 payload["media"] = media.url 

631 payload["file_name"] = media.file_name or "file" 

632 payload["size"] = media.file_size or 0 

633 elif media.type == MessageType.AUDIO: 

634 payload["type"] = "file" 

635 payload["media"] = media.url 

636 payload["file_name"] = media.file_name or "audio" 

637 payload["size"] = media.file_size or 0 

638 else: 

639 # Fall back to file 

640 payload["type"] = "file" 

641 payload["media"] = media.url 

642 payload["file_name"] = media.file_name or "file" 

643 

644 # Add keyboard 

645 if buttons and self.viber_config.enable_keyboard: 

646 keyboard = self._build_keyboard(buttons) 

647 payload["keyboard"] = keyboard.to_dict() 

648 

649 async with self._session.post(VIBER_API_SEND_MESSAGE, json=payload) as response: 

650 data = await response.json() 

651 

652 if data.get("status") == 0: 

653 return SendResult( 

654 success=True, 

655 message_id=str(data.get("message_token")), 

656 ) 

657 else: 

658 return SendResult( 

659 success=False, 

660 error=data.get("status_message", "Unknown error"), 

661 ) 

662 

663 except Exception as e: 

664 logger.error(f"Failed to send Viber media: {e}") 

665 return SendResult(success=False, error=str(e)) 

666 

667 def _build_keyboard(self, buttons: List[Dict]) -> Keyboard: 

668 """Build a keyboard from button definitions.""" 

669 keyboard = Keyboard(bg_color=self.viber_config.default_keyboard_bg_color) 

670 

671 for btn in buttons: 

672 text = btn.get("text", "") 

673 

674 if btn.get("url"): 

675 keyboard.add_url_button(text, btn["url"]) 

676 elif btn.get("callback_data"): 

677 keyboard.add_button( 

678 text, 

679 action_type="reply", 

680 action_body=btn["callback_data"], 

681 ) 

682 else: 

683 keyboard.add_button(text) 

684 

685 return keyboard 

686 

687 async def edit_message( 

688 self, 

689 chat_id: str, 

690 message_id: str, 

691 text: str, 

692 buttons: Optional[List[Dict]] = None, 

693 ) -> SendResult: 

694 """ 

695 Edit a message. 

696 Note: Viber doesn't support message editing, sends new message. 

697 """ 

698 logger.warning("Viber doesn't support message editing, sending new message") 

699 return await self.send_message(chat_id, text, buttons=buttons) 

700 

701 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

702 """ 

703 Delete a message. 

704 Note: Viber doesn't support message deletion. 

705 """ 

706 logger.warning("Viber doesn't support message deletion") 

707 return False 

708 

709 async def send_typing(self, chat_id: str) -> None: 

710 """ 

711 Send typing indicator. 

712 Note: Viber doesn't have a typing indicator API. 

713 """ 

714 pass 

715 

716 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

717 """Get user information.""" 

718 user = await self.get_user_details(chat_id) 

719 if user: 

720 return { 

721 'id': user.id, 

722 'name': user.name, 

723 'avatar': user.avatar, 

724 'country': user.country, 

725 'language': user.language, 

726 'device_type': user.device_type, 

727 } 

728 return None 

729 

730 # Viber-specific methods 

731 

732 def register_event_handler( 

733 self, 

734 event_type: str, 

735 handler: Callable[[Dict[str, Any]], Any], 

736 ) -> None: 

737 """Register a handler for Viber events.""" 

738 self._callback_handlers[event_type] = handler 

739 

740 async def get_user_details(self, user_id: str) -> Optional[ViberUser]: 

741 """Get detailed user information.""" 

742 # Check cache first 

743 if user_id in self._user_cache: 

744 return self._user_cache[user_id] 

745 

746 if not self._session: 

747 return None 

748 

749 try: 

750 payload = {"id": user_id} 

751 

752 async with self._session.post(VIBER_API_GET_USER_DETAILS, json=payload) as response: 

753 data = await response.json() 

754 

755 if data.get("status") == 0: 

756 user_data = data.get("user", {}) 

757 user = self._parse_user(user_data) 

758 self._user_cache[user_id] = user 

759 return user 

760 

761 return None 

762 

763 except Exception as e: 

764 logger.error(f"Error getting user details: {e}") 

765 return None 

766 

767 async def check_online_status(self, user_ids: List[str]) -> Dict[str, int]: 

768 """ 

769 Check online status of users. 

770 

771 Returns dict mapping user_id to online_status: 

772 0 = offline, 1 = online, 2 = undisclosed 

773 """ 

774 if not self._session: 

775 return {} 

776 

777 try: 

778 payload = {"ids": user_ids} 

779 

780 async with self._session.post(VIBER_API_GET_ONLINE, json=payload) as response: 

781 data = await response.json() 

782 

783 if data.get("status") == 0: 

784 result = {} 

785 for user in data.get("users", []): 

786 result[user["id"]] = user.get("online_status", 2) 

787 return result 

788 

789 return {} 

790 

791 except Exception as e: 

792 logger.error(f"Error checking online status: {e}") 

793 return {} 

794 

795 async def broadcast_message( 

796 self, 

797 user_ids: List[str], 

798 text: str, 

799 media: Optional[MediaAttachment] = None, 

800 buttons: Optional[List[Dict]] = None, 

801 ) -> Dict[str, SendResult]: 

802 """ 

803 Send a broadcast message to multiple users. 

804 Note: Maximum 500 users per request. 

805 """ 

806 if not self._session: 

807 return {} 

808 

809 results = {} 

810 

811 # Process in batches of 500 

812 for i in range(0, len(user_ids), 500): 

813 batch = user_ids[i:i + 500] 

814 

815 try: 

816 payload = { 

817 "broadcast_list": batch, 

818 "min_api_version": 1, 

819 "sender": { 

820 "name": self.viber_config.bot_name or self._account_info.get("name", "Bot"), 

821 }, 

822 } 

823 

824 if self.viber_config.bot_avatar: 

825 payload["sender"]["avatar"] = self.viber_config.bot_avatar 

826 

827 if media: 

828 if media.type == MessageType.IMAGE: 

829 payload["type"] = "picture" 

830 payload["media"] = media.url 

831 if text: 

832 payload["text"] = text 

833 else: 

834 payload["type"] = "text" 

835 payload["text"] = text 

836 else: 

837 payload["type"] = "text" 

838 payload["text"] = text 

839 

840 if buttons and self.viber_config.enable_keyboard: 

841 keyboard = self._build_keyboard(buttons) 

842 payload["keyboard"] = keyboard.to_dict() 

843 

844 async with self._session.post(VIBER_API_BROADCAST, json=payload) as response: 

845 data = await response.json() 

846 

847 if data.get("status") == 0: 

848 for uid in batch: 

849 results[uid] = SendResult(success=True) 

850 else: 

851 error = data.get("status_message", "Unknown error") 

852 failed_list = data.get("failed_list", []) 

853 

854 for uid in batch: 

855 if uid in failed_list: 

856 results[uid] = SendResult(success=False, error=error) 

857 else: 

858 results[uid] = SendResult(success=True) 

859 

860 except Exception as e: 

861 logger.error(f"Broadcast batch failed: {e}") 

862 for uid in batch: 

863 results[uid] = SendResult(success=False, error=str(e)) 

864 

865 return results 

866 

867 async def send_keyboard( 

868 self, 

869 chat_id: str, 

870 text: str, 

871 keyboard: Keyboard, 

872 ) -> SendResult: 

873 """Send a message with custom keyboard.""" 

874 if not self._session: 

875 return SendResult(success=False, error="Not connected") 

876 

877 try: 

878 payload = { 

879 "receiver": chat_id, 

880 "min_api_version": 1, 

881 "type": "text", 

882 "text": text, 

883 "sender": { 

884 "name": self.viber_config.bot_name or self._account_info.get("name", "Bot"), 

885 }, 

886 "keyboard": keyboard.to_dict(), 

887 } 

888 

889 if self.viber_config.bot_avatar: 

890 payload["sender"]["avatar"] = self.viber_config.bot_avatar 

891 

892 async with self._session.post(VIBER_API_SEND_MESSAGE, json=payload) as response: 

893 data = await response.json() 

894 

895 if data.get("status") == 0: 

896 return SendResult( 

897 success=True, 

898 message_id=str(data.get("message_token")), 

899 ) 

900 else: 

901 return SendResult( 

902 success=False, 

903 error=data.get("status_message", "Unknown error"), 

904 ) 

905 

906 except Exception as e: 

907 logger.error(f"Failed to send keyboard: {e}") 

908 return SendResult(success=False, error=str(e)) 

909 

910 async def send_carousel( 

911 self, 

912 chat_id: str, 

913 items: List[CarouselItem], 

914 alt_text: str = "Carousel message", 

915 ) -> SendResult: 

916 """Send a carousel message (rich media).""" 

917 if not self._session: 

918 return SendResult(success=False, error="Not connected") 

919 

920 try: 

921 # Build rich media content 

922 elements = [] 

923 for item in items: 

924 elements.extend(item.to_dict()) 

925 

926 payload = { 

927 "receiver": chat_id, 

928 "min_api_version": 7, # Rich media requires API v7+ 

929 "type": "rich_media", 

930 "rich_media": { 

931 "Type": "rich_media", 

932 "ButtonsGroupColumns": 6, 

933 "ButtonsGroupRows": 6, 

934 "BgColor": "#FFFFFF", 

935 "Buttons": elements, 

936 }, 

937 "sender": { 

938 "name": self.viber_config.bot_name or self._account_info.get("name", "Bot"), 

939 }, 

940 "alt_text": alt_text, 

941 } 

942 

943 if self.viber_config.bot_avatar: 

944 payload["sender"]["avatar"] = self.viber_config.bot_avatar 

945 

946 async with self._session.post(VIBER_API_SEND_MESSAGE, json=payload) as response: 

947 data = await response.json() 

948 

949 if data.get("status") == 0: 

950 return SendResult( 

951 success=True, 

952 message_id=str(data.get("message_token")), 

953 ) 

954 else: 

955 return SendResult( 

956 success=False, 

957 error=data.get("status_message", "Unknown error"), 

958 ) 

959 

960 except Exception as e: 

961 logger.error(f"Failed to send carousel: {e}") 

962 return SendResult(success=False, error=str(e)) 

963 

964 async def send_location( 

965 self, 

966 chat_id: str, 

967 latitude: float, 

968 longitude: float, 

969 buttons: Optional[List[Dict]] = None, 

970 ) -> SendResult: 

971 """Send a location message.""" 

972 if not self._session: 

973 return SendResult(success=False, error="Not connected") 

974 

975 try: 

976 payload = { 

977 "receiver": chat_id, 

978 "min_api_version": 1, 

979 "type": "location", 

980 "location": { 

981 "lat": latitude, 

982 "lon": longitude, 

983 }, 

984 "sender": { 

985 "name": self.viber_config.bot_name or self._account_info.get("name", "Bot"), 

986 }, 

987 } 

988 

989 if self.viber_config.bot_avatar: 

990 payload["sender"]["avatar"] = self.viber_config.bot_avatar 

991 

992 if buttons and self.viber_config.enable_keyboard: 

993 keyboard = self._build_keyboard(buttons) 

994 payload["keyboard"] = keyboard.to_dict() 

995 

996 async with self._session.post(VIBER_API_SEND_MESSAGE, json=payload) as response: 

997 data = await response.json() 

998 

999 if data.get("status") == 0: 

1000 return SendResult( 

1001 success=True, 

1002 message_id=str(data.get("message_token")), 

1003 ) 

1004 else: 

1005 return SendResult( 

1006 success=False, 

1007 error=data.get("status_message", "Unknown error"), 

1008 ) 

1009 

1010 except Exception as e: 

1011 logger.error(f"Failed to send location: {e}") 

1012 return SendResult(success=False, error=str(e)) 

1013 

1014 

1015def create_viber_adapter( 

1016 auth_token: str = None, 

1017 bot_name: str = None, 

1018 **kwargs 

1019) -> ViberAdapter: 

1020 """ 

1021 Factory function to create Viber adapter. 

1022 

1023 Args: 

1024 auth_token: Viber bot auth token (or set VIBER_AUTH_TOKEN env var) 

1025 bot_name: Bot display name (or set VIBER_BOT_NAME env var) 

1026 **kwargs: Additional config options 

1027 

1028 Returns: 

1029 Configured ViberAdapter 

1030 """ 

1031 auth_token = auth_token or os.getenv("VIBER_AUTH_TOKEN") 

1032 bot_name = bot_name or os.getenv("VIBER_BOT_NAME", "Bot") 

1033 

1034 if not auth_token: 

1035 raise ValueError("Viber auth token required") 

1036 

1037 config = ViberConfig( 

1038 auth_token=auth_token, 

1039 bot_name=bot_name, 

1040 **kwargs 

1041 ) 

1042 return ViberAdapter(config)