Coverage for integrations / google_a2a / a2a_agent_registry.py: 27.1%

48 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Google A2A Agent Registration and Executor Functions 

3 

4This module registers our agents (Assistant, Helper, Executor, Verify) with the 

5Google A2A Protocol server and provides executor functions for A2A message handling. 

6""" 

7 

8import logging 

9from typing import Dict, Any 

10from .google_a2a_integration import get_a2a_server 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15# ============================================================================ 

16# Agent Executor Functions 

17# ============================================================================ 

18 

19async def assistant_executor(message: str, context_id: str) -> Dict[str, Any]: 

20 """ 

21 Execute Assistant agent tasks for A2A protocol 

22 

23 Args: 

24 message: User message text 

25 context_id: A2A context ID for conversation tracking 

26 

27 Returns: 

28 A2A response in format: {"role": "model", "parts": [{"text": "..."}]} 

29 """ 

30 try: 

31 # Import here to avoid circular dependencies 

32 from reuse_recipe import chat_agent 

33 

34 logger.info(f"Assistant A2A executing: {message[:100]}... (context: {context_id})") 

35 

36 # Execute using the chat_agent (Assistant) from reuse_recipe 

37 result = chat_agent(message) 

38 

39 # Format as A2A response 

40 return { 

41 "role": "model", 

42 "parts": [{"text": str(result)}] 

43 } 

44 

45 except Exception as e: 

46 logger.error(f"Assistant A2A executor error: {e}") 

47 return { 

48 "role": "model", 

49 "parts": [{"text": f"Error executing task: {str(e)}"}] 

50 } 

51 

52 

53async def helper_executor(message: str, context_id: str) -> Dict[str, Any]: 

54 """ 

55 Execute Helper agent tasks for A2A protocol 

56 

57 Args: 

58 message: User message text 

59 context_id: A2A context ID for conversation tracking 

60 

61 Returns: 

62 A2A response in format: {"role": "model", "parts": [{"text": "..."}]} 

63 """ 

64 try: 

65 # Import here to avoid circular dependencies 

66 from create_recipe import recipe 

67 

68 logger.info(f"Helper A2A executing: {message[:100]}... (context: {context_id})") 

69 

70 # Execute using the recipe (Helper) from create_recipe 

71 result = recipe(message, agent_name="helper") 

72 

73 # Format as A2A response 

74 return { 

75 "role": "model", 

76 "parts": [{"text": str(result)}] 

77 } 

78 

79 except Exception as e: 

80 logger.error(f"Helper A2A executor error: {e}") 

81 return { 

82 "role": "model", 

83 "parts": [{"text": f"Error executing task: {str(e)}"}] 

84 } 

85 

86 

87async def executor_executor(message: str, context_id: str) -> Dict[str, Any]: 

88 """ 

89 Execute Executor agent tasks for A2A protocol 

90 

91 Args: 

92 message: User message text 

93 context_id: A2A context ID for conversation tracking 

94 

95 Returns: 

96 A2A response in format: {"role": "model", "parts": [{"text": "..."}]} 

97 """ 

98 try: 

99 # Import here to avoid circular dependencies 

100 from create_recipe import recipe 

101 

102 logger.info(f"Executor A2A executing: {message[:100]}... (context: {context_id})") 

103 

104 # Execute using the recipe (Executor) from create_recipe 

105 result = recipe(message, agent_name="executor") 

106 

107 # Format as A2A response 

108 return { 

109 "role": "model", 

110 "parts": [{"text": str(result)}] 

111 } 

112 

113 except Exception as e: 

114 logger.error(f"Executor A2A executor error: {e}") 

115 return { 

116 "role": "model", 

117 "parts": [{"text": f"Error executing task: {str(e)}"}] 

118 } 

119 

120 

121async def verify_executor(message: str, context_id: str) -> Dict[str, Any]: 

122 """ 

123 Execute Verify agent tasks for A2A protocol 

124 

125 Args: 

126 message: User message text 

127 context_id: A2A context ID for conversation tracking 

128 

129 Returns: 

130 A2A response in format: {"role": "model", "parts": [{"text": "..."}]} 

131 """ 

132 try: 

133 # Import here to avoid circular dependencies 

134 from create_recipe import recipe 

135 

136 logger.info(f"Verify A2A executing: {message[:100]}... (context: {context_id})") 

137 

138 # Execute using the recipe (Verify) from create_recipe 

139 result = recipe(message, agent_name="verify") 

140 

141 # Format as A2A response 

142 return { 

143 "role": "model", 

144 "parts": [{"text": str(result)}] 

145 } 

146 

147 except Exception as e: 

148 logger.error(f"Verify A2A executor error: {e}") 

149 return { 

150 "role": "model", 

151 "parts": [{"text": f"Error executing task: {str(e)}"}] 

152 } 

153 

154 

155# ============================================================================ 

156# Agent Skills Definitions (A2A Format) 

157# ============================================================================ 

158 

159ASSISTANT_SKILLS = [ 

160 { 

161 "name": "task_coordination", 

162 "description": "Coordinate complex multi-step tasks across multiple agents", 

163 "examples": [ 

164 "Plan and execute a research project", 

165 "Orchestrate data analysis workflow", 

166 "Manage multi-agent collaboration" 

167 ], 

168 "input_modes": ["text", "text/plain"], 

169 "output_modes": ["text", "text/plain"] 

170 }, 

171 { 

172 "name": "decision_making", 

173 "description": "Make strategic decisions based on context and requirements", 

174 "examples": [ 

175 "Choose the best approach for a task", 

176 "Prioritize competing requirements", 

177 "Select optimal tools and methods" 

178 ], 

179 "input_modes": ["text", "text/plain"], 

180 "output_modes": ["text", "text/plain"] 

181 }, 

182 { 

183 "name": "context_management", 

184 "description": "Manage conversation context and maintain coherent dialogue", 

185 "examples": [ 

186 "Track conversation history", 

187 "Maintain task context across turns", 

188 "Summarize complex discussions" 

189 ], 

190 "input_modes": ["text", "text/plain"], 

191 "output_modes": ["text", "text/plain"] 

192 } 

193] 

194 

195HELPER_SKILLS = [ 

196 { 

197 "name": "tool_execution", 

198 "description": "Execute various tools and functions for data manipulation", 

199 "examples": [ 

200 "Run API calls to external services", 

201 "Execute file operations", 

202 "Use web search and retrieval tools" 

203 ], 

204 "input_modes": ["text", "text/plain"], 

205 "output_modes": ["text", "text/plain", "application/json"] 

206 }, 

207 { 

208 "name": "data_processing", 

209 "description": "Process and transform data in various formats", 

210 "examples": [ 

211 "Parse JSON and XML data", 

212 "Transform data structures", 

213 "Filter and aggregate data" 

214 ], 

215 "input_modes": ["text", "text/plain", "application/json"], 

216 "output_modes": ["text", "text/plain", "application/json"] 

217 }, 

218 { 

219 "name": "external_api", 

220 "description": "Interact with external APIs and web services", 

221 "examples": [ 

222 "Call REST APIs", 

223 "Fetch web resources", 

224 "Submit data to external services" 

225 ], 

226 "input_modes": ["text", "text/plain"], 

227 "output_modes": ["text", "text/plain", "application/json"] 

228 } 

229] 

230 

231EXECUTOR_SKILLS = [ 

232 { 

233 "name": "code_execution", 

234 "description": "Execute code safely in various languages", 

235 "examples": [ 

236 "Run Python scripts", 

237 "Execute shell commands", 

238 "Evaluate mathematical expressions" 

239 ], 

240 "input_modes": ["text", "text/plain", "application/x-python-code"], 

241 "output_modes": ["text", "text/plain"] 

242 }, 

243 { 

244 "name": "computation", 

245 "description": "Perform complex computations and calculations", 

246 "examples": [ 

247 "Solve mathematical problems", 

248 "Run statistical analysis", 

249 "Perform numerical simulations" 

250 ], 

251 "input_modes": ["text", "text/plain"], 

252 "output_modes": ["text", "text/plain", "application/json"] 

253 }, 

254 { 

255 "name": "data_analysis", 

256 "description": "Analyze data and generate insights", 

257 "examples": [ 

258 "Generate statistical summaries", 

259 "Identify patterns and trends", 

260 "Create data visualizations" 

261 ], 

262 "input_modes": ["text", "text/plain", "application/json"], 

263 "output_modes": ["text", "text/plain", "application/json"] 

264 } 

265] 

266 

267VERIFY_SKILLS = [ 

268 { 

269 "name": "status_verification", 

270 "description": "Verify task completion status and results", 

271 "examples": [ 

272 "Check if task completed successfully", 

273 "Verify output correctness", 

274 "Monitor task progress" 

275 ], 

276 "input_modes": ["text", "text/plain"], 

277 "output_modes": ["text", "text/plain"] 

278 }, 

279 { 

280 "name": "quality_assurance", 

281 "description": "Ensure output quality meets requirements", 

282 "examples": [ 

283 "Validate output format", 

284 "Check for errors and inconsistencies", 

285 "Verify data integrity" 

286 ], 

287 "input_modes": ["text", "text/plain", "application/json"], 

288 "output_modes": ["text", "text/plain"] 

289 }, 

290 { 

291 "name": "validation", 

292 "description": "Validate results and outputs against specifications", 

293 "examples": [ 

294 "Verify API response format", 

295 "Check data against schema", 

296 "Validate calculations" 

297 ], 

298 "input_modes": ["text", "text/plain", "application/json"], 

299 "output_modes": ["text", "text/plain"] 

300 } 

301] 

302 

303 

304# ============================================================================ 

305# Agent Registration Function 

306# ============================================================================ 

307 

308def register_all_agents(): 

309 """ 

310 Register all agents (Assistant, Helper, Executor, Verify) with the A2A server 

311 """ 

312 try: 

313 a2a_server = get_a2a_server() 

314 

315 if a2a_server is None: 

316 logger.warning("A2A server not initialized, skipping agent registration") 

317 return 

318 

319 logger.info("Registering agents with Google A2A Protocol...") 

320 

321 # Register Assistant Agent 

322 a2a_server.register_agent( 

323 agent_id="assistant", 

324 name="Assistant Agent", 

325 description="Coordinate tasks, make decisions, and manage conversation context", 

326 skills=ASSISTANT_SKILLS, 

327 executor_func=assistant_executor, 

328 capabilities={"streaming": False, "async": True} 

329 ) 

330 logger.info("Registered Assistant agent with A2A") 

331 

332 # Register Helper Agent 

333 a2a_server.register_agent( 

334 agent_id="helper", 

335 name="Helper Agent", 

336 description="Execute tools, process data, and interact with external APIs", 

337 skills=HELPER_SKILLS, 

338 executor_func=helper_executor, 

339 capabilities={"streaming": False, "async": True} 

340 ) 

341 logger.info("Registered Helper agent with A2A") 

342 

343 # Register Executor Agent 

344 a2a_server.register_agent( 

345 agent_id="executor", 

346 name="Executor Agent", 

347 description="Execute code, perform computations, and analyze data", 

348 skills=EXECUTOR_SKILLS, 

349 executor_func=executor_executor, 

350 capabilities={"streaming": False, "async": True} 

351 ) 

352 logger.info("Registered Executor agent with A2A") 

353 

354 # Register Verify Agent 

355 a2a_server.register_agent( 

356 agent_id="verify", 

357 name="Verify Agent", 

358 description="Verify task status, ensure quality, and validate results", 

359 skills=VERIFY_SKILLS, 

360 executor_func=verify_executor, 

361 capabilities={"streaming": False, "async": True} 

362 ) 

363 logger.info("Registered Verify agent with A2A") 

364 

365 logger.info("All agents registered with Google A2A Protocol successfully") 

366 

367 except Exception as e: 

368 logger.error(f"Error registering agents with A2A: {e}") 

369 import traceback 

370 traceback.print_exc()