Coverage for security / safe_deserialize.py: 70.7%
58 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Safe Deserialization - Pickle Replacement
3Replaces pickle.loads() for numpy frame data with a safe binary format.
4Defends against CVE-style RCE via deserialization (OpenClaw attack vector).
6Format: [4-byte header length][JSON header][raw numpy bytes]
7Header: {"shape": [h, w, c], "dtype": "uint8"}
8"""
10import io
11import json
12import struct
13import pickle
14import logging
15from typing import Optional
17logger = logging.getLogger('hevolve_security')
19# Sentinel bytes to identify the safe format
20_MAGIC = b'HVSF' # HevolVe Safe Frame
22try:
23 import numpy as np
24 HAS_NUMPY = True
25except ImportError:
26 HAS_NUMPY = False
29class RestrictedUnpickler(pickle.Unpickler):
30 """
31 Restricted unpickler that only allows numpy types.
32 Used as a fallback for legacy pickle data during migration.
33 """
35 ALLOWED_MODULES = {
36 'numpy': {'ndarray', 'dtype', 'core'},
37 'numpy.core.multiarray': {'_reconstruct', 'scalar'},
38 'numpy.core.numeric': {'*'},
39 'numpy.ma.core': {'MaskedArray'},
40 }
42 def find_class(self, module, name):
43 module_base = module.split('.')[0]
44 if module_base == 'numpy':
45 allowed = self.ALLOWED_MODULES.get(module)
46 if allowed is None or name in allowed or '*' in allowed:
47 return super().find_class(module, name)
48 raise pickle.UnpicklingError(
49 f"Blocked unpickling of {module}.{name} - "
50 f"only numpy types are allowed"
51 )
54def safe_dump_frame(frame) -> bytes:
55 """
56 Serialize a numpy array without pickle.
57 Returns: magic + header_size + json_header + raw_bytes
58 """
59 if not HAS_NUMPY:
60 raise RuntimeError("numpy required for frame serialization")
62 header = json.dumps({
63 'shape': list(frame.shape),
64 'dtype': str(frame.dtype),
65 }).encode('utf-8')
67 header_size = struct.pack('<I', len(header))
68 return _MAGIC + header_size + header + frame.tobytes()
71def safe_load_frame(data: bytes):
72 """
73 Deserialize a numpy array safely.
74 Tries safe format first, falls back to RestrictedUnpickler for legacy data.
75 Returns the numpy array, or None on failure.
76 """
77 if not HAS_NUMPY:
78 raise RuntimeError("numpy required for frame deserialization")
80 # Try safe format first
81 if data[:4] == _MAGIC:
82 return _load_safe_format(data)
84 # Fall back to restricted unpickler for legacy pickle data
85 logger.warning("Legacy pickle data detected - using restricted unpickler")
86 return _load_restricted_pickle(data)
89def _load_safe_format(data: bytes):
90 """Load from the safe binary format."""
91 header_size = struct.unpack('<I', data[4:8])[0]
92 header_json = data[8:8 + header_size]
93 header = json.loads(header_json.decode('utf-8'))
95 raw_bytes = data[8 + header_size:]
96 return np.frombuffer(
97 raw_bytes, dtype=np.dtype(header['dtype'])
98 ).reshape(header['shape']).copy()
101def _load_restricted_pickle(data: bytes):
102 """
103 Load legacy pickle data with restricted unpickler.
104 Only allows numpy types - blocks arbitrary code execution.
105 """
106 try:
107 return RestrictedUnpickler(io.BytesIO(data)).load()
108 except (pickle.UnpicklingError, Exception) as e:
109 logger.error(f"Restricted unpickle failed: {e}")
110 return None
113def migrate_redis_frame(redis_client, key: str) -> bool:
114 """
115 Migrate a single Redis key from pickle to safe format.
116 Returns True if migration occurred.
117 """
118 data = redis_client.get(key)
119 if data is None:
120 return False
122 if data[:4] == _MAGIC:
123 return False # Already in safe format
125 frame = _load_restricted_pickle(data)
126 if frame is None:
127 return False
129 safe_data = safe_dump_frame(frame)
130 redis_client.set(key, safe_data)
131 logger.info(f"Migrated Redis key {key} from pickle to safe format")
132 return True