Coverage for integrations / channels / google_chat_adapter.py: 40.6%
219 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Google Chat Channel Adapter
4Implements Google Chat messaging using webhooks and Chat API.
5Designed for Docker-compatible deployments.
7Features:
8- Webhook-based integration (incoming)
9- Chat API for outgoing messages
10- Card messages with rich formatting
11- Slash commands
12- Thread support
13- Spaces (rooms) support
15Requirements:
16- Google Cloud project with Chat API enabled
17- Service account or OAuth credentials
18- Webhook URL for incoming messages (optional)
20Two modes:
211. Webhook-only: Receive messages via webhook, respond inline
222. Full API: Use Chat API for two-way communication
23"""
25from __future__ import annotations
27import asyncio
28import logging
29import os
30import json
31from typing import Optional, List, Dict, Any, Callable
32from datetime import datetime
34try:
35 import aiohttp
36 HAS_AIOHTTP = True
37except ImportError:
38 HAS_AIOHTTP = False
40try:
41 from google.oauth2 import service_account
42 from googleapiclient.discovery import build
43 HAS_GOOGLE = True
44except ImportError:
45 HAS_GOOGLE = False
47from .base import (
48 ChannelAdapter,
49 ChannelConfig,
50 ChannelStatus,
51 Message,
52 MessageType,
53 MediaAttachment,
54 SendResult,
55 ChannelConnectionError,
56 ChannelSendError,
57 ChannelRateLimitError,
58)
60logger = logging.getLogger(__name__)
63class GoogleChatAdapter(ChannelAdapter):
64 """
65 Google Chat adapter supporting webhooks and Chat API.
67 Usage (Webhook mode):
68 config = ChannelConfig(
69 webhook_url="https://chat.googleapis.com/v1/spaces/XXX/messages?key=YYY&token=ZZZ"
70 )
71 adapter = GoogleChatAdapter(config)
72 adapter.on_message(my_handler)
73 await adapter.start()
75 Usage (Full API mode):
76 config = ChannelConfig(
77 extra={
78 "service_account_file": "/path/to/credentials.json",
79 "scopes": ["https://www.googleapis.com/auth/chat.bot"],
80 }
81 )
82 adapter = GoogleChatAdapter(config)
83 adapter.on_message(my_handler)
84 await adapter.start()
85 """
87 def __init__(self, config: ChannelConfig):
88 if not HAS_AIOHTTP:
89 raise ImportError(
90 "aiohttp not installed. "
91 "Install with: pip install aiohttp"
92 )
94 super().__init__(config)
95 self._webhook_url = config.webhook_url
96 self._session: Optional[aiohttp.ClientSession] = None
97 self._chat_service = None
98 self._bot_id: Optional[str] = None
99 self._slash_commands: Dict[str, Callable] = {}
100 self._use_api = config.extra.get("service_account_file") is not None
102 @property
103 def name(self) -> str:
104 return "google_chat"
106 async def connect(self) -> bool:
107 """Connect to Google Chat."""
108 try:
109 self._session = aiohttp.ClientSession()
111 # Initialize Google Chat API if credentials provided
112 if self._use_api:
113 if not HAS_GOOGLE:
114 raise ImportError(
115 "Google API client not installed. "
116 "Install with: pip install google-api-python-client google-auth"
117 )
119 sa_file = self.config.extra.get("service_account_file")
120 scopes = self.config.extra.get("scopes", [
121 "https://www.googleapis.com/auth/chat.bot",
122 "https://www.googleapis.com/auth/chat.messages",
123 "https://www.googleapis.com/auth/chat.messages.create",
124 ])
126 credentials = service_account.Credentials.from_service_account_file(
127 sa_file, scopes=scopes
128 )
130 self._chat_service = build("chat", "v1", credentials=credentials)
131 logger.info("Connected to Google Chat API with service account")
133 elif self._webhook_url:
134 # Verify webhook URL is valid
135 logger.info("Using webhook-only mode for Google Chat")
137 else:
138 logger.error("No webhook URL or service account provided")
139 return False
141 self.status = ChannelStatus.CONNECTED
142 return True
144 except Exception as e:
145 logger.error(f"Failed to connect to Google Chat: {e}")
146 self.status = ChannelStatus.ERROR
147 return False
149 async def disconnect(self) -> None:
150 """Disconnect from Google Chat."""
151 if self._session:
152 await self._session.close()
153 self._session = None
155 self._chat_service = None
156 self.status = ChannelStatus.DISCONNECTED
157 logger.info("Disconnected from Google Chat")
159 def register_slash_command(
160 self,
161 command: str,
162 handler: Callable,
163 description: str = "",
164 ) -> None:
165 """
166 Register a slash command handler.
168 Args:
169 command: Command name (without /)
170 handler: Async function to handle the command
171 description: Command description
172 """
173 self._slash_commands[command] = {
174 "handler": handler,
175 "description": description,
176 }
178 async def handle_webhook(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
179 """
180 Handle incoming webhook from Google Chat.
182 This should be called from your webhook endpoint.
184 Args:
185 data: Webhook payload from Google Chat
187 Returns:
188 Response to send back (optional)
189 """
190 event_type = data.get("type", "")
191 message_data = data.get("message", {})
193 if event_type == "ADDED_TO_SPACE":
194 # Bot added to space
195 space = data.get("space", {})
196 logger.info(f"Added to space: {space.get('displayName', space.get('name'))}")
198 # Return welcome message
199 return {
200 "text": "Thanks for adding me! I'm ready to help."
201 }
203 elif event_type == "REMOVED_FROM_SPACE":
204 # Bot removed from space
205 space = data.get("space", {})
206 logger.info(f"Removed from space: {space.get('displayName', space.get('name'))}")
207 return None
209 elif event_type == "MESSAGE":
210 # Handle incoming message
211 message = self._convert_message(data)
212 if message:
213 # Check for slash commands
214 slash_command = message_data.get("slashCommand")
215 if slash_command:
216 command_id = slash_command.get("commandId")
217 # Find command by ID (requires matching configuration)
218 for cmd_name, cmd_info in self._slash_commands.items():
219 if str(command_id) == cmd_name or cmd_name in message.text:
220 handler = cmd_info["handler"]
221 result = handler(message)
222 if asyncio.iscoroutine(result):
223 result = await result
224 if isinstance(result, dict):
225 return result
226 elif isinstance(result, str):
227 return {"text": result}
229 # Dispatch to registered handlers
230 await self._dispatch_message(message)
232 return None
234 elif event_type == "CARD_CLICKED":
235 # Handle card button click
236 action = data.get("action", {})
237 action_name = action.get("actionMethodName", "")
238 parameters = {
239 p.get("key"): p.get("value")
240 for p in action.get("parameters", [])
241 }
243 # Create message-like event
244 message = Message(
245 id=data.get("eventTime", str(datetime.now().timestamp())),
246 channel=self.name,
247 sender_id=data.get("user", {}).get("name", ""),
248 sender_name=data.get("user", {}).get("displayName", ""),
249 chat_id=data.get("space", {}).get("name", ""),
250 text=f"[button:{action_name}]",
251 raw={
252 "action": action_name,
253 "parameters": parameters,
254 "card_clicked": True,
255 },
256 )
257 await self._dispatch_message(message)
259 return None
261 return None
263 def _convert_message(self, data: Dict[str, Any]) -> Optional[Message]:
264 """Convert Google Chat message to unified Message format."""
265 message_data = data.get("message", {})
267 if not message_data:
268 return None
270 sender = message_data.get("sender", {})
271 space = data.get("space", {})
272 thread = message_data.get("thread", {})
274 # Determine if group/space or DM
275 space_type = space.get("type", "")
276 is_group = space_type in ("ROOM", "SPACE")
278 # Process attachments
279 media = []
280 for attachment in message_data.get("attachment", []):
281 att_data = attachment.get("attachmentDataRef", {})
282 media.append(MediaAttachment(
283 type=MessageType.DOCUMENT,
284 file_id=att_data.get("resourceName"),
285 file_name=attachment.get("contentName"),
286 mime_type=attachment.get("contentType"),
287 ))
289 # Check for mentions
290 annotations = message_data.get("annotations", [])
291 is_mentioned = any(
292 ann.get("type") == "USER_MENTION" and
293 ann.get("userMention", {}).get("type") == "BOT"
294 for ann in annotations
295 )
297 return Message(
298 id=message_data.get("name", ""),
299 channel=self.name,
300 sender_id=sender.get("name", ""),
301 sender_name=sender.get("displayName", ""),
302 chat_id=space.get("name", ""),
303 text=message_data.get("text", "") or message_data.get("argumentText", ""),
304 media=media,
305 reply_to_id=thread.get("name") if thread else None,
306 timestamp=datetime.now(), # Google Chat doesn't include timestamp in webhook
307 is_group=is_group,
308 is_bot_mentioned=is_mentioned,
309 raw={
310 "space": space,
311 "thread": thread,
312 "sender": sender,
313 },
314 )
316 async def send_message(
317 self,
318 chat_id: str,
319 text: str,
320 reply_to: Optional[str] = None,
321 media: Optional[List[MediaAttachment]] = None,
322 buttons: Optional[List[Dict]] = None,
323 ) -> SendResult:
324 """Send a message to Google Chat."""
325 try:
326 # Build message payload
327 message = {"text": text}
329 # Add thread for reply
330 if reply_to:
331 message["thread"] = {"name": reply_to}
333 # Add cards for buttons
334 if buttons:
335 message["cards"] = [self._build_card(text, buttons)]
336 # Text is in card, don't duplicate
337 del message["text"]
339 # Use API if available
340 if self._chat_service:
341 return await self._send_via_api(chat_id, message, reply_to)
342 elif self._webhook_url:
343 return await self._send_via_webhook(message)
344 else:
345 return SendResult(success=False, error="No sending method configured")
347 except Exception as e:
348 logger.error(f"Failed to send Google Chat message: {e}")
349 return SendResult(success=False, error=str(e))
351 async def _send_via_api(
352 self,
353 space_name: str,
354 message: Dict[str, Any],
355 thread_key: Optional[str] = None,
356 ) -> SendResult:
357 """Send message using Chat API."""
358 try:
359 # Run in executor since googleapiclient is synchronous
360 loop = asyncio.get_event_loop()
362 request = self._chat_service.spaces().messages().create(
363 parent=space_name,
364 body=message,
365 messageReplyOption="REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD" if thread_key else None,
366 )
368 result = await loop.run_in_executor(None, request.execute)
370 return SendResult(
371 success=True,
372 message_id=result.get("name", ""),
373 raw=result,
374 )
376 except Exception as e:
377 logger.error(f"Chat API error: {e}")
378 return SendResult(success=False, error=str(e))
380 async def _send_via_webhook(self, message: Dict[str, Any]) -> SendResult:
381 """Send message via webhook."""
382 try:
383 async with self._session.post(
384 self._webhook_url,
385 json=message,
386 headers={"Content-Type": "application/json"},
387 ) as response:
388 if response.status in (200, 201):
389 data = await response.json()
390 return SendResult(
391 success=True,
392 message_id=data.get("name", ""),
393 raw=data,
394 )
395 else:
396 error_text = await response.text()
397 logger.error(f"Webhook error: {error_text}")
398 return SendResult(success=False, error=error_text)
400 except Exception as e:
401 logger.error(f"Webhook send error: {e}")
402 return SendResult(success=False, error=str(e))
404 def _build_card(
405 self,
406 text: str,
407 buttons: List[Dict],
408 header: Optional[Dict] = None,
409 ) -> Dict[str, Any]:
410 """Build a Google Chat card message."""
411 card = {
412 "sections": [
413 {
414 "widgets": [
415 {"textParagraph": {"text": text}},
416 ]
417 }
418 ]
419 }
421 # Add header if provided
422 if header:
423 card["header"] = {
424 "title": header.get("title", ""),
425 "subtitle": header.get("subtitle"),
426 "imageUrl": header.get("image_url"),
427 }
429 # Add buttons
430 button_list = []
431 for btn in buttons:
432 if btn.get("url"):
433 button_list.append({
434 "textButton": {
435 "text": btn["text"],
436 "onClick": {
437 "openLink": {"url": btn["url"]}
438 }
439 }
440 })
441 else:
442 button_list.append({
443 "textButton": {
444 "text": btn["text"],
445 "onClick": {
446 "action": {
447 "actionMethodName": btn.get("callback_data", btn["text"]),
448 "parameters": btn.get("parameters", []),
449 }
450 }
451 }
452 })
454 if button_list:
455 card["sections"][0]["widgets"].append({
456 "buttons": button_list
457 })
459 return card
461 def build_card_v2(
462 self,
463 title: str,
464 sections: List[Dict[str, Any]],
465 ) -> Dict[str, Any]:
466 """
467 Build a Cards v2 format message.
469 Args:
470 title: Card title
471 sections: List of section definitions
473 Returns:
474 Card message payload
476 Example section:
477 {
478 "header": "Section Header",
479 "widgets": [
480 {"text": "Some text"},
481 {"image": {"url": "https://..."}},
482 {"buttons": [{"text": "Click", "url": "https://..."}]},
483 ]
484 }
485 """
486 card_sections = []
488 for section in sections:
489 widgets = []
491 for widget in section.get("widgets", []):
492 if "text" in widget:
493 widgets.append({
494 "decoratedText": {
495 "text": widget["text"],
496 "wrapText": True,
497 }
498 })
499 elif "image" in widget:
500 widgets.append({
501 "image": {
502 "imageUrl": widget["image"]["url"],
503 "altText": widget["image"].get("alt", ""),
504 }
505 })
506 elif "buttons" in widget:
507 button_list = []
508 for btn in widget["buttons"]:
509 if btn.get("url"):
510 button_list.append({
511 "text": btn["text"],
512 "onClick": {"openLink": {"url": btn["url"]}},
513 })
514 else:
515 button_list.append({
516 "text": btn["text"],
517 "onClick": {
518 "action": {
519 "function": btn.get("action", btn["text"]),
520 "parameters": btn.get("parameters", []),
521 }
522 },
523 })
524 widgets.append({"buttonList": {"buttons": button_list}})
526 card_sections.append({
527 "header": section.get("header"),
528 "widgets": widgets,
529 })
531 return {
532 "cardsV2": [
533 {
534 "cardId": "main",
535 "card": {
536 "header": {"title": title},
537 "sections": card_sections,
538 }
539 }
540 ]
541 }
543 async def edit_message(
544 self,
545 chat_id: str,
546 message_id: str,
547 text: str,
548 buttons: Optional[List[Dict]] = None,
549 ) -> SendResult:
550 """Edit an existing message."""
551 if not self._chat_service:
552 return SendResult(success=False, error="API mode required for editing")
554 try:
555 message = {"text": text}
557 if buttons:
558 message["cards"] = [self._build_card(text, buttons)]
559 del message["text"]
561 loop = asyncio.get_event_loop()
563 request = self._chat_service.spaces().messages().update(
564 name=message_id,
565 updateMask="text,cards",
566 body=message,
567 )
569 result = await loop.run_in_executor(None, request.execute)
571 return SendResult(
572 success=True,
573 message_id=result.get("name", ""),
574 raw=result,
575 )
577 except Exception as e:
578 logger.error(f"Failed to edit message: {e}")
579 return SendResult(success=False, error=str(e))
581 async def delete_message(self, chat_id: str, message_id: str) -> bool:
582 """Delete a message."""
583 if not self._chat_service:
584 return False
586 try:
587 loop = asyncio.get_event_loop()
589 request = self._chat_service.spaces().messages().delete(
590 name=message_id
591 )
593 await loop.run_in_executor(None, request.execute)
594 return True
596 except Exception as e:
597 logger.error(f"Failed to delete message: {e}")
598 return False
600 async def send_typing(self, chat_id: str) -> None:
601 """Send typing indicator (not supported in Google Chat)."""
602 # Google Chat doesn't have typing indicators
603 pass
605 async def get_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
606 """Get information about a space."""
607 if not self._chat_service:
608 return None
610 try:
611 loop = asyncio.get_event_loop()
613 request = self._chat_service.spaces().get(name=chat_id)
614 result = await loop.run_in_executor(None, request.execute)
616 return {
617 "id": result.get("name"),
618 "type": result.get("type"),
619 "display_name": result.get("displayName"),
620 "single_user_bot_dm": result.get("singleUserBotDm"),
621 }
623 except Exception as e:
624 logger.error(f"Failed to get space info: {e}")
625 return None
627 async def list_spaces(self) -> List[Dict[str, Any]]:
628 """List spaces the bot is a member of."""
629 if not self._chat_service:
630 return []
632 try:
633 loop = asyncio.get_event_loop()
635 request = self._chat_service.spaces().list()
636 result = await loop.run_in_executor(None, request.execute)
638 return result.get("spaces", [])
640 except Exception as e:
641 logger.error(f"Failed to list spaces: {e}")
642 return []
644 async def create_space(
645 self,
646 name: str,
647 external_user_allowed: bool = False,
648 ) -> Optional[str]:
649 """Create a new space."""
650 if not self._chat_service:
651 return None
653 try:
654 loop = asyncio.get_event_loop()
656 request = self._chat_service.spaces().create(
657 body={
658 "displayName": name,
659 "spaceType": "SPACE",
660 "externalUserAllowed": external_user_allowed,
661 }
662 )
664 result = await loop.run_in_executor(None, request.execute)
665 return result.get("name")
667 except Exception as e:
668 logger.error(f"Failed to create space: {e}")
669 return None
672def create_google_chat_adapter(
673 webhook_url: str = None,
674 service_account_file: str = None,
675 **kwargs
676) -> GoogleChatAdapter:
677 """
678 Factory function to create Google Chat adapter.
680 Args:
681 webhook_url: Webhook URL for incoming messages (or set GOOGLE_CHAT_WEBHOOK env var)
682 service_account_file: Path to service account JSON (or set GOOGLE_CHAT_SA_FILE env var)
683 **kwargs: Additional config options
685 Returns:
686 Configured GoogleChatAdapter
687 """
688 webhook_url = webhook_url or os.getenv("GOOGLE_CHAT_WEBHOOK")
689 service_account_file = service_account_file or os.getenv("GOOGLE_CHAT_SA_FILE")
691 if not webhook_url and not service_account_file:
692 raise ValueError("Either webhook_url or service_account_file required")
694 config = ChannelConfig(
695 webhook_url=webhook_url,
696 extra={
697 "service_account_file": service_account_file,
698 **kwargs.get("extra", {}),
699 },
700 **{k: v for k, v in kwargs.items() if k != "extra"},
701 )
702 return GoogleChatAdapter(config)