Coverage for security / mcp_sandbox.py: 25.0%

68 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2MCP Tool Sandboxing 

3Defends against CVE-2025-6514 (command injection via MCP) and 

4supply chain attacks through malicious MCP tools/skills. 

5""" 

6 

7import os 

8import re 

9import logging 

10from typing import Dict, Any, List, Optional, Set 

11from urllib.parse import urlparse 

12 

13logger = logging.getLogger('hevolve_security') 

14 

15# Shell metacharacters that could enable command injection 

16_SHELL_METACHAR_PATTERN = re.compile(r'[;&|`${}()\n\r]') 

17 

18# Path traversal patterns 

19_PATH_TRAVERSAL_PATTERN = re.compile(r'\.\.[\\/]') 

20 

21# Dangerous command patterns 

22_DANGEROUS_CMD_PATTERNS = [ 

23 re.compile(r'\b(rm|del|format|mkfs|dd|chmod|chown)\s', re.I), 

24 re.compile(r'\beval\s*\(', re.I), 

25 re.compile(r'\bexec\s*\(', re.I), 

26 re.compile(r'\b(curl|wget|nc|ncat)\s.*[-]', re.I), 

27 re.compile(r'\bos\.(system|popen|exec)', re.I), 

28 re.compile(r'\bsubprocess\.(call|run|Popen)', re.I), 

29 re.compile(r'\b__import__\s*\(', re.I), 

30] 

31 

32# Max response size from MCP tools (1MB) 

33MAX_RESPONSE_SIZE = 1 * 1024 * 1024 

34 

35# Max execution timeout (seconds) 

36MAX_EXECUTION_TIMEOUT = 60 

37 

38 

39class MCPSandbox: 

40 """ 

41 Sandbox for MCP tool execution. 

42 Validates server URLs, tool names, and arguments before execution. 

43 """ 

44 

45 def __init__(self, allowed_servers: Optional[List[str]] = None, 

46 allowed_tools: Optional[Set[str]] = None): 

47 self.allowed_servers: Set[str] = set(allowed_servers or []) 

48 self.allowed_tools: Set[str] = allowed_tools or set() 

49 

50 # Load from environment 

51 env_servers = os.environ.get('MCP_ALLOWED_SERVERS', '') 

52 if env_servers: 

53 self.allowed_servers.update( 

54 s.strip() for s in env_servers.split(',') if s.strip() 

55 ) 

56 

57 # Always allow localhost 

58 self.allowed_servers.update(['localhost', '127.0.0.1']) 

59 

60 def validate_server_url(self, url: str) -> bool: 

61 """ 

62 Validate MCP server URL against allowlist. 

63 Returns True if allowed, False if blocked. 

64 """ 

65 try: 

66 parsed = urlparse(url) 

67 hostname = parsed.hostname or '' 

68 

69 if not self.allowed_servers: 

70 # If no allowlist configured, only allow localhost 

71 return hostname in ('localhost', '127.0.0.1') 

72 

73 if hostname in self.allowed_servers: 

74 return True 

75 

76 logger.warning(f"MCP server blocked: {hostname} not in allowlist") 

77 return False 

78 

79 except Exception as e: 

80 logger.error(f"Failed to parse MCP server URL: {e}") 

81 return False 

82 

83 def validate_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> tuple: 

84 """ 

85 Validate tool name and arguments before execution. 

86 Returns (is_safe, reason). 

87 """ 

88 # Check tool allowlist 

89 if self.allowed_tools and tool_name not in self.allowed_tools: 

90 return False, f"Tool '{tool_name}' not in allowlist" 

91 

92 # Check arguments for injection patterns 

93 for key, value in arguments.items(): 

94 if not isinstance(value, str): 

95 continue 

96 

97 # Shell metacharacters 

98 if _SHELL_METACHAR_PATTERN.search(value): 

99 logger.warning( 

100 f"Shell metacharacter blocked in MCP tool arg '{key}': " 

101 f"{value[:50]}..." 

102 ) 

103 return False, f"Argument '{key}' contains shell metacharacters" 

104 

105 # Path traversal 

106 if _PATH_TRAVERSAL_PATTERN.search(value): 

107 logger.warning(f"Path traversal blocked in MCP tool arg '{key}'") 

108 return False, f"Argument '{key}' contains path traversal" 

109 

110 # Dangerous commands 

111 for pattern in _DANGEROUS_CMD_PATTERNS: 

112 if pattern.search(value): 

113 logger.warning( 

114 f"Dangerous command blocked in MCP tool arg '{key}': " 

115 f"{value[:50]}..." 

116 ) 

117 return False, f"Argument '{key}' contains dangerous command pattern" 

118 

119 # DLP: check for PII in outbound tool arguments 

120 try: 

121 from security.dlp_engine import get_dlp_engine 

122 for key, value in arguments.items(): 

123 if isinstance(value, str): 

124 allowed, reason = get_dlp_engine().check_outbound(value) 

125 if not allowed: 

126 logger.warning(f"DLP blocked MCP tool arg '{key}': {reason}") 

127 return False, f"DLP: {reason} in argument '{key}'" 

128 except ImportError: 

129 pass # DLP engine not available — allow through 

130 

131 return True, "" 

132 

133 def validate_response(self, response: Any) -> tuple: 

134 """ 

135 Validate MCP tool response for data exfiltration patterns. 

136 Returns (is_safe, reason). 

137 """ 

138 if isinstance(response, (str, bytes)): 

139 size = len(response) 

140 if size > MAX_RESPONSE_SIZE: 

141 return False, f"Response size {size} exceeds limit {MAX_RESPONSE_SIZE}" 

142 

143 # Check for credential patterns in response 

144 response_str = str(response) 

145 credential_patterns = [ 

146 re.compile(r'sk-[a-zA-Z0-9]{20,}'), 

147 re.compile(r'eyJ[a-zA-Z0-9_-]+\.eyJ'), 

148 re.compile(r'AIzaSy[a-zA-Z0-9_-]{33}'), 

149 re.compile(r'AKIA[0-9A-Z]{16}'), 

150 ] 

151 for pattern in credential_patterns: 

152 if pattern.search(response_str): 

153 logger.warning("Credential pattern detected in MCP response") 

154 return False, "Response contains potential credentials" 

155 

156 return True, "" 

157 

158 def get_timeout(self) -> int: 

159 """Get max execution timeout for MCP tools.""" 

160 return MAX_EXECUTION_TIMEOUT