Coverage for integrations / mcp / mcp_integration.py: 17.8%
174 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2MCP (Model Context Protocol) Integration Module
4This module enables the agent system to connect to external user-provided MCP servers
5and use their tools within the Autogen framework.
7Features:
8- Connect to multiple MCP servers
9- Discover and register tools from MCP servers
10- Convert MCP tools to Autogen-compatible functions
11- Automatic error handling and retries
12"""
14import json
15import logging
16import requests
17from typing import List, Dict, Any, Optional, Callable
18from core.http_pool import pooled_get, pooled_post
19from datetime import datetime
20import os
22logger = logging.getLogger(__name__)
25class MCPServerConnector:
26 """Connects to an external MCP server and manages tool discovery"""
28 def __init__(self, server_name: str, server_url: str, api_key: Optional[str] = None):
29 """
30 Initialize MCP server connector
32 Args:
33 server_name: Human-readable name for the server
34 server_url: Base URL of the MCP server
35 api_key: Optional API key for authentication
36 """
37 self.server_name = server_name
38 self.server_url = server_url.rstrip('/')
39 self.api_key = api_key
40 self.tools = []
41 self.connected = False
43 def connect(self) -> bool:
44 """
45 Connect to the MCP server and verify it's accessible
47 Returns:
48 True if connection successful, False otherwise
49 """
50 try:
51 headers = {}
52 if self.api_key:
53 headers['Authorization'] = f'Bearer {self.api_key}'
55 # Try health endpoint first
56 response = pooled_get(
57 f"{self.server_url}/health",
58 headers=headers,
59 timeout=5
60 )
62 if response.status_code == 200:
63 self.connected = True
64 logger.info(f"Connected to MCP server: {self.server_name}")
65 return True
66 else:
67 logger.warning(f"MCP server {self.server_name} health check failed: {response.status_code}")
68 return False
70 except requests.exceptions.RequestException as e:
71 logger.error(f"Failed to connect to MCP server {self.server_name}: {e}")
72 return False
74 def discover_tools(self) -> List[Dict[str, Any]]:
75 """
76 Discover available tools from the MCP server
78 Returns:
79 List of tool definitions
80 """
81 if not self.connected:
82 logger.warning(f"Cannot discover tools - not connected to {self.server_name}")
83 return []
85 try:
86 headers = {}
87 if self.api_key:
88 headers['Authorization'] = f'Bearer {self.api_key}'
90 response = pooled_get(
91 f"{self.server_url}/tools/list",
92 headers=headers,
93 timeout=10
94 )
96 if response.status_code == 200:
97 data = response.json()
98 self.tools = data.get('tools', [])
99 logger.info(f"Discovered {len(self.tools)} tools from {self.server_name}")
100 return self.tools
101 else:
102 logger.error(f"Failed to discover tools from {self.server_name}: {response.status_code}")
103 return []
105 except requests.exceptions.RequestException as e:
106 logger.error(f"Error discovering tools from {self.server_name}: {e}")
107 return []
109 def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
110 """
111 Execute a tool on the MCP server
113 Args:
114 tool_name: Name of the tool to execute
115 arguments: Tool arguments
117 Returns:
118 Tool execution result
119 """
120 if not self.connected:
121 return {
122 'success': False,
123 'error': f'Not connected to MCP server {self.server_name}'
124 }
126 try:
127 # Security: MCP sandbox - validate tool call before execution
128 try:
129 from security.mcp_sandbox import MCPSandbox
130 sandbox = MCPSandbox()
131 if not sandbox.validate_server_url(self.server_url):
132 logger.warning(f"MCP sandbox rejected server URL: {self.server_url}")
133 return {'success': False, 'error': f'Server URL not allowed: {self.server_url}'}
134 is_valid, rejection = sandbox.validate_tool_call(tool_name, arguments)
135 if not is_valid:
136 logger.warning(f"MCP sandbox rejected tool call: {rejection}")
137 return {'success': False, 'error': f'Tool call rejected: {rejection}'}
138 except ImportError:
139 pass # Security module not available
141 headers = {'Content-Type': 'application/json'}
142 if self.api_key:
143 headers['Authorization'] = f'Bearer {self.api_key}'
145 payload = {
146 'tool': tool_name,
147 'arguments': arguments
148 }
150 response = pooled_post(
151 f"{self.server_url}/tools/execute",
152 headers=headers,
153 json=payload,
154 timeout=30
155 )
157 if response.status_code == 200:
158 result = response.json()
159 # Security: Validate response for data exfiltration
160 try:
161 from security.mcp_sandbox import MCPSandbox
162 sandbox = MCPSandbox()
163 resp_safe, resp_reason = sandbox.validate_response(result)
164 if not resp_safe:
165 logger.warning(f"MCP response validation failed: {resp_reason}")
166 return {'success': False, 'error': f'Response rejected: {resp_reason}'}
167 except ImportError:
168 pass
169 return result
170 else:
171 logger.error(f"Tool execution failed on {self.server_name}: {response.status_code}")
172 return {
173 'success': False,
174 'error': f'HTTP {response.status_code}: {response.text}'
175 }
177 except requests.exceptions.RequestException as e:
178 logger.error(f"Error executing tool {tool_name} on {self.server_name}: {e}")
179 return {
180 'success': False,
181 'error': str(e)
182 }
185class MCPToolRegistry:
186 """Registry for managing multiple MCP servers and their tools"""
188 def __init__(self):
189 """Initialize the MCP tool registry"""
190 self.servers: Dict[str, MCPServerConnector] = {}
191 self.tools: Dict[str, tuple] = {} # tool_name -> (server_name, tool_def)
193 def add_server(self, server_name: str, server_url: str, api_key: Optional[str] = None) -> bool:
194 """
195 Add an MCP server to the registry
197 Args:
198 server_name: Unique name for the server
199 server_url: Base URL of the server
200 api_key: Optional API key
202 Returns:
203 True if server added successfully
204 """
205 if server_name in self.servers:
206 logger.warning(f"MCP server {server_name} already exists in registry")
207 return False
209 connector = MCPServerConnector(server_name, server_url, api_key)
210 if connector.connect():
211 self.servers[server_name] = connector
212 logger.info(f"Added MCP server {server_name} to registry")
213 return True
214 else:
215 logger.error(f"Failed to add MCP server {server_name}")
216 return False
218 def discover_all_tools(self) -> int:
219 """
220 Discover tools from all registered servers
222 Returns:
223 Total number of tools discovered
224 """
225 total_tools = 0
226 self.tools.clear()
228 for server_name, connector in self.servers.items():
229 tools = connector.discover_tools()
230 for tool in tools:
231 tool_name = tool.get('name')
232 if tool_name:
233 # Prefix tool name with server name to avoid conflicts
234 prefixed_name = f"{server_name}_{tool_name}"
235 self.tools[prefixed_name] = (server_name, tool)
236 total_tools += 1
238 logger.info(f"Discovered {total_tools} total tools from {len(self.servers)} MCP servers")
239 return total_tools
241 def get_tool_definitions(self) -> List[Dict[str, Any]]:
242 """
243 Get all tool definitions in Autogen-compatible format
245 Returns:
246 List of tool definitions
247 """
248 tool_defs = []
250 for prefixed_name, (server_name, tool_def) in self.tools.items():
251 autogen_def = {
252 'name': prefixed_name,
253 'description': tool_def.get('description', f'Tool from {server_name}'),
254 'parameters': tool_def.get('parameters', {}),
255 'mcp_server': server_name,
256 'original_name': tool_def.get('name')
257 }
258 tool_defs.append(autogen_def)
260 return tool_defs
262 def create_tool_function(self, tool_name: str) -> Optional[Callable]:
263 """
264 Create an executable function for a tool
266 Args:
267 tool_name: Prefixed tool name (server_toolname)
269 Returns:
270 Callable function that executes the tool
271 """
272 if tool_name not in self.tools:
273 logger.error(f"Tool {tool_name} not found in registry")
274 return None
276 server_name, tool_def = self.tools[tool_name]
277 original_name = tool_def.get('name')
279 def tool_executor(**kwargs) -> str:
280 """Execute the MCP tool with given arguments"""
281 connector = self.servers.get(server_name)
282 if not connector:
283 return json.dumps({
284 'success': False,
285 'error': f'MCP server {server_name} not available'
286 })
288 result = connector.execute_tool(original_name, kwargs)
289 return json.dumps(result)
291 # Set function metadata
292 tool_executor.__name__ = tool_name
293 tool_executor.__doc__ = tool_def.get('description', 'MCP tool')
295 return tool_executor
297 def get_all_tool_functions(self) -> Dict[str, Callable]:
298 """
299 Get all tools as executable functions
301 Returns:
302 Dictionary mapping tool names to executable functions
303 """
304 functions = {}
306 for tool_name in self.tools.keys():
307 func = self.create_tool_function(tool_name)
308 if func:
309 functions[tool_name] = func
311 return functions
314# Global registry instance
315mcp_registry = MCPToolRegistry()
318def load_user_mcp_servers(config_file: str = 'mcp_servers.json') -> int:
319 """
320 Load user-configured MCP servers from a JSON file
322 Args:
323 config_file: Path to the MCP servers configuration file
325 Returns:
326 Number of servers successfully loaded
327 """
328 if not os.path.exists(config_file):
329 logger.info(f"No MCP server configuration found at {config_file}")
330 return 0
332 try:
333 with open(config_file, 'r') as f:
334 config = json.load(f)
336 servers = config.get('servers', [])
337 loaded = 0
339 for server in servers:
340 server_name = server.get('name')
341 server_url = server.get('url')
342 api_key = server.get('api_key')
343 enabled = server.get('enabled', True)
345 if not enabled:
346 logger.info(f"Skipping disabled MCP server: {server_name}")
347 continue
349 if not server_name or not server_url:
350 logger.warning(f"Invalid server configuration: {server}")
351 continue
353 if mcp_registry.add_server(server_name, server_url, api_key):
354 loaded += 1
356 # Discover all tools after loading servers
357 if loaded > 0:
358 total_tools = mcp_registry.discover_all_tools()
359 logger.info(f"Loaded {loaded} MCP servers with {total_tools} total tools")
361 return loaded
363 except json.JSONDecodeError as e:
364 logger.error(f"Failed to parse MCP server configuration: {e}")
365 return 0
366 except Exception as e:
367 logger.error(f"Error loading MCP servers: {e}")
368 return 0
371def get_mcp_tools_for_autogen() -> List[Callable]:
372 """
373 Get all MCP tools as Autogen-compatible functions
375 Returns:
376 List of executable tool functions
377 """
378 functions = mcp_registry.get_all_tool_functions()
379 return list(functions.values())
382def get_mcp_tool_descriptions() -> List[Dict[str, Any]]:
383 """
384 Get MCP tool descriptions for agent configuration
386 Returns:
387 List of tool descriptions
388 """
389 return mcp_registry.get_tool_definitions()