Coverage for integrations / google_a2a / register_dynamic_agents.py: 32.1%
84 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Dynamic Agent Registration for Google A2A Protocol
4Replaces hardcoded agent registration with dynamic discovery.
5All agents are loaded from prompts/{prompt_id}_{flow_id}_{role_number}_recipe.json files.
7NO HARDCODED AGENTS!
8"""
10import logging
11from typing import Dict, Any
12from .google_a2a_integration import get_a2a_server
13from .dynamic_agent_registry import (
14 get_dynamic_discovery,
15 get_dynamic_executor,
16 TrainedAgent,
17 DynamicAgentDiscovery
18)
20logger = logging.getLogger(__name__)
23def create_dynamic_executor_function(agent: TrainedAgent):
24 """
25 Create an executor function for a dynamically discovered agent
27 Args:
28 agent: TrainedAgent instance
30 Returns:
31 Async executor function compatible with A2A protocol
32 """
33 async def executor(message: str, context_id: str) -> Dict[str, Any]:
34 """Execute task for dynamically discovered agent"""
35 try:
36 executor = get_dynamic_executor()
37 result = await executor.execute_agent_task(agent.agent_id, message, context_id)
38 return result
40 except Exception as e:
41 logger.error(f"Dynamic agent {agent.agent_id} execution error: {e}")
42 return {
43 "role": "model",
44 "parts": [{"text": f"Error executing {agent.agent_id}: {str(e)}"}]
45 }
47 # Set function name for debugging
48 executor.__name__ = f"{agent.agent_id}_executor"
49 return executor
52def register_all_dynamic_agents():
53 """
54 Discover and register all agents from prompts directory with A2A protocol
56 This function:
57 1. Scans prompts/ for *_*_*_recipe.json files
58 2. Extracts agent capabilities from each recipe
59 3. Creates A2A Agent Cards
60 4. Registers with Google A2A Protocol server
62 Returns:
63 Number of agents registered
64 """
65 try:
66 a2a_server = get_a2a_server()
68 if a2a_server is None:
69 logger.warning("A2A server not initialized, skipping dynamic agent registration")
70 return 0
72 logger.info("Starting dynamic agent registration...")
74 # Discover all trained agents
75 discovery = get_dynamic_discovery()
76 num_discovered = discovery.discover_all_agents()
78 if num_discovered == 0:
79 logger.warning("No trained agents found in prompts directory")
80 return 0
82 # Register each discovered agent
83 registered_count = 0
85 for agent in discovery.get_all_agents():
86 try:
87 # Get agent's skills
88 skills = discovery.get_agent_skills(agent)
90 # Get agent description
91 description = discovery.get_agent_description(agent)
93 # Create agent name
94 agent_name = f"{agent.persona.title()} Agent {agent.agent_id}"
96 # Create executor function for this agent
97 executor_func = create_dynamic_executor_function(agent)
99 # Determine capabilities
100 capabilities = {
101 "streaming": False,
102 "async": True,
103 "autonomous": agent.can_perform_without_user_input == "yes",
104 "has_fallback": bool(agent.fallback_action),
105 "recipe_steps": len(agent.recipe)
106 }
108 # Register with A2A
109 a2a_server.register_agent(
110 agent_id=agent.agent_id,
111 name=agent_name,
112 description=description,
113 skills=skills,
114 executor_func=executor_func,
115 capabilities=capabilities
116 )
118 logger.info(f"Registered dynamic agent: {agent.agent_id} ({agent.persona})")
119 registered_count += 1
121 except Exception as e:
122 logger.error(f"Failed to register agent {agent.agent_id}: {e}")
123 continue
125 logger.info(f"Successfully registered {registered_count}/{num_discovered} dynamic agents with A2A")
126 return registered_count
128 except Exception as e:
129 logger.error(f"Dynamic agent registration failed: {e}")
130 import traceback
131 traceback.print_exc()
132 return 0
135def get_registered_agent_info() -> Dict[str, Any]:
136 """
137 Get information about all registered dynamic agents
139 Returns:
140 Dict with agent statistics and details
141 """
142 discovery = get_dynamic_discovery()
143 agents = discovery.get_all_agents()
145 # Group by prompt_id
146 by_prompt = {}
147 for agent in agents:
148 if agent.prompt_id not in by_prompt:
149 by_prompt[agent.prompt_id] = []
150 by_prompt[agent.prompt_id].append(agent)
152 # Group by persona
153 by_persona = {}
154 for agent in agents:
155 if agent.persona not in by_persona:
156 by_persona[agent.persona] = []
157 by_persona[agent.persona].append(agent)
159 # Count by status
160 by_status = {}
161 for agent in agents:
162 status = agent.status
163 by_status[status] = by_status.get(status, 0) + 1
165 return {
166 "total_agents": len(agents),
167 "by_prompt": {
168 str(pid): [a.agent_id for a in agents_list]
169 for pid, agents_list in by_prompt.items()
170 },
171 "by_persona": {
172 persona: [a.agent_id for a in agents_list]
173 for persona, agents_list in by_persona.items()
174 },
175 "by_status": by_status,
176 "agent_ids": [a.agent_id for a in agents],
177 "autonomous_agents": [a.agent_id for a in agents if a.can_perform_without_user_input == "yes"],
178 "agents_with_fallback": [a.agent_id for a in agents if a.fallback_action]
179 }
182def list_available_agents():
183 """
184 Print formatted list of all available dynamic agents
186 Useful for debugging and status checking
187 """
188 discovery = get_dynamic_discovery()
189 agents = discovery.get_all_agents()
191 print("\n" + "="*80)
192 print("DYNAMICALLY REGISTERED AGENTS")
193 print("="*80)
195 if not agents:
196 print("No agents discovered. Check prompts/ directory for *_*_*_recipe.json files.")
197 return
199 # Group by prompt
200 by_prompt = {}
201 for agent in agents:
202 if agent.prompt_id not in by_prompt:
203 by_prompt[agent.prompt_id] = []
204 by_prompt[agent.prompt_id].append(agent)
206 for prompt_id in sorted(by_prompt.keys()):
207 prompt_def = discovery.prompt_definitions.get(prompt_id, {})
208 prompt_name = prompt_def.get("name", f"Prompt {prompt_id}")
210 print(f"\n[Prompt {prompt_id}] {prompt_name}")
211 print("-" * 80)
213 for agent in sorted(by_prompt[prompt_id], key=lambda a: (a.flow_id, a.role_number)):
214 status_icon = "✓" if agent.status == "done" else "○"
215 auto_icon = "⚡" if agent.can_perform_without_user_input == "yes" else "👤"
216 fallback_icon = "🔄" if agent.fallback_action else " "
218 print(f" {status_icon} {agent.agent_id:15} | "
219 f"Persona: {agent.persona:15} | "
220 f"{auto_icon} | {fallback_icon} | "
221 f"Steps: {len(agent.recipe)}")
223 print("\n" + "="*80)
224 print(f"Total: {len(agents)} agents")
225 print(f"Legend: ✓=Done ○=Pending ⚡=Autonomous 👤=User-Interactive 🔄=Has-Fallback")
226 print("="*80 + "\n")