Coverage for integrations / google_a2a / dynamic_agent_registry.py: 38.7%

142 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Dynamic Agent Registry for Google A2A Protocol 

3 

4Automatically discovers and registers agents from prompt JSON files. 

5Each trained agent (recipe JSON) becomes an A2A-compatible specialist. 

6 

7Architecture: 

8- Scans prompts/ directory for {prompt_id}_{flow_id}_{role_number}.json files 

9- Extracts agent capabilities from recipe JSONs 

10- Automatically creates A2A Agent Cards 

11- Registers with Google A2A Protocol server 

12- No hardcoded agents - fully dynamic! 

13""" 

14 

15import os 

16import json 

17import logging 

18import glob 

19from typing import Dict, List, Any, Optional 

20from dataclasses import dataclass 

21from pathlib import Path 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26@dataclass 

27class TrainedAgent: 

28 """ 

29 Represents a trained agent from recipe JSON 

30 

31 File naming pattern: {prompt_id}_{flow_id}_recipe.json 

32 Note: Each FLOW has a persona/role, not each role having multiple flows 

33 """ 

34 agent_id: str # e.g., "71_0" for prompt 71, flow 0 

35 prompt_id: int 

36 flow_id: int 

37 persona: str 

38 action: str 

39 recipe: List[Dict[str, Any]] 

40 status: str 

41 can_perform_without_user_input: str 

42 fallback_action: str 

43 metadata: Dict[str, Any] 

44 recipe_file: str 

45 flow_name: str = "" 

46 sub_goal: str = "" 

47 

48 

49class DynamicAgentDiscovery: 

50 """Discovers trained agents from prompts directory""" 

51 

52 def __init__(self, prompts_dir: str = "prompts"): 

53 self.prompts_dir = prompts_dir 

54 self.discovered_agents: Dict[str, TrainedAgent] = {} 

55 self.prompt_definitions: Dict[int, Dict[str, Any]] = {} 

56 

57 def discover_all_agents(self) -> int: 

58 """ 

59 Discover all trained agents from recipe JSON files 

60 

61 Returns: 

62 Number of agents discovered 

63 """ 

64 logger.info(f"Scanning {self.prompts_dir} for trained agents...") 

65 

66 # First, load all main prompt definitions (e.g., 71.json, 8888.json) 

67 self._load_prompt_definitions() 

68 

69 # Then discover all recipe JSONs (e.g., 71_0_recipe.json) 

70 recipe_pattern = os.path.join(self.prompts_dir, "*_*_recipe.json") 

71 recipe_files = glob.glob(recipe_pattern) 

72 

73 for recipe_file in recipe_files: 

74 try: 

75 agent = self._load_agent_from_recipe(recipe_file) 

76 if agent: 

77 self.discovered_agents[agent.agent_id] = agent 

78 logger.info(f"Discovered agent: {agent.agent_id} (persona: {agent.persona})") 

79 except Exception as e: 

80 logger.warning(f"Failed to load agent from {recipe_file}: {e}") 

81 

82 logger.info(f"Discovered {len(self.discovered_agents)} trained agents") 

83 return len(self.discovered_agents) 

84 

85 def _load_prompt_definitions(self): 

86 """Load main prompt definition files (e.g., 71.json, 8888.json)""" 

87 prompt_files = glob.glob(os.path.join(self.prompts_dir, "*.json")) 

88 

89 for prompt_file in prompt_files: 

90 filename = os.path.basename(prompt_file) 

91 

92 # Skip recipe files (they have underscores) 

93 if "_" in filename: 

94 continue 

95 

96 try: 

97 prompt_id = int(filename.replace(".json", "")) 

98 

99 with open(prompt_file, 'r', encoding='utf-8') as f: 

100 prompt_def = json.load(f) 

101 

102 self.prompt_definitions[prompt_id] = prompt_def 

103 logger.debug(f"Loaded prompt definition: {prompt_id}") 

104 

105 except (ValueError, json.JSONDecodeError) as e: 

106 logger.debug(f"Skipping non-prompt file: {filename}") 

107 

108 def _load_agent_from_recipe(self, recipe_file: str) -> Optional[TrainedAgent]: 

109 """ 

110 Load a trained agent from recipe JSON file 

111 

112 Pattern: {prompt_id}_{flow_id}_recipe.json 

113 Example: 71_0_recipe.json = prompt 71, flow 0 

114 """ 

115 filename = os.path.basename(recipe_file) 

116 

117 # Parse filename: {prompt_id}_{flow_id}_recipe.json 

118 parts = filename.replace("_recipe.json", "").split("_") 

119 if len(parts) != 2: 

120 logger.debug(f"Skipping {filename} - doesn't match pattern") 

121 return None 

122 

123 try: 

124 prompt_id = int(parts[0]) 

125 flow_id = int(parts[1]) 

126 except ValueError: 

127 return None 

128 

129 # Load recipe JSON 

130 with open(recipe_file, 'r', encoding='utf-8') as f: 

131 recipe_data = json.load(f) 

132 

133 # Create agent ID 

134 agent_id = f"{prompt_id}_{flow_id}" 

135 

136 # Get flow information from prompt definition 

137 prompt_def = self.prompt_definitions.get(prompt_id, {}) 

138 flows = prompt_def.get("flows", []) 

139 

140 flow_name = "" 

141 sub_goal = "" 

142 if flow_id < len(flows): 

143 flow_info = flows[flow_id] 

144 flow_name = flow_info.get("flow_name", "") 

145 sub_goal = flow_info.get("sub_goal", "") 

146 

147 # Extract agent information 

148 agent = TrainedAgent( 

149 agent_id=agent_id, 

150 prompt_id=prompt_id, 

151 flow_id=flow_id, 

152 persona=recipe_data.get("persona", "unknown"), 

153 action=recipe_data.get("action", ""), 

154 recipe=recipe_data.get("recipe", []), 

155 status=recipe_data.get("status", "unknown"), 

156 can_perform_without_user_input=recipe_data.get("can_perform_without_user_input", "no"), 

157 fallback_action=recipe_data.get("fallback_action", ""), 

158 metadata=recipe_data.get("metadata", {}), 

159 recipe_file=recipe_file, 

160 flow_name=flow_name, 

161 sub_goal=sub_goal 

162 ) 

163 

164 return agent 

165 

166 def get_agent_skills(self, agent: TrainedAgent) -> List[Dict[str, Any]]: 

167 """ 

168 Extract skills from trained agent's recipe 

169 

170 Returns A2A-compatible skills list 

171 """ 

172 skills = [] 

173 

174 # Get prompt definition for context 

175 prompt_def = self.prompt_definitions.get(agent.prompt_id, {}) 

176 prompt_name = prompt_def.get("name", f"Prompt {agent.prompt_id}") 

177 

178 # Get persona description 

179 personas = prompt_def.get("personas", []) 

180 persona_desc = next( 

181 (p["description"] for p in personas if p["name"] == agent.persona), 

182 f"Specialist for {agent.persona}" 

183 ) 

184 

185 # Get flow information 

186 flows = prompt_def.get("flows", []) 

187 if agent.flow_id < len(flows): 

188 flow = flows[agent.flow_id] 

189 flow_name = flow.get("flow_name", f"Flow {agent.flow_id}") 

190 sub_goal = flow.get("sub_goal", "") 

191 else: 

192 flow_name = f"Flow {agent.flow_id}" 

193 sub_goal = "" 

194 

195 # Create primary skill based on agent's trained action 

196 primary_skill = { 

197 "name": f"{agent.persona}_{flow_name.replace(' ', '_')}".lower(), 

198 "description": agent.action, 

199 "examples": [ 

200 agent.action, 

201 sub_goal if sub_goal else agent.action 

202 ], 

203 "input_modes": ["text", "text/plain"], 

204 "output_modes": ["text", "text/plain", "application/json"], 

205 "metadata": { 

206 "prompt_id": agent.prompt_id, 

207 "flow_id": agent.flow_id, 

208 "flow_name": agent.flow_name, 

209 "persona": agent.persona, 

210 "autonomous": agent.can_perform_without_user_input == "yes", 

211 "has_fallback": bool(agent.fallback_action), 

212 "recipe_steps": len(agent.recipe) 

213 } 

214 } 

215 

216 skills.append(primary_skill) 

217 

218 # Add individual recipe steps as sub-skills 

219 for idx, step in enumerate(agent.recipe): 

220 step_skill = { 

221 "name": f"step_{idx+1}_{step.get('tool_name', 'action')}".lower().replace(' ', '_'), 

222 "description": step.get("steps", ""), 

223 "examples": [step.get("steps", "")], 

224 "input_modes": ["text", "text/plain"], 

225 "output_modes": ["text", "text/plain"], 

226 "metadata": { 

227 "step_number": idx + 1, 

228 "tool_name": step.get("tool_name", "None"), 

229 "agent_performer": step.get("agent_to_perform_this_action", "") 

230 } 

231 } 

232 skills.append(step_skill) 

233 

234 return skills 

235 

236 def get_agent_description(self, agent: TrainedAgent) -> str: 

237 """Generate comprehensive agent description""" 

238 prompt_def = self.prompt_definitions.get(agent.prompt_id, {}) 

239 prompt_name = prompt_def.get("name", f"Prompt {agent.prompt_id}") 

240 

241 personas = prompt_def.get("personas", []) 

242 persona_desc = next( 

243 (p["description"] for p in personas if p["name"] == agent.persona), 

244 "" 

245 ) 

246 

247 description = f"Trained specialist for '{prompt_name}' - {persona_desc}. " 

248 description += f"Specialized in: {agent.action}. " 

249 description += f"Recipe contains {len(agent.recipe)} steps. " 

250 

251 if agent.can_perform_without_user_input == "yes": 

252 description += "Can operate autonomously. " 

253 

254 if agent.fallback_action: 

255 description += f"Has fallback strategy: {agent.fallback_action}" 

256 

257 return description 

258 

259 def get_all_agents(self) -> List[TrainedAgent]: 

260 """Get list of all discovered agents""" 

261 return list(self.discovered_agents.values()) 

262 

263 def get_agent_by_id(self, agent_id: str) -> Optional[TrainedAgent]: 

264 """Get specific agent by ID""" 

265 return self.discovered_agents.get(agent_id) 

266 

267 

268class DynamicAgentExecutor: 

269 """Executes tasks for dynamically discovered agents""" 

270 

271 def __init__(self): 

272 self.discovery = DynamicAgentDiscovery() 

273 self.discovery.discover_all_agents() 

274 

275 async def execute_agent_task(self, agent_id: str, message: str, context_id: str) -> Dict[str, Any]: 

276 """ 

277 Execute a task for a dynamically discovered agent 

278 

279 Args: 

280 agent_id: Agent identifier (e.g., "71_0_1") 

281 message: Task message 

282 context_id: A2A context ID 

283 

284 Returns: 

285 A2A response format 

286 """ 

287 agent = self.discovery.get_agent_by_id(agent_id) 

288 

289 if not agent: 

290 return { 

291 "role": "model", 

292 "parts": [{ 

293 "text": f"Error: Agent {agent_id} not found. Agent may not be trained yet." 

294 }] 

295 } 

296 

297 try: 

298 # Import execution functions 

299 from create_recipe import recipe 

300 from reuse_recipe import chat_agent 

301 

302 logger.info(f"Executing task for agent {agent_id} (persona: {agent.persona})") 

303 

304 # Determine execution mode based on agent status 

305 from core.constants import DEFAULT_USER_ID 

306 if agent.status == "done" or agent.status == "completed": 

307 # Use reuse mode (agent has trained recipe) 

308 result = chat_agent( 

309 message, 

310 user_id=agent.metadata.get("user_id", DEFAULT_USER_ID), 

311 prompt_id=agent.prompt_id 

312 ) 

313 else: 

314 # Use create mode (agent still learning) 

315 result = recipe( 

316 user_id=agent.metadata.get("user_id", DEFAULT_USER_ID), 

317 message=message, 

318 prompt_id=agent.prompt_id 

319 ) 

320 

321 return { 

322 "role": "model", 

323 "parts": [{ 

324 "text": str(result), 

325 "metadata": { 

326 "agent_id": agent_id, 

327 "persona": agent.persona, 

328 "execution_mode": "reuse" if agent.status == "done" else "create" 

329 } 

330 }] 

331 } 

332 

333 except Exception as e: 

334 logger.error(f"Agent {agent_id} execution failed: {e}") 

335 return { 

336 "role": "model", 

337 "parts": [{ 

338 "text": f"Error executing agent {agent_id}: {str(e)}" 

339 }] 

340 } 

341 

342 

343# Global instances 

344_dynamic_discovery = None 

345_dynamic_executor = None 

346 

347 

348def get_dynamic_discovery() -> DynamicAgentDiscovery: 

349 """Get global dynamic discovery instance""" 

350 global _dynamic_discovery 

351 if _dynamic_discovery is None: 

352 _dynamic_discovery = DynamicAgentDiscovery() 

353 return _dynamic_discovery 

354 

355 

356def get_dynamic_executor() -> DynamicAgentExecutor: 

357 """Get global dynamic executor instance""" 

358 global _dynamic_executor 

359 if _dynamic_executor is None: 

360 _dynamic_executor = DynamicAgentExecutor() 

361 return _dynamic_executor