Coverage for integrations / channels / google_chat_adapter.py: 40.6%

219 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Google Chat Channel Adapter 

3 

4Implements Google Chat messaging using webhooks and Chat API. 

5Designed for Docker-compatible deployments. 

6 

7Features: 

8- Webhook-based integration (incoming) 

9- Chat API for outgoing messages 

10- Card messages with rich formatting 

11- Slash commands 

12- Thread support 

13- Spaces (rooms) support 

14 

15Requirements: 

16- Google Cloud project with Chat API enabled 

17- Service account or OAuth credentials 

18- Webhook URL for incoming messages (optional) 

19 

20Two modes: 

211. Webhook-only: Receive messages via webhook, respond inline 

222. Full API: Use Chat API for two-way communication 

23""" 

24 

25from __future__ import annotations 

26 

27import asyncio 

28import logging 

29import os 

30import json 

31from typing import Optional, List, Dict, Any, Callable 

32from datetime import datetime 

33 

34try: 

35 import aiohttp 

36 HAS_AIOHTTP = True 

37except ImportError: 

38 HAS_AIOHTTP = False 

39 

40try: 

41 from google.oauth2 import service_account 

42 from googleapiclient.discovery import build 

43 HAS_GOOGLE = True 

44except ImportError: 

45 HAS_GOOGLE = False 

46 

47from .base import ( 

48 ChannelAdapter, 

49 ChannelConfig, 

50 ChannelStatus, 

51 Message, 

52 MessageType, 

53 MediaAttachment, 

54 SendResult, 

55 ChannelConnectionError, 

56 ChannelSendError, 

57 ChannelRateLimitError, 

58) 

59 

60logger = logging.getLogger(__name__) 

61 

62 

63class GoogleChatAdapter(ChannelAdapter): 

64 """ 

65 Google Chat adapter supporting webhooks and Chat API. 

66 

67 Usage (Webhook mode): 

68 config = ChannelConfig( 

69 webhook_url="https://chat.googleapis.com/v1/spaces/XXX/messages?key=YYY&token=ZZZ" 

70 ) 

71 adapter = GoogleChatAdapter(config) 

72 adapter.on_message(my_handler) 

73 await adapter.start() 

74 

75 Usage (Full API mode): 

76 config = ChannelConfig( 

77 extra={ 

78 "service_account_file": "/path/to/credentials.json", 

79 "scopes": ["https://www.googleapis.com/auth/chat.bot"], 

80 } 

81 ) 

82 adapter = GoogleChatAdapter(config) 

83 adapter.on_message(my_handler) 

84 await adapter.start() 

85 """ 

86 

87 def __init__(self, config: ChannelConfig): 

88 if not HAS_AIOHTTP: 

89 raise ImportError( 

90 "aiohttp not installed. " 

91 "Install with: pip install aiohttp" 

92 ) 

93 

94 super().__init__(config) 

95 self._webhook_url = config.webhook_url 

96 self._session: Optional[aiohttp.ClientSession] = None 

97 self._chat_service = None 

98 self._bot_id: Optional[str] = None 

99 self._slash_commands: Dict[str, Callable] = {} 

100 self._use_api = config.extra.get("service_account_file") is not None 

101 

102 @property 

103 def name(self) -> str: 

104 return "google_chat" 

105 

106 async def connect(self) -> bool: 

107 """Connect to Google Chat.""" 

108 try: 

109 self._session = aiohttp.ClientSession() 

110 

111 # Initialize Google Chat API if credentials provided 

112 if self._use_api: 

113 if not HAS_GOOGLE: 

114 raise ImportError( 

115 "Google API client not installed. " 

116 "Install with: pip install google-api-python-client google-auth" 

117 ) 

118 

119 sa_file = self.config.extra.get("service_account_file") 

120 scopes = self.config.extra.get("scopes", [ 

121 "https://www.googleapis.com/auth/chat.bot", 

122 "https://www.googleapis.com/auth/chat.messages", 

123 "https://www.googleapis.com/auth/chat.messages.create", 

124 ]) 

125 

126 credentials = service_account.Credentials.from_service_account_file( 

127 sa_file, scopes=scopes 

128 ) 

129 

130 self._chat_service = build("chat", "v1", credentials=credentials) 

131 logger.info("Connected to Google Chat API with service account") 

132 

133 elif self._webhook_url: 

134 # Verify webhook URL is valid 

135 logger.info("Using webhook-only mode for Google Chat") 

136 

137 else: 

138 logger.error("No webhook URL or service account provided") 

139 return False 

140 

141 self.status = ChannelStatus.CONNECTED 

142 return True 

143 

144 except Exception as e: 

145 logger.error(f"Failed to connect to Google Chat: {e}") 

146 self.status = ChannelStatus.ERROR 

147 return False 

148 

149 async def disconnect(self) -> None: 

150 """Disconnect from Google Chat.""" 

151 if self._session: 

152 await self._session.close() 

153 self._session = None 

154 

155 self._chat_service = None 

156 self.status = ChannelStatus.DISCONNECTED 

157 logger.info("Disconnected from Google Chat") 

158 

159 def register_slash_command( 

160 self, 

161 command: str, 

162 handler: Callable, 

163 description: str = "", 

164 ) -> None: 

165 """ 

166 Register a slash command handler. 

167 

168 Args: 

169 command: Command name (without /) 

170 handler: Async function to handle the command 

171 description: Command description 

172 """ 

173 self._slash_commands[command] = { 

174 "handler": handler, 

175 "description": description, 

176 } 

177 

178 async def handle_webhook(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: 

179 """ 

180 Handle incoming webhook from Google Chat. 

181 

182 This should be called from your webhook endpoint. 

183 

184 Args: 

185 data: Webhook payload from Google Chat 

186 

187 Returns: 

188 Response to send back (optional) 

189 """ 

190 event_type = data.get("type", "") 

191 message_data = data.get("message", {}) 

192 

193 if event_type == "ADDED_TO_SPACE": 

194 # Bot added to space 

195 space = data.get("space", {}) 

196 logger.info(f"Added to space: {space.get('displayName', space.get('name'))}") 

197 

198 # Return welcome message 

199 return { 

200 "text": "Thanks for adding me! I'm ready to help." 

201 } 

202 

203 elif event_type == "REMOVED_FROM_SPACE": 

204 # Bot removed from space 

205 space = data.get("space", {}) 

206 logger.info(f"Removed from space: {space.get('displayName', space.get('name'))}") 

207 return None 

208 

209 elif event_type == "MESSAGE": 

210 # Handle incoming message 

211 message = self._convert_message(data) 

212 if message: 

213 # Check for slash commands 

214 slash_command = message_data.get("slashCommand") 

215 if slash_command: 

216 command_id = slash_command.get("commandId") 

217 # Find command by ID (requires matching configuration) 

218 for cmd_name, cmd_info in self._slash_commands.items(): 

219 if str(command_id) == cmd_name or cmd_name in message.text: 

220 handler = cmd_info["handler"] 

221 result = handler(message) 

222 if asyncio.iscoroutine(result): 

223 result = await result 

224 if isinstance(result, dict): 

225 return result 

226 elif isinstance(result, str): 

227 return {"text": result} 

228 

229 # Dispatch to registered handlers 

230 await self._dispatch_message(message) 

231 

232 return None 

233 

234 elif event_type == "CARD_CLICKED": 

235 # Handle card button click 

236 action = data.get("action", {}) 

237 action_name = action.get("actionMethodName", "") 

238 parameters = { 

239 p.get("key"): p.get("value") 

240 for p in action.get("parameters", []) 

241 } 

242 

243 # Create message-like event 

244 message = Message( 

245 id=data.get("eventTime", str(datetime.now().timestamp())), 

246 channel=self.name, 

247 sender_id=data.get("user", {}).get("name", ""), 

248 sender_name=data.get("user", {}).get("displayName", ""), 

249 chat_id=data.get("space", {}).get("name", ""), 

250 text=f"[button:{action_name}]", 

251 raw={ 

252 "action": action_name, 

253 "parameters": parameters, 

254 "card_clicked": True, 

255 }, 

256 ) 

257 await self._dispatch_message(message) 

258 

259 return None 

260 

261 return None 

262 

263 def _convert_message(self, data: Dict[str, Any]) -> Optional[Message]: 

264 """Convert Google Chat message to unified Message format.""" 

265 message_data = data.get("message", {}) 

266 

267 if not message_data: 

268 return None 

269 

270 sender = message_data.get("sender", {}) 

271 space = data.get("space", {}) 

272 thread = message_data.get("thread", {}) 

273 

274 # Determine if group/space or DM 

275 space_type = space.get("type", "") 

276 is_group = space_type in ("ROOM", "SPACE") 

277 

278 # Process attachments 

279 media = [] 

280 for attachment in message_data.get("attachment", []): 

281 att_data = attachment.get("attachmentDataRef", {}) 

282 media.append(MediaAttachment( 

283 type=MessageType.DOCUMENT, 

284 file_id=att_data.get("resourceName"), 

285 file_name=attachment.get("contentName"), 

286 mime_type=attachment.get("contentType"), 

287 )) 

288 

289 # Check for mentions 

290 annotations = message_data.get("annotations", []) 

291 is_mentioned = any( 

292 ann.get("type") == "USER_MENTION" and 

293 ann.get("userMention", {}).get("type") == "BOT" 

294 for ann in annotations 

295 ) 

296 

297 return Message( 

298 id=message_data.get("name", ""), 

299 channel=self.name, 

300 sender_id=sender.get("name", ""), 

301 sender_name=sender.get("displayName", ""), 

302 chat_id=space.get("name", ""), 

303 text=message_data.get("text", "") or message_data.get("argumentText", ""), 

304 media=media, 

305 reply_to_id=thread.get("name") if thread else None, 

306 timestamp=datetime.now(), # Google Chat doesn't include timestamp in webhook 

307 is_group=is_group, 

308 is_bot_mentioned=is_mentioned, 

309 raw={ 

310 "space": space, 

311 "thread": thread, 

312 "sender": sender, 

313 }, 

314 ) 

315 

316 async def send_message( 

317 self, 

318 chat_id: str, 

319 text: str, 

320 reply_to: Optional[str] = None, 

321 media: Optional[List[MediaAttachment]] = None, 

322 buttons: Optional[List[Dict]] = None, 

323 ) -> SendResult: 

324 """Send a message to Google Chat.""" 

325 try: 

326 # Build message payload 

327 message = {"text": text} 

328 

329 # Add thread for reply 

330 if reply_to: 

331 message["thread"] = {"name": reply_to} 

332 

333 # Add cards for buttons 

334 if buttons: 

335 message["cards"] = [self._build_card(text, buttons)] 

336 # Text is in card, don't duplicate 

337 del message["text"] 

338 

339 # Use API if available 

340 if self._chat_service: 

341 return await self._send_via_api(chat_id, message, reply_to) 

342 elif self._webhook_url: 

343 return await self._send_via_webhook(message) 

344 else: 

345 return SendResult(success=False, error="No sending method configured") 

346 

347 except Exception as e: 

348 logger.error(f"Failed to send Google Chat message: {e}") 

349 return SendResult(success=False, error=str(e)) 

350 

351 async def _send_via_api( 

352 self, 

353 space_name: str, 

354 message: Dict[str, Any], 

355 thread_key: Optional[str] = None, 

356 ) -> SendResult: 

357 """Send message using Chat API.""" 

358 try: 

359 # Run in executor since googleapiclient is synchronous 

360 loop = asyncio.get_event_loop() 

361 

362 request = self._chat_service.spaces().messages().create( 

363 parent=space_name, 

364 body=message, 

365 messageReplyOption="REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD" if thread_key else None, 

366 ) 

367 

368 result = await loop.run_in_executor(None, request.execute) 

369 

370 return SendResult( 

371 success=True, 

372 message_id=result.get("name", ""), 

373 raw=result, 

374 ) 

375 

376 except Exception as e: 

377 logger.error(f"Chat API error: {e}") 

378 return SendResult(success=False, error=str(e)) 

379 

380 async def _send_via_webhook(self, message: Dict[str, Any]) -> SendResult: 

381 """Send message via webhook.""" 

382 try: 

383 async with self._session.post( 

384 self._webhook_url, 

385 json=message, 

386 headers={"Content-Type": "application/json"}, 

387 ) as response: 

388 if response.status in (200, 201): 

389 data = await response.json() 

390 return SendResult( 

391 success=True, 

392 message_id=data.get("name", ""), 

393 raw=data, 

394 ) 

395 else: 

396 error_text = await response.text() 

397 logger.error(f"Webhook error: {error_text}") 

398 return SendResult(success=False, error=error_text) 

399 

400 except Exception as e: 

401 logger.error(f"Webhook send error: {e}") 

402 return SendResult(success=False, error=str(e)) 

403 

404 def _build_card( 

405 self, 

406 text: str, 

407 buttons: List[Dict], 

408 header: Optional[Dict] = None, 

409 ) -> Dict[str, Any]: 

410 """Build a Google Chat card message.""" 

411 card = { 

412 "sections": [ 

413 { 

414 "widgets": [ 

415 {"textParagraph": {"text": text}}, 

416 ] 

417 } 

418 ] 

419 } 

420 

421 # Add header if provided 

422 if header: 

423 card["header"] = { 

424 "title": header.get("title", ""), 

425 "subtitle": header.get("subtitle"), 

426 "imageUrl": header.get("image_url"), 

427 } 

428 

429 # Add buttons 

430 button_list = [] 

431 for btn in buttons: 

432 if btn.get("url"): 

433 button_list.append({ 

434 "textButton": { 

435 "text": btn["text"], 

436 "onClick": { 

437 "openLink": {"url": btn["url"]} 

438 } 

439 } 

440 }) 

441 else: 

442 button_list.append({ 

443 "textButton": { 

444 "text": btn["text"], 

445 "onClick": { 

446 "action": { 

447 "actionMethodName": btn.get("callback_data", btn["text"]), 

448 "parameters": btn.get("parameters", []), 

449 } 

450 } 

451 } 

452 }) 

453 

454 if button_list: 

455 card["sections"][0]["widgets"].append({ 

456 "buttons": button_list 

457 }) 

458 

459 return card 

460 

461 def build_card_v2( 

462 self, 

463 title: str, 

464 sections: List[Dict[str, Any]], 

465 ) -> Dict[str, Any]: 

466 """ 

467 Build a Cards v2 format message. 

468 

469 Args: 

470 title: Card title 

471 sections: List of section definitions 

472 

473 Returns: 

474 Card message payload 

475 

476 Example section: 

477 { 

478 "header": "Section Header", 

479 "widgets": [ 

480 {"text": "Some text"}, 

481 {"image": {"url": "https://..."}}, 

482 {"buttons": [{"text": "Click", "url": "https://..."}]}, 

483 ] 

484 } 

485 """ 

486 card_sections = [] 

487 

488 for section in sections: 

489 widgets = [] 

490 

491 for widget in section.get("widgets", []): 

492 if "text" in widget: 

493 widgets.append({ 

494 "decoratedText": { 

495 "text": widget["text"], 

496 "wrapText": True, 

497 } 

498 }) 

499 elif "image" in widget: 

500 widgets.append({ 

501 "image": { 

502 "imageUrl": widget["image"]["url"], 

503 "altText": widget["image"].get("alt", ""), 

504 } 

505 }) 

506 elif "buttons" in widget: 

507 button_list = [] 

508 for btn in widget["buttons"]: 

509 if btn.get("url"): 

510 button_list.append({ 

511 "text": btn["text"], 

512 "onClick": {"openLink": {"url": btn["url"]}}, 

513 }) 

514 else: 

515 button_list.append({ 

516 "text": btn["text"], 

517 "onClick": { 

518 "action": { 

519 "function": btn.get("action", btn["text"]), 

520 "parameters": btn.get("parameters", []), 

521 } 

522 }, 

523 }) 

524 widgets.append({"buttonList": {"buttons": button_list}}) 

525 

526 card_sections.append({ 

527 "header": section.get("header"), 

528 "widgets": widgets, 

529 }) 

530 

531 return { 

532 "cardsV2": [ 

533 { 

534 "cardId": "main", 

535 "card": { 

536 "header": {"title": title}, 

537 "sections": card_sections, 

538 } 

539 } 

540 ] 

541 } 

542 

543 async def edit_message( 

544 self, 

545 chat_id: str, 

546 message_id: str, 

547 text: str, 

548 buttons: Optional[List[Dict]] = None, 

549 ) -> SendResult: 

550 """Edit an existing message.""" 

551 if not self._chat_service: 

552 return SendResult(success=False, error="API mode required for editing") 

553 

554 try: 

555 message = {"text": text} 

556 

557 if buttons: 

558 message["cards"] = [self._build_card(text, buttons)] 

559 del message["text"] 

560 

561 loop = asyncio.get_event_loop() 

562 

563 request = self._chat_service.spaces().messages().update( 

564 name=message_id, 

565 updateMask="text,cards", 

566 body=message, 

567 ) 

568 

569 result = await loop.run_in_executor(None, request.execute) 

570 

571 return SendResult( 

572 success=True, 

573 message_id=result.get("name", ""), 

574 raw=result, 

575 ) 

576 

577 except Exception as e: 

578 logger.error(f"Failed to edit message: {e}") 

579 return SendResult(success=False, error=str(e)) 

580 

581 async def delete_message(self, chat_id: str, message_id: str) -> bool: 

582 """Delete a message.""" 

583 if not self._chat_service: 

584 return False 

585 

586 try: 

587 loop = asyncio.get_event_loop() 

588 

589 request = self._chat_service.spaces().messages().delete( 

590 name=message_id 

591 ) 

592 

593 await loop.run_in_executor(None, request.execute) 

594 return True 

595 

596 except Exception as e: 

597 logger.error(f"Failed to delete message: {e}") 

598 return False 

599 

600 async def send_typing(self, chat_id: str) -> None: 

601 """Send typing indicator (not supported in Google Chat).""" 

602 # Google Chat doesn't have typing indicators 

603 pass 

604 

605 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: 

606 """Get information about a space.""" 

607 if not self._chat_service: 

608 return None 

609 

610 try: 

611 loop = asyncio.get_event_loop() 

612 

613 request = self._chat_service.spaces().get(name=chat_id) 

614 result = await loop.run_in_executor(None, request.execute) 

615 

616 return { 

617 "id": result.get("name"), 

618 "type": result.get("type"), 

619 "display_name": result.get("displayName"), 

620 "single_user_bot_dm": result.get("singleUserBotDm"), 

621 } 

622 

623 except Exception as e: 

624 logger.error(f"Failed to get space info: {e}") 

625 return None 

626 

627 async def list_spaces(self) -> List[Dict[str, Any]]: 

628 """List spaces the bot is a member of.""" 

629 if not self._chat_service: 

630 return [] 

631 

632 try: 

633 loop = asyncio.get_event_loop() 

634 

635 request = self._chat_service.spaces().list() 

636 result = await loop.run_in_executor(None, request.execute) 

637 

638 return result.get("spaces", []) 

639 

640 except Exception as e: 

641 logger.error(f"Failed to list spaces: {e}") 

642 return [] 

643 

644 async def create_space( 

645 self, 

646 name: str, 

647 external_user_allowed: bool = False, 

648 ) -> Optional[str]: 

649 """Create a new space.""" 

650 if not self._chat_service: 

651 return None 

652 

653 try: 

654 loop = asyncio.get_event_loop() 

655 

656 request = self._chat_service.spaces().create( 

657 body={ 

658 "displayName": name, 

659 "spaceType": "SPACE", 

660 "externalUserAllowed": external_user_allowed, 

661 } 

662 ) 

663 

664 result = await loop.run_in_executor(None, request.execute) 

665 return result.get("name") 

666 

667 except Exception as e: 

668 logger.error(f"Failed to create space: {e}") 

669 return None 

670 

671 

672def create_google_chat_adapter( 

673 webhook_url: str = None, 

674 service_account_file: str = None, 

675 **kwargs 

676) -> GoogleChatAdapter: 

677 """ 

678 Factory function to create Google Chat adapter. 

679 

680 Args: 

681 webhook_url: Webhook URL for incoming messages (or set GOOGLE_CHAT_WEBHOOK env var) 

682 service_account_file: Path to service account JSON (or set GOOGLE_CHAT_SA_FILE env var) 

683 **kwargs: Additional config options 

684 

685 Returns: 

686 Configured GoogleChatAdapter 

687 """ 

688 webhook_url = webhook_url or os.getenv("GOOGLE_CHAT_WEBHOOK") 

689 service_account_file = service_account_file or os.getenv("GOOGLE_CHAT_SA_FILE") 

690 

691 if not webhook_url and not service_account_file: 

692 raise ValueError("Either webhook_url or service_account_file required") 

693 

694 config = ChannelConfig( 

695 webhook_url=webhook_url, 

696 extra={ 

697 "service_account_file": service_account_file, 

698 **kwargs.get("extra", {}), 

699 }, 

700 **{k: v for k, v in kwargs.items() if k != "extra"}, 

701 ) 

702 return GoogleChatAdapter(config)