Coverage for security / release_hash_registry.py: 95.0%
60 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Release Hash Registry — Multi-version code hash allowlist.
4Maintains a set of known-good code hashes from GA releases so the
5perimeter can accept peers running any valid version (not just the
6current one). Critical during rolling upgrades where the network
7has a mix of old and new nodes.
9Populated by:
10 1. _KNOWN_HASHES dict — hardcoded by CI/CD at release time
11 (scripts/update_release_hashes.py writes this dict)
12 2. Current release manifest — always trusted
13 3. Runtime discovery — hashes from verified peers (bounded, thread-safe)
15Usage at perimeter:
16 from security.release_hash_registry import ReleaseHashRegistry
17 registry = ReleaseHashRegistry()
18 if not registry.is_known_release_hash(peer_code_hash):
19 reject_peer()
20"""
21import logging
22import os
23import threading
24from collections import OrderedDict
25from typing import Dict, Optional
27logger = logging.getLogger('hevolve_security')
29# ── CI/CD-populated GA release hashes ────────────────────────────
30# Format: {'version_string': 'sha256_code_hash'}
31# Updated automatically by scripts/update_release_hashes.py before
32# each release signing. Do NOT edit manually.
33_KNOWN_HASHES: Dict[str, str] = {
34 # CI/CD will append entries here, e.g.:
35 # '1.0.0': 'abc123...',
36 # '1.1.0': 'def456...',
37}
39# Maximum runtime-discovered hashes to keep (prevents unbounded growth)
40_MAX_RUNTIME_HASHES = 50
43class ReleaseHashRegistry:
44 """Thread-safe registry of known-good code hashes.
46 Combines:
47 - Hardcoded GA release hashes (_KNOWN_HASHES)
48 - Current release manifest's code_hash
49 - Runtime-discovered hashes from verified peers
50 """
52 def __init__(self):
53 self._lock = threading.Lock()
54 # Runtime hashes: bounded OrderedDict (FIFO eviction)
55 self._runtime_hashes: OrderedDict = OrderedDict()
56 self._manifest_hash: Optional[str] = None
57 self._load_from_manifest()
59 def _load_from_manifest(self) -> None:
60 """Load the current release manifest's code_hash as always-trusted."""
61 try:
62 from security.master_key import (
63 load_release_manifest, verify_release_manifest,
64 )
65 manifest = load_release_manifest()
66 if manifest and verify_release_manifest(manifest):
67 self._manifest_hash = manifest.get('code_hash', '')
68 except Exception:
69 pass
71 def is_known_release_hash(self, code_hash: str) -> bool:
72 """Check if a code hash belongs to any known GA release.
74 Returns True if the hash matches:
75 1. Any hardcoded GA release hash
76 2. The current release manifest's hash
77 3. Any runtime-discovered hash from a verified peer
78 """
79 if not code_hash:
80 return False
82 # 1. Hardcoded GA releases
83 if code_hash in _KNOWN_HASHES.values():
84 return True
86 # 2. Current manifest
87 if self._manifest_hash and code_hash == self._manifest_hash:
88 return True
90 # 3. Runtime-discovered
91 with self._lock:
92 if code_hash in self._runtime_hashes.values():
93 return True
95 return False
97 def get_known_versions(self) -> Dict[str, str]:
98 """Return all known version→hash mappings (for diagnostics)."""
99 result = dict(_KNOWN_HASHES)
100 if self._manifest_hash:
101 result['_current_manifest'] = self._manifest_hash
102 with self._lock:
103 result.update(self._runtime_hashes)
104 return result
106 def add_runtime_hash(self, version: str, code_hash: str) -> None:
107 """Add a hash discovered from a verified peer at runtime.
109 Thread-safe. Bounded to _MAX_RUNTIME_HASHES entries (FIFO eviction).
110 Only call this for hashes from peers that passed full verification
111 (signature + master_key_verified).
112 """
113 if not version or not code_hash:
114 return
115 with self._lock:
116 self._runtime_hashes[version] = code_hash
117 # FIFO eviction if over limit
118 while len(self._runtime_hashes) > _MAX_RUNTIME_HASHES:
119 self._runtime_hashes.popitem(last=False)
121 def hash_count(self) -> int:
122 """Total number of known hashes (for diagnostics)."""
123 count = len(_KNOWN_HASHES)
124 if self._manifest_hash:
125 count += 1
126 with self._lock:
127 count += len(self._runtime_hashes)
128 return count
131# ── Module-level singleton ────────────────────────────────────────
132_registry = None
133_registry_lock = threading.Lock()
136def get_release_hash_registry() -> ReleaseHashRegistry:
137 """Get or create the singleton ReleaseHashRegistry."""
138 global _registry
139 if _registry is None:
140 with _registry_lock:
141 if _registry is None:
142 _registry = ReleaseHashRegistry()
143 return _registry