Coverage for integrations / channels / signal_adapter.py: 30.8%

253 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Signal Channel Adapter 

3 

4Implements Signal messaging using signal-cli REST API. 

5Designed for Docker-compatible deployments. 

6 

7Features: 

8- signal-cli REST API integration 

9- Linked device support 

10- Group V2 support 

11- Attachments, reactions, typing indicators 

12 

13Requirements: 

14- signal-cli-rest-api running (https://github.com/bbernhard/signal-cli-rest-api) 

15- Linked Signal account 

16 

17Docker setup: 

18 docker run -d --name signal-api -p 8080:8080 \ 

19 -v /path/to/signal-cli-config:/home/.local/share/signal-cli \ 

20 -e MODE=native \ 

21 bbernhard/signal-cli-rest-api 

22""" 

23 

24from __future__ import annotations 

25 

26import asyncio 

27import logging 

28import os 

29import base64 

30import mimetypes 

31from typing import Optional, List, Dict, Any 

32from datetime import datetime 

33from pathlib import Path 

34 

35try: 

36 import aiohttp 

37 HAS_AIOHTTP = True 

38except ImportError: 

39 HAS_AIOHTTP = False 

40 

41from .base import ( 

42 ChannelAdapter, 

43 ChannelConfig, 

44 ChannelStatus, 

45 Message, 

46 MessageType, 

47 MediaAttachment, 

48 SendResult, 

49 ChannelConnectionError, 

50 ChannelSendError, 

51 ChannelRateLimitError, 

52) 

53 

54logger = logging.getLogger(__name__) 

55 

56 

57class SignalAdapter(ChannelAdapter): 

58 """ 

59 Signal messaging adapter using signal-cli REST API. 

60 

61 Usage: 

62 config = ChannelConfig( 

63 token="+1234567890", # Your Signal phone number 

64 extra={ 

65 "api_url": "http://localhost:8080", 

66 } 

67 ) 

68 adapter = SignalAdapter(config) 

69 adapter.on_message(my_handler) 

70 await adapter.start() 

71 """ 

72 

73 def __init__(self, config: ChannelConfig): 

74 if not HAS_AIOHTTP: 

75 raise ImportError( 

76 "aiohttp not installed. " 

77 "Install with: pip install aiohttp" 

78 ) 

79 

80 super().__init__(config) 

81 self._phone_number = config.token # Phone number as token 

82 self._api_url = config.extra.get("api_url", "http://localhost:8080") 

83 self._session: Optional[aiohttp.ClientSession] = None 

84 self._poll_task: Optional[asyncio.Task] = None 

85 self._reconnect_delay = 5 # seconds 

86 self._max_reconnect_delay = 300 # 5 minutes max 

87 self._running = False 

88 

89 @property 

90 def name(self) -> str: 

91 return "signal" 

92 

93 async def connect(self) -> bool: 

94 """Connect to signal-cli REST API.""" 

95 if not self._phone_number: 

96 logger.error("Signal phone number not provided") 

97 return False 

98 

99 try: 

100 self._session = aiohttp.ClientSession() 

101 

102 # Verify API is available and account is linked 

103 async with self._session.get( 

104 f"{self._api_url}/v1/about" 

105 ) as response: 

106 if response.status != 200: 

107 logger.error("Signal API not available") 

108 return False 

109 

110 # Verify phone number is registered 

111 async with self._session.get( 

112 f"{self._api_url}/v1/accounts" 

113 ) as response: 

114 if response.status == 200: 

115 accounts = await response.json() 

116 if self._phone_number not in [acc.get("number") for acc in accounts]: 

117 logger.warning(f"Phone number {self._phone_number} not found in registered accounts") 

118 # Continue anyway - might be registered differently 

119 

120 # Start polling for messages 

121 self._running = True 

122 self._poll_task = asyncio.create_task(self._poll_messages()) 

123 

124 self.status = ChannelStatus.CONNECTED 

125 logger.info(f"Connected to Signal as {self._phone_number}") 

126 return True 

127 

128 except aiohttp.ClientError as e: 

129 logger.error(f"Failed to connect to Signal API: {e}") 

130 self.status = ChannelStatus.ERROR 

131 return False 

132 except Exception as e: 

133 logger.error(f"Signal connection error: {e}") 

134 self.status = ChannelStatus.ERROR 

135 return False 

136 

137 async def disconnect(self) -> None: 

138 """Disconnect from Signal API.""" 

139 self._running = False 

140 

141 if self._poll_task: 

142 self._poll_task.cancel() 

143 try: 

144 await self._poll_task 

145 except asyncio.CancelledError: 

146 pass 

147 self._poll_task = None 

148 

149 if self._session: 

150 await self._session.close() 

151 self._session = None 

152 

153 self.status = ChannelStatus.DISCONNECTED 

154 logger.info("Disconnected from Signal") 

155 

156 async def _poll_messages(self) -> None: 

157 """Poll for new messages from Signal.""" 

158 reconnect_delay = self._reconnect_delay 

159 

160 while self._running: 

161 try: 

162 async with self._session.get( 

163 f"{self._api_url}/v1/receive/{self._phone_number}", 

164 timeout=aiohttp.ClientTimeout(total=30) 

165 ) as response: 

166 if response.status == 200: 

167 messages = await response.json() 

168 for msg_data in messages: 

169 message = self._convert_message(msg_data) 

170 if message: 

171 await self._dispatch_message(message) 

172 

173 # Reset reconnect delay on success 

174 reconnect_delay = self._reconnect_delay 

175 elif response.status == 204: 

176 # No new messages 

177 pass 

178 else: 

179 logger.warning(f"Signal API returned {response.status}") 

180 

181 # Small delay between polls 

182 await asyncio.sleep(1) 

183 

184 except asyncio.CancelledError: 

185 break 

186 except aiohttp.ClientError as e: 

187 logger.error(f"Signal polling error: {e}") 

188 self.status = ChannelStatus.ERROR 

189 

190 # Exponential backoff for reconnection 

191 await asyncio.sleep(reconnect_delay) 

192 reconnect_delay = min(reconnect_delay * 2, self._max_reconnect_delay) 

193 

194 # Try to reconnect 

195 await self._reconnect() 

196 except Exception as e: 

197 logger.error(f"Unexpected error in Signal polling: {e}") 

198 await asyncio.sleep(reconnect_delay) 

199 

200 async def _reconnect(self) -> None: 

201 """Attempt to reconnect to Signal API.""" 

202 logger.info("Attempting to reconnect to Signal API...") 

203 

204 if self._session: 

205 await self._session.close() 

206 

207 self._session = aiohttp.ClientSession() 

208 

209 try: 

210 async with self._session.get(f"{self._api_url}/v1/about") as response: 

211 if response.status == 200: 

212 self.status = ChannelStatus.CONNECTED 

213 logger.info("Reconnected to Signal API") 

214 except Exception as e: 

215 logger.error(f"Reconnection failed: {e}") 

216 

217 def _convert_message(self, msg_data: Dict[str, Any]) -> Optional[Message]: 

218 """Convert Signal message to unified Message format.""" 

219 envelope = msg_data.get("envelope", {}) 

220 

221 # Skip non-data messages 

222 data_message = envelope.get("dataMessage") 

223 if not data_message: 

224 return None 

225 

226 source = envelope.get("source", "") 

227 source_name = envelope.get("sourceName", source) 

228 timestamp = envelope.get("timestamp", 0) 

229 

230 # Determine chat ID (group or direct) 

231 group_info = data_message.get("groupInfo", {}) 

232 is_group = bool(group_info) 

233 

234 if is_group: 

235 chat_id = group_info.get("groupId", "") 

236 else: 

237 chat_id = source 

238 

239 # Process attachments 

240 media = [] 

241 attachments = data_message.get("attachments", []) 

242 for att in attachments: 

243 media_type = self._get_media_type(att.get("contentType", "")) 

244 media.append(MediaAttachment( 

245 type=media_type, 

246 file_id=att.get("id"), 

247 file_name=att.get("filename"), 

248 mime_type=att.get("contentType"), 

249 file_size=att.get("size"), 

250 )) 

251 

252 # Check for mentions 

253 mentions = data_message.get("mentions", []) 

254 is_mentioned = any( 

255 m.get("number") == self._phone_number 

256 for m in mentions 

257 ) 

258 

259 return Message( 

260 id=str(timestamp), 

261 channel=self.name, 

262 sender_id=source, 

263 sender_name=source_name, 

264 chat_id=chat_id, 

265 text=data_message.get("message", ""), 

266 media=media, 

267 reply_to_id=str(data_message.get("quote", {}).get("id")) if data_message.get("quote") else None, 

268 timestamp=datetime.fromtimestamp(timestamp / 1000) if timestamp else datetime.now(), 

269 is_group=is_group, 

270 is_bot_mentioned=is_mentioned, 

271 raw=msg_data, 

272 ) 

273 

274 def _get_media_type(self, content_type: str) -> MessageType: 

275 """Get MessageType from MIME content type.""" 

276 if content_type.startswith("image/"): 

277 return MessageType.IMAGE 

278 elif content_type.startswith("video/"): 

279 return MessageType.VIDEO 

280 elif content_type.startswith("audio/"): 

281 return MessageType.AUDIO 

282 elif content_type.startswith("application/"): 

283 return MessageType.DOCUMENT 

284 else: 

285 return MessageType.DOCUMENT 

286 

287 async def send_message( 

288 self, 

289 chat_id: str, 

290 text: str, 

291 reply_to: Optional[str] = None, 

292 media: Optional[List[MediaAttachment]] = None, 

293 buttons: Optional[List[Dict]] = None, 

294 ) -> SendResult: 

295 """Send a message via Signal.""" 

296 if not self._session: 

297 return SendResult(success=False, error="Not connected") 

298 

299 try: 

300 # Determine if group or direct message 

301 is_group = chat_id.startswith("group.") 

302 

303 payload = { 

304 "message": text, 

305 "number": self._phone_number, 

306 } 

307 

308 if is_group: 

309 payload["recipients"] = [] 

310 # Group ID format: group.BASE64_ID 

311 group_id = chat_id.replace("group.", "") 

312 endpoint = f"{self._api_url}/v2/send" 

313 payload["group_id"] = group_id 

314 else: 

315 payload["recipients"] = [chat_id] 

316 endpoint = f"{self._api_url}/v2/send" 

317 

318 # Handle quote/reply 

319 if reply_to: 

320 payload["quote_timestamp"] = int(reply_to) 

321 

322 # Handle attachments 

323 if media: 

324 attachments = [] 

325 for m in media: 

326 att_data = await self._prepare_attachment(m) 

327 if att_data: 

328 attachments.append(att_data) 

329 if attachments: 

330 payload["base64_attachments"] = attachments 

331 

332 async with self._session.post( 

333 endpoint, 

334 json=payload, 

335 timeout=aiohttp.ClientTimeout(total=30) 

336 ) as response: 

337 if response.status in (200, 201): 

338 result = await response.json() 

339 return SendResult( 

340 success=True, 

341 message_id=str(result.get("timestamp", "")), 

342 raw=result, 

343 ) 

344 else: 

345 error_text = await response.text() 

346 logger.error(f"Failed to send Signal message: {error_text}") 

347 return SendResult(success=False, error=error_text) 

348 

349 except aiohttp.ClientError as e: 

350 logger.error(f"Signal send error: {e}") 

351 return SendResult(success=False, error=str(e)) 

352 except Exception as e: 

353 logger.error(f"Unexpected error sending Signal message: {e}") 

354 return SendResult(success=False, error=str(e)) 

355 

356 async def _prepare_attachment(self, attachment: MediaAttachment) -> Optional[str]: 

357 """Prepare attachment for sending (base64 encode).""" 

358 try: 

359 if attachment.file_path: 

360 path = Path(attachment.file_path) 

361 if path.exists(): 

362 content = path.read_bytes() 

363 mime_type = attachment.mime_type or mimetypes.guess_type(str(path))[0] or "application/octet-stream" 

364 return f"data:{mime_type};base64,{base64.b64encode(content).decode()}" 

365 elif attachment.url: 

366 # Download from URL 

367 async with self._session.get(attachment.url) as response: 

368 if response.status == 200: 

369 content = await response.read() 

370 mime_type = attachment.mime_type or response.content_type or "application/octet-stream" 

371 return f"data:{mime_type};base64,{base64.b64encode(content).decode()}" 

372 except Exception as e: 

373 logger.error(f"Failed to prepare attachment: {e}") 

374 

375 return None 

376 

377 async def edit_message( 

378 self, 

379 chat_id: str, 

380 message_id: str, 

381 text: str, 

382 buttons: Optional[List[Dict]] = None, 

383 ) -> SendResult: 

384 """Edit an existing message (Signal doesn't support this natively).""" 

385 # Signal doesn't support message editing 

386 # Send a new message with indication it's an edit 

387 return await self.send_message( 

388 chat_id=chat_id, 

389 text=f"[Edit] {text}", 

390 ) 

391 

392 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

393 """Delete a message (Signal remote delete).""" 

394 if not self._session: 

395 return False 

396 

397 try: 

398 payload = { 

399 "number": self._phone_number, 

400 "recipients": [chat_id] if not chat_id.startswith("group.") else [], 

401 "target_timestamp": int(message_id), 

402 } 

403 

404 if chat_id.startswith("group."): 

405 payload["group_id"] = chat_id.replace("group.", "") 

406 

407 async with self._session.post( 

408 f"{self._api_url}/v1/delete", 

409 json=payload, 

410 ) as response: 

411 return response.status in (200, 201, 204) 

412 

413 except Exception as e: 

414 logger.error(f"Failed to delete Signal message: {e}") 

415 return False 

416 

417 async def send_typing(self, chat_id: str) -> None: 

418 """Send typing indicator.""" 

419 if not self._session: 

420 return 

421 

422 try: 

423 payload = { 

424 "number": self._phone_number, 

425 } 

426 

427 if chat_id.startswith("group."): 

428 payload["group_id"] = chat_id.replace("group.", "") 

429 else: 

430 payload["recipient"] = chat_id 

431 

432 await self._session.put( 

433 f"{self._api_url}/v1/typing-indicator/{self._phone_number}", 

434 json=payload, 

435 ) 

436 except Exception as e: 

437 logger.debug(f"Failed to send typing indicator: {e}") 

438 

439 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

440 """Get information about a chat.""" 

441 if not self._session: 

442 return None 

443 

444 try: 

445 if chat_id.startswith("group."): 

446 group_id = chat_id.replace("group.", "") 

447 async with self._session.get( 

448 f"{self._api_url}/v1/groups/{self._phone_number}/{group_id}" 

449 ) as response: 

450 if response.status == 200: 

451 data = await response.json() 

452 return { 

453 "id": chat_id, 

454 "type": "group", 

455 "name": data.get("name"), 

456 "members": data.get("members", []), 

457 } 

458 else: 

459 # Direct chat - return phone info 

460 async with self._session.get( 

461 f"{self._api_url}/v1/identities/{self._phone_number}/{chat_id}" 

462 ) as response: 

463 if response.status == 200: 

464 data = await response.json() 

465 return { 

466 "id": chat_id, 

467 "type": "direct", 

468 "trust_level": data.get("trust_level"), 

469 } 

470 

471 except Exception as e: 

472 logger.error(f"Failed to get chat info: {e}") 

473 

474 return None 

475 

476 async def send_reaction( 

477 self, 

478 chat_id: str, 

479 message_id: str, 

480 emoji: str, 

481 remove: bool = False, 

482 ) -> bool: 

483 """Send a reaction to a message.""" 

484 if not self._session: 

485 return False 

486 

487 try: 

488 payload = { 

489 "number": self._phone_number, 

490 "reaction": { 

491 "emoji": emoji, 

492 "target_author": chat_id if not chat_id.startswith("group.") else "", 

493 "target_timestamp": int(message_id), 

494 "remove": remove, 

495 }, 

496 } 

497 

498 if chat_id.startswith("group."): 

499 payload["group_id"] = chat_id.replace("group.", "") 

500 else: 

501 payload["recipients"] = [chat_id] 

502 

503 async with self._session.post( 

504 f"{self._api_url}/v2/send", 

505 json=payload, 

506 ) as response: 

507 return response.status in (200, 201) 

508 

509 except Exception as e: 

510 logger.error(f"Failed to send reaction: {e}") 

511 return False 

512 

513 async def create_group( 

514 self, 

515 name: str, 

516 members: List[str], 

517 avatar_path: Optional[str] = None, 

518 ) -> Optional[str]: 

519 """Create a new Signal group (Group V2).""" 

520 if not self._session: 

521 return None 

522 

523 try: 

524 payload = { 

525 "name": name, 

526 "members": members, 

527 } 

528 

529 if avatar_path and Path(avatar_path).exists(): 

530 content = Path(avatar_path).read_bytes() 

531 payload["avatar"] = base64.b64encode(content).decode() 

532 

533 async with self._session.post( 

534 f"{self._api_url}/v1/groups/{self._phone_number}", 

535 json=payload, 

536 ) as response: 

537 if response.status in (200, 201): 

538 data = await response.json() 

539 return f"group.{data.get('id', '')}" 

540 

541 except Exception as e: 

542 logger.error(f"Failed to create group: {e}") 

543 

544 return None 

545 

546 async def download_attachment( 

547 self, 

548 attachment_id: str, 

549 destination: str, 

550 ) -> bool: 

551 """Download an attachment from Signal.""" 

552 if not self._session: 

553 return False 

554 

555 try: 

556 async with self._session.get( 

557 f"{self._api_url}/v1/attachments/{attachment_id}" 

558 ) as response: 

559 if response.status == 200: 

560 content = await response.read() 

561 Path(destination).write_bytes(content) 

562 return True 

563 

564 except Exception as e: 

565 logger.error(f"Failed to download attachment: {e}") 

566 

567 return False 

568 

569 

570def create_signal_adapter( 

571 phone_number: str = None, 

572 api_url: str = None, 

573 **kwargs 

574) -> SignalAdapter: 

575 """ 

576 Factory function to create Signal adapter. 

577 

578 Args: 

579 phone_number: Signal phone number (or set SIGNAL_PHONE_NUMBER env var) 

580 api_url: signal-cli REST API URL (default: http://localhost:8080) 

581 **kwargs: Additional config options 

582 

583 Returns: 

584 Configured SignalAdapter 

585 """ 

586 phone_number = phone_number or os.getenv("SIGNAL_PHONE_NUMBER") 

587 if not phone_number: 

588 raise ValueError("Signal phone number required") 

589 

590 api_url = api_url or os.getenv("SIGNAL_API_URL", "http://localhost:8080") 

591 

592 config = ChannelConfig( 

593 token=phone_number, 

594 extra={"api_url": api_url, **kwargs.get("extra", {})}, 

595 **{k: v for k, v in kwargs.items() if k != "extra"}, 

596 ) 

597 return SignalAdapter(config)