Coverage for integrations / mcp / mcp_integration.py: 17.8%

174 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2MCP (Model Context Protocol) Integration Module 

3 

4This module enables the agent system to connect to external user-provided MCP servers 

5and use their tools within the Autogen framework. 

6 

7Features: 

8- Connect to multiple MCP servers 

9- Discover and register tools from MCP servers 

10- Convert MCP tools to Autogen-compatible functions 

11- Automatic error handling and retries 

12""" 

13 

14import json 

15import logging 

16import requests 

17from typing import List, Dict, Any, Optional, Callable 

18from core.http_pool import pooled_get, pooled_post 

19from datetime import datetime 

20import os 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class MCPServerConnector: 

26 """Connects to an external MCP server and manages tool discovery""" 

27 

28 def __init__(self, server_name: str, server_url: str, api_key: Optional[str] = None): 

29 """ 

30 Initialize MCP server connector 

31 

32 Args: 

33 server_name: Human-readable name for the server 

34 server_url: Base URL of the MCP server 

35 api_key: Optional API key for authentication 

36 """ 

37 self.server_name = server_name 

38 self.server_url = server_url.rstrip('/') 

39 self.api_key = api_key 

40 self.tools = [] 

41 self.connected = False 

42 

43 def connect(self) -> bool: 

44 """ 

45 Connect to the MCP server and verify it's accessible 

46 

47 Returns: 

48 True if connection successful, False otherwise 

49 """ 

50 try: 

51 headers = {} 

52 if self.api_key: 

53 headers['Authorization'] = f'Bearer {self.api_key}' 

54 

55 # Try health endpoint first 

56 response = pooled_get( 

57 f"{self.server_url}/health", 

58 headers=headers, 

59 timeout=5 

60 ) 

61 

62 if response.status_code == 200: 

63 self.connected = True 

64 logger.info(f"Connected to MCP server: {self.server_name}") 

65 return True 

66 else: 

67 logger.warning(f"MCP server {self.server_name} health check failed: {response.status_code}") 

68 return False 

69 

70 except requests.exceptions.RequestException as e: 

71 logger.error(f"Failed to connect to MCP server {self.server_name}: {e}") 

72 return False 

73 

74 def discover_tools(self) -> List[Dict[str, Any]]: 

75 """ 

76 Discover available tools from the MCP server 

77 

78 Returns: 

79 List of tool definitions 

80 """ 

81 if not self.connected: 

82 logger.warning(f"Cannot discover tools - not connected to {self.server_name}") 

83 return [] 

84 

85 try: 

86 headers = {} 

87 if self.api_key: 

88 headers['Authorization'] = f'Bearer {self.api_key}' 

89 

90 response = pooled_get( 

91 f"{self.server_url}/tools/list", 

92 headers=headers, 

93 timeout=10 

94 ) 

95 

96 if response.status_code == 200: 

97 data = response.json() 

98 self.tools = data.get('tools', []) 

99 logger.info(f"Discovered {len(self.tools)} tools from {self.server_name}") 

100 return self.tools 

101 else: 

102 logger.error(f"Failed to discover tools from {self.server_name}: {response.status_code}") 

103 return [] 

104 

105 except requests.exceptions.RequestException as e: 

106 logger.error(f"Error discovering tools from {self.server_name}: {e}") 

107 return [] 

108 

109 def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: 

110 """ 

111 Execute a tool on the MCP server 

112 

113 Args: 

114 tool_name: Name of the tool to execute 

115 arguments: Tool arguments 

116 

117 Returns: 

118 Tool execution result 

119 """ 

120 if not self.connected: 

121 return { 

122 'success': False, 

123 'error': f'Not connected to MCP server {self.server_name}' 

124 } 

125 

126 try: 

127 # Security: MCP sandbox - validate tool call before execution 

128 try: 

129 from security.mcp_sandbox import MCPSandbox 

130 sandbox = MCPSandbox() 

131 if not sandbox.validate_server_url(self.server_url): 

132 logger.warning(f"MCP sandbox rejected server URL: {self.server_url}") 

133 return {'success': False, 'error': f'Server URL not allowed: {self.server_url}'} 

134 is_valid, rejection = sandbox.validate_tool_call(tool_name, arguments) 

135 if not is_valid: 

136 logger.warning(f"MCP sandbox rejected tool call: {rejection}") 

137 return {'success': False, 'error': f'Tool call rejected: {rejection}'} 

138 except ImportError: 

139 pass # Security module not available 

140 

141 headers = {'Content-Type': 'application/json'} 

142 if self.api_key: 

143 headers['Authorization'] = f'Bearer {self.api_key}' 

144 

145 payload = { 

146 'tool': tool_name, 

147 'arguments': arguments 

148 } 

149 

150 response = pooled_post( 

151 f"{self.server_url}/tools/execute", 

152 headers=headers, 

153 json=payload, 

154 timeout=30 

155 ) 

156 

157 if response.status_code == 200: 

158 result = response.json() 

159 # Security: Validate response for data exfiltration 

160 try: 

161 from security.mcp_sandbox import MCPSandbox 

162 sandbox = MCPSandbox() 

163 resp_safe, resp_reason = sandbox.validate_response(result) 

164 if not resp_safe: 

165 logger.warning(f"MCP response validation failed: {resp_reason}") 

166 return {'success': False, 'error': f'Response rejected: {resp_reason}'} 

167 except ImportError: 

168 pass 

169 return result 

170 else: 

171 logger.error(f"Tool execution failed on {self.server_name}: {response.status_code}") 

172 return { 

173 'success': False, 

174 'error': f'HTTP {response.status_code}: {response.text}' 

175 } 

176 

177 except requests.exceptions.RequestException as e: 

178 logger.error(f"Error executing tool {tool_name} on {self.server_name}: {e}") 

179 return { 

180 'success': False, 

181 'error': str(e) 

182 } 

183 

184 

185class MCPToolRegistry: 

186 """Registry for managing multiple MCP servers and their tools""" 

187 

188 def __init__(self): 

189 """Initialize the MCP tool registry""" 

190 self.servers: Dict[str, MCPServerConnector] = {} 

191 self.tools: Dict[str, tuple] = {} # tool_name -> (server_name, tool_def) 

192 

193 def add_server(self, server_name: str, server_url: str, api_key: Optional[str] = None) -> bool: 

194 """ 

195 Add an MCP server to the registry 

196 

197 Args: 

198 server_name: Unique name for the server 

199 server_url: Base URL of the server 

200 api_key: Optional API key 

201 

202 Returns: 

203 True if server added successfully 

204 """ 

205 if server_name in self.servers: 

206 logger.warning(f"MCP server {server_name} already exists in registry") 

207 return False 

208 

209 connector = MCPServerConnector(server_name, server_url, api_key) 

210 if connector.connect(): 

211 self.servers[server_name] = connector 

212 logger.info(f"Added MCP server {server_name} to registry") 

213 return True 

214 else: 

215 logger.error(f"Failed to add MCP server {server_name}") 

216 return False 

217 

218 def discover_all_tools(self) -> int: 

219 """ 

220 Discover tools from all registered servers 

221 

222 Returns: 

223 Total number of tools discovered 

224 """ 

225 total_tools = 0 

226 self.tools.clear() 

227 

228 for server_name, connector in self.servers.items(): 

229 tools = connector.discover_tools() 

230 for tool in tools: 

231 tool_name = tool.get('name') 

232 if tool_name: 

233 # Prefix tool name with server name to avoid conflicts 

234 prefixed_name = f"{server_name}_{tool_name}" 

235 self.tools[prefixed_name] = (server_name, tool) 

236 total_tools += 1 

237 

238 logger.info(f"Discovered {total_tools} total tools from {len(self.servers)} MCP servers") 

239 return total_tools 

240 

241 def get_tool_definitions(self) -> List[Dict[str, Any]]: 

242 """ 

243 Get all tool definitions in Autogen-compatible format 

244 

245 Returns: 

246 List of tool definitions 

247 """ 

248 tool_defs = [] 

249 

250 for prefixed_name, (server_name, tool_def) in self.tools.items(): 

251 autogen_def = { 

252 'name': prefixed_name, 

253 'description': tool_def.get('description', f'Tool from {server_name}'), 

254 'parameters': tool_def.get('parameters', {}), 

255 'mcp_server': server_name, 

256 'original_name': tool_def.get('name') 

257 } 

258 tool_defs.append(autogen_def) 

259 

260 return tool_defs 

261 

262 def create_tool_function(self, tool_name: str) -> Optional[Callable]: 

263 """ 

264 Create an executable function for a tool 

265 

266 Args: 

267 tool_name: Prefixed tool name (server_toolname) 

268 

269 Returns: 

270 Callable function that executes the tool 

271 """ 

272 if tool_name not in self.tools: 

273 logger.error(f"Tool {tool_name} not found in registry") 

274 return None 

275 

276 server_name, tool_def = self.tools[tool_name] 

277 original_name = tool_def.get('name') 

278 

279 def tool_executor(**kwargs) -> str: 

280 """Execute the MCP tool with given arguments""" 

281 connector = self.servers.get(server_name) 

282 if not connector: 

283 return json.dumps({ 

284 'success': False, 

285 'error': f'MCP server {server_name} not available' 

286 }) 

287 

288 result = connector.execute_tool(original_name, kwargs) 

289 return json.dumps(result) 

290 

291 # Set function metadata 

292 tool_executor.__name__ = tool_name 

293 tool_executor.__doc__ = tool_def.get('description', 'MCP tool') 

294 

295 return tool_executor 

296 

297 def get_all_tool_functions(self) -> Dict[str, Callable]: 

298 """ 

299 Get all tools as executable functions 

300 

301 Returns: 

302 Dictionary mapping tool names to executable functions 

303 """ 

304 functions = {} 

305 

306 for tool_name in self.tools.keys(): 

307 func = self.create_tool_function(tool_name) 

308 if func: 

309 functions[tool_name] = func 

310 

311 return functions 

312 

313 

314# Global registry instance 

315mcp_registry = MCPToolRegistry() 

316 

317 

318def load_user_mcp_servers(config_file: str = 'mcp_servers.json') -> int: 

319 """ 

320 Load user-configured MCP servers from a JSON file 

321 

322 Args: 

323 config_file: Path to the MCP servers configuration file 

324 

325 Returns: 

326 Number of servers successfully loaded 

327 """ 

328 if not os.path.exists(config_file): 

329 logger.info(f"No MCP server configuration found at {config_file}") 

330 return 0 

331 

332 try: 

333 with open(config_file, 'r') as f: 

334 config = json.load(f) 

335 

336 servers = config.get('servers', []) 

337 loaded = 0 

338 

339 for server in servers: 

340 server_name = server.get('name') 

341 server_url = server.get('url') 

342 api_key = server.get('api_key') 

343 enabled = server.get('enabled', True) 

344 

345 if not enabled: 

346 logger.info(f"Skipping disabled MCP server: {server_name}") 

347 continue 

348 

349 if not server_name or not server_url: 

350 logger.warning(f"Invalid server configuration: {server}") 

351 continue 

352 

353 if mcp_registry.add_server(server_name, server_url, api_key): 

354 loaded += 1 

355 

356 # Discover all tools after loading servers 

357 if loaded > 0: 

358 total_tools = mcp_registry.discover_all_tools() 

359 logger.info(f"Loaded {loaded} MCP servers with {total_tools} total tools") 

360 

361 return loaded 

362 

363 except json.JSONDecodeError as e: 

364 logger.error(f"Failed to parse MCP server configuration: {e}") 

365 return 0 

366 except Exception as e: 

367 logger.error(f"Error loading MCP servers: {e}") 

368 return 0 

369 

370 

371def get_mcp_tools_for_autogen() -> List[Callable]: 

372 """ 

373 Get all MCP tools as Autogen-compatible functions 

374 

375 Returns: 

376 List of executable tool functions 

377 """ 

378 functions = mcp_registry.get_all_tool_functions() 

379 return list(functions.values()) 

380 

381 

382def get_mcp_tool_descriptions() -> List[Dict[str, Any]]: 

383 """ 

384 Get MCP tool descriptions for agent configuration 

385 

386 Returns: 

387 List of tool descriptions 

388 """ 

389 return mcp_registry.get_tool_definitions()