Coverage for security / release_hash_registry.py: 95.0%

60 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Release Hash Registry — Multi-version code hash allowlist. 

3 

4Maintains a set of known-good code hashes from GA releases so the 

5perimeter can accept peers running any valid version (not just the 

6current one). Critical during rolling upgrades where the network 

7has a mix of old and new nodes. 

8 

9Populated by: 

10 1. _KNOWN_HASHES dict — hardcoded by CI/CD at release time 

11 (scripts/update_release_hashes.py writes this dict) 

12 2. Current release manifest — always trusted 

13 3. Runtime discovery — hashes from verified peers (bounded, thread-safe) 

14 

15Usage at perimeter: 

16 from security.release_hash_registry import ReleaseHashRegistry 

17 registry = ReleaseHashRegistry() 

18 if not registry.is_known_release_hash(peer_code_hash): 

19 reject_peer() 

20""" 

21import logging 

22import os 

23import threading 

24from collections import OrderedDict 

25from typing import Dict, Optional 

26 

27logger = logging.getLogger('hevolve_security') 

28 

29# ── CI/CD-populated GA release hashes ──────────────────────────── 

30# Format: {'version_string': 'sha256_code_hash'} 

31# Updated automatically by scripts/update_release_hashes.py before 

32# each release signing. Do NOT edit manually. 

33_KNOWN_HASHES: Dict[str, str] = { 

34 # CI/CD will append entries here, e.g.: 

35 # '1.0.0': 'abc123...', 

36 # '1.1.0': 'def456...', 

37} 

38 

39# Maximum runtime-discovered hashes to keep (prevents unbounded growth) 

40_MAX_RUNTIME_HASHES = 50 

41 

42 

43class ReleaseHashRegistry: 

44 """Thread-safe registry of known-good code hashes. 

45 

46 Combines: 

47 - Hardcoded GA release hashes (_KNOWN_HASHES) 

48 - Current release manifest's code_hash 

49 - Runtime-discovered hashes from verified peers 

50 """ 

51 

52 def __init__(self): 

53 self._lock = threading.Lock() 

54 # Runtime hashes: bounded OrderedDict (FIFO eviction) 

55 self._runtime_hashes: OrderedDict = OrderedDict() 

56 self._manifest_hash: Optional[str] = None 

57 self._load_from_manifest() 

58 

59 def _load_from_manifest(self) -> None: 

60 """Load the current release manifest's code_hash as always-trusted.""" 

61 try: 

62 from security.master_key import ( 

63 load_release_manifest, verify_release_manifest, 

64 ) 

65 manifest = load_release_manifest() 

66 if manifest and verify_release_manifest(manifest): 

67 self._manifest_hash = manifest.get('code_hash', '') 

68 except Exception: 

69 pass 

70 

71 def is_known_release_hash(self, code_hash: str) -> bool: 

72 """Check if a code hash belongs to any known GA release. 

73 

74 Returns True if the hash matches: 

75 1. Any hardcoded GA release hash 

76 2. The current release manifest's hash 

77 3. Any runtime-discovered hash from a verified peer 

78 """ 

79 if not code_hash: 

80 return False 

81 

82 # 1. Hardcoded GA releases 

83 if code_hash in _KNOWN_HASHES.values(): 

84 return True 

85 

86 # 2. Current manifest 

87 if self._manifest_hash and code_hash == self._manifest_hash: 

88 return True 

89 

90 # 3. Runtime-discovered 

91 with self._lock: 

92 if code_hash in self._runtime_hashes.values(): 

93 return True 

94 

95 return False 

96 

97 def get_known_versions(self) -> Dict[str, str]: 

98 """Return all known version→hash mappings (for diagnostics).""" 

99 result = dict(_KNOWN_HASHES) 

100 if self._manifest_hash: 

101 result['_current_manifest'] = self._manifest_hash 

102 with self._lock: 

103 result.update(self._runtime_hashes) 

104 return result 

105 

106 def add_runtime_hash(self, version: str, code_hash: str) -> None: 

107 """Add a hash discovered from a verified peer at runtime. 

108 

109 Thread-safe. Bounded to _MAX_RUNTIME_HASHES entries (FIFO eviction). 

110 Only call this for hashes from peers that passed full verification 

111 (signature + master_key_verified). 

112 """ 

113 if not version or not code_hash: 

114 return 

115 with self._lock: 

116 self._runtime_hashes[version] = code_hash 

117 # FIFO eviction if over limit 

118 while len(self._runtime_hashes) > _MAX_RUNTIME_HASHES: 

119 self._runtime_hashes.popitem(last=False) 

120 

121 def hash_count(self) -> int: 

122 """Total number of known hashes (for diagnostics).""" 

123 count = len(_KNOWN_HASHES) 

124 if self._manifest_hash: 

125 count += 1 

126 with self._lock: 

127 count += len(self._runtime_hashes) 

128 return count 

129 

130 

131# ── Module-level singleton ──────────────────────────────────────── 

132_registry = None 

133_registry_lock = threading.Lock() 

134 

135 

136def get_release_hash_registry() -> ReleaseHashRegistry: 

137 """Get or create the singleton ReleaseHashRegistry.""" 

138 global _registry 

139 if _registry is None: 

140 with _registry_lock: 

141 if _registry is None: 

142 _registry = ReleaseHashRegistry() 

143 return _registry