Coverage for core / platform / extension_sandbox.py: 92.7%

109 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Extension Sandbox — AST-based static analysis for extension safety. 

3 

4Analyzes extension Python source BEFORE import to block dangerous patterns: 

5subprocess, eval, exec, os.system, __import__, ctypes. 

6 

7Zero external dependencies — uses only stdlib ast + hashlib. 

8 

9Usage: 

10 from core.platform.extension_sandbox import ExtensionSandbox 

11 safe, violations = ExtensionSandbox.analyze_file('extensions/my_ext.py') 

12 if not safe: 

13 raise SecurityError(f"Blocked: {violations}") 

14""" 

15 

16import ast 

17import hashlib 

18import logging 

19from typing import List, Tuple 

20 

21logger = logging.getLogger('hevolve.platform.sandbox') 

22 

23# ── Blocked patterns ───────────────────────────────────────────── 

24# Built-in functions that allow arbitrary code execution 

25BLOCKED_CALLS = frozenset({ 

26 'eval', 'exec', 'compile', '__import__', 

27}) 

28 

29# Modules that provide process/memory escape hatches 

30BLOCKED_MODULES = frozenset({ 

31 'subprocess', 'ctypes', 'multiprocessing', 

32}) 

33 

34# Specific attribute chains that perform dangerous operations 

35BLOCKED_ATTRIBUTES = frozenset({ 

36 'os.system', 

37 'os.popen', 

38 'os.execl', 

39 'os.execle', 

40 'os.execv', 

41 'os.execve', 

42 'os.spawnl', 

43 'os.spawnle', 

44 'subprocess.run', 

45 'subprocess.call', 

46 'subprocess.Popen', 

47 'subprocess.check_output', 

48 'subprocess.check_call', 

49 'shutil.rmtree', 

50}) 

51 

52 

53class _BlockedNodeVisitor(ast.NodeVisitor): 

54 """AST visitor that collects violations of the sandbox policy.""" 

55 

56 def __init__(self): 

57 self.violations: List[str] = [] 

58 

59 # ── Blocked built-in calls ──────────────────────────────── 

60 def visit_Call(self, node: ast.Call): 

61 if isinstance(node.func, ast.Name): 

62 if node.func.id in BLOCKED_CALLS: 

63 self.violations.append( 

64 f"Blocked call: {node.func.id}() at line {node.lineno}") 

65 elif isinstance(node.func, ast.Attribute): 

66 dotted = _reconstruct_dotted(node.func) 

67 if dotted and dotted in BLOCKED_ATTRIBUTES: 

68 self.violations.append( 

69 f"Blocked attribute call: {dotted}() at line {node.lineno}") 

70 self.generic_visit(node) 

71 

72 # ── Blocked imports: import X ───────────────────────────── 

73 def visit_Import(self, node: ast.Import): 

74 for alias in node.names: 

75 top = alias.name.split('.')[0] 

76 if top in BLOCKED_MODULES: 

77 self.violations.append( 

78 f"Blocked import: {alias.name} at line {node.lineno}") 

79 self.generic_visit(node) 

80 

81 # ── Blocked imports: from X import Y ────────────────────── 

82 def visit_ImportFrom(self, node: ast.ImportFrom): 

83 module = node.module or '' 

84 top = module.split('.')[0] 

85 if top in BLOCKED_MODULES: 

86 self.violations.append( 

87 f"Blocked import: from {module} at line {node.lineno}") 

88 else: 

89 # Check for specific attributes like 'from os import system' 

90 for alias in (node.names or []): 

91 full = f"{module}.{alias.name}" if module else alias.name 

92 if full in BLOCKED_ATTRIBUTES: 

93 self.violations.append( 

94 f"Blocked attribute import: {full} at line {node.lineno}") 

95 self.generic_visit(node) 

96 

97 # ── Blocked attribute access ────────────────────────────── 

98 def visit_Attribute(self, node: ast.Attribute): 

99 dotted = _reconstruct_dotted(node) 

100 if dotted and dotted in BLOCKED_ATTRIBUTES: 

101 self.violations.append( 

102 f"Blocked attribute access: {dotted} at line {node.lineno}") 

103 self.generic_visit(node) 

104 

105 

106def _reconstruct_dotted(node: ast.Attribute) -> str: 

107 """Reconstruct a dotted name from an ast.Attribute chain. 

108 

109 E.g., os.system -> 'os.system', subprocess.Popen -> 'subprocess.Popen'. 

110 Returns empty string if the chain contains non-Name/non-Attribute nodes. 

111 """ 

112 parts = [node.attr] 

113 current = node.value 

114 while isinstance(current, ast.Attribute): 

115 parts.append(current.attr) 

116 current = current.value 

117 if isinstance(current, ast.Name): 

118 parts.append(current.id) 

119 return '.'.join(reversed(parts)) 

120 return '' 

121 

122 

123class ExtensionSandbox: 

124 """Static analysis sandbox for extension Python source code.""" 

125 

126 @staticmethod 

127 def analyze_source(source: str) -> Tuple[bool, List[str]]: 

128 """Analyze Python source code for blocked patterns. 

129 

130 Args: 

131 source: Python source code string. 

132 

133 Returns: 

134 (is_safe, violations) — True if no violations, else list of 

135 human-readable violation descriptions. 

136 """ 

137 if not source or not source.strip(): 

138 return (True, []) 

139 

140 try: 

141 tree = ast.parse(source) 

142 except SyntaxError as e: 

143 return (False, [f"SyntaxError: {e}"]) 

144 

145 visitor = _BlockedNodeVisitor() 

146 visitor.visit(tree) 

147 return (len(visitor.violations) == 0, visitor.violations) 

148 

149 @staticmethod 

150 def analyze_file(file_path: str) -> Tuple[bool, List[str]]: 

151 """Analyze a Python file for blocked patterns. 

152 

153 Args: 

154 file_path: Path to a .py file. 

155 

156 Returns: 

157 (is_safe, violations). 

158 """ 

159 try: 

160 with open(file_path, 'r', encoding='utf-8') as f: 

161 source = f.read() 

162 except FileNotFoundError: 

163 return (False, [f"FileNotFoundError: {file_path}"]) 

164 except (UnicodeDecodeError, OSError) as e: 

165 return (False, [f"ReadError: {e}"]) 

166 

167 return ExtensionSandbox.analyze_source(source) 

168 

169 @staticmethod 

170 def verify_signature(file_path: str, signature_hex: str) -> bool: 

171 """Verify an Ed25519 signature on a file using the master public key. 

172 

173 The signature is verified against the raw file bytes. 

174 

175 Args: 

176 file_path: Path to the file. 

177 signature_hex: Hex-encoded Ed25519 signature. 

178 

179 Returns: 

180 True if the signature is valid. 

181 """ 

182 try: 

183 with open(file_path, 'rb') as f: 

184 content = f.read() 

185 content_hash = hashlib.sha256(content).digest() 

186 from security.master_key import get_master_public_key 

187 pub = get_master_public_key() 

188 sig = bytes.fromhex(signature_hex) 

189 pub.verify(sig, content_hash) 

190 return True 

191 except Exception: 

192 return False 

193 

194 @staticmethod 

195 def compute_source_hash(source: str) -> str: 

196 """Compute SHA-256 hex digest of source code. 

197 

198 Args: 

199 source: Python source code string. 

200 

201 Returns: 

202 64-character hex digest. 

203 """ 

204 return hashlib.sha256(source.encode('utf-8')).hexdigest() 

205 

206 @staticmethod 

207 def check_permission_declarations(source: str) -> List[str]: 

208 """Extract EXTENSION_PERMISSIONS declarations from source. 

209 

210 Looks for top-level assignments like: 

211 EXTENSION_PERMISSIONS = ['events.theme.*', 'config.read'] 

212 

213 Args: 

214 source: Python source code string. 

215 

216 Returns: 

217 List of permission strings, or empty list if not found. 

218 """ 

219 if not source or not source.strip(): 

220 return [] 

221 

222 try: 

223 tree = ast.parse(source) 

224 except SyntaxError: 

225 return [] 

226 

227 for node in ast.iter_child_nodes(tree): 

228 if not isinstance(node, ast.Assign): 

229 continue 

230 for target in node.targets: 

231 if isinstance(target, ast.Name) and target.id == 'EXTENSION_PERMISSIONS': 

232 if isinstance(node.value, ast.List): 

233 perms = [] 

234 for elt in node.value.elts: 

235 if isinstance(elt, ast.Constant) and isinstance(elt.value, str): 

236 perms.append(elt.value) 

237 # Python 3.7 compat: ast.Str 

238 elif isinstance(elt, ast.Str): 

239 perms.append(elt.s) 

240 return perms 

241 return []