Coverage for integrations / channels / whatsapp_adapter.py: 21.4%

220 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2WhatsApp Channel Adapter 

3 

4Implements WhatsApp messaging using whatsapp-web.js (via REST API). 

5Supports QR pairing for authentication. 

6 

7Features: 

8- Text messages 

9- Media (images, videos, documents) 

10- Group chats 

11- Message reactions 

12- Read receipts 

13- QR code authentication 

14""" 

15 

16from __future__ import annotations 

17 

18import asyncio 

19import logging 

20import os 

21import base64 

22from typing import Optional, List, Dict, Any, Callable 

23from datetime import datetime 

24 

25try: 

26 import aiohttp 

27 HAS_AIOHTTP = True 

28except ImportError: 

29 HAS_AIOHTTP = False 

30 

31from .base import ( 

32 ChannelAdapter, 

33 ChannelConfig, 

34 ChannelStatus, 

35 Message, 

36 MessageType, 

37 MediaAttachment, 

38 SendResult, 

39 ChannelConnectionError, 

40 ChannelSendError, 

41 ChannelRateLimitError, 

42) 

43from .room_capable import RoomCapableAdapter 

44 

45logger = logging.getLogger(__name__) 

46 

47 

48class WhatsAppAdapter(ChannelAdapter, RoomCapableAdapter): 

49 """ 

50 WhatsApp messaging adapter using whatsapp-web.js REST API. 

51 

52 Usage: 

53 config = ChannelConfig( 

54 webhook_url="http://localhost:3000", 

55 extra={"phone_number": "+1234567890"} 

56 ) 

57 adapter = WhatsAppAdapter(config) 

58 adapter.on_message(my_handler) 

59 await adapter.start() 

60 """ 

61 

62 def __init__(self, config: ChannelConfig): 

63 if not HAS_AIOHTTP: 

64 raise ImportError( 

65 "aiohttp not installed. " 

66 "Install with: pip install aiohttp" 

67 ) 

68 

69 super().__init__(config) 

70 self._session: Optional[aiohttp.ClientSession] = None 

71 self._ws: Optional[aiohttp.ClientWebSocketResponse] = None 

72 self._phone_number: Optional[str] = config.extra.get("phone_number") 

73 self._account_id: str = config.extra.get("account_id", "default") 

74 self._base_url: str = config.webhook_url or "http://localhost:3000" 

75 self._qr_callback: Optional[Callable[[str], None]] = None 

76 self._authenticated = False 

77 

78 @property 

79 def name(self) -> str: 

80 return "whatsapp" 

81 

82 def set_qr_callback(self, callback: Callable[[str], None]) -> None: 

83 """Set callback for QR code display during authentication.""" 

84 self._qr_callback = callback 

85 

86 async def connect(self) -> bool: 

87 """Connect to WhatsApp Web via REST API.""" 

88 try: 

89 self._session = aiohttp.ClientSession() 

90 

91 # Check API health 

92 async with self._session.get(f"{self._base_url}/api/health") as resp: 

93 if resp.status != 200: 

94 logger.error("WhatsApp API not available") 

95 return False 

96 

97 # Initialize session 

98 async with self._session.post( 

99 f"{self._base_url}/api/sessions/{self._account_id}/start" 

100 ) as resp: 

101 if resp.status not in (200, 201): 

102 logger.error(f"Failed to start WhatsApp session: {resp.status}") 

103 return False 

104 

105 # Start WebSocket for events 

106 asyncio.create_task(self._listen_events()) 

107 

108 self.status = ChannelStatus.CONNECTING 

109 logger.info(f"WhatsApp connecting for account: {self._account_id}") 

110 

111 # Wait for authentication or QR code 

112 await self._wait_for_auth() 

113 

114 return self._authenticated 

115 

116 except aiohttp.ClientError as e: 

117 logger.error(f"Failed to connect to WhatsApp API: {e}") 

118 self.status = ChannelStatus.ERROR 

119 return False 

120 

121 async def _wait_for_auth(self, timeout: int = 120) -> None: 

122 """Wait for WhatsApp authentication.""" 

123 start_time = asyncio.get_event_loop().time() 

124 

125 while not self._authenticated: 

126 if asyncio.get_event_loop().time() - start_time > timeout: 

127 logger.error("WhatsApp authentication timeout") 

128 self.status = ChannelStatus.ERROR 

129 return 

130 

131 # Check session status 

132 if self._session: 

133 try: 

134 async with self._session.get( 

135 f"{self._base_url}/api/sessions/{self._account_id}/status" 

136 ) as resp: 

137 if resp.status == 200: 

138 data = await resp.json() 

139 if data.get("authenticated"): 

140 self._authenticated = True 

141 self.status = ChannelStatus.CONNECTED 

142 logger.info("WhatsApp authenticated successfully") 

143 return 

144 except aiohttp.ClientError: 

145 pass 

146 

147 await asyncio.sleep(2) 

148 

149 async def _listen_events(self) -> None: 

150 """Listen for WhatsApp events via WebSocket.""" 

151 if not self._session: 

152 return 

153 

154 try: 

155 ws_url = self._base_url.replace("http", "ws") 

156 self._ws = await self._session.ws_connect( 

157 f"{ws_url}/ws/{self._account_id}" 

158 ) 

159 

160 async for msg in self._ws: 

161 if msg.type == aiohttp.WSMsgType.TEXT: 

162 await self._handle_event(msg.json()) 

163 elif msg.type == aiohttp.WSMsgType.ERROR: 

164 logger.error(f"WhatsApp WebSocket error: {msg.data}") 

165 break 

166 

167 except Exception as e: 

168 logger.error(f"WhatsApp event listener error: {e}") 

169 finally: 

170 self._ws = None 

171 

172 async def _handle_event(self, event: Dict[str, Any]) -> None: 

173 """Handle incoming WhatsApp events.""" 

174 event_type = event.get("type") 

175 

176 if event_type == "qr": 

177 # QR code for authentication 

178 qr_data = event.get("qr") 

179 if qr_data and self._qr_callback: 

180 self._qr_callback(qr_data) 

181 logger.info("WhatsApp QR code received - scan to authenticate") 

182 

183 elif event_type == "authenticated": 

184 self._authenticated = True 

185 self.status = ChannelStatus.CONNECTED 

186 logger.info("WhatsApp authenticated") 

187 

188 elif event_type == "message": 

189 message = self._convert_message(event.get("data", {})) 

190 await self._dispatch_message(message) 

191 

192 elif event_type == "disconnected": 

193 self._authenticated = False 

194 self.status = ChannelStatus.DISCONNECTED 

195 logger.warning("WhatsApp disconnected") 

196 

197 def _convert_message(self, wa_message: Dict[str, Any]) -> Message: 

198 """Convert WhatsApp message to unified Message format.""" 

199 chat = wa_message.get("chat", {}) 

200 sender = wa_message.get("sender", {}) 

201 

202 # Check for media 

203 media = [] 

204 if wa_message.get("hasMedia"): 

205 media_type = wa_message.get("type", "document") 

206 type_map = { 

207 "image": MessageType.IMAGE, 

208 "video": MessageType.VIDEO, 

209 "audio": MessageType.AUDIO, 

210 "ptt": MessageType.VOICE, 

211 "document": MessageType.DOCUMENT, 

212 "sticker": MessageType.STICKER, 

213 } 

214 media.append(MediaAttachment( 

215 type=type_map.get(media_type, MessageType.DOCUMENT), 

216 file_id=wa_message.get("mediaKey"), 

217 mime_type=wa_message.get("mimetype"), 

218 file_name=wa_message.get("filename"), 

219 caption=wa_message.get("caption"), 

220 )) 

221 

222 # Check for bot mention 

223 is_mentioned = False 

224 text = wa_message.get("body", "") 

225 mentions = wa_message.get("mentionedIds", []) 

226 if self._phone_number and self._phone_number in str(mentions): 

227 is_mentioned = True 

228 

229 return Message( 

230 id=wa_message.get("id", {}).get("_serialized", str(wa_message.get("id"))), 

231 channel=self.name, 

232 sender_id=sender.get("id", {}).get("_serialized", str(sender.get("id", ""))), 

233 sender_name=sender.get("pushname") or sender.get("name"), 

234 chat_id=chat.get("id", {}).get("_serialized", str(chat.get("id", ""))), 

235 text=text, 

236 media=media, 

237 reply_to_id=wa_message.get("quotedMsgId"), 

238 timestamp=datetime.fromtimestamp(wa_message.get("timestamp", 0)), 

239 is_group=chat.get("isGroup", False), 

240 is_bot_mentioned=is_mentioned, 

241 raw=wa_message, 

242 ) 

243 

244 async def disconnect(self) -> None: 

245 """Disconnect from WhatsApp.""" 

246 if self._ws: 

247 await self._ws.close() 

248 self._ws = None 

249 

250 if self._session: 

251 # Stop WhatsApp session 

252 try: 

253 await self._session.post( 

254 f"{self._base_url}/api/sessions/{self._account_id}/stop" 

255 ) 

256 except Exception: 

257 pass 

258 await self._session.close() 

259 self._session = None 

260 

261 self._authenticated = False 

262 self.status = ChannelStatus.DISCONNECTED 

263 logger.info("WhatsApp disconnected") 

264 

265 async def send_message( 

266 self, 

267 chat_id: str, 

268 text: str, 

269 reply_to: Optional[str] = None, 

270 media: Optional[List[MediaAttachment]] = None, 

271 buttons: Optional[List[Dict]] = None, 

272 ) -> SendResult: 

273 """Send a message to a WhatsApp chat.""" 

274 if not self._session or not self._authenticated: 

275 return SendResult(success=False, error="Not connected") 

276 

277 try: 

278 payload: Dict[str, Any] = { 

279 "chatId": chat_id, 

280 "text": text, 

281 } 

282 

283 if reply_to: 

284 payload["quotedMessageId"] = reply_to 

285 

286 # Handle media 

287 if media and len(media) > 0: 

288 first_media = media[0] 

289 if first_media.file_path: 

290 with open(first_media.file_path, "rb") as f: 

291 payload["media"] = base64.b64encode(f.read()).decode() 

292 payload["mimetype"] = first_media.mime_type 

293 payload["filename"] = first_media.file_name 

294 elif first_media.url: 

295 payload["mediaUrl"] = first_media.url 

296 

297 # Handle buttons (as list message) 

298 if buttons: 

299 payload["buttons"] = [ 

300 {"body": btn["text"], "id": btn.get("callback_data", btn["text"])} 

301 for btn in buttons 

302 ] 

303 

304 async with self._session.post( 

305 f"{self._base_url}/api/sessions/{self._account_id}/messages/send", 

306 json=payload, 

307 ) as resp: 

308 if resp.status == 429: 

309 data = await resp.json() 

310 raise ChannelRateLimitError(retry_after=data.get("retryAfter", 60)) 

311 

312 if resp.status not in (200, 201): 

313 error_text = await resp.text() 

314 return SendResult(success=False, error=error_text) 

315 

316 data = await resp.json() 

317 return SendResult( 

318 success=True, 

319 message_id=data.get("messageId") or data.get("id"), 

320 raw=data, 

321 ) 

322 

323 except ChannelRateLimitError: 

324 raise 

325 except Exception as e: 

326 logger.error(f"Failed to send WhatsApp message: {e}") 

327 return SendResult(success=False, error=str(e)) 

328 

329 async def edit_message( 

330 self, 

331 chat_id: str, 

332 message_id: str, 

333 text: str, 

334 buttons: Optional[List[Dict]] = None, 

335 ) -> SendResult: 

336 """Edit a WhatsApp message (limited support).""" 

337 # WhatsApp has limited edit support 

338 return SendResult( 

339 success=False, 

340 error="WhatsApp does not support message editing" 

341 ) 

342 

343 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

344 """Delete a WhatsApp message.""" 

345 if not self._session or not self._authenticated: 

346 return False 

347 

348 try: 

349 async with self._session.post( 

350 f"{self._base_url}/api/sessions/{self._account_id}/messages/delete", 

351 json={"chatId": chat_id, "messageId": message_id}, 

352 ) as resp: 

353 return resp.status in (200, 204) 

354 except Exception as e: 

355 logger.error(f"Failed to delete WhatsApp message: {e}") 

356 return False 

357 

358 async def send_typing(self, chat_id: str) -> None: 

359 """Send typing indicator (composing).""" 

360 if not self._session or not self._authenticated: 

361 return 

362 

363 try: 

364 await self._session.post( 

365 f"{self._base_url}/api/sessions/{self._account_id}/chats/{chat_id}/composing" 

366 ) 

367 except Exception: 

368 pass 

369 

370 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

371 """Get information about a WhatsApp chat.""" 

372 if not self._session or not self._authenticated: 

373 return None 

374 

375 try: 

376 async with self._session.get( 

377 f"{self._base_url}/api/sessions/{self._account_id}/chats/{chat_id}" 

378 ) as resp: 

379 if resp.status == 200: 

380 return await resp.json() 

381 except Exception as e: 

382 logger.error(f"Failed to get WhatsApp chat info: {e}") 

383 

384 return None 

385 

386 async def send_reaction( 

387 self, 

388 chat_id: str, 

389 message_id: str, 

390 emoji: str, 

391 ) -> bool: 

392 """Send a reaction to a message.""" 

393 if not self._session or not self._authenticated: 

394 return False 

395 

396 try: 

397 async with self._session.post( 

398 f"{self._base_url}/api/sessions/{self._account_id}/messages/react", 

399 json={ 

400 "chatId": chat_id, 

401 "messageId": message_id, 

402 "emoji": emoji, 

403 }, 

404 ) as resp: 

405 return resp.status in (200, 201) 

406 except Exception as e: 

407 logger.error(f"Failed to send WhatsApp reaction: {e}") 

408 return False 

409 

410 async def send_read_receipt(self, chat_id: str, message_id: str) -> bool: 

411 """Send read receipt for a message.""" 

412 if not self._session or not self._authenticated: 

413 return False 

414 

415 try: 

416 async with self._session.post( 

417 f"{self._base_url}/api/sessions/{self._account_id}/messages/read", 

418 json={"chatId": chat_id, "messageId": message_id}, 

419 ) as resp: 

420 return resp.status in (200, 204) 

421 except Exception as e: 

422 logger.error(f"Failed to send WhatsApp read receipt: {e}") 

423 return False 

424 

425 async def get_qr_code(self) -> Optional[str]: 

426 """Get current QR code for authentication.""" 

427 if not self._session: 

428 return None 

429 

430 try: 

431 async with self._session.get( 

432 f"{self._base_url}/api/sessions/{self._account_id}/qr" 

433 ) as resp: 

434 if resp.status == 200: 

435 data = await resp.json() 

436 return data.get("qr") 

437 except Exception as e: 

438 logger.error(f"Failed to get WhatsApp QR code: {e}") 

439 

440 return None 

441 

442 async def download_media(self, message_id: str, destination: str) -> bool: 

443 """Download media from a message.""" 

444 if not self._session or not self._authenticated: 

445 return False 

446 

447 try: 

448 async with self._session.get( 

449 f"{self._base_url}/api/sessions/{self._account_id}/messages/{message_id}/media" 

450 ) as resp: 

451 if resp.status == 200: 

452 data = await resp.read() 

453 with open(destination, "wb") as f: 

454 f.write(data) 

455 return True 

456 except Exception as e: 

457 logger.error(f"Failed to download WhatsApp media: {e}") 

458 

459 return False 

460 

461 # ─── UNIF-G2: RoomCapableAdapter — pending OAuth wiring ────────── 

462 # 

463 # WhatsApp groups via WAHA expose group_id-as-chat_id, but 

464 # programmatic join / leave / member listing requires the WAHA 

465 # session to be paired with the user's WA account AND the bot 

466 # number to be already a member of the target group. Marking the 

467 # adapter ``RoomCapableAdapter`` keeps ``isinstance`` honest; real 

468 # join_room implementation lands when the WAHA group-management 

469 # endpoints are wired (separate task). 

470 

471 async def join_room(self, room_id: str, 

472 role: str = 'participant') -> bool: 

473 logger.info( 

474 "WhatsApp.join_room: platform support pending — WAHA group-" 

475 "management endpoints not yet wired (room_id=%s, role=%s)", 

476 room_id, role) 

477 return False 

478 

479 async def leave_room(self, room_id: str) -> bool: 

480 logger.info( 

481 "WhatsApp.leave_room: platform support pending (room_id=%s)", 

482 room_id) 

483 return False 

484 

485 async def list_room_members( 

486 self, room_id: str, 

487 ) -> List[Dict[str, Any]]: 

488 return [] 

489 

490 

491def create_whatsapp_adapter( 

492 api_url: str = None, 

493 phone_number: str = None, 

494 account_id: str = "default", 

495 **kwargs 

496) -> WhatsAppAdapter: 

497 """ 

498 Factory function to create WhatsApp adapter. 

499 

500 Args: 

501 api_url: WhatsApp Web API URL (or set WHATSAPP_API_URL env var) 

502 phone_number: Phone number for the account 

503 account_id: Account identifier for multi-account support 

504 **kwargs: Additional config options 

505 

506 Returns: 

507 Configured WhatsAppAdapter 

508 """ 

509 api_url = api_url or os.getenv("WHATSAPP_API_URL", "http://localhost:3000") 

510 

511 config = ChannelConfig( 

512 webhook_url=api_url, 

513 extra={ 

514 "phone_number": phone_number, 

515 "account_id": account_id, 

516 **kwargs, 

517 }, 

518 ) 

519 return WhatsAppAdapter(config)