Coverage for integrations / google_a2a / google_a2a_integration.py: 35.8%
134 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Google A2A (Agent2Agent) Protocol Integration
4This module implements Google's official A2A protocol for cross-platform agent communication.
5Uses JSON-RPC 2.0 over HTTP(S) with Agent Cards for discovery.
7Official Spec: https://a2a-protocol.org/latest/
8SDK: https://github.com/a2aproject/a2a-python
9"""
11import json
12import uuid
13import logging
14from typing import Dict, List, Any, Optional
15from datetime import datetime
16from flask import Flask, request, jsonify, Response
17import asyncio
18from enum import Enum
20logger = logging.getLogger(__name__)
22# A2A Protocol Version
23A2A_PROTOCOL_VERSION = "0.2.6"
26class TaskState(str, Enum):
27 """A2A Task lifecycle states"""
28 SUBMITTED = "submitted"
29 WORKING = "working"
30 INPUT_REQUIRED = "input_required"
31 COMPLETED = "completed"
32 FAILED = "failed"
35class AgentCard:
36 """
37 Agent Card for A2A discovery
38 Published at /.well-known/agent.json
39 """
41 def __init__(
42 self,
43 name: str,
44 description: str,
45 url: str,
46 version: str,
47 skills: List[Dict[str, Any]],
48 capabilities: Optional[Dict[str, Any]] = None,
49 default_input_modes: Optional[List[str]] = None,
50 default_output_modes: Optional[List[str]] = None
51 ):
52 self.name = name
53 self.description = description
54 self.url = url
55 self.version = version
56 self.skills = skills
57 self.capabilities = capabilities or {"streaming": False}
58 self.default_input_modes = default_input_modes or ["text", "text/plain"]
59 self.default_output_modes = default_output_modes or ["text", "text/plain"]
61 def to_dict(self) -> Dict[str, Any]:
62 """Convert Agent Card to JSON-compatible dict"""
63 return {
64 "name": self.name,
65 "description": self.description,
66 "url": self.url,
67 "version": self.version,
68 "protocolVersion": A2A_PROTOCOL_VERSION,
69 "capabilities": self.capabilities,
70 "defaultInputModes": self.default_input_modes,
71 "defaultOutputModes": self.default_output_modes,
72 "skills": self.skills
73 }
76class A2ATask:
77 """Represents an A2A task with full lifecycle management"""
79 def __init__(self, task_id: str, message: Dict[str, Any], context_id: Optional[str] = None):
80 self.task_id = task_id
81 self.message = message
82 self.context_id = context_id or str(uuid.uuid4())
83 self.state = TaskState.SUBMITTED
84 self.created_at = datetime.now()
85 self.updated_at = datetime.now()
86 self.result = None
87 self.error = None
88 self.metadata = {
89 "prompt_token_count": 0,
90 "candidates_token_count": 0,
91 "total_token_count": 0
92 }
94 def update_state(self, new_state: TaskState, result: Optional[Any] = None, error: Optional[str] = None):
95 """Update task state"""
96 self.state = new_state
97 self.updated_at = datetime.now()
98 if result is not None:
99 self.result = result
100 if error is not None:
101 self.error = error
103 def to_dict(self) -> Dict[str, Any]:
104 """Convert task to JSON-compatible dict"""
105 response = {
106 "id": self.task_id,
107 "contextId": self.context_id,
108 "state": self.state.value,
109 "timestamp": self.created_at.timestamp(),
110 "usage_metadata": self.metadata
111 }
113 if self.result is not None:
114 response["content"] = self.result
116 if self.error is not None:
117 response["error"] = self.error
119 return response
122class A2AMessageHandler:
123 """Handles A2A JSON-RPC messages and task execution"""
125 def __init__(self, agent_executor_func):
126 """
127 Initialize message handler
129 Args:
130 agent_executor_func: Function that executes agent tasks
131 Should accept (message_content: str, context_id: str)
132 Should return: {"role": "model", "parts": [{"text": "..."}]}
133 """
134 self.agent_executor = agent_executor_func
135 self.tasks: Dict[str, A2ATask] = {}
137 async def handle_message_send(self, params: Dict[str, Any]) -> Dict[str, Any]:
138 """
139 Handle message/send JSON-RPC method
141 Args:
142 params: JSON-RPC params containing message
144 Returns:
145 Task response
146 """
147 message = params.get("message", {})
148 message_id = message.get("messageId", str(uuid.uuid4()))
149 context_id = message.get("contextId", str(uuid.uuid4()))
151 # Extract message content
152 parts = message.get("parts", [])
153 message_text = ""
154 for part in parts:
155 if part.get("type") == "text":
156 message_text += part.get("text", "")
158 # Create task
159 task = A2ATask(task_id=message_id, message=message, context_id=context_id)
160 self.tasks[message_id] = task
162 try:
163 # Update to working state
164 task.update_state(TaskState.WORKING)
166 # Execute agent
167 logger.info(f"Executing A2A task {message_id}: {message_text[:100]}")
168 result = await self.agent_executor(message_text, context_id)
170 # Update to completed state
171 task.update_state(TaskState.COMPLETED, result=result)
173 logger.info(f"A2A task {message_id} completed successfully")
175 except Exception as e:
176 logger.error(f"A2A task {message_id} failed: {e}")
177 task.update_state(TaskState.FAILED, error=str(e))
179 return task.to_dict()
181 async def handle_message_get(self, params: Dict[str, Any]) -> Dict[str, Any]:
182 """
183 Handle message/get JSON-RPC method
185 Args:
186 params: JSON-RPC params containing task_id
188 Returns:
189 Task status
190 """
191 task_id = params.get("taskId")
193 if task_id not in self.tasks:
194 return {
195 "error": {
196 "code": -32602,
197 "message": f"Task {task_id} not found"
198 }
199 }
201 task = self.tasks[task_id]
202 return task.to_dict()
204 async def handle_task_cancel(self, params: Dict[str, Any]) -> Dict[str, Any]:
205 """
206 Handle task/cancel JSON-RPC method
208 Args:
209 params: JSON-RPC params containing task_id
211 Returns:
212 Cancellation confirmation
213 """
214 task_id = params.get("taskId")
216 if task_id not in self.tasks:
217 return {
218 "error": {
219 "code": -32602,
220 "message": f"Task {task_id} not found"
221 }
222 }
224 task = self.tasks[task_id]
226 # Only cancel if not already completed/failed
227 if task.state in [TaskState.SUBMITTED, TaskState.WORKING]:
228 task.update_state(TaskState.FAILED, error="Task cancelled by client")
229 return {"success": True, "taskId": task_id}
230 else:
231 return {
232 "error": {
233 "code": -32600,
234 "message": f"Cannot cancel task in state {task.state}"
235 }
236 }
239class A2AProtocolServer:
240 """Google A2A Protocol Server"""
242 def __init__(self, app: Flask, base_url: str):
243 """
244 Initialize A2A server
246 Args:
247 app: Flask application
248 base_url: Base URL where this agent is hosted
249 """
250 self.app = app
251 self.base_url = base_url.rstrip('/')
252 self.agent_cards: Dict[str, AgentCard] = {}
253 self.message_handlers: Dict[str, A2AMessageHandler] = {}
255 def register_agent(
256 self,
257 agent_id: str,
258 name: str,
259 description: str,
260 skills: List[Dict[str, Any]],
261 executor_func,
262 capabilities: Optional[Dict[str, Any]] = None
263 ):
264 """
265 Register an agent with A2A protocol
267 Args:
268 agent_id: Unique agent identifier
269 name: Agent name
270 description: Agent description
271 skills: List of agent skills
272 executor_func: Async function to execute agent tasks
273 capabilities: Optional agent capabilities
274 """
275 # Create Agent Card
276 agent_card = AgentCard(
277 name=name,
278 description=description,
279 url=f"{self.base_url}/a2a/{agent_id}",
280 version="1.0.0",
281 skills=skills,
282 capabilities=capabilities
283 )
285 self.agent_cards[agent_id] = agent_card
287 # Create message handler
288 self.message_handlers[agent_id] = A2AMessageHandler(executor_func)
290 logger.info(f"Registered A2A agent: {agent_id} ({name})")
292 def setup_routes(self):
293 """Setup Flask routes for A2A protocol"""
295 @self.app.route('/a2a/<agent_id>/.well-known/agent.json', methods=['GET'])
296 def get_agent_card(agent_id):
297 """Agent Card discovery endpoint"""
298 if agent_id not in self.agent_cards:
299 return jsonify({"error": f"Agent {agent_id} not found"}), 404
301 agent_card = self.agent_cards[agent_id]
302 return jsonify(agent_card.to_dict())
304 @self.app.route('/a2a/<agent_id>/jsonrpc', methods=['POST'])
305 async def handle_jsonrpc(agent_id):
306 """JSON-RPC endpoint for A2A messages"""
307 if agent_id not in self.message_handlers:
308 return jsonify({
309 "jsonrpc": "2.0",
310 "error": {
311 "code": -32602,
312 "message": f"Agent {agent_id} not found"
313 },
314 "id": None
315 }), 404
317 try:
318 rpc_request = request.json
319 method = rpc_request.get("method")
320 params = rpc_request.get("params", {})
321 rpc_id = rpc_request.get("id")
323 handler = self.message_handlers[agent_id]
325 # Route to appropriate handler
326 if method == "message/send":
327 result = await handler.handle_message_send(params)
328 elif method == "message/get":
329 result = await handler.handle_message_get(params)
330 elif method == "task/cancel":
331 result = await handler.handle_task_cancel(params)
332 else:
333 return jsonify({
334 "jsonrpc": "2.0",
335 "error": {
336 "code": -32601,
337 "message": f"Method {method} not found"
338 },
339 "id": rpc_id
340 }), 400
342 # Return JSON-RPC response
343 return jsonify({
344 "jsonrpc": "2.0",
345 "result": result,
346 "id": rpc_id
347 })
349 except Exception as e:
350 logger.error(f"A2A JSON-RPC error: {e}")
351 return jsonify({
352 "jsonrpc": "2.0",
353 "error": {
354 "code": -32603,
355 "message": f"Internal error: {str(e)}"
356 },
357 "id": rpc_request.get("id") if rpc_request else None
358 }), 500
360 logger.info("A2A protocol routes configured")
363# Global A2A server instance
364_a2a_server: Optional[A2AProtocolServer] = None
367def initialize_a2a_server(app: Flask, base_url: str) -> A2AProtocolServer:
368 """
369 Initialize Google A2A protocol server
371 Args:
372 app: Flask application
373 base_url: Base URL where agents are hosted
375 Returns:
376 A2AProtocolServer instance
377 """
378 global _a2a_server
379 _a2a_server = A2AProtocolServer(app, base_url)
380 _a2a_server.setup_routes()
381 return _a2a_server
384def get_a2a_server() -> Optional[A2AProtocolServer]:
385 """Get the global A2A server instance"""
386 return _a2a_server