Coverage for security / hsm_provider.py: 59.6%

245 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HSM Provider - Hardware Security Module abstraction for master key operations. 

3 

4The master private key NEVER leaves the HSM. All signing happens inside 

5the hardware. The application sends a payload, the HSM signs it internally, 

6and returns only the signature. No private key bytes ever appear in memory. 

7 

8Supported backends (in priority order): 

9 1. Google Cloud KMS - Ed25519, FIPS 140-2 Level 3 

10 2. Azure Key Vault HSM - Ed25519, FIPS 140-2 Level 3 

11 3. HashiCorp Vault Transit - Ed25519, self-hostable 

12 4. AWS CloudHSM - via PKCS#11 (Ed25519 support varies) 

13 5. Env var fallback - DEV MODE ONLY, loud warnings 

14 

15┌─────────────────────────────────────────────────────────────────┐ 

16│ AI EXCLUSION ZONE │ 

17│ AI tools MUST NOT call any signing function in this module. │ 

18│ See CLAUDE.md for the full exclusion policy. │ 

19└─────────────────────────────────────────────────────────────────┘ 

20 

21Trust path: 

22 Application → mTLS/IAM → HSM API → Hardware signs → signature returned 

23 

24 Certificate pinned to HSM endpoint (see hsm_trust.py) 

25 Audit log of every signing operation 

26 Rate limited (max N signs per hour) 

27""" 

28import os 

29import json 

30import time 

31import logging 

32import threading 

33import hashlib 

34from abc import ABC, abstractmethod 

35from typing import Optional, Dict 

36from datetime import datetime 

37 

38logger = logging.getLogger('hevolve_security') 

39 

40# Rate limit: max signing operations per hour (safety against runaway code) 

41_MAX_SIGNS_PER_HOUR = int(os.environ.get('HART_HSM_MAX_SIGNS_PER_HOUR', '50')) 

42 

43 

44class HSMSigningError(Exception): 

45 """Raised when HSM signing operation fails.""" 

46 pass 

47 

48 

49class HSMUnavailableError(Exception): 

50 """Raised when no HSM backend is available.""" 

51 pass 

52 

53 

54# ═══════════════════════════════════════════════════════════════ 

55# Abstract HSM Provider 

56# ═══════════════════════════════════════════════════════════════ 

57 

58class HSMProvider(ABC): 

59 """Abstract base for HSM backends. Key NEVER leaves the hardware.""" 

60 

61 def __init__(self): 

62 self._sign_count = 0 

63 self._sign_window_start = time.time() 

64 self._lock = threading.Lock() 

65 self._audit_log = [] 

66 

67 @abstractmethod 

68 def sign(self, payload_bytes: bytes) -> bytes: 

69 """Sign raw bytes inside HSM. Returns raw Ed25519 signature (64 bytes). 

70 The private key never leaves the hardware.""" 

71 pass 

72 

73 @abstractmethod 

74 def get_public_key_hex(self) -> str: 

75 """Return the public key hex from the HSM (for verification against trust anchor).""" 

76 pass 

77 

78 @abstractmethod 

79 def is_available(self) -> bool: 

80 """Check if this HSM backend is configured and reachable.""" 

81 pass 

82 

83 @abstractmethod 

84 def get_provider_name(self) -> str: 

85 """Return human-readable provider name for audit logs.""" 

86 pass 

87 

88 def sign_json_payload(self, payload: dict) -> str: 

89 """Sign a JSON payload canonically. Returns hex signature. 

90 Enforces rate limiting and audit logging.""" 

91 self._enforce_rate_limit() 

92 

93 canonical = json.dumps(payload, sort_keys=True, separators=(',', ':')) 

94 payload_bytes = canonical.encode('utf-8') 

95 

96 sig_bytes = self.sign(payload_bytes) 

97 sig_hex = sig_bytes.hex() 

98 

99 self._audit_sign(payload, sig_hex) 

100 return sig_hex 

101 

102 def _enforce_rate_limit(self): 

103 """Rate limit signing operations. Safety against runaway code.""" 

104 with self._lock: 

105 now = time.time() 

106 if now - self._sign_window_start > 3600: 

107 self._sign_count = 0 

108 self._sign_window_start = now 

109 self._sign_count += 1 

110 if self._sign_count > _MAX_SIGNS_PER_HOUR: 

111 raise HSMSigningError( 

112 f'HSM rate limit exceeded: {self._sign_count} signs in current hour ' 

113 f'(max {_MAX_SIGNS_PER_HOUR}). This may indicate runaway code.') 

114 

115 def _audit_sign(self, payload: dict, sig_hex: str): 

116 """Log every signing operation for audit trail.""" 

117 entry = { 

118 'timestamp': datetime.utcnow().isoformat(), 

119 'provider': self.get_provider_name(), 

120 'payload_hash': hashlib.sha256( 

121 json.dumps(payload, sort_keys=True).encode()).hexdigest()[:16], 

122 'signature_prefix': sig_hex[:16], 

123 'sign_count': self._sign_count, 

124 } 

125 with self._lock: 

126 self._audit_log.append(entry) 

127 # Keep last 1000 entries 

128 if len(self._audit_log) > 1000: 

129 self._audit_log = self._audit_log[-500:] 

130 logger.info(f"HSM sign [{entry['provider']}]: " 

131 f"payload_hash={entry['payload_hash']}... " 

132 f"count={entry['sign_count']}") 

133 

134 def get_audit_log(self) -> list: 

135 """Return recent signing audit entries.""" 

136 with self._lock: 

137 return list(self._audit_log) 

138 

139 

140# ═══════════════════════════════════════════════════════════════ 

141# Google Cloud KMS Provider 

142# ═══════════════════════════════════════════════════════════════ 

143 

144class GoogleCloudKMSProvider(HSMProvider): 

145 """Google Cloud KMS with Ed25519 key (FIPS 140-2 Level 3 HSM backend). 

146 

147 Required env vars: 

148 HART_GCP_KMS_KEY_PATH: projects/{project}/locations/{location}/keyRings/{ring}/cryptoKeys/{key}/cryptoKeyVersions/{version} 

149 GOOGLE_APPLICATION_CREDENTIALS: path to service account JSON (or use workload identity) 

150 """ 

151 

152 def __init__(self): 

153 super().__init__() 

154 self._key_path = os.environ.get('HART_GCP_KMS_KEY_PATH', '') 

155 self._client = None 

156 

157 def _get_client(self): 

158 if self._client is None: 

159 from google.cloud import kms # type: ignore 

160 self._client = kms.KeyManagementServiceClient() 

161 return self._client 

162 

163 def sign(self, payload_bytes: bytes) -> bytes: 

164 from google.cloud.kms import CryptoKeyVersion # type: ignore 

165 client = self._get_client() 

166 

167 # CRC32C integrity check 

168 import struct 

169 import binascii 

170 crc32c = binascii.crc32(payload_bytes) & 0xffffffff 

171 

172 response = client.asymmetric_sign( 

173 request={ 

174 'name': self._key_path, 

175 'data': payload_bytes, 

176 'data_crc32c': {'value': crc32c}, 

177 } 

178 ) 

179 

180 # Verify response integrity 

181 if not response.verified_data_crc32c: 

182 raise HSMSigningError('GCP KMS: data integrity check failed') 

183 

184 return response.signature 

185 

186 def get_public_key_hex(self) -> str: 

187 client = self._get_client() 

188 pub_key = client.get_public_key(request={'name': self._key_path}) 

189 # Parse PEM to get raw Ed25519 public key bytes 

190 from cryptography.hazmat.primitives.serialization import load_pem_public_key 

191 key = load_pem_public_key(pub_key.pem.encode()) 

192 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat 

193 raw = key.public_bytes(Encoding.Raw, PublicFormat.Raw) 

194 return raw.hex() 

195 

196 def is_available(self) -> bool: 

197 if not self._key_path: 

198 return False 

199 try: 

200 self._get_client() 

201 return True 

202 except Exception: 

203 return False 

204 

205 def get_provider_name(self) -> str: 

206 return 'google_cloud_kms' 

207 

208 

209# ═══════════════════════════════════════════════════════════════ 

210# Azure Key Vault HSM Provider 

211# ═══════════════════════════════════════════════════════════════ 

212 

213class AzureKeyVaultProvider(HSMProvider): 

214 """Azure Key Vault with Managed HSM (FIPS 140-2 Level 3). 

215 

216 Required env vars: 

217 HART_AZURE_VAULT_URL: https://{vault-name}.vault.azure.net 

218 HART_AZURE_KEY_NAME: name of the Ed25519 key in the vault 

219 HART_AZURE_KEY_VERSION: (optional) specific version 

220 Authentication: DefaultAzureCredential (managed identity, CLI, env vars) 

221 """ 

222 

223 def __init__(self): 

224 super().__init__() 

225 self._vault_url = os.environ.get('HART_AZURE_VAULT_URL', '') 

226 self._key_name = os.environ.get('HART_AZURE_KEY_NAME', '') 

227 self._key_version = os.environ.get('HART_AZURE_KEY_VERSION', '') 

228 self._client = None 

229 self._crypto_client = None 

230 

231 def _get_clients(self): 

232 if self._client is None: 

233 from azure.identity import DefaultAzureCredential # type: ignore 

234 from azure.keyvault.keys import KeyClient # type: ignore 

235 from azure.keyvault.keys.crypto import CryptographyClient, SignatureAlgorithm # type: ignore 

236 credential = DefaultAzureCredential() 

237 self._client = KeyClient(vault_url=self._vault_url, credential=credential) 

238 key = self._client.get_key(self._key_name, self._key_version or None) 

239 self._crypto_client = CryptographyClient(key, credential=credential) 

240 return self._client, self._crypto_client 

241 

242 def sign(self, payload_bytes: bytes) -> bytes: 

243 from azure.keyvault.keys.crypto import SignatureAlgorithm # type: ignore 

244 _, crypto = self._get_clients() 

245 # Ed25519 signs raw bytes (no pre-hashing needed) 

246 result = crypto.sign(SignatureAlgorithm.eddsa, payload_bytes) 

247 return result.signature 

248 

249 def get_public_key_hex(self) -> str: 

250 client, _ = self._get_clients() 

251 key = client.get_key(self._key_name, self._key_version or None) 

252 # Azure returns JWK; extract raw Ed25519 public key 

253 import base64 

254 x = key.key.x # Raw public key bytes (base64url-encoded in JWK) 

255 if isinstance(x, str): 

256 x = base64.urlsafe_b64decode(x + '==') 

257 return x.hex() 

258 

259 def is_available(self) -> bool: 

260 if not self._vault_url or not self._key_name: 

261 return False 

262 try: 

263 self._get_clients() 

264 return True 

265 except Exception: 

266 return False 

267 

268 def get_provider_name(self) -> str: 

269 return 'azure_key_vault' 

270 

271 

272# ═══════════════════════════════════════════════════════════════ 

273# HashiCorp Vault Transit Provider 

274# ═══════════════════════════════════════════════════════════════ 

275 

276class VaultTransitProvider(HSMProvider): 

277 """HashiCorp Vault Transit secrets engine with Ed25519. 

278 

279 Self-hostable. Can back onto physical HSMs (PKCS#11 seal). 

280 

281 Required env vars: 

282 HART_VAULT_ADDR: https://vault.example.com:8200 

283 HART_VAULT_TOKEN: auth token (or use AppRole/K8s auth) 

284 HART_VAULT_KEY_NAME: name of the transit key (type: ed25519) 

285 HART_VAULT_MOUNT: transit mount path (default: transit) 

286 HART_VAULT_CA_CERT: (optional) CA cert for TLS verification 

287 """ 

288 

289 def __init__(self): 

290 super().__init__() 

291 self._addr = os.environ.get('HART_VAULT_ADDR', '') 

292 self._token = os.environ.get('HART_VAULT_TOKEN', '') 

293 self._key_name = os.environ.get('HART_VAULT_KEY_NAME', 'hart-master') 

294 self._mount = os.environ.get('HART_VAULT_MOUNT', 'transit') 

295 self._ca_cert = os.environ.get('HART_VAULT_CA_CERT', '') 

296 self._client = None 

297 

298 def _get_client(self): 

299 if self._client is None: 

300 import hvac # type: ignore 

301 kwargs = {'url': self._addr, 'token': self._token} 

302 if self._ca_cert: 

303 kwargs['verify'] = self._ca_cert 

304 self._client = hvac.Client(**kwargs) 

305 return self._client 

306 

307 def sign(self, payload_bytes: bytes) -> bytes: 

308 import base64 

309 client = self._get_client() 

310 

311 # Vault Transit expects base64-encoded input 

312 b64_input = base64.b64encode(payload_bytes).decode() 

313 

314 response = client.secrets.transit.sign_data( 

315 name=self._key_name, 

316 hash_input=b64_input, 

317 marshaling_algorithm='jws', 

318 mount_point=self._mount, 

319 ) 

320 

321 # Response signature is vault:v1:base64signature 

322 sig_str = response['data']['signature'] 

323 # Strip vault:vN: prefix 

324 sig_b64 = sig_str.split(':')[-1] 

325 return base64.b64decode(sig_b64) 

326 

327 def get_public_key_hex(self) -> str: 

328 import base64 

329 client = self._get_client() 

330 response = client.secrets.transit.read_key( 

331 name=self._key_name, 

332 mount_point=self._mount, 

333 ) 

334 # Get latest version's public key 

335 keys = response['data']['keys'] 

336 latest = str(max(int(k) for k in keys.keys())) 

337 pub_b64 = keys[latest].get('public_key', '') 

338 if pub_b64: 

339 # PEM or base64 raw depending on Vault version 

340 if 'BEGIN' in pub_b64: 

341 from cryptography.hazmat.primitives.serialization import load_pem_public_key, Encoding, PublicFormat 

342 key = load_pem_public_key(pub_b64.encode()) 

343 return key.public_bytes(Encoding.Raw, PublicFormat.Raw).hex() 

344 return base64.b64decode(pub_b64).hex() 

345 return '' 

346 

347 def is_available(self) -> bool: 

348 if not self._addr or not self._token: 

349 return False 

350 try: 

351 client = self._get_client() 

352 return client.is_authenticated() 

353 except Exception: 

354 return False 

355 

356 def get_provider_name(self) -> str: 

357 return 'hashicorp_vault' 

358 

359 

360# ═══════════════════════════════════════════════════════════════ 

361# Environment Variable Fallback (DEV ONLY) 

362# ═══════════════════════════════════════════════════════════════ 

363 

364class EnvVarFallbackProvider(HSMProvider): 

365 """DEVELOPMENT ONLY. Loads private key from environment variable. 

366 

367 ⚠️ This is NOT hardware-protected. The private key exists in process memory. 

368 ⚠️ Only for local development and testing. 

369 ⚠️ In production, use a real HSM provider. 

370 """ 

371 

372 def __init__(self): 

373 super().__init__() 

374 self._warned = False 

375 

376 def _warn_once(self): 

377 if not self._warned: 

378 self._warned = True 

379 import sys 

380 msg = ( 

381 "\n" + "=" * 70 + 

382 "\n WARNING: Master key loaded from environment variable." 

383 "\n This is NOT hardware-protected. Use an HSM in production." 

384 "\n Set HART_GCP_KMS_KEY_PATH, HART_AZURE_VAULT_URL, or" 

385 "\n HART_VAULT_ADDR to enable HSM protection." 

386 "\n" + "=" * 70 + "\n" 

387 ) 

388 print(msg, file=sys.stderr) 

389 logger.warning("Master key signing via env var fallback - NOT HSM-protected") 

390 

391 def sign(self, payload_bytes: bytes) -> bytes: 

392 self._warn_once() 

393 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 

394 hex_key = os.environ.get('HEVOLVE_MASTER_PRIVATE_KEY', '') 

395 if not hex_key: 

396 raise HSMSigningError( 

397 'No HSM configured and HEVOLVE_MASTER_PRIVATE_KEY not set. ' 

398 'Cannot sign without master key.') 

399 try: 

400 priv = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(hex_key)) 

401 return priv.sign(payload_bytes) 

402 except Exception as e: 

403 raise HSMSigningError(f'Env var signing failed: {e}') 

404 

405 def get_public_key_hex(self) -> str: 

406 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 

407 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat 

408 hex_key = os.environ.get('HEVOLVE_MASTER_PRIVATE_KEY', '') 

409 if not hex_key: 

410 return '' 

411 priv = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(hex_key)) 

412 return priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex() 

413 

414 def is_available(self) -> bool: 

415 return bool(os.environ.get('HEVOLVE_MASTER_PRIVATE_KEY', '')) 

416 

417 def get_provider_name(self) -> str: 

418 return 'env_var_fallback_DEV_ONLY' 

419 

420 

421# ═══════════════════════════════════════════════════════════════ 

422# Provider Registry & Singleton 

423# ═══════════════════════════════════════════════════════════════ 

424 

425# Priority order: real HSMs first, env var fallback last 

426_PROVIDER_CLASSES = [ 

427 GoogleCloudKMSProvider, 

428 AzureKeyVaultProvider, 

429 VaultTransitProvider, 

430 EnvVarFallbackProvider, 

431] 

432 

433_active_provider: Optional[HSMProvider] = None 

434_provider_lock = threading.Lock() 

435 

436 

437def get_hsm_provider() -> HSMProvider: 

438 """Get the active HSM provider. Auto-selects based on available config. 

439 Priority: GCP KMS > Azure Key Vault > HashiCorp Vault > env var fallback. 

440 Raises HSMUnavailableError if nothing is configured.""" 

441 global _active_provider 

442 if _active_provider is not None: 

443 return _active_provider 

444 

445 with _provider_lock: 

446 if _active_provider is not None: 

447 return _active_provider 

448 

449 for cls in _PROVIDER_CLASSES: 

450 try: 

451 provider = cls() 

452 if provider.is_available(): 

453 # Verify public key matches trust anchor 

454 from security.master_key import MASTER_PUBLIC_KEY_HEX 

455 hsm_pub = provider.get_public_key_hex() 

456 if hsm_pub and hsm_pub != MASTER_PUBLIC_KEY_HEX: 

457 logger.error( 

458 f"HSM provider {provider.get_provider_name()}: " 

459 f"public key mismatch! HSM={hsm_pub[:16]}... " 

460 f"expected={MASTER_PUBLIC_KEY_HEX[:16]}...") 

461 continue # Wrong key - try next provider 

462 

463 _active_provider = provider 

464 logger.info(f"HSM provider active: {provider.get_provider_name()}") 

465 return provider 

466 except Exception as e: 

467 logger.debug(f"HSM provider {cls.__name__} unavailable: {e}") 

468 

469 raise HSMUnavailableError( 

470 'No HSM backend available. Configure one of: ' 

471 'HART_GCP_KMS_KEY_PATH (Google), ' 

472 'HART_AZURE_VAULT_URL (Azure), ' 

473 'HART_VAULT_ADDR (HashiCorp Vault), ' 

474 'or HEVOLVE_MASTER_PRIVATE_KEY (dev fallback).') 

475 

476 

477def is_hsm_available() -> bool: 

478 """Check if any HSM provider is available (without raising).""" 

479 try: 

480 get_hsm_provider() 

481 return True 

482 except HSMUnavailableError: 

483 return False 

484 

485 

486def get_hsm_status() -> Dict: 

487 """Return HSM status for dashboards and health checks.""" 

488 try: 

489 provider = get_hsm_provider() 

490 return { 

491 'available': True, 

492 'provider': provider.get_provider_name(), 

493 'hardware_backed': not isinstance(provider, EnvVarFallbackProvider), 

494 'sign_count': provider._sign_count, 

495 'audit_entries': len(provider._audit_log), 

496 } 

497 except HSMUnavailableError: 

498 return { 

499 'available': False, 

500 'provider': None, 

501 'hardware_backed': False, 

502 'sign_count': 0, 

503 'audit_entries': 0, 

504 } 

505 

506 

507def hsm_sign_payload(payload: dict) -> str: 

508 """Sign a JSON payload via the active HSM. Returns hex signature. 

509 The private key NEVER leaves the HSM hardware.""" 

510 provider = get_hsm_provider() 

511 return provider.sign_json_payload(payload)