Coverage for integrations / google_a2a / dynamic_agent_registry.py: 38.7%
142 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Dynamic Agent Registry for Google A2A Protocol
4Automatically discovers and registers agents from prompt JSON files.
5Each trained agent (recipe JSON) becomes an A2A-compatible specialist.
7Architecture:
8- Scans prompts/ directory for {prompt_id}_{flow_id}_{role_number}.json files
9- Extracts agent capabilities from recipe JSONs
10- Automatically creates A2A Agent Cards
11- Registers with Google A2A Protocol server
12- No hardcoded agents - fully dynamic!
13"""
15import os
16import json
17import logging
18import glob
19from typing import Dict, List, Any, Optional
20from dataclasses import dataclass
21from pathlib import Path
23logger = logging.getLogger(__name__)
26@dataclass
27class TrainedAgent:
28 """
29 Represents a trained agent from recipe JSON
31 File naming pattern: {prompt_id}_{flow_id}_recipe.json
32 Note: Each FLOW has a persona/role, not each role having multiple flows
33 """
34 agent_id: str # e.g., "71_0" for prompt 71, flow 0
35 prompt_id: int
36 flow_id: int
37 persona: str
38 action: str
39 recipe: List[Dict[str, Any]]
40 status: str
41 can_perform_without_user_input: str
42 fallback_action: str
43 metadata: Dict[str, Any]
44 recipe_file: str
45 flow_name: str = ""
46 sub_goal: str = ""
49class DynamicAgentDiscovery:
50 """Discovers trained agents from prompts directory"""
52 def __init__(self, prompts_dir: str = "prompts"):
53 self.prompts_dir = prompts_dir
54 self.discovered_agents: Dict[str, TrainedAgent] = {}
55 self.prompt_definitions: Dict[int, Dict[str, Any]] = {}
57 def discover_all_agents(self) -> int:
58 """
59 Discover all trained agents from recipe JSON files
61 Returns:
62 Number of agents discovered
63 """
64 logger.info(f"Scanning {self.prompts_dir} for trained agents...")
66 # First, load all main prompt definitions (e.g., 71.json, 8888.json)
67 self._load_prompt_definitions()
69 # Then discover all recipe JSONs (e.g., 71_0_recipe.json)
70 recipe_pattern = os.path.join(self.prompts_dir, "*_*_recipe.json")
71 recipe_files = glob.glob(recipe_pattern)
73 for recipe_file in recipe_files:
74 try:
75 agent = self._load_agent_from_recipe(recipe_file)
76 if agent:
77 self.discovered_agents[agent.agent_id] = agent
78 logger.info(f"Discovered agent: {agent.agent_id} (persona: {agent.persona})")
79 except Exception as e:
80 logger.warning(f"Failed to load agent from {recipe_file}: {e}")
82 logger.info(f"Discovered {len(self.discovered_agents)} trained agents")
83 return len(self.discovered_agents)
85 def _load_prompt_definitions(self):
86 """Load main prompt definition files (e.g., 71.json, 8888.json)"""
87 prompt_files = glob.glob(os.path.join(self.prompts_dir, "*.json"))
89 for prompt_file in prompt_files:
90 filename = os.path.basename(prompt_file)
92 # Skip recipe files (they have underscores)
93 if "_" in filename:
94 continue
96 try:
97 prompt_id = int(filename.replace(".json", ""))
99 with open(prompt_file, 'r', encoding='utf-8') as f:
100 prompt_def = json.load(f)
102 self.prompt_definitions[prompt_id] = prompt_def
103 logger.debug(f"Loaded prompt definition: {prompt_id}")
105 except (ValueError, json.JSONDecodeError) as e:
106 logger.debug(f"Skipping non-prompt file: {filename}")
108 def _load_agent_from_recipe(self, recipe_file: str) -> Optional[TrainedAgent]:
109 """
110 Load a trained agent from recipe JSON file
112 Pattern: {prompt_id}_{flow_id}_recipe.json
113 Example: 71_0_recipe.json = prompt 71, flow 0
114 """
115 filename = os.path.basename(recipe_file)
117 # Parse filename: {prompt_id}_{flow_id}_recipe.json
118 parts = filename.replace("_recipe.json", "").split("_")
119 if len(parts) != 2:
120 logger.debug(f"Skipping {filename} - doesn't match pattern")
121 return None
123 try:
124 prompt_id = int(parts[0])
125 flow_id = int(parts[1])
126 except ValueError:
127 return None
129 # Load recipe JSON
130 with open(recipe_file, 'r', encoding='utf-8') as f:
131 recipe_data = json.load(f)
133 # Create agent ID
134 agent_id = f"{prompt_id}_{flow_id}"
136 # Get flow information from prompt definition
137 prompt_def = self.prompt_definitions.get(prompt_id, {})
138 flows = prompt_def.get("flows", [])
140 flow_name = ""
141 sub_goal = ""
142 if flow_id < len(flows):
143 flow_info = flows[flow_id]
144 flow_name = flow_info.get("flow_name", "")
145 sub_goal = flow_info.get("sub_goal", "")
147 # Extract agent information
148 agent = TrainedAgent(
149 agent_id=agent_id,
150 prompt_id=prompt_id,
151 flow_id=flow_id,
152 persona=recipe_data.get("persona", "unknown"),
153 action=recipe_data.get("action", ""),
154 recipe=recipe_data.get("recipe", []),
155 status=recipe_data.get("status", "unknown"),
156 can_perform_without_user_input=recipe_data.get("can_perform_without_user_input", "no"),
157 fallback_action=recipe_data.get("fallback_action", ""),
158 metadata=recipe_data.get("metadata", {}),
159 recipe_file=recipe_file,
160 flow_name=flow_name,
161 sub_goal=sub_goal
162 )
164 return agent
166 def get_agent_skills(self, agent: TrainedAgent) -> List[Dict[str, Any]]:
167 """
168 Extract skills from trained agent's recipe
170 Returns A2A-compatible skills list
171 """
172 skills = []
174 # Get prompt definition for context
175 prompt_def = self.prompt_definitions.get(agent.prompt_id, {})
176 prompt_name = prompt_def.get("name", f"Prompt {agent.prompt_id}")
178 # Get persona description
179 personas = prompt_def.get("personas", [])
180 persona_desc = next(
181 (p["description"] for p in personas if p["name"] == agent.persona),
182 f"Specialist for {agent.persona}"
183 )
185 # Get flow information
186 flows = prompt_def.get("flows", [])
187 if agent.flow_id < len(flows):
188 flow = flows[agent.flow_id]
189 flow_name = flow.get("flow_name", f"Flow {agent.flow_id}")
190 sub_goal = flow.get("sub_goal", "")
191 else:
192 flow_name = f"Flow {agent.flow_id}"
193 sub_goal = ""
195 # Create primary skill based on agent's trained action
196 primary_skill = {
197 "name": f"{agent.persona}_{flow_name.replace(' ', '_')}".lower(),
198 "description": agent.action,
199 "examples": [
200 agent.action,
201 sub_goal if sub_goal else agent.action
202 ],
203 "input_modes": ["text", "text/plain"],
204 "output_modes": ["text", "text/plain", "application/json"],
205 "metadata": {
206 "prompt_id": agent.prompt_id,
207 "flow_id": agent.flow_id,
208 "flow_name": agent.flow_name,
209 "persona": agent.persona,
210 "autonomous": agent.can_perform_without_user_input == "yes",
211 "has_fallback": bool(agent.fallback_action),
212 "recipe_steps": len(agent.recipe)
213 }
214 }
216 skills.append(primary_skill)
218 # Add individual recipe steps as sub-skills
219 for idx, step in enumerate(agent.recipe):
220 step_skill = {
221 "name": f"step_{idx+1}_{step.get('tool_name', 'action')}".lower().replace(' ', '_'),
222 "description": step.get("steps", ""),
223 "examples": [step.get("steps", "")],
224 "input_modes": ["text", "text/plain"],
225 "output_modes": ["text", "text/plain"],
226 "metadata": {
227 "step_number": idx + 1,
228 "tool_name": step.get("tool_name", "None"),
229 "agent_performer": step.get("agent_to_perform_this_action", "")
230 }
231 }
232 skills.append(step_skill)
234 return skills
236 def get_agent_description(self, agent: TrainedAgent) -> str:
237 """Generate comprehensive agent description"""
238 prompt_def = self.prompt_definitions.get(agent.prompt_id, {})
239 prompt_name = prompt_def.get("name", f"Prompt {agent.prompt_id}")
241 personas = prompt_def.get("personas", [])
242 persona_desc = next(
243 (p["description"] for p in personas if p["name"] == agent.persona),
244 ""
245 )
247 description = f"Trained specialist for '{prompt_name}' - {persona_desc}. "
248 description += f"Specialized in: {agent.action}. "
249 description += f"Recipe contains {len(agent.recipe)} steps. "
251 if agent.can_perform_without_user_input == "yes":
252 description += "Can operate autonomously. "
254 if agent.fallback_action:
255 description += f"Has fallback strategy: {agent.fallback_action}"
257 return description
259 def get_all_agents(self) -> List[TrainedAgent]:
260 """Get list of all discovered agents"""
261 return list(self.discovered_agents.values())
263 def get_agent_by_id(self, agent_id: str) -> Optional[TrainedAgent]:
264 """Get specific agent by ID"""
265 return self.discovered_agents.get(agent_id)
268class DynamicAgentExecutor:
269 """Executes tasks for dynamically discovered agents"""
271 def __init__(self):
272 self.discovery = DynamicAgentDiscovery()
273 self.discovery.discover_all_agents()
275 async def execute_agent_task(self, agent_id: str, message: str, context_id: str) -> Dict[str, Any]:
276 """
277 Execute a task for a dynamically discovered agent
279 Args:
280 agent_id: Agent identifier (e.g., "71_0_1")
281 message: Task message
282 context_id: A2A context ID
284 Returns:
285 A2A response format
286 """
287 agent = self.discovery.get_agent_by_id(agent_id)
289 if not agent:
290 return {
291 "role": "model",
292 "parts": [{
293 "text": f"Error: Agent {agent_id} not found. Agent may not be trained yet."
294 }]
295 }
297 try:
298 # Import execution functions
299 from create_recipe import recipe
300 from reuse_recipe import chat_agent
302 logger.info(f"Executing task for agent {agent_id} (persona: {agent.persona})")
304 # Determine execution mode based on agent status
305 from core.constants import DEFAULT_USER_ID
306 if agent.status == "done" or agent.status == "completed":
307 # Use reuse mode (agent has trained recipe)
308 result = chat_agent(
309 message,
310 user_id=agent.metadata.get("user_id", DEFAULT_USER_ID),
311 prompt_id=agent.prompt_id
312 )
313 else:
314 # Use create mode (agent still learning)
315 result = recipe(
316 user_id=agent.metadata.get("user_id", DEFAULT_USER_ID),
317 message=message,
318 prompt_id=agent.prompt_id
319 )
321 return {
322 "role": "model",
323 "parts": [{
324 "text": str(result),
325 "metadata": {
326 "agent_id": agent_id,
327 "persona": agent.persona,
328 "execution_mode": "reuse" if agent.status == "done" else "create"
329 }
330 }]
331 }
333 except Exception as e:
334 logger.error(f"Agent {agent_id} execution failed: {e}")
335 return {
336 "role": "model",
337 "parts": [{
338 "text": f"Error executing agent {agent_id}: {str(e)}"
339 }]
340 }
343# Global instances
344_dynamic_discovery = None
345_dynamic_executor = None
348def get_dynamic_discovery() -> DynamicAgentDiscovery:
349 """Get global dynamic discovery instance"""
350 global _dynamic_discovery
351 if _dynamic_discovery is None:
352 _dynamic_discovery = DynamicAgentDiscovery()
353 return _dynamic_discovery
356def get_dynamic_executor() -> DynamicAgentExecutor:
357 """Get global dynamic executor instance"""
358 global _dynamic_executor
359 if _dynamic_executor is None:
360 _dynamic_executor = DynamicAgentExecutor()
361 return _dynamic_executor