Coverage for core / platform / extension_sandbox.py: 92.7%
109 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Extension Sandbox — AST-based static analysis for extension safety.
4Analyzes extension Python source BEFORE import to block dangerous patterns:
5subprocess, eval, exec, os.system, __import__, ctypes.
7Zero external dependencies — uses only stdlib ast + hashlib.
9Usage:
10 from core.platform.extension_sandbox import ExtensionSandbox
11 safe, violations = ExtensionSandbox.analyze_file('extensions/my_ext.py')
12 if not safe:
13 raise SecurityError(f"Blocked: {violations}")
14"""
16import ast
17import hashlib
18import logging
19from typing import List, Tuple
21logger = logging.getLogger('hevolve.platform.sandbox')
23# ── Blocked patterns ─────────────────────────────────────────────
24# Built-in functions that allow arbitrary code execution
25BLOCKED_CALLS = frozenset({
26 'eval', 'exec', 'compile', '__import__',
27})
29# Modules that provide process/memory escape hatches
30BLOCKED_MODULES = frozenset({
31 'subprocess', 'ctypes', 'multiprocessing',
32})
34# Specific attribute chains that perform dangerous operations
35BLOCKED_ATTRIBUTES = frozenset({
36 'os.system',
37 'os.popen',
38 'os.execl',
39 'os.execle',
40 'os.execv',
41 'os.execve',
42 'os.spawnl',
43 'os.spawnle',
44 'subprocess.run',
45 'subprocess.call',
46 'subprocess.Popen',
47 'subprocess.check_output',
48 'subprocess.check_call',
49 'shutil.rmtree',
50})
53class _BlockedNodeVisitor(ast.NodeVisitor):
54 """AST visitor that collects violations of the sandbox policy."""
56 def __init__(self):
57 self.violations: List[str] = []
59 # ── Blocked built-in calls ────────────────────────────────
60 def visit_Call(self, node: ast.Call):
61 if isinstance(node.func, ast.Name):
62 if node.func.id in BLOCKED_CALLS:
63 self.violations.append(
64 f"Blocked call: {node.func.id}() at line {node.lineno}")
65 elif isinstance(node.func, ast.Attribute):
66 dotted = _reconstruct_dotted(node.func)
67 if dotted and dotted in BLOCKED_ATTRIBUTES:
68 self.violations.append(
69 f"Blocked attribute call: {dotted}() at line {node.lineno}")
70 self.generic_visit(node)
72 # ── Blocked imports: import X ─────────────────────────────
73 def visit_Import(self, node: ast.Import):
74 for alias in node.names:
75 top = alias.name.split('.')[0]
76 if top in BLOCKED_MODULES:
77 self.violations.append(
78 f"Blocked import: {alias.name} at line {node.lineno}")
79 self.generic_visit(node)
81 # ── Blocked imports: from X import Y ──────────────────────
82 def visit_ImportFrom(self, node: ast.ImportFrom):
83 module = node.module or ''
84 top = module.split('.')[0]
85 if top in BLOCKED_MODULES:
86 self.violations.append(
87 f"Blocked import: from {module} at line {node.lineno}")
88 else:
89 # Check for specific attributes like 'from os import system'
90 for alias in (node.names or []):
91 full = f"{module}.{alias.name}" if module else alias.name
92 if full in BLOCKED_ATTRIBUTES:
93 self.violations.append(
94 f"Blocked attribute import: {full} at line {node.lineno}")
95 self.generic_visit(node)
97 # ── Blocked attribute access ──────────────────────────────
98 def visit_Attribute(self, node: ast.Attribute):
99 dotted = _reconstruct_dotted(node)
100 if dotted and dotted in BLOCKED_ATTRIBUTES:
101 self.violations.append(
102 f"Blocked attribute access: {dotted} at line {node.lineno}")
103 self.generic_visit(node)
106def _reconstruct_dotted(node: ast.Attribute) -> str:
107 """Reconstruct a dotted name from an ast.Attribute chain.
109 E.g., os.system -> 'os.system', subprocess.Popen -> 'subprocess.Popen'.
110 Returns empty string if the chain contains non-Name/non-Attribute nodes.
111 """
112 parts = [node.attr]
113 current = node.value
114 while isinstance(current, ast.Attribute):
115 parts.append(current.attr)
116 current = current.value
117 if isinstance(current, ast.Name):
118 parts.append(current.id)
119 return '.'.join(reversed(parts))
120 return ''
123class ExtensionSandbox:
124 """Static analysis sandbox for extension Python source code."""
126 @staticmethod
127 def analyze_source(source: str) -> Tuple[bool, List[str]]:
128 """Analyze Python source code for blocked patterns.
130 Args:
131 source: Python source code string.
133 Returns:
134 (is_safe, violations) — True if no violations, else list of
135 human-readable violation descriptions.
136 """
137 if not source or not source.strip():
138 return (True, [])
140 try:
141 tree = ast.parse(source)
142 except SyntaxError as e:
143 return (False, [f"SyntaxError: {e}"])
145 visitor = _BlockedNodeVisitor()
146 visitor.visit(tree)
147 return (len(visitor.violations) == 0, visitor.violations)
149 @staticmethod
150 def analyze_file(file_path: str) -> Tuple[bool, List[str]]:
151 """Analyze a Python file for blocked patterns.
153 Args:
154 file_path: Path to a .py file.
156 Returns:
157 (is_safe, violations).
158 """
159 try:
160 with open(file_path, 'r', encoding='utf-8') as f:
161 source = f.read()
162 except FileNotFoundError:
163 return (False, [f"FileNotFoundError: {file_path}"])
164 except (UnicodeDecodeError, OSError) as e:
165 return (False, [f"ReadError: {e}"])
167 return ExtensionSandbox.analyze_source(source)
169 @staticmethod
170 def verify_signature(file_path: str, signature_hex: str) -> bool:
171 """Verify an Ed25519 signature on a file using the master public key.
173 The signature is verified against the raw file bytes.
175 Args:
176 file_path: Path to the file.
177 signature_hex: Hex-encoded Ed25519 signature.
179 Returns:
180 True if the signature is valid.
181 """
182 try:
183 with open(file_path, 'rb') as f:
184 content = f.read()
185 content_hash = hashlib.sha256(content).digest()
186 from security.master_key import get_master_public_key
187 pub = get_master_public_key()
188 sig = bytes.fromhex(signature_hex)
189 pub.verify(sig, content_hash)
190 return True
191 except Exception:
192 return False
194 @staticmethod
195 def compute_source_hash(source: str) -> str:
196 """Compute SHA-256 hex digest of source code.
198 Args:
199 source: Python source code string.
201 Returns:
202 64-character hex digest.
203 """
204 return hashlib.sha256(source.encode('utf-8')).hexdigest()
206 @staticmethod
207 def check_permission_declarations(source: str) -> List[str]:
208 """Extract EXTENSION_PERMISSIONS declarations from source.
210 Looks for top-level assignments like:
211 EXTENSION_PERMISSIONS = ['events.theme.*', 'config.read']
213 Args:
214 source: Python source code string.
216 Returns:
217 List of permission strings, or empty list if not found.
218 """
219 if not source or not source.strip():
220 return []
222 try:
223 tree = ast.parse(source)
224 except SyntaxError:
225 return []
227 for node in ast.iter_child_nodes(tree):
228 if not isinstance(node, ast.Assign):
229 continue
230 for target in node.targets:
231 if isinstance(target, ast.Name) and target.id == 'EXTENSION_PERMISSIONS':
232 if isinstance(node.value, ast.List):
233 perms = []
234 for elt in node.value.elts:
235 if isinstance(elt, ast.Constant) and isinstance(elt.value, str):
236 perms.append(elt.value)
237 # Python 3.7 compat: ast.Str
238 elif isinstance(elt, ast.Str):
239 perms.append(elt.s)
240 return perms
241 return []