Coverage for integrations / channels / signal_adapter.py: 30.8%
253 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Signal Channel Adapter
4Implements Signal messaging using signal-cli REST API.
5Designed for Docker-compatible deployments.
7Features:
8- signal-cli REST API integration
9- Linked device support
10- Group V2 support
11- Attachments, reactions, typing indicators
13Requirements:
14- signal-cli-rest-api running (https://github.com/bbernhard/signal-cli-rest-api)
15- Linked Signal account
17Docker setup:
18 docker run -d --name signal-api -p 8080:8080 \
19 -v /path/to/signal-cli-config:/home/.local/share/signal-cli \
20 -e MODE=native \
21 bbernhard/signal-cli-rest-api
22"""
24from __future__ import annotations
26import asyncio
27import logging
28import os
29import base64
30import mimetypes
31from typing import Optional, List, Dict, Any
32from datetime import datetime
33from pathlib import Path
35try:
36 import aiohttp
37 HAS_AIOHTTP = True
38except ImportError:
39 HAS_AIOHTTP = False
41from .base import (
42 ChannelAdapter,
43 ChannelConfig,
44 ChannelStatus,
45 Message,
46 MessageType,
47 MediaAttachment,
48 SendResult,
49 ChannelConnectionError,
50 ChannelSendError,
51 ChannelRateLimitError,
52)
54logger = logging.getLogger(__name__)
57class SignalAdapter(ChannelAdapter):
58 """
59 Signal messaging adapter using signal-cli REST API.
61 Usage:
62 config = ChannelConfig(
63 token="+1234567890", # Your Signal phone number
64 extra={
65 "api_url": "http://localhost:8080",
66 }
67 )
68 adapter = SignalAdapter(config)
69 adapter.on_message(my_handler)
70 await adapter.start()
71 """
73 def __init__(self, config: ChannelConfig):
74 if not HAS_AIOHTTP:
75 raise ImportError(
76 "aiohttp not installed. "
77 "Install with: pip install aiohttp"
78 )
80 super().__init__(config)
81 self._phone_number = config.token # Phone number as token
82 self._api_url = config.extra.get("api_url", "http://localhost:8080")
83 self._session: Optional[aiohttp.ClientSession] = None
84 self._poll_task: Optional[asyncio.Task] = None
85 self._reconnect_delay = 5 # seconds
86 self._max_reconnect_delay = 300 # 5 minutes max
87 self._running = False
89 @property
90 def name(self) -> str:
91 return "signal"
93 async def connect(self) -> bool:
94 """Connect to signal-cli REST API."""
95 if not self._phone_number:
96 logger.error("Signal phone number not provided")
97 return False
99 try:
100 self._session = aiohttp.ClientSession()
102 # Verify API is available and account is linked
103 async with self._session.get(
104 f"{self._api_url}/v1/about"
105 ) as response:
106 if response.status != 200:
107 logger.error("Signal API not available")
108 return False
110 # Verify phone number is registered
111 async with self._session.get(
112 f"{self._api_url}/v1/accounts"
113 ) as response:
114 if response.status == 200:
115 accounts = await response.json()
116 if self._phone_number not in [acc.get("number") for acc in accounts]:
117 logger.warning(f"Phone number {self._phone_number} not found in registered accounts")
118 # Continue anyway - might be registered differently
120 # Start polling for messages
121 self._running = True
122 self._poll_task = asyncio.create_task(self._poll_messages())
124 self.status = ChannelStatus.CONNECTED
125 logger.info(f"Connected to Signal as {self._phone_number}")
126 return True
128 except aiohttp.ClientError as e:
129 logger.error(f"Failed to connect to Signal API: {e}")
130 self.status = ChannelStatus.ERROR
131 return False
132 except Exception as e:
133 logger.error(f"Signal connection error: {e}")
134 self.status = ChannelStatus.ERROR
135 return False
137 async def disconnect(self) -> None:
138 """Disconnect from Signal API."""
139 self._running = False
141 if self._poll_task:
142 self._poll_task.cancel()
143 try:
144 await self._poll_task
145 except asyncio.CancelledError:
146 pass
147 self._poll_task = None
149 if self._session:
150 await self._session.close()
151 self._session = None
153 self.status = ChannelStatus.DISCONNECTED
154 logger.info("Disconnected from Signal")
156 async def _poll_messages(self) -> None:
157 """Poll for new messages from Signal."""
158 reconnect_delay = self._reconnect_delay
160 while self._running:
161 try:
162 async with self._session.get(
163 f"{self._api_url}/v1/receive/{self._phone_number}",
164 timeout=aiohttp.ClientTimeout(total=30)
165 ) as response:
166 if response.status == 200:
167 messages = await response.json()
168 for msg_data in messages:
169 message = self._convert_message(msg_data)
170 if message:
171 await self._dispatch_message(message)
173 # Reset reconnect delay on success
174 reconnect_delay = self._reconnect_delay
175 elif response.status == 204:
176 # No new messages
177 pass
178 else:
179 logger.warning(f"Signal API returned {response.status}")
181 # Small delay between polls
182 await asyncio.sleep(1)
184 except asyncio.CancelledError:
185 break
186 except aiohttp.ClientError as e:
187 logger.error(f"Signal polling error: {e}")
188 self.status = ChannelStatus.ERROR
190 # Exponential backoff for reconnection
191 await asyncio.sleep(reconnect_delay)
192 reconnect_delay = min(reconnect_delay * 2, self._max_reconnect_delay)
194 # Try to reconnect
195 await self._reconnect()
196 except Exception as e:
197 logger.error(f"Unexpected error in Signal polling: {e}")
198 await asyncio.sleep(reconnect_delay)
200 async def _reconnect(self) -> None:
201 """Attempt to reconnect to Signal API."""
202 logger.info("Attempting to reconnect to Signal API...")
204 if self._session:
205 await self._session.close()
207 self._session = aiohttp.ClientSession()
209 try:
210 async with self._session.get(f"{self._api_url}/v1/about") as response:
211 if response.status == 200:
212 self.status = ChannelStatus.CONNECTED
213 logger.info("Reconnected to Signal API")
214 except Exception as e:
215 logger.error(f"Reconnection failed: {e}")
217 def _convert_message(self, msg_data: Dict[str, Any]) -> Optional[Message]:
218 """Convert Signal message to unified Message format."""
219 envelope = msg_data.get("envelope", {})
221 # Skip non-data messages
222 data_message = envelope.get("dataMessage")
223 if not data_message:
224 return None
226 source = envelope.get("source", "")
227 source_name = envelope.get("sourceName", source)
228 timestamp = envelope.get("timestamp", 0)
230 # Determine chat ID (group or direct)
231 group_info = data_message.get("groupInfo", {})
232 is_group = bool(group_info)
234 if is_group:
235 chat_id = group_info.get("groupId", "")
236 else:
237 chat_id = source
239 # Process attachments
240 media = []
241 attachments = data_message.get("attachments", [])
242 for att in attachments:
243 media_type = self._get_media_type(att.get("contentType", ""))
244 media.append(MediaAttachment(
245 type=media_type,
246 file_id=att.get("id"),
247 file_name=att.get("filename"),
248 mime_type=att.get("contentType"),
249 file_size=att.get("size"),
250 ))
252 # Check for mentions
253 mentions = data_message.get("mentions", [])
254 is_mentioned = any(
255 m.get("number") == self._phone_number
256 for m in mentions
257 )
259 return Message(
260 id=str(timestamp),
261 channel=self.name,
262 sender_id=source,
263 sender_name=source_name,
264 chat_id=chat_id,
265 text=data_message.get("message", ""),
266 media=media,
267 reply_to_id=str(data_message.get("quote", {}).get("id")) if data_message.get("quote") else None,
268 timestamp=datetime.fromtimestamp(timestamp / 1000) if timestamp else datetime.now(),
269 is_group=is_group,
270 is_bot_mentioned=is_mentioned,
271 raw=msg_data,
272 )
274 def _get_media_type(self, content_type: str) -> MessageType:
275 """Get MessageType from MIME content type."""
276 if content_type.startswith("image/"):
277 return MessageType.IMAGE
278 elif content_type.startswith("video/"):
279 return MessageType.VIDEO
280 elif content_type.startswith("audio/"):
281 return MessageType.AUDIO
282 elif content_type.startswith("application/"):
283 return MessageType.DOCUMENT
284 else:
285 return MessageType.DOCUMENT
287 async def send_message(
288 self,
289 chat_id: str,
290 text: str,
291 reply_to: Optional[str] = None,
292 media: Optional[List[MediaAttachment]] = None,
293 buttons: Optional[List[Dict]] = None,
294 ) -> SendResult:
295 """Send a message via Signal."""
296 if not self._session:
297 return SendResult(success=False, error="Not connected")
299 try:
300 # Determine if group or direct message
301 is_group = chat_id.startswith("group.")
303 payload = {
304 "message": text,
305 "number": self._phone_number,
306 }
308 if is_group:
309 payload["recipients"] = []
310 # Group ID format: group.BASE64_ID
311 group_id = chat_id.replace("group.", "")
312 endpoint = f"{self._api_url}/v2/send"
313 payload["group_id"] = group_id
314 else:
315 payload["recipients"] = [chat_id]
316 endpoint = f"{self._api_url}/v2/send"
318 # Handle quote/reply
319 if reply_to:
320 payload["quote_timestamp"] = int(reply_to)
322 # Handle attachments
323 if media:
324 attachments = []
325 for m in media:
326 att_data = await self._prepare_attachment(m)
327 if att_data:
328 attachments.append(att_data)
329 if attachments:
330 payload["base64_attachments"] = attachments
332 async with self._session.post(
333 endpoint,
334 json=payload,
335 timeout=aiohttp.ClientTimeout(total=30)
336 ) as response:
337 if response.status in (200, 201):
338 result = await response.json()
339 return SendResult(
340 success=True,
341 message_id=str(result.get("timestamp", "")),
342 raw=result,
343 )
344 else:
345 error_text = await response.text()
346 logger.error(f"Failed to send Signal message: {error_text}")
347 return SendResult(success=False, error=error_text)
349 except aiohttp.ClientError as e:
350 logger.error(f"Signal send error: {e}")
351 return SendResult(success=False, error=str(e))
352 except Exception as e:
353 logger.error(f"Unexpected error sending Signal message: {e}")
354 return SendResult(success=False, error=str(e))
356 async def _prepare_attachment(self, attachment: MediaAttachment) -> Optional[str]:
357 """Prepare attachment for sending (base64 encode)."""
358 try:
359 if attachment.file_path:
360 path = Path(attachment.file_path)
361 if path.exists():
362 content = path.read_bytes()
363 mime_type = attachment.mime_type or mimetypes.guess_type(str(path))[0] or "application/octet-stream"
364 return f"data:{mime_type};base64,{base64.b64encode(content).decode()}"
365 elif attachment.url:
366 # Download from URL
367 async with self._session.get(attachment.url) as response:
368 if response.status == 200:
369 content = await response.read()
370 mime_type = attachment.mime_type or response.content_type or "application/octet-stream"
371 return f"data:{mime_type};base64,{base64.b64encode(content).decode()}"
372 except Exception as e:
373 logger.error(f"Failed to prepare attachment: {e}")
375 return None
377 async def edit_message(
378 self,
379 chat_id: str,
380 message_id: str,
381 text: str,
382 buttons: Optional[List[Dict]] = None,
383 ) -> SendResult:
384 """Edit an existing message (Signal doesn't support this natively)."""
385 # Signal doesn't support message editing
386 # Send a new message with indication it's an edit
387 return await self.send_message(
388 chat_id=chat_id,
389 text=f"[Edit] {text}",
390 )
392 async def delete_message(self, chat_id: str, message_id: str) -> bool:
393 """Delete a message (Signal remote delete)."""
394 if not self._session:
395 return False
397 try:
398 payload = {
399 "number": self._phone_number,
400 "recipients": [chat_id] if not chat_id.startswith("group.") else [],
401 "target_timestamp": int(message_id),
402 }
404 if chat_id.startswith("group."):
405 payload["group_id"] = chat_id.replace("group.", "")
407 async with self._session.post(
408 f"{self._api_url}/v1/delete",
409 json=payload,
410 ) as response:
411 return response.status in (200, 201, 204)
413 except Exception as e:
414 logger.error(f"Failed to delete Signal message: {e}")
415 return False
417 async def send_typing(self, chat_id: str) -> None:
418 """Send typing indicator."""
419 if not self._session:
420 return
422 try:
423 payload = {
424 "number": self._phone_number,
425 }
427 if chat_id.startswith("group."):
428 payload["group_id"] = chat_id.replace("group.", "")
429 else:
430 payload["recipient"] = chat_id
432 await self._session.put(
433 f"{self._api_url}/v1/typing-indicator/{self._phone_number}",
434 json=payload,
435 )
436 except Exception as e:
437 logger.debug(f"Failed to send typing indicator: {e}")
439 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
440 """Get information about a chat."""
441 if not self._session:
442 return None
444 try:
445 if chat_id.startswith("group."):
446 group_id = chat_id.replace("group.", "")
447 async with self._session.get(
448 f"{self._api_url}/v1/groups/{self._phone_number}/{group_id}"
449 ) as response:
450 if response.status == 200:
451 data = await response.json()
452 return {
453 "id": chat_id,
454 "type": "group",
455 "name": data.get("name"),
456 "members": data.get("members", []),
457 }
458 else:
459 # Direct chat - return phone info
460 async with self._session.get(
461 f"{self._api_url}/v1/identities/{self._phone_number}/{chat_id}"
462 ) as response:
463 if response.status == 200:
464 data = await response.json()
465 return {
466 "id": chat_id,
467 "type": "direct",
468 "trust_level": data.get("trust_level"),
469 }
471 except Exception as e:
472 logger.error(f"Failed to get chat info: {e}")
474 return None
476 async def send_reaction(
477 self,
478 chat_id: str,
479 message_id: str,
480 emoji: str,
481 remove: bool = False,
482 ) -> bool:
483 """Send a reaction to a message."""
484 if not self._session:
485 return False
487 try:
488 payload = {
489 "number": self._phone_number,
490 "reaction": {
491 "emoji": emoji,
492 "target_author": chat_id if not chat_id.startswith("group.") else "",
493 "target_timestamp": int(message_id),
494 "remove": remove,
495 },
496 }
498 if chat_id.startswith("group."):
499 payload["group_id"] = chat_id.replace("group.", "")
500 else:
501 payload["recipients"] = [chat_id]
503 async with self._session.post(
504 f"{self._api_url}/v2/send",
505 json=payload,
506 ) as response:
507 return response.status in (200, 201)
509 except Exception as e:
510 logger.error(f"Failed to send reaction: {e}")
511 return False
513 async def create_group(
514 self,
515 name: str,
516 members: List[str],
517 avatar_path: Optional[str] = None,
518 ) -> Optional[str]:
519 """Create a new Signal group (Group V2)."""
520 if not self._session:
521 return None
523 try:
524 payload = {
525 "name": name,
526 "members": members,
527 }
529 if avatar_path and Path(avatar_path).exists():
530 content = Path(avatar_path).read_bytes()
531 payload["avatar"] = base64.b64encode(content).decode()
533 async with self._session.post(
534 f"{self._api_url}/v1/groups/{self._phone_number}",
535 json=payload,
536 ) as response:
537 if response.status in (200, 201):
538 data = await response.json()
539 return f"group.{data.get('id', '')}"
541 except Exception as e:
542 logger.error(f"Failed to create group: {e}")
544 return None
546 async def download_attachment(
547 self,
548 attachment_id: str,
549 destination: str,
550 ) -> bool:
551 """Download an attachment from Signal."""
552 if not self._session:
553 return False
555 try:
556 async with self._session.get(
557 f"{self._api_url}/v1/attachments/{attachment_id}"
558 ) as response:
559 if response.status == 200:
560 content = await response.read()
561 Path(destination).write_bytes(content)
562 return True
564 except Exception as e:
565 logger.error(f"Failed to download attachment: {e}")
567 return False
570def create_signal_adapter(
571 phone_number: str = None,
572 api_url: str = None,
573 **kwargs
574) -> SignalAdapter:
575 """
576 Factory function to create Signal adapter.
578 Args:
579 phone_number: Signal phone number (or set SIGNAL_PHONE_NUMBER env var)
580 api_url: signal-cli REST API URL (default: http://localhost:8080)
581 **kwargs: Additional config options
583 Returns:
584 Configured SignalAdapter
585 """
586 phone_number = phone_number or os.getenv("SIGNAL_PHONE_NUMBER")
587 if not phone_number:
588 raise ValueError("Signal phone number required")
590 api_url = api_url or os.getenv("SIGNAL_API_URL", "http://localhost:8080")
592 config = ChannelConfig(
593 token=phone_number,
594 extra={"api_url": api_url, **kwargs.get("extra", {})},
595 **{k: v for k, v in kwargs.items() if k != "extra"},
596 )
597 return SignalAdapter(config)