Coverage for integrations / channels / whatsapp_adapter.py: 21.4%
220 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2WhatsApp Channel Adapter
4Implements WhatsApp messaging using whatsapp-web.js (via REST API).
5Supports QR pairing for authentication.
7Features:
8- Text messages
9- Media (images, videos, documents)
10- Group chats
11- Message reactions
12- Read receipts
13- QR code authentication
14"""
16from __future__ import annotations
18import asyncio
19import logging
20import os
21import base64
22from typing import Optional, List, Dict, Any, Callable
23from datetime import datetime
25try:
26 import aiohttp
27 HAS_AIOHTTP = True
28except ImportError:
29 HAS_AIOHTTP = False
31from .base import (
32 ChannelAdapter,
33 ChannelConfig,
34 ChannelStatus,
35 Message,
36 MessageType,
37 MediaAttachment,
38 SendResult,
39 ChannelConnectionError,
40 ChannelSendError,
41 ChannelRateLimitError,
42)
43from .room_capable import RoomCapableAdapter
45logger = logging.getLogger(__name__)
48class WhatsAppAdapter(ChannelAdapter, RoomCapableAdapter):
49 """
50 WhatsApp messaging adapter using whatsapp-web.js REST API.
52 Usage:
53 config = ChannelConfig(
54 webhook_url="http://localhost:3000",
55 extra={"phone_number": "+1234567890"}
56 )
57 adapter = WhatsAppAdapter(config)
58 adapter.on_message(my_handler)
59 await adapter.start()
60 """
62 def __init__(self, config: ChannelConfig):
63 if not HAS_AIOHTTP:
64 raise ImportError(
65 "aiohttp not installed. "
66 "Install with: pip install aiohttp"
67 )
69 super().__init__(config)
70 self._session: Optional[aiohttp.ClientSession] = None
71 self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
72 self._phone_number: Optional[str] = config.extra.get("phone_number")
73 self._account_id: str = config.extra.get("account_id", "default")
74 self._base_url: str = config.webhook_url or "http://localhost:3000"
75 self._qr_callback: Optional[Callable[[str], None]] = None
76 self._authenticated = False
78 @property
79 def name(self) -> str:
80 return "whatsapp"
82 def set_qr_callback(self, callback: Callable[[str], None]) -> None:
83 """Set callback for QR code display during authentication."""
84 self._qr_callback = callback
86 async def connect(self) -> bool:
87 """Connect to WhatsApp Web via REST API."""
88 try:
89 self._session = aiohttp.ClientSession()
91 # Check API health
92 async with self._session.get(f"{self._base_url}/api/health") as resp:
93 if resp.status != 200:
94 logger.error("WhatsApp API not available")
95 return False
97 # Initialize session
98 async with self._session.post(
99 f"{self._base_url}/api/sessions/{self._account_id}/start"
100 ) as resp:
101 if resp.status not in (200, 201):
102 logger.error(f"Failed to start WhatsApp session: {resp.status}")
103 return False
105 # Start WebSocket for events
106 asyncio.create_task(self._listen_events())
108 self.status = ChannelStatus.CONNECTING
109 logger.info(f"WhatsApp connecting for account: {self._account_id}")
111 # Wait for authentication or QR code
112 await self._wait_for_auth()
114 return self._authenticated
116 except aiohttp.ClientError as e:
117 logger.error(f"Failed to connect to WhatsApp API: {e}")
118 self.status = ChannelStatus.ERROR
119 return False
121 async def _wait_for_auth(self, timeout: int = 120) -> None:
122 """Wait for WhatsApp authentication."""
123 start_time = asyncio.get_event_loop().time()
125 while not self._authenticated:
126 if asyncio.get_event_loop().time() - start_time > timeout:
127 logger.error("WhatsApp authentication timeout")
128 self.status = ChannelStatus.ERROR
129 return
131 # Check session status
132 if self._session:
133 try:
134 async with self._session.get(
135 f"{self._base_url}/api/sessions/{self._account_id}/status"
136 ) as resp:
137 if resp.status == 200:
138 data = await resp.json()
139 if data.get("authenticated"):
140 self._authenticated = True
141 self.status = ChannelStatus.CONNECTED
142 logger.info("WhatsApp authenticated successfully")
143 return
144 except aiohttp.ClientError:
145 pass
147 await asyncio.sleep(2)
149 async def _listen_events(self) -> None:
150 """Listen for WhatsApp events via WebSocket."""
151 if not self._session:
152 return
154 try:
155 ws_url = self._base_url.replace("http", "ws")
156 self._ws = await self._session.ws_connect(
157 f"{ws_url}/ws/{self._account_id}"
158 )
160 async for msg in self._ws:
161 if msg.type == aiohttp.WSMsgType.TEXT:
162 await self._handle_event(msg.json())
163 elif msg.type == aiohttp.WSMsgType.ERROR:
164 logger.error(f"WhatsApp WebSocket error: {msg.data}")
165 break
167 except Exception as e:
168 logger.error(f"WhatsApp event listener error: {e}")
169 finally:
170 self._ws = None
172 async def _handle_event(self, event: Dict[str, Any]) -> None:
173 """Handle incoming WhatsApp events."""
174 event_type = event.get("type")
176 if event_type == "qr":
177 # QR code for authentication
178 qr_data = event.get("qr")
179 if qr_data and self._qr_callback:
180 self._qr_callback(qr_data)
181 logger.info("WhatsApp QR code received - scan to authenticate")
183 elif event_type == "authenticated":
184 self._authenticated = True
185 self.status = ChannelStatus.CONNECTED
186 logger.info("WhatsApp authenticated")
188 elif event_type == "message":
189 message = self._convert_message(event.get("data", {}))
190 await self._dispatch_message(message)
192 elif event_type == "disconnected":
193 self._authenticated = False
194 self.status = ChannelStatus.DISCONNECTED
195 logger.warning("WhatsApp disconnected")
197 def _convert_message(self, wa_message: Dict[str, Any]) -> Message:
198 """Convert WhatsApp message to unified Message format."""
199 chat = wa_message.get("chat", {})
200 sender = wa_message.get("sender", {})
202 # Check for media
203 media = []
204 if wa_message.get("hasMedia"):
205 media_type = wa_message.get("type", "document")
206 type_map = {
207 "image": MessageType.IMAGE,
208 "video": MessageType.VIDEO,
209 "audio": MessageType.AUDIO,
210 "ptt": MessageType.VOICE,
211 "document": MessageType.DOCUMENT,
212 "sticker": MessageType.STICKER,
213 }
214 media.append(MediaAttachment(
215 type=type_map.get(media_type, MessageType.DOCUMENT),
216 file_id=wa_message.get("mediaKey"),
217 mime_type=wa_message.get("mimetype"),
218 file_name=wa_message.get("filename"),
219 caption=wa_message.get("caption"),
220 ))
222 # Check for bot mention
223 is_mentioned = False
224 text = wa_message.get("body", "")
225 mentions = wa_message.get("mentionedIds", [])
226 if self._phone_number and self._phone_number in str(mentions):
227 is_mentioned = True
229 return Message(
230 id=wa_message.get("id", {}).get("_serialized", str(wa_message.get("id"))),
231 channel=self.name,
232 sender_id=sender.get("id", {}).get("_serialized", str(sender.get("id", ""))),
233 sender_name=sender.get("pushname") or sender.get("name"),
234 chat_id=chat.get("id", {}).get("_serialized", str(chat.get("id", ""))),
235 text=text,
236 media=media,
237 reply_to_id=wa_message.get("quotedMsgId"),
238 timestamp=datetime.fromtimestamp(wa_message.get("timestamp", 0)),
239 is_group=chat.get("isGroup", False),
240 is_bot_mentioned=is_mentioned,
241 raw=wa_message,
242 )
244 async def disconnect(self) -> None:
245 """Disconnect from WhatsApp."""
246 if self._ws:
247 await self._ws.close()
248 self._ws = None
250 if self._session:
251 # Stop WhatsApp session
252 try:
253 await self._session.post(
254 f"{self._base_url}/api/sessions/{self._account_id}/stop"
255 )
256 except Exception:
257 pass
258 await self._session.close()
259 self._session = None
261 self._authenticated = False
262 self.status = ChannelStatus.DISCONNECTED
263 logger.info("WhatsApp disconnected")
265 async def send_message(
266 self,
267 chat_id: str,
268 text: str,
269 reply_to: Optional[str] = None,
270 media: Optional[List[MediaAttachment]] = None,
271 buttons: Optional[List[Dict]] = None,
272 ) -> SendResult:
273 """Send a message to a WhatsApp chat."""
274 if not self._session or not self._authenticated:
275 return SendResult(success=False, error="Not connected")
277 try:
278 payload: Dict[str, Any] = {
279 "chatId": chat_id,
280 "text": text,
281 }
283 if reply_to:
284 payload["quotedMessageId"] = reply_to
286 # Handle media
287 if media and len(media) > 0:
288 first_media = media[0]
289 if first_media.file_path:
290 with open(first_media.file_path, "rb") as f:
291 payload["media"] = base64.b64encode(f.read()).decode()
292 payload["mimetype"] = first_media.mime_type
293 payload["filename"] = first_media.file_name
294 elif first_media.url:
295 payload["mediaUrl"] = first_media.url
297 # Handle buttons (as list message)
298 if buttons:
299 payload["buttons"] = [
300 {"body": btn["text"], "id": btn.get("callback_data", btn["text"])}
301 for btn in buttons
302 ]
304 async with self._session.post(
305 f"{self._base_url}/api/sessions/{self._account_id}/messages/send",
306 json=payload,
307 ) as resp:
308 if resp.status == 429:
309 data = await resp.json()
310 raise ChannelRateLimitError(retry_after=data.get("retryAfter", 60))
312 if resp.status not in (200, 201):
313 error_text = await resp.text()
314 return SendResult(success=False, error=error_text)
316 data = await resp.json()
317 return SendResult(
318 success=True,
319 message_id=data.get("messageId") or data.get("id"),
320 raw=data,
321 )
323 except ChannelRateLimitError:
324 raise
325 except Exception as e:
326 logger.error(f"Failed to send WhatsApp message: {e}")
327 return SendResult(success=False, error=str(e))
329 async def edit_message(
330 self,
331 chat_id: str,
332 message_id: str,
333 text: str,
334 buttons: Optional[List[Dict]] = None,
335 ) -> SendResult:
336 """Edit a WhatsApp message (limited support)."""
337 # WhatsApp has limited edit support
338 return SendResult(
339 success=False,
340 error="WhatsApp does not support message editing"
341 )
343 async def delete_message(self, chat_id: str, message_id: str) -> bool:
344 """Delete a WhatsApp message."""
345 if not self._session or not self._authenticated:
346 return False
348 try:
349 async with self._session.post(
350 f"{self._base_url}/api/sessions/{self._account_id}/messages/delete",
351 json={"chatId": chat_id, "messageId": message_id},
352 ) as resp:
353 return resp.status in (200, 204)
354 except Exception as e:
355 logger.error(f"Failed to delete WhatsApp message: {e}")
356 return False
358 async def send_typing(self, chat_id: str) -> None:
359 """Send typing indicator (composing)."""
360 if not self._session or not self._authenticated:
361 return
363 try:
364 await self._session.post(
365 f"{self._base_url}/api/sessions/{self._account_id}/chats/{chat_id}/composing"
366 )
367 except Exception:
368 pass
370 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
371 """Get information about a WhatsApp chat."""
372 if not self._session or not self._authenticated:
373 return None
375 try:
376 async with self._session.get(
377 f"{self._base_url}/api/sessions/{self._account_id}/chats/{chat_id}"
378 ) as resp:
379 if resp.status == 200:
380 return await resp.json()
381 except Exception as e:
382 logger.error(f"Failed to get WhatsApp chat info: {e}")
384 return None
386 async def send_reaction(
387 self,
388 chat_id: str,
389 message_id: str,
390 emoji: str,
391 ) -> bool:
392 """Send a reaction to a message."""
393 if not self._session or not self._authenticated:
394 return False
396 try:
397 async with self._session.post(
398 f"{self._base_url}/api/sessions/{self._account_id}/messages/react",
399 json={
400 "chatId": chat_id,
401 "messageId": message_id,
402 "emoji": emoji,
403 },
404 ) as resp:
405 return resp.status in (200, 201)
406 except Exception as e:
407 logger.error(f"Failed to send WhatsApp reaction: {e}")
408 return False
410 async def send_read_receipt(self, chat_id: str, message_id: str) -> bool:
411 """Send read receipt for a message."""
412 if not self._session or not self._authenticated:
413 return False
415 try:
416 async with self._session.post(
417 f"{self._base_url}/api/sessions/{self._account_id}/messages/read",
418 json={"chatId": chat_id, "messageId": message_id},
419 ) as resp:
420 return resp.status in (200, 204)
421 except Exception as e:
422 logger.error(f"Failed to send WhatsApp read receipt: {e}")
423 return False
425 async def get_qr_code(self) -> Optional[str]:
426 """Get current QR code for authentication."""
427 if not self._session:
428 return None
430 try:
431 async with self._session.get(
432 f"{self._base_url}/api/sessions/{self._account_id}/qr"
433 ) as resp:
434 if resp.status == 200:
435 data = await resp.json()
436 return data.get("qr")
437 except Exception as e:
438 logger.error(f"Failed to get WhatsApp QR code: {e}")
440 return None
442 async def download_media(self, message_id: str, destination: str) -> bool:
443 """Download media from a message."""
444 if not self._session or not self._authenticated:
445 return False
447 try:
448 async with self._session.get(
449 f"{self._base_url}/api/sessions/{self._account_id}/messages/{message_id}/media"
450 ) as resp:
451 if resp.status == 200:
452 data = await resp.read()
453 with open(destination, "wb") as f:
454 f.write(data)
455 return True
456 except Exception as e:
457 logger.error(f"Failed to download WhatsApp media: {e}")
459 return False
461 # ─── UNIF-G2: RoomCapableAdapter — pending OAuth wiring ──────────
462 #
463 # WhatsApp groups via WAHA expose group_id-as-chat_id, but
464 # programmatic join / leave / member listing requires the WAHA
465 # session to be paired with the user's WA account AND the bot
466 # number to be already a member of the target group. Marking the
467 # adapter ``RoomCapableAdapter`` keeps ``isinstance`` honest; real
468 # join_room implementation lands when the WAHA group-management
469 # endpoints are wired (separate task).
471 async def join_room(self, room_id: str,
472 role: str = 'participant') -> bool:
473 logger.info(
474 "WhatsApp.join_room: platform support pending — WAHA group-"
475 "management endpoints not yet wired (room_id=%s, role=%s)",
476 room_id, role)
477 return False
479 async def leave_room(self, room_id: str) -> bool:
480 logger.info(
481 "WhatsApp.leave_room: platform support pending (room_id=%s)",
482 room_id)
483 return False
485 async def list_room_members(
486 self, room_id: str,
487 ) -> List[Dict[str, Any]]:
488 return []
491def create_whatsapp_adapter(
492 api_url: str = None,
493 phone_number: str = None,
494 account_id: str = "default",
495 **kwargs
496) -> WhatsAppAdapter:
497 """
498 Factory function to create WhatsApp adapter.
500 Args:
501 api_url: WhatsApp Web API URL (or set WHATSAPP_API_URL env var)
502 phone_number: Phone number for the account
503 account_id: Account identifier for multi-account support
504 **kwargs: Additional config options
506 Returns:
507 Configured WhatsAppAdapter
508 """
509 api_url = api_url or os.getenv("WHATSAPP_API_URL", "http://localhost:3000")
511 config = ChannelConfig(
512 webhook_url=api_url,
513 extra={
514 "phone_number": phone_number,
515 "account_id": account_id,
516 **kwargs,
517 },
518 )
519 return WhatsAppAdapter(config)