Coverage for security / hsm_provider.py: 59.6%
245 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HSM Provider - Hardware Security Module abstraction for master key operations.
4The master private key NEVER leaves the HSM. All signing happens inside
5the hardware. The application sends a payload, the HSM signs it internally,
6and returns only the signature. No private key bytes ever appear in memory.
8Supported backends (in priority order):
9 1. Google Cloud KMS - Ed25519, FIPS 140-2 Level 3
10 2. Azure Key Vault HSM - Ed25519, FIPS 140-2 Level 3
11 3. HashiCorp Vault Transit - Ed25519, self-hostable
12 4. AWS CloudHSM - via PKCS#11 (Ed25519 support varies)
13 5. Env var fallback - DEV MODE ONLY, loud warnings
15┌─────────────────────────────────────────────────────────────────┐
16│ AI EXCLUSION ZONE │
17│ AI tools MUST NOT call any signing function in this module. │
18│ See CLAUDE.md for the full exclusion policy. │
19└─────────────────────────────────────────────────────────────────┘
21Trust path:
22 Application → mTLS/IAM → HSM API → Hardware signs → signature returned
23 ↑
24 Certificate pinned to HSM endpoint (see hsm_trust.py)
25 Audit log of every signing operation
26 Rate limited (max N signs per hour)
27"""
28import os
29import json
30import time
31import logging
32import threading
33import hashlib
34from abc import ABC, abstractmethod
35from typing import Optional, Dict
36from datetime import datetime
38logger = logging.getLogger('hevolve_security')
40# Rate limit: max signing operations per hour (safety against runaway code)
41_MAX_SIGNS_PER_HOUR = int(os.environ.get('HART_HSM_MAX_SIGNS_PER_HOUR', '50'))
44class HSMSigningError(Exception):
45 """Raised when HSM signing operation fails."""
46 pass
49class HSMUnavailableError(Exception):
50 """Raised when no HSM backend is available."""
51 pass
54# ═══════════════════════════════════════════════════════════════
55# Abstract HSM Provider
56# ═══════════════════════════════════════════════════════════════
58class HSMProvider(ABC):
59 """Abstract base for HSM backends. Key NEVER leaves the hardware."""
61 def __init__(self):
62 self._sign_count = 0
63 self._sign_window_start = time.time()
64 self._lock = threading.Lock()
65 self._audit_log = []
67 @abstractmethod
68 def sign(self, payload_bytes: bytes) -> bytes:
69 """Sign raw bytes inside HSM. Returns raw Ed25519 signature (64 bytes).
70 The private key never leaves the hardware."""
71 pass
73 @abstractmethod
74 def get_public_key_hex(self) -> str:
75 """Return the public key hex from the HSM (for verification against trust anchor)."""
76 pass
78 @abstractmethod
79 def is_available(self) -> bool:
80 """Check if this HSM backend is configured and reachable."""
81 pass
83 @abstractmethod
84 def get_provider_name(self) -> str:
85 """Return human-readable provider name for audit logs."""
86 pass
88 def sign_json_payload(self, payload: dict) -> str:
89 """Sign a JSON payload canonically. Returns hex signature.
90 Enforces rate limiting and audit logging."""
91 self._enforce_rate_limit()
93 canonical = json.dumps(payload, sort_keys=True, separators=(',', ':'))
94 payload_bytes = canonical.encode('utf-8')
96 sig_bytes = self.sign(payload_bytes)
97 sig_hex = sig_bytes.hex()
99 self._audit_sign(payload, sig_hex)
100 return sig_hex
102 def _enforce_rate_limit(self):
103 """Rate limit signing operations. Safety against runaway code."""
104 with self._lock:
105 now = time.time()
106 if now - self._sign_window_start > 3600:
107 self._sign_count = 0
108 self._sign_window_start = now
109 self._sign_count += 1
110 if self._sign_count > _MAX_SIGNS_PER_HOUR:
111 raise HSMSigningError(
112 f'HSM rate limit exceeded: {self._sign_count} signs in current hour '
113 f'(max {_MAX_SIGNS_PER_HOUR}). This may indicate runaway code.')
115 def _audit_sign(self, payload: dict, sig_hex: str):
116 """Log every signing operation for audit trail."""
117 entry = {
118 'timestamp': datetime.utcnow().isoformat(),
119 'provider': self.get_provider_name(),
120 'payload_hash': hashlib.sha256(
121 json.dumps(payload, sort_keys=True).encode()).hexdigest()[:16],
122 'signature_prefix': sig_hex[:16],
123 'sign_count': self._sign_count,
124 }
125 with self._lock:
126 self._audit_log.append(entry)
127 # Keep last 1000 entries
128 if len(self._audit_log) > 1000:
129 self._audit_log = self._audit_log[-500:]
130 logger.info(f"HSM sign [{entry['provider']}]: "
131 f"payload_hash={entry['payload_hash']}... "
132 f"count={entry['sign_count']}")
134 def get_audit_log(self) -> list:
135 """Return recent signing audit entries."""
136 with self._lock:
137 return list(self._audit_log)
140# ═══════════════════════════════════════════════════════════════
141# Google Cloud KMS Provider
142# ═══════════════════════════════════════════════════════════════
144class GoogleCloudKMSProvider(HSMProvider):
145 """Google Cloud KMS with Ed25519 key (FIPS 140-2 Level 3 HSM backend).
147 Required env vars:
148 HART_GCP_KMS_KEY_PATH: projects/{project}/locations/{location}/keyRings/{ring}/cryptoKeys/{key}/cryptoKeyVersions/{version}
149 GOOGLE_APPLICATION_CREDENTIALS: path to service account JSON (or use workload identity)
150 """
152 def __init__(self):
153 super().__init__()
154 self._key_path = os.environ.get('HART_GCP_KMS_KEY_PATH', '')
155 self._client = None
157 def _get_client(self):
158 if self._client is None:
159 from google.cloud import kms # type: ignore
160 self._client = kms.KeyManagementServiceClient()
161 return self._client
163 def sign(self, payload_bytes: bytes) -> bytes:
164 from google.cloud.kms import CryptoKeyVersion # type: ignore
165 client = self._get_client()
167 # CRC32C integrity check
168 import struct
169 import binascii
170 crc32c = binascii.crc32(payload_bytes) & 0xffffffff
172 response = client.asymmetric_sign(
173 request={
174 'name': self._key_path,
175 'data': payload_bytes,
176 'data_crc32c': {'value': crc32c},
177 }
178 )
180 # Verify response integrity
181 if not response.verified_data_crc32c:
182 raise HSMSigningError('GCP KMS: data integrity check failed')
184 return response.signature
186 def get_public_key_hex(self) -> str:
187 client = self._get_client()
188 pub_key = client.get_public_key(request={'name': self._key_path})
189 # Parse PEM to get raw Ed25519 public key bytes
190 from cryptography.hazmat.primitives.serialization import load_pem_public_key
191 key = load_pem_public_key(pub_key.pem.encode())
192 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
193 raw = key.public_bytes(Encoding.Raw, PublicFormat.Raw)
194 return raw.hex()
196 def is_available(self) -> bool:
197 if not self._key_path:
198 return False
199 try:
200 self._get_client()
201 return True
202 except Exception:
203 return False
205 def get_provider_name(self) -> str:
206 return 'google_cloud_kms'
209# ═══════════════════════════════════════════════════════════════
210# Azure Key Vault HSM Provider
211# ═══════════════════════════════════════════════════════════════
213class AzureKeyVaultProvider(HSMProvider):
214 """Azure Key Vault with Managed HSM (FIPS 140-2 Level 3).
216 Required env vars:
217 HART_AZURE_VAULT_URL: https://{vault-name}.vault.azure.net
218 HART_AZURE_KEY_NAME: name of the Ed25519 key in the vault
219 HART_AZURE_KEY_VERSION: (optional) specific version
220 Authentication: DefaultAzureCredential (managed identity, CLI, env vars)
221 """
223 def __init__(self):
224 super().__init__()
225 self._vault_url = os.environ.get('HART_AZURE_VAULT_URL', '')
226 self._key_name = os.environ.get('HART_AZURE_KEY_NAME', '')
227 self._key_version = os.environ.get('HART_AZURE_KEY_VERSION', '')
228 self._client = None
229 self._crypto_client = None
231 def _get_clients(self):
232 if self._client is None:
233 from azure.identity import DefaultAzureCredential # type: ignore
234 from azure.keyvault.keys import KeyClient # type: ignore
235 from azure.keyvault.keys.crypto import CryptographyClient, SignatureAlgorithm # type: ignore
236 credential = DefaultAzureCredential()
237 self._client = KeyClient(vault_url=self._vault_url, credential=credential)
238 key = self._client.get_key(self._key_name, self._key_version or None)
239 self._crypto_client = CryptographyClient(key, credential=credential)
240 return self._client, self._crypto_client
242 def sign(self, payload_bytes: bytes) -> bytes:
243 from azure.keyvault.keys.crypto import SignatureAlgorithm # type: ignore
244 _, crypto = self._get_clients()
245 # Ed25519 signs raw bytes (no pre-hashing needed)
246 result = crypto.sign(SignatureAlgorithm.eddsa, payload_bytes)
247 return result.signature
249 def get_public_key_hex(self) -> str:
250 client, _ = self._get_clients()
251 key = client.get_key(self._key_name, self._key_version or None)
252 # Azure returns JWK; extract raw Ed25519 public key
253 import base64
254 x = key.key.x # Raw public key bytes (base64url-encoded in JWK)
255 if isinstance(x, str):
256 x = base64.urlsafe_b64decode(x + '==')
257 return x.hex()
259 def is_available(self) -> bool:
260 if not self._vault_url or not self._key_name:
261 return False
262 try:
263 self._get_clients()
264 return True
265 except Exception:
266 return False
268 def get_provider_name(self) -> str:
269 return 'azure_key_vault'
272# ═══════════════════════════════════════════════════════════════
273# HashiCorp Vault Transit Provider
274# ═══════════════════════════════════════════════════════════════
276class VaultTransitProvider(HSMProvider):
277 """HashiCorp Vault Transit secrets engine with Ed25519.
279 Self-hostable. Can back onto physical HSMs (PKCS#11 seal).
281 Required env vars:
282 HART_VAULT_ADDR: https://vault.example.com:8200
283 HART_VAULT_TOKEN: auth token (or use AppRole/K8s auth)
284 HART_VAULT_KEY_NAME: name of the transit key (type: ed25519)
285 HART_VAULT_MOUNT: transit mount path (default: transit)
286 HART_VAULT_CA_CERT: (optional) CA cert for TLS verification
287 """
289 def __init__(self):
290 super().__init__()
291 self._addr = os.environ.get('HART_VAULT_ADDR', '')
292 self._token = os.environ.get('HART_VAULT_TOKEN', '')
293 self._key_name = os.environ.get('HART_VAULT_KEY_NAME', 'hart-master')
294 self._mount = os.environ.get('HART_VAULT_MOUNT', 'transit')
295 self._ca_cert = os.environ.get('HART_VAULT_CA_CERT', '')
296 self._client = None
298 def _get_client(self):
299 if self._client is None:
300 import hvac # type: ignore
301 kwargs = {'url': self._addr, 'token': self._token}
302 if self._ca_cert:
303 kwargs['verify'] = self._ca_cert
304 self._client = hvac.Client(**kwargs)
305 return self._client
307 def sign(self, payload_bytes: bytes) -> bytes:
308 import base64
309 client = self._get_client()
311 # Vault Transit expects base64-encoded input
312 b64_input = base64.b64encode(payload_bytes).decode()
314 response = client.secrets.transit.sign_data(
315 name=self._key_name,
316 hash_input=b64_input,
317 marshaling_algorithm='jws',
318 mount_point=self._mount,
319 )
321 # Response signature is vault:v1:base64signature
322 sig_str = response['data']['signature']
323 # Strip vault:vN: prefix
324 sig_b64 = sig_str.split(':')[-1]
325 return base64.b64decode(sig_b64)
327 def get_public_key_hex(self) -> str:
328 import base64
329 client = self._get_client()
330 response = client.secrets.transit.read_key(
331 name=self._key_name,
332 mount_point=self._mount,
333 )
334 # Get latest version's public key
335 keys = response['data']['keys']
336 latest = str(max(int(k) for k in keys.keys()))
337 pub_b64 = keys[latest].get('public_key', '')
338 if pub_b64:
339 # PEM or base64 raw depending on Vault version
340 if 'BEGIN' in pub_b64:
341 from cryptography.hazmat.primitives.serialization import load_pem_public_key, Encoding, PublicFormat
342 key = load_pem_public_key(pub_b64.encode())
343 return key.public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
344 return base64.b64decode(pub_b64).hex()
345 return ''
347 def is_available(self) -> bool:
348 if not self._addr or not self._token:
349 return False
350 try:
351 client = self._get_client()
352 return client.is_authenticated()
353 except Exception:
354 return False
356 def get_provider_name(self) -> str:
357 return 'hashicorp_vault'
360# ═══════════════════════════════════════════════════════════════
361# Environment Variable Fallback (DEV ONLY)
362# ═══════════════════════════════════════════════════════════════
364class EnvVarFallbackProvider(HSMProvider):
365 """DEVELOPMENT ONLY. Loads private key from environment variable.
367 ⚠️ This is NOT hardware-protected. The private key exists in process memory.
368 ⚠️ Only for local development and testing.
369 ⚠️ In production, use a real HSM provider.
370 """
372 def __init__(self):
373 super().__init__()
374 self._warned = False
376 def _warn_once(self):
377 if not self._warned:
378 self._warned = True
379 import sys
380 msg = (
381 "\n" + "=" * 70 +
382 "\n WARNING: Master key loaded from environment variable."
383 "\n This is NOT hardware-protected. Use an HSM in production."
384 "\n Set HART_GCP_KMS_KEY_PATH, HART_AZURE_VAULT_URL, or"
385 "\n HART_VAULT_ADDR to enable HSM protection."
386 "\n" + "=" * 70 + "\n"
387 )
388 print(msg, file=sys.stderr)
389 logger.warning("Master key signing via env var fallback - NOT HSM-protected")
391 def sign(self, payload_bytes: bytes) -> bytes:
392 self._warn_once()
393 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
394 hex_key = os.environ.get('HEVOLVE_MASTER_PRIVATE_KEY', '')
395 if not hex_key:
396 raise HSMSigningError(
397 'No HSM configured and HEVOLVE_MASTER_PRIVATE_KEY not set. '
398 'Cannot sign without master key.')
399 try:
400 priv = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(hex_key))
401 return priv.sign(payload_bytes)
402 except Exception as e:
403 raise HSMSigningError(f'Env var signing failed: {e}')
405 def get_public_key_hex(self) -> str:
406 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
407 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
408 hex_key = os.environ.get('HEVOLVE_MASTER_PRIVATE_KEY', '')
409 if not hex_key:
410 return ''
411 priv = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(hex_key))
412 return priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
414 def is_available(self) -> bool:
415 return bool(os.environ.get('HEVOLVE_MASTER_PRIVATE_KEY', ''))
417 def get_provider_name(self) -> str:
418 return 'env_var_fallback_DEV_ONLY'
421# ═══════════════════════════════════════════════════════════════
422# Provider Registry & Singleton
423# ═══════════════════════════════════════════════════════════════
425# Priority order: real HSMs first, env var fallback last
426_PROVIDER_CLASSES = [
427 GoogleCloudKMSProvider,
428 AzureKeyVaultProvider,
429 VaultTransitProvider,
430 EnvVarFallbackProvider,
431]
433_active_provider: Optional[HSMProvider] = None
434_provider_lock = threading.Lock()
437def get_hsm_provider() -> HSMProvider:
438 """Get the active HSM provider. Auto-selects based on available config.
439 Priority: GCP KMS > Azure Key Vault > HashiCorp Vault > env var fallback.
440 Raises HSMUnavailableError if nothing is configured."""
441 global _active_provider
442 if _active_provider is not None:
443 return _active_provider
445 with _provider_lock:
446 if _active_provider is not None:
447 return _active_provider
449 for cls in _PROVIDER_CLASSES:
450 try:
451 provider = cls()
452 if provider.is_available():
453 # Verify public key matches trust anchor
454 from security.master_key import MASTER_PUBLIC_KEY_HEX
455 hsm_pub = provider.get_public_key_hex()
456 if hsm_pub and hsm_pub != MASTER_PUBLIC_KEY_HEX:
457 logger.error(
458 f"HSM provider {provider.get_provider_name()}: "
459 f"public key mismatch! HSM={hsm_pub[:16]}... "
460 f"expected={MASTER_PUBLIC_KEY_HEX[:16]}...")
461 continue # Wrong key - try next provider
463 _active_provider = provider
464 logger.info(f"HSM provider active: {provider.get_provider_name()}")
465 return provider
466 except Exception as e:
467 logger.debug(f"HSM provider {cls.__name__} unavailable: {e}")
469 raise HSMUnavailableError(
470 'No HSM backend available. Configure one of: '
471 'HART_GCP_KMS_KEY_PATH (Google), '
472 'HART_AZURE_VAULT_URL (Azure), '
473 'HART_VAULT_ADDR (HashiCorp Vault), '
474 'or HEVOLVE_MASTER_PRIVATE_KEY (dev fallback).')
477def is_hsm_available() -> bool:
478 """Check if any HSM provider is available (without raising)."""
479 try:
480 get_hsm_provider()
481 return True
482 except HSMUnavailableError:
483 return False
486def get_hsm_status() -> Dict:
487 """Return HSM status for dashboards and health checks."""
488 try:
489 provider = get_hsm_provider()
490 return {
491 'available': True,
492 'provider': provider.get_provider_name(),
493 'hardware_backed': not isinstance(provider, EnvVarFallbackProvider),
494 'sign_count': provider._sign_count,
495 'audit_entries': len(provider._audit_log),
496 }
497 except HSMUnavailableError:
498 return {
499 'available': False,
500 'provider': None,
501 'hardware_backed': False,
502 'sign_count': 0,
503 'audit_entries': 0,
504 }
507def hsm_sign_payload(payload: dict) -> str:
508 """Sign a JSON payload via the active HSM. Returns hex signature.
509 The private key NEVER leaves the HSM hardware."""
510 provider = get_hsm_provider()
511 return provider.sign_json_payload(payload)