Coverage for security / secrets_manager.py: 72.4%
105 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Encrypted Secrets Manager
3Replaces plaintext config.json with Fernet-encrypted vault.
4Master key derived from HEVOLVE_MASTER_KEY env var via PBKDF2.
6Usage:
7 from security.secrets_manager import SecretsManager
8 sm = SecretsManager.get_instance()
9 api_key = sm.get_secret('OPENAI_API_KEY')
11Migration:
12 python -m security.secrets_manager migrate
13"""
15import os
16import json
17import base64
18import logging
19from typing import Optional
21from cryptography.fernet import Fernet, InvalidToken
22from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
23from cryptography.hazmat.primitives import hashes
25logger = logging.getLogger('hevolve_security')
27_VAULT_PATH = os.path.join(os.path.dirname(__file__), '..', 'secrets.enc')
28_SALT_PATH = os.path.join(os.path.dirname(__file__), '..', 'secrets.salt')
29_CONFIG_PATH = os.path.join(os.path.dirname(__file__), '..', 'config.json')
31# Known secret keys that should be loaded from the vault
32SECRET_KEYS = [
33 'OPENAI_API_KEY',
34 'GROQ_API_KEY',
35 'LANGCHAIN_API_KEY',
36 'GOOGLE_CSE_ID',
37 'GOOGLE_API_KEY',
38 'NEWS_API_KEY',
39 'SERPAPI_API_KEY',
40 'ZEP_API_KEY',
41 'SOCIAL_SECRET_KEY',
42 'HEVOLVE_API_KEY',
43 'SOCIAL_DB_KEY',
44 'REDIS_URL',
45 'DATABASE_URL',
46]
49class SecretsManager:
50 """Thread-safe singleton for encrypted secret access."""
52 _instance = None
54 def __init__(self):
55 self._cache: dict = {}
56 self._fernet: Optional[Fernet] = None
57 self._init_encryption()
58 self._load_vault()
60 @classmethod
61 def get_instance(cls) -> 'SecretsManager':
62 if cls._instance is None:
63 cls._instance = cls()
64 return cls._instance
66 @classmethod
67 def reset(cls):
68 """Reset singleton (for testing)."""
69 cls._instance = None
71 def _derive_key(self, master_key: str, salt: bytes) -> bytes:
72 """Derive Fernet key from master key using PBKDF2."""
73 kdf = PBKDF2HMAC(
74 algorithm=hashes.SHA256(),
75 length=32,
76 salt=salt,
77 iterations=480000,
78 )
79 return base64.urlsafe_b64encode(kdf.derive(master_key.encode()))
81 def _init_encryption(self):
82 """Initialize Fernet cipher from master key."""
83 master_key = os.environ.get('HEVOLVE_MASTER_KEY')
84 if not master_key:
85 logger.warning(
86 "HEVOLVE_MASTER_KEY not set. Secrets vault unavailable. "
87 "Falling back to environment variables only."
88 )
89 return
91 salt_path = os.path.abspath(_SALT_PATH)
92 if os.path.exists(salt_path):
93 with open(salt_path, 'rb') as f:
94 salt = f.read()
95 else:
96 salt = os.urandom(16)
97 with open(salt_path, 'wb') as f:
98 f.write(salt)
100 key = self._derive_key(master_key, salt)
101 self._fernet = Fernet(key)
103 def _load_vault(self):
104 """Load and decrypt the vault file."""
105 vault_path = os.path.abspath(_VAULT_PATH)
106 if not os.path.exists(vault_path) or self._fernet is None:
107 return
109 try:
110 with open(vault_path, 'rb') as f:
111 encrypted = f.read()
112 decrypted = self._fernet.decrypt(encrypted)
113 self._cache = json.loads(decrypted.decode())
114 logger.info("Secrets vault loaded successfully.")
115 except InvalidToken:
116 logger.error(
117 "Failed to decrypt secrets vault. "
118 "Check HEVOLVE_MASTER_KEY is correct."
119 )
120 except Exception as e:
121 logger.error(f"Failed to load secrets vault: {e}")
123 def get_secret(self, name: str, default: str = '') -> str:
124 """
125 Get a secret value. Priority:
126 1. Environment variable
127 2. Encrypted vault
128 3. Default value
129 """
130 env_val = os.environ.get(name)
131 if env_val:
132 return env_val
133 return self._cache.get(name, default)
135 def set_secret(self, name: str, value: str):
136 """Set a secret in the vault (persists to disk)."""
137 self._cache[name] = value
138 self._save_vault()
140 def _save_vault(self):
141 """Encrypt and write vault to disk."""
142 if self._fernet is None:
143 raise RuntimeError("Cannot save vault: HEVOLVE_MASTER_KEY not set")
144 vault_path = os.path.abspath(_VAULT_PATH)
145 plaintext = json.dumps(self._cache, indent=2).encode()
146 encrypted = self._fernet.encrypt(plaintext)
147 with open(vault_path, 'wb') as f:
148 f.write(encrypted)
149 logger.info("Secrets vault saved.")
151 def has_secret(self, name: str) -> bool:
152 """Check if a secret exists (env or vault)."""
153 return bool(os.environ.get(name)) or name in self._cache
155 @staticmethod
156 def migrate_from_config():
157 """
158 One-time migration: read config.json, encrypt into secrets.enc.
159 Run via: python -m security.secrets_manager migrate
160 """
161 config_path = os.path.abspath(_CONFIG_PATH)
162 if not os.path.exists(config_path):
163 print(f"No config.json found at {config_path}")
164 return
166 master_key = os.environ.get('HEVOLVE_MASTER_KEY')
167 if not master_key:
168 print("ERROR: Set HEVOLVE_MASTER_KEY environment variable first.")
169 print(" Example: export HEVOLVE_MASTER_KEY='your-strong-secret-key-here'")
170 return
172 with open(config_path, 'r') as f:
173 config = json.load(f)
175 # Map config.json keys to standard secret names
176 key_mapping = {
177 'OPENAI_API_KEY': config.get('OPENAI_API_KEY', ''),
178 'GROQ_API_KEY': config.get('GROQ_API_KEY', ''),
179 'LANGCHAIN_API_KEY': config.get('LANGCHAIN_API_KEY', ''),
180 'GOOGLE_CSE_ID': config.get('GOOGLE_CSE_ID', ''),
181 'GOOGLE_API_KEY': config.get('GOOGLE_API_KEY', ''),
182 'NEWS_API_KEY': config.get('NEWS_API_KEY', ''),
183 'SERPAPI_API_KEY': config.get('SERPAPI_API_KEY', ''),
184 }
186 # Also capture any other keys not in mapping
187 for k, v in config.items():
188 if isinstance(v, str) and k not in key_mapping:
189 key_mapping[k] = v
191 sm = SecretsManager.get_instance()
192 for name, value in key_mapping.items():
193 if value:
194 sm.set_secret(name, value)
196 # Backup original config
197 backup_path = config_path + '.bak'
198 os.rename(config_path, backup_path)
199 print(f"Migration complete:")
200 print(f" Encrypted {len(key_mapping)} secrets to secrets.enc")
201 print(f" Original config backed up to {backup_path}")
202 print(f" Add config.json and secrets.enc to .gitignore")
205# Convenience function for backward compatibility
206def get_secret(name: str, default: str = '') -> str:
207 """Shorthand for SecretsManager.get_instance().get_secret()"""
208 return SecretsManager.get_instance().get_secret(name, default)
211if __name__ == '__main__':
212 import sys
213 if len(sys.argv) > 1 and sys.argv[1] == 'migrate':
214 SecretsManager.migrate_from_config()
215 else:
216 print("Usage: python -m security.secrets_manager migrate")