Coverage for security / safe_deserialize.py: 70.7%

58 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Safe Deserialization - Pickle Replacement 

3Replaces pickle.loads() for numpy frame data with a safe binary format. 

4Defends against CVE-style RCE via deserialization (OpenClaw attack vector). 

5 

6Format: [4-byte header length][JSON header][raw numpy bytes] 

7Header: {"shape": [h, w, c], "dtype": "uint8"} 

8""" 

9 

10import io 

11import json 

12import struct 

13import pickle 

14import logging 

15from typing import Optional 

16 

17logger = logging.getLogger('hevolve_security') 

18 

19# Sentinel bytes to identify the safe format 

20_MAGIC = b'HVSF' # HevolVe Safe Frame 

21 

22try: 

23 import numpy as np 

24 HAS_NUMPY = True 

25except ImportError: 

26 HAS_NUMPY = False 

27 

28 

29class RestrictedUnpickler(pickle.Unpickler): 

30 """ 

31 Restricted unpickler that only allows numpy types. 

32 Used as a fallback for legacy pickle data during migration. 

33 """ 

34 

35 ALLOWED_MODULES = { 

36 'numpy': {'ndarray', 'dtype', 'core'}, 

37 'numpy.core.multiarray': {'_reconstruct', 'scalar'}, 

38 'numpy.core.numeric': {'*'}, 

39 'numpy.ma.core': {'MaskedArray'}, 

40 } 

41 

42 def find_class(self, module, name): 

43 module_base = module.split('.')[0] 

44 if module_base == 'numpy': 

45 allowed = self.ALLOWED_MODULES.get(module) 

46 if allowed is None or name in allowed or '*' in allowed: 

47 return super().find_class(module, name) 

48 raise pickle.UnpicklingError( 

49 f"Blocked unpickling of {module}.{name} - " 

50 f"only numpy types are allowed" 

51 ) 

52 

53 

54def safe_dump_frame(frame) -> bytes: 

55 """ 

56 Serialize a numpy array without pickle. 

57 Returns: magic + header_size + json_header + raw_bytes 

58 """ 

59 if not HAS_NUMPY: 

60 raise RuntimeError("numpy required for frame serialization") 

61 

62 header = json.dumps({ 

63 'shape': list(frame.shape), 

64 'dtype': str(frame.dtype), 

65 }).encode('utf-8') 

66 

67 header_size = struct.pack('<I', len(header)) 

68 return _MAGIC + header_size + header + frame.tobytes() 

69 

70 

71def safe_load_frame(data: bytes): 

72 """ 

73 Deserialize a numpy array safely. 

74 Tries safe format first, falls back to RestrictedUnpickler for legacy data. 

75 Returns the numpy array, or None on failure. 

76 """ 

77 if not HAS_NUMPY: 

78 raise RuntimeError("numpy required for frame deserialization") 

79 

80 # Try safe format first 

81 if data[:4] == _MAGIC: 

82 return _load_safe_format(data) 

83 

84 # Fall back to restricted unpickler for legacy pickle data 

85 logger.warning("Legacy pickle data detected - using restricted unpickler") 

86 return _load_restricted_pickle(data) 

87 

88 

89def _load_safe_format(data: bytes): 

90 """Load from the safe binary format.""" 

91 header_size = struct.unpack('<I', data[4:8])[0] 

92 header_json = data[8:8 + header_size] 

93 header = json.loads(header_json.decode('utf-8')) 

94 

95 raw_bytes = data[8 + header_size:] 

96 return np.frombuffer( 

97 raw_bytes, dtype=np.dtype(header['dtype']) 

98 ).reshape(header['shape']).copy() 

99 

100 

101def _load_restricted_pickle(data: bytes): 

102 """ 

103 Load legacy pickle data with restricted unpickler. 

104 Only allows numpy types - blocks arbitrary code execution. 

105 """ 

106 try: 

107 return RestrictedUnpickler(io.BytesIO(data)).load() 

108 except (pickle.UnpicklingError, Exception) as e: 

109 logger.error(f"Restricted unpickle failed: {e}") 

110 return None 

111 

112 

113def migrate_redis_frame(redis_client, key: str) -> bool: 

114 """ 

115 Migrate a single Redis key from pickle to safe format. 

116 Returns True if migration occurred. 

117 """ 

118 data = redis_client.get(key) 

119 if data is None: 

120 return False 

121 

122 if data[:4] == _MAGIC: 

123 return False # Already in safe format 

124 

125 frame = _load_restricted_pickle(data) 

126 if frame is None: 

127 return False 

128 

129 safe_data = safe_dump_frame(frame) 

130 redis_client.set(key, safe_data) 

131 logger.info(f"Migrated Redis key {key} from pickle to safe format") 

132 return True