Coverage for integrations / google_a2a / register_dynamic_agents.py: 32.1%

84 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Dynamic Agent Registration for Google A2A Protocol 

3 

4Replaces hardcoded agent registration with dynamic discovery. 

5All agents are loaded from prompts/{prompt_id}_{flow_id}_{role_number}_recipe.json files. 

6 

7NO HARDCODED AGENTS! 

8""" 

9 

10import logging 

11from typing import Dict, Any 

12from .google_a2a_integration import get_a2a_server 

13from .dynamic_agent_registry import ( 

14 get_dynamic_discovery, 

15 get_dynamic_executor, 

16 TrainedAgent, 

17 DynamicAgentDiscovery 

18) 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23def create_dynamic_executor_function(agent: TrainedAgent): 

24 """ 

25 Create an executor function for a dynamically discovered agent 

26 

27 Args: 

28 agent: TrainedAgent instance 

29 

30 Returns: 

31 Async executor function compatible with A2A protocol 

32 """ 

33 async def executor(message: str, context_id: str) -> Dict[str, Any]: 

34 """Execute task for dynamically discovered agent""" 

35 try: 

36 executor = get_dynamic_executor() 

37 result = await executor.execute_agent_task(agent.agent_id, message, context_id) 

38 return result 

39 

40 except Exception as e: 

41 logger.error(f"Dynamic agent {agent.agent_id} execution error: {e}") 

42 return { 

43 "role": "model", 

44 "parts": [{"text": f"Error executing {agent.agent_id}: {str(e)}"}] 

45 } 

46 

47 # Set function name for debugging 

48 executor.__name__ = f"{agent.agent_id}_executor" 

49 return executor 

50 

51 

52def register_all_dynamic_agents(): 

53 """ 

54 Discover and register all agents from prompts directory with A2A protocol 

55 

56 This function: 

57 1. Scans prompts/ for *_*_*_recipe.json files 

58 2. Extracts agent capabilities from each recipe 

59 3. Creates A2A Agent Cards 

60 4. Registers with Google A2A Protocol server 

61 

62 Returns: 

63 Number of agents registered 

64 """ 

65 try: 

66 a2a_server = get_a2a_server() 

67 

68 if a2a_server is None: 

69 logger.warning("A2A server not initialized, skipping dynamic agent registration") 

70 return 0 

71 

72 logger.info("Starting dynamic agent registration...") 

73 

74 # Discover all trained agents 

75 discovery = get_dynamic_discovery() 

76 num_discovered = discovery.discover_all_agents() 

77 

78 if num_discovered == 0: 

79 logger.warning("No trained agents found in prompts directory") 

80 return 0 

81 

82 # Register each discovered agent 

83 registered_count = 0 

84 

85 for agent in discovery.get_all_agents(): 

86 try: 

87 # Get agent's skills 

88 skills = discovery.get_agent_skills(agent) 

89 

90 # Get agent description 

91 description = discovery.get_agent_description(agent) 

92 

93 # Create agent name 

94 agent_name = f"{agent.persona.title()} Agent {agent.agent_id}" 

95 

96 # Create executor function for this agent 

97 executor_func = create_dynamic_executor_function(agent) 

98 

99 # Determine capabilities 

100 capabilities = { 

101 "streaming": False, 

102 "async": True, 

103 "autonomous": agent.can_perform_without_user_input == "yes", 

104 "has_fallback": bool(agent.fallback_action), 

105 "recipe_steps": len(agent.recipe) 

106 } 

107 

108 # Register with A2A 

109 a2a_server.register_agent( 

110 agent_id=agent.agent_id, 

111 name=agent_name, 

112 description=description, 

113 skills=skills, 

114 executor_func=executor_func, 

115 capabilities=capabilities 

116 ) 

117 

118 logger.info(f"Registered dynamic agent: {agent.agent_id} ({agent.persona})") 

119 registered_count += 1 

120 

121 except Exception as e: 

122 logger.error(f"Failed to register agent {agent.agent_id}: {e}") 

123 continue 

124 

125 logger.info(f"Successfully registered {registered_count}/{num_discovered} dynamic agents with A2A") 

126 return registered_count 

127 

128 except Exception as e: 

129 logger.error(f"Dynamic agent registration failed: {e}") 

130 import traceback 

131 traceback.print_exc() 

132 return 0 

133 

134 

135def get_registered_agent_info() -> Dict[str, Any]: 

136 """ 

137 Get information about all registered dynamic agents 

138 

139 Returns: 

140 Dict with agent statistics and details 

141 """ 

142 discovery = get_dynamic_discovery() 

143 agents = discovery.get_all_agents() 

144 

145 # Group by prompt_id 

146 by_prompt = {} 

147 for agent in agents: 

148 if agent.prompt_id not in by_prompt: 

149 by_prompt[agent.prompt_id] = [] 

150 by_prompt[agent.prompt_id].append(agent) 

151 

152 # Group by persona 

153 by_persona = {} 

154 for agent in agents: 

155 if agent.persona not in by_persona: 

156 by_persona[agent.persona] = [] 

157 by_persona[agent.persona].append(agent) 

158 

159 # Count by status 

160 by_status = {} 

161 for agent in agents: 

162 status = agent.status 

163 by_status[status] = by_status.get(status, 0) + 1 

164 

165 return { 

166 "total_agents": len(agents), 

167 "by_prompt": { 

168 str(pid): [a.agent_id for a in agents_list] 

169 for pid, agents_list in by_prompt.items() 

170 }, 

171 "by_persona": { 

172 persona: [a.agent_id for a in agents_list] 

173 for persona, agents_list in by_persona.items() 

174 }, 

175 "by_status": by_status, 

176 "agent_ids": [a.agent_id for a in agents], 

177 "autonomous_agents": [a.agent_id for a in agents if a.can_perform_without_user_input == "yes"], 

178 "agents_with_fallback": [a.agent_id for a in agents if a.fallback_action] 

179 } 

180 

181 

182def list_available_agents(): 

183 """ 

184 Print formatted list of all available dynamic agents 

185 

186 Useful for debugging and status checking 

187 """ 

188 discovery = get_dynamic_discovery() 

189 agents = discovery.get_all_agents() 

190 

191 print("\n" + "="*80) 

192 print("DYNAMICALLY REGISTERED AGENTS") 

193 print("="*80) 

194 

195 if not agents: 

196 print("No agents discovered. Check prompts/ directory for *_*_*_recipe.json files.") 

197 return 

198 

199 # Group by prompt 

200 by_prompt = {} 

201 for agent in agents: 

202 if agent.prompt_id not in by_prompt: 

203 by_prompt[agent.prompt_id] = [] 

204 by_prompt[agent.prompt_id].append(agent) 

205 

206 for prompt_id in sorted(by_prompt.keys()): 

207 prompt_def = discovery.prompt_definitions.get(prompt_id, {}) 

208 prompt_name = prompt_def.get("name", f"Prompt {prompt_id}") 

209 

210 print(f"\n[Prompt {prompt_id}] {prompt_name}") 

211 print("-" * 80) 

212 

213 for agent in sorted(by_prompt[prompt_id], key=lambda a: (a.flow_id, a.role_number)): 

214 status_icon = "✓" if agent.status == "done" else "○" 

215 auto_icon = "⚡" if agent.can_perform_without_user_input == "yes" else "👤" 

216 fallback_icon = "🔄" if agent.fallback_action else " " 

217 

218 print(f" {status_icon} {agent.agent_id:15} | " 

219 f"Persona: {agent.persona:15} | " 

220 f"{auto_icon} | {fallback_icon} | " 

221 f"Steps: {len(agent.recipe)}") 

222 

223 print("\n" + "="*80) 

224 print(f"Total: {len(agents)} agents") 

225 print(f"Legend: ✓=Done ○=Pending ⚡=Autonomous 👤=User-Interactive 🔄=Has-Fallback") 

226 print("="*80 + "\n")