Coverage for security / hsm_trust.py: 0.0%

159 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HSM Trust Network - Certificate pinning and path protection for HSM connections. 

3 

4Ensures the entire path from application to HSM is protected: 

5 1. TLS certificate pinning to HSM endpoints 

6 2. mTLS client certificates for HSM authentication 

7 3. Request signing for HSM API calls 

8 4. Audit logging of all trust path events 

9 5. Health monitoring of HSM connectivity 

10 

11Trust chain: 

12 Application (this code) 

13 ↓ mTLS / IAM 

14 HSM Endpoint (pinned certificate) 

15 ↓ hardware boundary 

16 HSM Hardware (FIPS 140-2 Level 3) 

17 ↓ internal 

18 Private Key (NEVER extracted) 

19 

20┌─────────────────────────────────────────────────────────────────┐ 

21│ AI EXCLUSION ZONE │ 

22│ AI tools MUST NOT modify trust anchors or certificate pins. │ 

23└─────────────────────────────────────────────────────────────────┘ 

24""" 

25import os 

26import ssl 

27import json 

28import hashlib 

29import logging 

30import threading 

31import time 

32from typing import Optional, Dict, List 

33from datetime import datetime 

34 

35logger = logging.getLogger('hevolve_security') 

36 

37# ═══════════════════════════════════════════════════════════════ 

38# Certificate Pin Store 

39# ═══════════════════════════════════════════════════════════════ 

40 

41# SHA-256 pins of trusted HSM endpoint certificates. 

42# These are loaded from config or hardcoded for known cloud HSM services. 

43_KNOWN_HSM_PINS: Dict[str, List[str]] = { 

44 # Google Cloud KMS regional endpoints 

45 'cloudkms.googleapis.com': [ 

46 # Google Trust Services root CA pins (GTS Root R1-R4) 

47 # These are public, well-known pins for Google's PKI 

48 ], 

49 # Azure Key Vault 

50 'vault.azure.net': [], 

51 # HashiCorp Vault - user-configured, loaded from HART_VAULT_CA_CERT 

52} 

53 

54 

55class HSMTrustManager: 

56 """Manages TLS trust for HSM connections. 

57 

58 Responsibilities: 

59 - Certificate pinning for HSM endpoints 

60 - mTLS client certificate management 

61 - Connection health monitoring 

62 - Audit trail of trust path events 

63 """ 

64 

65 def __init__(self): 

66 self._pins: Dict[str, List[str]] = {} 

67 self._lock = threading.Lock() 

68 self._health_history: list = [] 

69 self._load_pins() 

70 

71 def _load_pins(self): 

72 """Load certificate pins from config file or environment.""" 

73 # User-configured pins 

74 pin_file = os.environ.get('HART_HSM_PIN_FILE', '') 

75 if pin_file and os.path.exists(pin_file): 

76 try: 

77 with open(pin_file, 'r') as f: 

78 user_pins = json.load(f) 

79 self._pins.update(user_pins) 

80 logger.info(f"Loaded HSM certificate pins from {pin_file}") 

81 except Exception as e: 

82 logger.warning(f"Failed to load HSM pin file: {e}") 

83 

84 # Vault CA cert → derive pin 

85 vault_ca = os.environ.get('HART_VAULT_CA_CERT', '') 

86 if vault_ca and os.path.exists(vault_ca): 

87 try: 

88 pin = self._compute_cert_pin(vault_ca) 

89 vault_addr = os.environ.get('HART_VAULT_ADDR', '') 

90 if vault_addr: 

91 from urllib.parse import urlparse 

92 host = urlparse(vault_addr).hostname 

93 if host: 

94 self._pins.setdefault(host, []).append(pin) 

95 logger.info(f"Pinned Vault CA cert for {host}: {pin[:16]}...") 

96 except Exception as e: 

97 logger.debug(f"Vault CA cert pin failed: {e}") 

98 

99 @staticmethod 

100 def _compute_cert_pin(cert_path: str) -> str: 

101 """Compute SHA-256 pin of a certificate's Subject Public Key Info (SPKI).""" 

102 try: 

103 from cryptography import x509 

104 from cryptography.hazmat.primitives.serialization import ( 

105 Encoding, PublicFormat, 

106 ) 

107 with open(cert_path, 'rb') as f: 

108 cert_data = f.read() 

109 if b'BEGIN CERTIFICATE' in cert_data: 

110 cert = x509.load_pem_x509_certificate(cert_data) 

111 else: 

112 cert = x509.load_der_x509_certificate(cert_data) 

113 

114 spki_bytes = cert.public_key().public_bytes( 

115 Encoding.DER, PublicFormat.SubjectPublicKeyInfo) 

116 return hashlib.sha256(spki_bytes).hexdigest() 

117 except Exception as e: 

118 logger.warning(f"Certificate pin computation failed: {e}") 

119 return '' 

120 

121 def create_ssl_context(self, hostname: str) -> ssl.SSLContext: 

122 """Create an SSL context with certificate pinning for an HSM endpoint. 

123 

124 If pins are configured for this hostname, the connection will be 

125 rejected if the server's certificate doesn't match any pin. 

126 """ 

127 ctx = ssl.create_default_context() 

128 

129 # Load custom CA cert if available 

130 ca_cert = os.environ.get('HART_HSM_CA_CERT', '') 

131 if ca_cert and os.path.exists(ca_cert): 

132 ctx.load_verify_locations(ca_cert) 

133 

134 # Load client cert for mTLS if available 

135 client_cert = os.environ.get('HART_HSM_CLIENT_CERT', '') 

136 client_key = os.environ.get('HART_HSM_CLIENT_KEY', '') 

137 if client_cert and client_key: 

138 try: 

139 ctx.load_cert_chain(client_cert, client_key) 

140 logger.debug(f"mTLS client cert loaded for {hostname}") 

141 except Exception as e: 

142 logger.warning(f"mTLS client cert load failed: {e}") 

143 

144 # Set minimum TLS version 

145 ctx.minimum_version = ssl.TLSVersion.TLSv1_2 

146 

147 return ctx 

148 

149 def verify_connection(self, hostname: str, port: int = 443) -> Dict: 

150 """Verify TLS connection to an HSM endpoint. 

151 Checks certificate chain and pin matching.""" 

152 import socket 

153 result = { 

154 'hostname': hostname, 

155 'port': port, 

156 'connected': False, 

157 'tls_version': None, 

158 'cert_pin': None, 

159 'pin_matched': None, 

160 'timestamp': datetime.utcnow().isoformat(), 

161 } 

162 

163 try: 

164 ctx = self.create_ssl_context(hostname) 

165 with socket.create_connection((hostname, port), timeout=10) as sock: 

166 with ctx.wrap_socket(sock, server_hostname=hostname) as ssock: 

167 result['connected'] = True 

168 result['tls_version'] = ssock.version() 

169 

170 # Get server cert and compute pin 

171 cert_der = ssock.getpeercert(binary_form=True) 

172 if cert_der: 

173 from cryptography import x509 

174 from cryptography.hazmat.primitives.serialization import ( 

175 Encoding, PublicFormat, 

176 ) 

177 cert = x509.load_der_x509_certificate(cert_der) 

178 spki = cert.public_key().public_bytes( 

179 Encoding.DER, PublicFormat.SubjectPublicKeyInfo) 

180 pin = hashlib.sha256(spki).hexdigest() 

181 result['cert_pin'] = pin 

182 

183 # Check against known pins 

184 known = self._pins.get(hostname, []) 

185 if known: 

186 result['pin_matched'] = pin in known 

187 else: 

188 result['pin_matched'] = None # No pins configured 

189 

190 except Exception as e: 

191 result['error'] = str(e) 

192 

193 with self._lock: 

194 self._health_history.append(result) 

195 if len(self._health_history) > 100: 

196 self._health_history = self._health_history[-50:] 

197 

198 return result 

199 

200 def get_trust_status(self) -> Dict: 

201 """Return trust network status for dashboards.""" 

202 return { 

203 'pins_configured': {k: len(v) for k, v in self._pins.items() if v}, 

204 'mtls_configured': bool( 

205 os.environ.get('HART_HSM_CLIENT_CERT') and 

206 os.environ.get('HART_HSM_CLIENT_KEY')), 

207 'custom_ca': bool(os.environ.get('HART_HSM_CA_CERT')), 

208 'vault_ca': bool(os.environ.get('HART_VAULT_CA_CERT')), 

209 'recent_checks': self._health_history[-5:] if self._health_history else [], 

210 } 

211 

212 

213# ═══════════════════════════════════════════════════════════════ 

214# Path Protection Monitor 

215# ═══════════════════════════════════════════════════════════════ 

216 

217class HSMPathMonitor: 

218 """Background monitor that continuously verifies HSM trust path integrity. 

219 

220 Checks: 

221 1. HSM endpoint reachability 

222 2. Certificate pin consistency (hasn't changed unexpectedly) 

223 3. Public key from HSM matches trust anchor 

224 4. Signing operations produce valid signatures 

225 """ 

226 

227 def __init__(self, check_interval: int = 300): 

228 self._interval = check_interval 

229 self._running = False 

230 self._thread = None 

231 self._trust_manager = HSMTrustManager() 

232 self._last_check: Optional[Dict] = None 

233 

234 def start(self): 

235 if self._running: 

236 return 

237 self._running = True 

238 self._thread = threading.Thread(target=self._monitor_loop, daemon=True) 

239 self._thread.start() 

240 logger.info(f"HSM path monitor started (interval={self._interval}s)") 

241 

242 def stop(self): 

243 self._running = False 

244 if self._thread: 

245 self._thread.join(timeout=10) 

246 

247 def _monitor_loop(self): 

248 while self._running: 

249 try: 

250 self._last_check = self._check_path() 

251 if not self._last_check.get('healthy'): 

252 logger.warning( 

253 f"HSM path unhealthy: {self._last_check.get('details')}") 

254 except Exception as e: 

255 logger.debug(f"HSM monitor error: {e}") 

256 time.sleep(self._interval) 

257 

258 def _check_path(self) -> Dict: 

259 """Run full trust path verification.""" 

260 from .hsm_provider import get_hsm_provider, is_hsm_available, EnvVarFallbackProvider 

261 

262 result = { 

263 'timestamp': datetime.utcnow().isoformat(), 

264 'healthy': True, 

265 'checks': {}, 

266 } 

267 

268 # Check 1: HSM availability 

269 hsm_ok = is_hsm_available() 

270 result['checks']['hsm_available'] = hsm_ok 

271 if not hsm_ok: 

272 result['healthy'] = False 

273 result['details'] = 'No HSM provider available' 

274 return result 

275 

276 provider = get_hsm_provider() 

277 result['checks']['provider'] = provider.get_provider_name() 

278 result['checks']['hardware_backed'] = not isinstance( 

279 provider, EnvVarFallbackProvider) 

280 

281 # Check 2: Public key matches trust anchor 

282 try: 

283 from security.master_key import MASTER_PUBLIC_KEY_HEX 

284 hsm_pub = provider.get_public_key_hex() 

285 pub_match = hsm_pub == MASTER_PUBLIC_KEY_HEX 

286 result['checks']['public_key_match'] = pub_match 

287 if not pub_match: 

288 result['healthy'] = False 

289 result['details'] = 'HSM public key does not match trust anchor' 

290 return result 

291 except Exception as e: 

292 result['checks']['public_key_match'] = False 

293 result['healthy'] = False 

294 result['details'] = f'Public key check failed: {e}' 

295 return result 

296 

297 # Check 3: Test sign + verify round-trip 

298 try: 

299 test_payload = {'_hsm_health_check': True, 

300 'timestamp': datetime.utcnow().isoformat()} 

301 sig_hex = provider.sign_json_payload(test_payload) 

302 from security.master_key import verify_master_signature 

303 valid = verify_master_signature(test_payload, sig_hex) 

304 result['checks']['sign_verify_roundtrip'] = valid 

305 if not valid: 

306 result['healthy'] = False 

307 result['details'] = 'HSM sign→verify round-trip failed' 

308 except Exception as e: 

309 result['checks']['sign_verify_roundtrip'] = False 

310 result['healthy'] = False 

311 result['details'] = f'Sign/verify test failed: {e}' 

312 

313 if result['healthy']: 

314 result['details'] = 'All HSM path checks passed' 

315 

316 return result 

317 

318 def get_last_check(self) -> Optional[Dict]: 

319 return self._last_check 

320 

321 def get_trust_status(self) -> Dict: 

322 return self._trust_manager.get_trust_status() 

323 

324 

325# Module-level singleton 

326_path_monitor: Optional[HSMPathMonitor] = None 

327 

328 

329def get_path_monitor() -> HSMPathMonitor: 

330 global _path_monitor 

331 if _path_monitor is None: 

332 _path_monitor = HSMPathMonitor() 

333 return _path_monitor 

334 

335 

336def start_hsm_monitor(): 

337 """Start the background HSM path monitor.""" 

338 monitor = get_path_monitor() 

339 monitor.start() 

340 return monitor