Coverage for security / mcp_sandbox.py: 25.0%
68 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2MCP Tool Sandboxing
3Defends against CVE-2025-6514 (command injection via MCP) and
4supply chain attacks through malicious MCP tools/skills.
5"""
7import os
8import re
9import logging
10from typing import Dict, Any, List, Optional, Set
11from urllib.parse import urlparse
13logger = logging.getLogger('hevolve_security')
15# Shell metacharacters that could enable command injection
16_SHELL_METACHAR_PATTERN = re.compile(r'[;&|`${}()\n\r]')
18# Path traversal patterns
19_PATH_TRAVERSAL_PATTERN = re.compile(r'\.\.[\\/]')
21# Dangerous command patterns
22_DANGEROUS_CMD_PATTERNS = [
23 re.compile(r'\b(rm|del|format|mkfs|dd|chmod|chown)\s', re.I),
24 re.compile(r'\beval\s*\(', re.I),
25 re.compile(r'\bexec\s*\(', re.I),
26 re.compile(r'\b(curl|wget|nc|ncat)\s.*[-]', re.I),
27 re.compile(r'\bos\.(system|popen|exec)', re.I),
28 re.compile(r'\bsubprocess\.(call|run|Popen)', re.I),
29 re.compile(r'\b__import__\s*\(', re.I),
30]
32# Max response size from MCP tools (1MB)
33MAX_RESPONSE_SIZE = 1 * 1024 * 1024
35# Max execution timeout (seconds)
36MAX_EXECUTION_TIMEOUT = 60
39class MCPSandbox:
40 """
41 Sandbox for MCP tool execution.
42 Validates server URLs, tool names, and arguments before execution.
43 """
45 def __init__(self, allowed_servers: Optional[List[str]] = None,
46 allowed_tools: Optional[Set[str]] = None):
47 self.allowed_servers: Set[str] = set(allowed_servers or [])
48 self.allowed_tools: Set[str] = allowed_tools or set()
50 # Load from environment
51 env_servers = os.environ.get('MCP_ALLOWED_SERVERS', '')
52 if env_servers:
53 self.allowed_servers.update(
54 s.strip() for s in env_servers.split(',') if s.strip()
55 )
57 # Always allow localhost
58 self.allowed_servers.update(['localhost', '127.0.0.1'])
60 def validate_server_url(self, url: str) -> bool:
61 """
62 Validate MCP server URL against allowlist.
63 Returns True if allowed, False if blocked.
64 """
65 try:
66 parsed = urlparse(url)
67 hostname = parsed.hostname or ''
69 if not self.allowed_servers:
70 # If no allowlist configured, only allow localhost
71 return hostname in ('localhost', '127.0.0.1')
73 if hostname in self.allowed_servers:
74 return True
76 logger.warning(f"MCP server blocked: {hostname} not in allowlist")
77 return False
79 except Exception as e:
80 logger.error(f"Failed to parse MCP server URL: {e}")
81 return False
83 def validate_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> tuple:
84 """
85 Validate tool name and arguments before execution.
86 Returns (is_safe, reason).
87 """
88 # Check tool allowlist
89 if self.allowed_tools and tool_name not in self.allowed_tools:
90 return False, f"Tool '{tool_name}' not in allowlist"
92 # Check arguments for injection patterns
93 for key, value in arguments.items():
94 if not isinstance(value, str):
95 continue
97 # Shell metacharacters
98 if _SHELL_METACHAR_PATTERN.search(value):
99 logger.warning(
100 f"Shell metacharacter blocked in MCP tool arg '{key}': "
101 f"{value[:50]}..."
102 )
103 return False, f"Argument '{key}' contains shell metacharacters"
105 # Path traversal
106 if _PATH_TRAVERSAL_PATTERN.search(value):
107 logger.warning(f"Path traversal blocked in MCP tool arg '{key}'")
108 return False, f"Argument '{key}' contains path traversal"
110 # Dangerous commands
111 for pattern in _DANGEROUS_CMD_PATTERNS:
112 if pattern.search(value):
113 logger.warning(
114 f"Dangerous command blocked in MCP tool arg '{key}': "
115 f"{value[:50]}..."
116 )
117 return False, f"Argument '{key}' contains dangerous command pattern"
119 # DLP: check for PII in outbound tool arguments
120 try:
121 from security.dlp_engine import get_dlp_engine
122 for key, value in arguments.items():
123 if isinstance(value, str):
124 allowed, reason = get_dlp_engine().check_outbound(value)
125 if not allowed:
126 logger.warning(f"DLP blocked MCP tool arg '{key}': {reason}")
127 return False, f"DLP: {reason} in argument '{key}'"
128 except ImportError:
129 pass # DLP engine not available — allow through
131 return True, ""
133 def validate_response(self, response: Any) -> tuple:
134 """
135 Validate MCP tool response for data exfiltration patterns.
136 Returns (is_safe, reason).
137 """
138 if isinstance(response, (str, bytes)):
139 size = len(response)
140 if size > MAX_RESPONSE_SIZE:
141 return False, f"Response size {size} exceeds limit {MAX_RESPONSE_SIZE}"
143 # Check for credential patterns in response
144 response_str = str(response)
145 credential_patterns = [
146 re.compile(r'sk-[a-zA-Z0-9]{20,}'),
147 re.compile(r'eyJ[a-zA-Z0-9_-]+\.eyJ'),
148 re.compile(r'AIzaSy[a-zA-Z0-9_-]{33}'),
149 re.compile(r'AKIA[0-9A-Z]{16}'),
150 ]
151 for pattern in credential_patterns:
152 if pattern.search(response_str):
153 logger.warning("Credential pattern detected in MCP response")
154 return False, "Response contains potential credentials"
156 return True, ""
158 def get_timeout(self) -> int:
159 """Get max execution timeout for MCP tools."""
160 return MAX_EXECUTION_TIMEOUT