Coverage for security / hsm_trust.py: 0.0%
159 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HSM Trust Network - Certificate pinning and path protection for HSM connections.
4Ensures the entire path from application to HSM is protected:
5 1. TLS certificate pinning to HSM endpoints
6 2. mTLS client certificates for HSM authentication
7 3. Request signing for HSM API calls
8 4. Audit logging of all trust path events
9 5. Health monitoring of HSM connectivity
11Trust chain:
12 Application (this code)
13 ↓ mTLS / IAM
14 HSM Endpoint (pinned certificate)
15 ↓ hardware boundary
16 HSM Hardware (FIPS 140-2 Level 3)
17 ↓ internal
18 Private Key (NEVER extracted)
20┌─────────────────────────────────────────────────────────────────┐
21│ AI EXCLUSION ZONE │
22│ AI tools MUST NOT modify trust anchors or certificate pins. │
23└─────────────────────────────────────────────────────────────────┘
24"""
25import os
26import ssl
27import json
28import hashlib
29import logging
30import threading
31import time
32from typing import Optional, Dict, List
33from datetime import datetime
35logger = logging.getLogger('hevolve_security')
37# ═══════════════════════════════════════════════════════════════
38# Certificate Pin Store
39# ═══════════════════════════════════════════════════════════════
41# SHA-256 pins of trusted HSM endpoint certificates.
42# These are loaded from config or hardcoded for known cloud HSM services.
43_KNOWN_HSM_PINS: Dict[str, List[str]] = {
44 # Google Cloud KMS regional endpoints
45 'cloudkms.googleapis.com': [
46 # Google Trust Services root CA pins (GTS Root R1-R4)
47 # These are public, well-known pins for Google's PKI
48 ],
49 # Azure Key Vault
50 'vault.azure.net': [],
51 # HashiCorp Vault - user-configured, loaded from HART_VAULT_CA_CERT
52}
55class HSMTrustManager:
56 """Manages TLS trust for HSM connections.
58 Responsibilities:
59 - Certificate pinning for HSM endpoints
60 - mTLS client certificate management
61 - Connection health monitoring
62 - Audit trail of trust path events
63 """
65 def __init__(self):
66 self._pins: Dict[str, List[str]] = {}
67 self._lock = threading.Lock()
68 self._health_history: list = []
69 self._load_pins()
71 def _load_pins(self):
72 """Load certificate pins from config file or environment."""
73 # User-configured pins
74 pin_file = os.environ.get('HART_HSM_PIN_FILE', '')
75 if pin_file and os.path.exists(pin_file):
76 try:
77 with open(pin_file, 'r') as f:
78 user_pins = json.load(f)
79 self._pins.update(user_pins)
80 logger.info(f"Loaded HSM certificate pins from {pin_file}")
81 except Exception as e:
82 logger.warning(f"Failed to load HSM pin file: {e}")
84 # Vault CA cert → derive pin
85 vault_ca = os.environ.get('HART_VAULT_CA_CERT', '')
86 if vault_ca and os.path.exists(vault_ca):
87 try:
88 pin = self._compute_cert_pin(vault_ca)
89 vault_addr = os.environ.get('HART_VAULT_ADDR', '')
90 if vault_addr:
91 from urllib.parse import urlparse
92 host = urlparse(vault_addr).hostname
93 if host:
94 self._pins.setdefault(host, []).append(pin)
95 logger.info(f"Pinned Vault CA cert for {host}: {pin[:16]}...")
96 except Exception as e:
97 logger.debug(f"Vault CA cert pin failed: {e}")
99 @staticmethod
100 def _compute_cert_pin(cert_path: str) -> str:
101 """Compute SHA-256 pin of a certificate's Subject Public Key Info (SPKI)."""
102 try:
103 from cryptography import x509
104 from cryptography.hazmat.primitives.serialization import (
105 Encoding, PublicFormat,
106 )
107 with open(cert_path, 'rb') as f:
108 cert_data = f.read()
109 if b'BEGIN CERTIFICATE' in cert_data:
110 cert = x509.load_pem_x509_certificate(cert_data)
111 else:
112 cert = x509.load_der_x509_certificate(cert_data)
114 spki_bytes = cert.public_key().public_bytes(
115 Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
116 return hashlib.sha256(spki_bytes).hexdigest()
117 except Exception as e:
118 logger.warning(f"Certificate pin computation failed: {e}")
119 return ''
121 def create_ssl_context(self, hostname: str) -> ssl.SSLContext:
122 """Create an SSL context with certificate pinning for an HSM endpoint.
124 If pins are configured for this hostname, the connection will be
125 rejected if the server's certificate doesn't match any pin.
126 """
127 ctx = ssl.create_default_context()
129 # Load custom CA cert if available
130 ca_cert = os.environ.get('HART_HSM_CA_CERT', '')
131 if ca_cert and os.path.exists(ca_cert):
132 ctx.load_verify_locations(ca_cert)
134 # Load client cert for mTLS if available
135 client_cert = os.environ.get('HART_HSM_CLIENT_CERT', '')
136 client_key = os.environ.get('HART_HSM_CLIENT_KEY', '')
137 if client_cert and client_key:
138 try:
139 ctx.load_cert_chain(client_cert, client_key)
140 logger.debug(f"mTLS client cert loaded for {hostname}")
141 except Exception as e:
142 logger.warning(f"mTLS client cert load failed: {e}")
144 # Set minimum TLS version
145 ctx.minimum_version = ssl.TLSVersion.TLSv1_2
147 return ctx
149 def verify_connection(self, hostname: str, port: int = 443) -> Dict:
150 """Verify TLS connection to an HSM endpoint.
151 Checks certificate chain and pin matching."""
152 import socket
153 result = {
154 'hostname': hostname,
155 'port': port,
156 'connected': False,
157 'tls_version': None,
158 'cert_pin': None,
159 'pin_matched': None,
160 'timestamp': datetime.utcnow().isoformat(),
161 }
163 try:
164 ctx = self.create_ssl_context(hostname)
165 with socket.create_connection((hostname, port), timeout=10) as sock:
166 with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
167 result['connected'] = True
168 result['tls_version'] = ssock.version()
170 # Get server cert and compute pin
171 cert_der = ssock.getpeercert(binary_form=True)
172 if cert_der:
173 from cryptography import x509
174 from cryptography.hazmat.primitives.serialization import (
175 Encoding, PublicFormat,
176 )
177 cert = x509.load_der_x509_certificate(cert_der)
178 spki = cert.public_key().public_bytes(
179 Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
180 pin = hashlib.sha256(spki).hexdigest()
181 result['cert_pin'] = pin
183 # Check against known pins
184 known = self._pins.get(hostname, [])
185 if known:
186 result['pin_matched'] = pin in known
187 else:
188 result['pin_matched'] = None # No pins configured
190 except Exception as e:
191 result['error'] = str(e)
193 with self._lock:
194 self._health_history.append(result)
195 if len(self._health_history) > 100:
196 self._health_history = self._health_history[-50:]
198 return result
200 def get_trust_status(self) -> Dict:
201 """Return trust network status for dashboards."""
202 return {
203 'pins_configured': {k: len(v) for k, v in self._pins.items() if v},
204 'mtls_configured': bool(
205 os.environ.get('HART_HSM_CLIENT_CERT') and
206 os.environ.get('HART_HSM_CLIENT_KEY')),
207 'custom_ca': bool(os.environ.get('HART_HSM_CA_CERT')),
208 'vault_ca': bool(os.environ.get('HART_VAULT_CA_CERT')),
209 'recent_checks': self._health_history[-5:] if self._health_history else [],
210 }
213# ═══════════════════════════════════════════════════════════════
214# Path Protection Monitor
215# ═══════════════════════════════════════════════════════════════
217class HSMPathMonitor:
218 """Background monitor that continuously verifies HSM trust path integrity.
220 Checks:
221 1. HSM endpoint reachability
222 2. Certificate pin consistency (hasn't changed unexpectedly)
223 3. Public key from HSM matches trust anchor
224 4. Signing operations produce valid signatures
225 """
227 def __init__(self, check_interval: int = 300):
228 self._interval = check_interval
229 self._running = False
230 self._thread = None
231 self._trust_manager = HSMTrustManager()
232 self._last_check: Optional[Dict] = None
234 def start(self):
235 if self._running:
236 return
237 self._running = True
238 self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
239 self._thread.start()
240 logger.info(f"HSM path monitor started (interval={self._interval}s)")
242 def stop(self):
243 self._running = False
244 if self._thread:
245 self._thread.join(timeout=10)
247 def _monitor_loop(self):
248 while self._running:
249 try:
250 self._last_check = self._check_path()
251 if not self._last_check.get('healthy'):
252 logger.warning(
253 f"HSM path unhealthy: {self._last_check.get('details')}")
254 except Exception as e:
255 logger.debug(f"HSM monitor error: {e}")
256 time.sleep(self._interval)
258 def _check_path(self) -> Dict:
259 """Run full trust path verification."""
260 from .hsm_provider import get_hsm_provider, is_hsm_available, EnvVarFallbackProvider
262 result = {
263 'timestamp': datetime.utcnow().isoformat(),
264 'healthy': True,
265 'checks': {},
266 }
268 # Check 1: HSM availability
269 hsm_ok = is_hsm_available()
270 result['checks']['hsm_available'] = hsm_ok
271 if not hsm_ok:
272 result['healthy'] = False
273 result['details'] = 'No HSM provider available'
274 return result
276 provider = get_hsm_provider()
277 result['checks']['provider'] = provider.get_provider_name()
278 result['checks']['hardware_backed'] = not isinstance(
279 provider, EnvVarFallbackProvider)
281 # Check 2: Public key matches trust anchor
282 try:
283 from security.master_key import MASTER_PUBLIC_KEY_HEX
284 hsm_pub = provider.get_public_key_hex()
285 pub_match = hsm_pub == MASTER_PUBLIC_KEY_HEX
286 result['checks']['public_key_match'] = pub_match
287 if not pub_match:
288 result['healthy'] = False
289 result['details'] = 'HSM public key does not match trust anchor'
290 return result
291 except Exception as e:
292 result['checks']['public_key_match'] = False
293 result['healthy'] = False
294 result['details'] = f'Public key check failed: {e}'
295 return result
297 # Check 3: Test sign + verify round-trip
298 try:
299 test_payload = {'_hsm_health_check': True,
300 'timestamp': datetime.utcnow().isoformat()}
301 sig_hex = provider.sign_json_payload(test_payload)
302 from security.master_key import verify_master_signature
303 valid = verify_master_signature(test_payload, sig_hex)
304 result['checks']['sign_verify_roundtrip'] = valid
305 if not valid:
306 result['healthy'] = False
307 result['details'] = 'HSM sign→verify round-trip failed'
308 except Exception as e:
309 result['checks']['sign_verify_roundtrip'] = False
310 result['healthy'] = False
311 result['details'] = f'Sign/verify test failed: {e}'
313 if result['healthy']:
314 result['details'] = 'All HSM path checks passed'
316 return result
318 def get_last_check(self) -> Optional[Dict]:
319 return self._last_check
321 def get_trust_status(self) -> Dict:
322 return self._trust_manager.get_trust_status()
325# Module-level singleton
326_path_monitor: Optional[HSMPathMonitor] = None
329def get_path_monitor() -> HSMPathMonitor:
330 global _path_monitor
331 if _path_monitor is None:
332 _path_monitor = HSMPathMonitor()
333 return _path_monitor
336def start_hsm_monitor():
337 """Start the background HSM path monitor."""
338 monitor = get_path_monitor()
339 monitor.start()
340 return monitor