Coverage for security / secrets_manager.py: 72.4%

105 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Encrypted Secrets Manager 

3Replaces plaintext config.json with Fernet-encrypted vault. 

4Master key derived from HEVOLVE_MASTER_KEY env var via PBKDF2. 

5 

6Usage: 

7 from security.secrets_manager import SecretsManager 

8 sm = SecretsManager.get_instance() 

9 api_key = sm.get_secret('OPENAI_API_KEY') 

10 

11Migration: 

12 python -m security.secrets_manager migrate 

13""" 

14 

15import os 

16import json 

17import base64 

18import logging 

19from typing import Optional 

20 

21from cryptography.fernet import Fernet, InvalidToken 

22from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC 

23from cryptography.hazmat.primitives import hashes 

24 

25logger = logging.getLogger('hevolve_security') 

26 

27_VAULT_PATH = os.path.join(os.path.dirname(__file__), '..', 'secrets.enc') 

28_SALT_PATH = os.path.join(os.path.dirname(__file__), '..', 'secrets.salt') 

29_CONFIG_PATH = os.path.join(os.path.dirname(__file__), '..', 'config.json') 

30 

31# Known secret keys that should be loaded from the vault 

32SECRET_KEYS = [ 

33 'OPENAI_API_KEY', 

34 'GROQ_API_KEY', 

35 'LANGCHAIN_API_KEY', 

36 'GOOGLE_CSE_ID', 

37 'GOOGLE_API_KEY', 

38 'NEWS_API_KEY', 

39 'SERPAPI_API_KEY', 

40 'ZEP_API_KEY', 

41 'SOCIAL_SECRET_KEY', 

42 'HEVOLVE_API_KEY', 

43 'SOCIAL_DB_KEY', 

44 'REDIS_URL', 

45 'DATABASE_URL', 

46] 

47 

48 

49class SecretsManager: 

50 """Thread-safe singleton for encrypted secret access.""" 

51 

52 _instance = None 

53 

54 def __init__(self): 

55 self._cache: dict = {} 

56 self._fernet: Optional[Fernet] = None 

57 self._init_encryption() 

58 self._load_vault() 

59 

60 @classmethod 

61 def get_instance(cls) -> 'SecretsManager': 

62 if cls._instance is None: 

63 cls._instance = cls() 

64 return cls._instance 

65 

66 @classmethod 

67 def reset(cls): 

68 """Reset singleton (for testing).""" 

69 cls._instance = None 

70 

71 def _derive_key(self, master_key: str, salt: bytes) -> bytes: 

72 """Derive Fernet key from master key using PBKDF2.""" 

73 kdf = PBKDF2HMAC( 

74 algorithm=hashes.SHA256(), 

75 length=32, 

76 salt=salt, 

77 iterations=480000, 

78 ) 

79 return base64.urlsafe_b64encode(kdf.derive(master_key.encode())) 

80 

81 def _init_encryption(self): 

82 """Initialize Fernet cipher from master key.""" 

83 master_key = os.environ.get('HEVOLVE_MASTER_KEY') 

84 if not master_key: 

85 logger.warning( 

86 "HEVOLVE_MASTER_KEY not set. Secrets vault unavailable. " 

87 "Falling back to environment variables only." 

88 ) 

89 return 

90 

91 salt_path = os.path.abspath(_SALT_PATH) 

92 if os.path.exists(salt_path): 

93 with open(salt_path, 'rb') as f: 

94 salt = f.read() 

95 else: 

96 salt = os.urandom(16) 

97 with open(salt_path, 'wb') as f: 

98 f.write(salt) 

99 

100 key = self._derive_key(master_key, salt) 

101 self._fernet = Fernet(key) 

102 

103 def _load_vault(self): 

104 """Load and decrypt the vault file.""" 

105 vault_path = os.path.abspath(_VAULT_PATH) 

106 if not os.path.exists(vault_path) or self._fernet is None: 

107 return 

108 

109 try: 

110 with open(vault_path, 'rb') as f: 

111 encrypted = f.read() 

112 decrypted = self._fernet.decrypt(encrypted) 

113 self._cache = json.loads(decrypted.decode()) 

114 logger.info("Secrets vault loaded successfully.") 

115 except InvalidToken: 

116 logger.error( 

117 "Failed to decrypt secrets vault. " 

118 "Check HEVOLVE_MASTER_KEY is correct." 

119 ) 

120 except Exception as e: 

121 logger.error(f"Failed to load secrets vault: {e}") 

122 

123 def get_secret(self, name: str, default: str = '') -> str: 

124 """ 

125 Get a secret value. Priority: 

126 1. Environment variable 

127 2. Encrypted vault 

128 3. Default value 

129 """ 

130 env_val = os.environ.get(name) 

131 if env_val: 

132 return env_val 

133 return self._cache.get(name, default) 

134 

135 def set_secret(self, name: str, value: str): 

136 """Set a secret in the vault (persists to disk).""" 

137 self._cache[name] = value 

138 self._save_vault() 

139 

140 def _save_vault(self): 

141 """Encrypt and write vault to disk.""" 

142 if self._fernet is None: 

143 raise RuntimeError("Cannot save vault: HEVOLVE_MASTER_KEY not set") 

144 vault_path = os.path.abspath(_VAULT_PATH) 

145 plaintext = json.dumps(self._cache, indent=2).encode() 

146 encrypted = self._fernet.encrypt(plaintext) 

147 with open(vault_path, 'wb') as f: 

148 f.write(encrypted) 

149 logger.info("Secrets vault saved.") 

150 

151 def has_secret(self, name: str) -> bool: 

152 """Check if a secret exists (env or vault).""" 

153 return bool(os.environ.get(name)) or name in self._cache 

154 

155 @staticmethod 

156 def migrate_from_config(): 

157 """ 

158 One-time migration: read config.json, encrypt into secrets.enc. 

159 Run via: python -m security.secrets_manager migrate 

160 """ 

161 config_path = os.path.abspath(_CONFIG_PATH) 

162 if not os.path.exists(config_path): 

163 print(f"No config.json found at {config_path}") 

164 return 

165 

166 master_key = os.environ.get('HEVOLVE_MASTER_KEY') 

167 if not master_key: 

168 print("ERROR: Set HEVOLVE_MASTER_KEY environment variable first.") 

169 print(" Example: export HEVOLVE_MASTER_KEY='your-strong-secret-key-here'") 

170 return 

171 

172 with open(config_path, 'r') as f: 

173 config = json.load(f) 

174 

175 # Map config.json keys to standard secret names 

176 key_mapping = { 

177 'OPENAI_API_KEY': config.get('OPENAI_API_KEY', ''), 

178 'GROQ_API_KEY': config.get('GROQ_API_KEY', ''), 

179 'LANGCHAIN_API_KEY': config.get('LANGCHAIN_API_KEY', ''), 

180 'GOOGLE_CSE_ID': config.get('GOOGLE_CSE_ID', ''), 

181 'GOOGLE_API_KEY': config.get('GOOGLE_API_KEY', ''), 

182 'NEWS_API_KEY': config.get('NEWS_API_KEY', ''), 

183 'SERPAPI_API_KEY': config.get('SERPAPI_API_KEY', ''), 

184 } 

185 

186 # Also capture any other keys not in mapping 

187 for k, v in config.items(): 

188 if isinstance(v, str) and k not in key_mapping: 

189 key_mapping[k] = v 

190 

191 sm = SecretsManager.get_instance() 

192 for name, value in key_mapping.items(): 

193 if value: 

194 sm.set_secret(name, value) 

195 

196 # Backup original config 

197 backup_path = config_path + '.bak' 

198 os.rename(config_path, backup_path) 

199 print(f"Migration complete:") 

200 print(f" Encrypted {len(key_mapping)} secrets to secrets.enc") 

201 print(f" Original config backed up to {backup_path}") 

202 print(f" Add config.json and secrets.enc to .gitignore") 

203 

204 

205# Convenience function for backward compatibility 

206def get_secret(name: str, default: str = '') -> str: 

207 """Shorthand for SecretsManager.get_instance().get_secret()""" 

208 return SecretsManager.get_instance().get_secret(name, default) 

209 

210 

211if __name__ == '__main__': 

212 import sys 

213 if len(sys.argv) > 1 and sys.argv[1] == 'migrate': 

214 SecretsManager.migrate_from_config() 

215 else: 

216 print("Usage: python -m security.secrets_manager migrate")