Coverage for integrations / google_a2a / google_a2a_integration.py: 35.8%

134 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Google A2A (Agent2Agent) Protocol Integration 

3 

4This module implements Google's official A2A protocol for cross-platform agent communication. 

5Uses JSON-RPC 2.0 over HTTP(S) with Agent Cards for discovery. 

6 

7Official Spec: https://a2a-protocol.org/latest/ 

8SDK: https://github.com/a2aproject/a2a-python 

9""" 

10 

11import json 

12import uuid 

13import logging 

14from typing import Dict, List, Any, Optional 

15from datetime import datetime 

16from flask import Flask, request, jsonify, Response 

17import asyncio 

18from enum import Enum 

19 

20logger = logging.getLogger(__name__) 

21 

22# A2A Protocol Version 

23A2A_PROTOCOL_VERSION = "0.2.6" 

24 

25 

26class TaskState(str, Enum): 

27 """A2A Task lifecycle states""" 

28 SUBMITTED = "submitted" 

29 WORKING = "working" 

30 INPUT_REQUIRED = "input_required" 

31 COMPLETED = "completed" 

32 FAILED = "failed" 

33 

34 

35class AgentCard: 

36 """ 

37 Agent Card for A2A discovery 

38 Published at /.well-known/agent.json 

39 """ 

40 

41 def __init__( 

42 self, 

43 name: str, 

44 description: str, 

45 url: str, 

46 version: str, 

47 skills: List[Dict[str, Any]], 

48 capabilities: Optional[Dict[str, Any]] = None, 

49 default_input_modes: Optional[List[str]] = None, 

50 default_output_modes: Optional[List[str]] = None 

51 ): 

52 self.name = name 

53 self.description = description 

54 self.url = url 

55 self.version = version 

56 self.skills = skills 

57 self.capabilities = capabilities or {"streaming": False} 

58 self.default_input_modes = default_input_modes or ["text", "text/plain"] 

59 self.default_output_modes = default_output_modes or ["text", "text/plain"] 

60 

61 def to_dict(self) -> Dict[str, Any]: 

62 """Convert Agent Card to JSON-compatible dict""" 

63 return { 

64 "name": self.name, 

65 "description": self.description, 

66 "url": self.url, 

67 "version": self.version, 

68 "protocolVersion": A2A_PROTOCOL_VERSION, 

69 "capabilities": self.capabilities, 

70 "defaultInputModes": self.default_input_modes, 

71 "defaultOutputModes": self.default_output_modes, 

72 "skills": self.skills 

73 } 

74 

75 

76class A2ATask: 

77 """Represents an A2A task with full lifecycle management""" 

78 

79 def __init__(self, task_id: str, message: Dict[str, Any], context_id: Optional[str] = None): 

80 self.task_id = task_id 

81 self.message = message 

82 self.context_id = context_id or str(uuid.uuid4()) 

83 self.state = TaskState.SUBMITTED 

84 self.created_at = datetime.now() 

85 self.updated_at = datetime.now() 

86 self.result = None 

87 self.error = None 

88 self.metadata = { 

89 "prompt_token_count": 0, 

90 "candidates_token_count": 0, 

91 "total_token_count": 0 

92 } 

93 

94 def update_state(self, new_state: TaskState, result: Optional[Any] = None, error: Optional[str] = None): 

95 """Update task state""" 

96 self.state = new_state 

97 self.updated_at = datetime.now() 

98 if result is not None: 

99 self.result = result 

100 if error is not None: 

101 self.error = error 

102 

103 def to_dict(self) -> Dict[str, Any]: 

104 """Convert task to JSON-compatible dict""" 

105 response = { 

106 "id": self.task_id, 

107 "contextId": self.context_id, 

108 "state": self.state.value, 

109 "timestamp": self.created_at.timestamp(), 

110 "usage_metadata": self.metadata 

111 } 

112 

113 if self.result is not None: 

114 response["content"] = self.result 

115 

116 if self.error is not None: 

117 response["error"] = self.error 

118 

119 return response 

120 

121 

122class A2AMessageHandler: 

123 """Handles A2A JSON-RPC messages and task execution""" 

124 

125 def __init__(self, agent_executor_func): 

126 """ 

127 Initialize message handler 

128 

129 Args: 

130 agent_executor_func: Function that executes agent tasks 

131 Should accept (message_content: str, context_id: str) 

132 Should return: {"role": "model", "parts": [{"text": "..."}]} 

133 """ 

134 self.agent_executor = agent_executor_func 

135 self.tasks: Dict[str, A2ATask] = {} 

136 

137 async def handle_message_send(self, params: Dict[str, Any]) -> Dict[str, Any]: 

138 """ 

139 Handle message/send JSON-RPC method 

140 

141 Args: 

142 params: JSON-RPC params containing message 

143 

144 Returns: 

145 Task response 

146 """ 

147 message = params.get("message", {}) 

148 message_id = message.get("messageId", str(uuid.uuid4())) 

149 context_id = message.get("contextId", str(uuid.uuid4())) 

150 

151 # Extract message content 

152 parts = message.get("parts", []) 

153 message_text = "" 

154 for part in parts: 

155 if part.get("type") == "text": 

156 message_text += part.get("text", "") 

157 

158 # Create task 

159 task = A2ATask(task_id=message_id, message=message, context_id=context_id) 

160 self.tasks[message_id] = task 

161 

162 try: 

163 # Update to working state 

164 task.update_state(TaskState.WORKING) 

165 

166 # Execute agent 

167 logger.info(f"Executing A2A task {message_id}: {message_text[:100]}") 

168 result = await self.agent_executor(message_text, context_id) 

169 

170 # Update to completed state 

171 task.update_state(TaskState.COMPLETED, result=result) 

172 

173 logger.info(f"A2A task {message_id} completed successfully") 

174 

175 except Exception as e: 

176 logger.error(f"A2A task {message_id} failed: {e}") 

177 task.update_state(TaskState.FAILED, error=str(e)) 

178 

179 return task.to_dict() 

180 

181 async def handle_message_get(self, params: Dict[str, Any]) -> Dict[str, Any]: 

182 """ 

183 Handle message/get JSON-RPC method 

184 

185 Args: 

186 params: JSON-RPC params containing task_id 

187 

188 Returns: 

189 Task status 

190 """ 

191 task_id = params.get("taskId") 

192 

193 if task_id not in self.tasks: 

194 return { 

195 "error": { 

196 "code": -32602, 

197 "message": f"Task {task_id} not found" 

198 } 

199 } 

200 

201 task = self.tasks[task_id] 

202 return task.to_dict() 

203 

204 async def handle_task_cancel(self, params: Dict[str, Any]) -> Dict[str, Any]: 

205 """ 

206 Handle task/cancel JSON-RPC method 

207 

208 Args: 

209 params: JSON-RPC params containing task_id 

210 

211 Returns: 

212 Cancellation confirmation 

213 """ 

214 task_id = params.get("taskId") 

215 

216 if task_id not in self.tasks: 

217 return { 

218 "error": { 

219 "code": -32602, 

220 "message": f"Task {task_id} not found" 

221 } 

222 } 

223 

224 task = self.tasks[task_id] 

225 

226 # Only cancel if not already completed/failed 

227 if task.state in [TaskState.SUBMITTED, TaskState.WORKING]: 

228 task.update_state(TaskState.FAILED, error="Task cancelled by client") 

229 return {"success": True, "taskId": task_id} 

230 else: 

231 return { 

232 "error": { 

233 "code": -32600, 

234 "message": f"Cannot cancel task in state {task.state}" 

235 } 

236 } 

237 

238 

239class A2AProtocolServer: 

240 """Google A2A Protocol Server""" 

241 

242 def __init__(self, app: Flask, base_url: str): 

243 """ 

244 Initialize A2A server 

245 

246 Args: 

247 app: Flask application 

248 base_url: Base URL where this agent is hosted 

249 """ 

250 self.app = app 

251 self.base_url = base_url.rstrip('/') 

252 self.agent_cards: Dict[str, AgentCard] = {} 

253 self.message_handlers: Dict[str, A2AMessageHandler] = {} 

254 

255 def register_agent( 

256 self, 

257 agent_id: str, 

258 name: str, 

259 description: str, 

260 skills: List[Dict[str, Any]], 

261 executor_func, 

262 capabilities: Optional[Dict[str, Any]] = None 

263 ): 

264 """ 

265 Register an agent with A2A protocol 

266 

267 Args: 

268 agent_id: Unique agent identifier 

269 name: Agent name 

270 description: Agent description 

271 skills: List of agent skills 

272 executor_func: Async function to execute agent tasks 

273 capabilities: Optional agent capabilities 

274 """ 

275 # Create Agent Card 

276 agent_card = AgentCard( 

277 name=name, 

278 description=description, 

279 url=f"{self.base_url}/a2a/{agent_id}", 

280 version="1.0.0", 

281 skills=skills, 

282 capabilities=capabilities 

283 ) 

284 

285 self.agent_cards[agent_id] = agent_card 

286 

287 # Create message handler 

288 self.message_handlers[agent_id] = A2AMessageHandler(executor_func) 

289 

290 logger.info(f"Registered A2A agent: {agent_id} ({name})") 

291 

292 def setup_routes(self): 

293 """Setup Flask routes for A2A protocol""" 

294 

295 @self.app.route('/a2a/<agent_id>/.well-known/agent.json', methods=['GET']) 

296 def get_agent_card(agent_id): 

297 """Agent Card discovery endpoint""" 

298 if agent_id not in self.agent_cards: 

299 return jsonify({"error": f"Agent {agent_id} not found"}), 404 

300 

301 agent_card = self.agent_cards[agent_id] 

302 return jsonify(agent_card.to_dict()) 

303 

304 @self.app.route('/a2a/<agent_id>/jsonrpc', methods=['POST']) 

305 async def handle_jsonrpc(agent_id): 

306 """JSON-RPC endpoint for A2A messages""" 

307 if agent_id not in self.message_handlers: 

308 return jsonify({ 

309 "jsonrpc": "2.0", 

310 "error": { 

311 "code": -32602, 

312 "message": f"Agent {agent_id} not found" 

313 }, 

314 "id": None 

315 }), 404 

316 

317 try: 

318 rpc_request = request.json 

319 method = rpc_request.get("method") 

320 params = rpc_request.get("params", {}) 

321 rpc_id = rpc_request.get("id") 

322 

323 handler = self.message_handlers[agent_id] 

324 

325 # Route to appropriate handler 

326 if method == "message/send": 

327 result = await handler.handle_message_send(params) 

328 elif method == "message/get": 

329 result = await handler.handle_message_get(params) 

330 elif method == "task/cancel": 

331 result = await handler.handle_task_cancel(params) 

332 else: 

333 return jsonify({ 

334 "jsonrpc": "2.0", 

335 "error": { 

336 "code": -32601, 

337 "message": f"Method {method} not found" 

338 }, 

339 "id": rpc_id 

340 }), 400 

341 

342 # Return JSON-RPC response 

343 return jsonify({ 

344 "jsonrpc": "2.0", 

345 "result": result, 

346 "id": rpc_id 

347 }) 

348 

349 except Exception as e: 

350 logger.error(f"A2A JSON-RPC error: {e}") 

351 return jsonify({ 

352 "jsonrpc": "2.0", 

353 "error": { 

354 "code": -32603, 

355 "message": f"Internal error: {str(e)}" 

356 }, 

357 "id": rpc_request.get("id") if rpc_request else None 

358 }), 500 

359 

360 logger.info("A2A protocol routes configured") 

361 

362 

363# Global A2A server instance 

364_a2a_server: Optional[A2AProtocolServer] = None 

365 

366 

367def initialize_a2a_server(app: Flask, base_url: str) -> A2AProtocolServer: 

368 """ 

369 Initialize Google A2A protocol server 

370 

371 Args: 

372 app: Flask application 

373 base_url: Base URL where agents are hosted 

374 

375 Returns: 

376 A2AProtocolServer instance 

377 """ 

378 global _a2a_server 

379 _a2a_server = A2AProtocolServer(app, base_url) 

380 _a2a_server.setup_routes() 

381 return _a2a_server 

382 

383 

384def get_a2a_server() -> Optional[A2AProtocolServer]: 

385 """Get the global A2A server instance""" 

386 return _a2a_server