Coverage for security / crypto.py: 95.2%

83 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Encryption for Data at Rest and Agent-to-Agent E2E Communication 

3Uses Fernet symmetric encryption (AES-128-CBC with HMAC-SHA256). 

4""" 

5 

6import os 

7import json 

8import logging 

9from typing import Optional, Union 

10 

11from cryptography.fernet import Fernet, InvalidToken 

12 

13logger = logging.getLogger('hevolve_security') 

14 

15# Fernet-encrypted data always starts with 'gAAAAA' 

16_FERNET_PREFIX = b'gAAAAA' 

17 

18 

19def _get_data_key() -> Optional[Fernet]: 

20 """Get Fernet cipher for data-at-rest encryption.""" 

21 key = os.environ.get('HEVOLVE_DATA_KEY') 

22 if not key: 

23 try: 

24 from security.secrets_manager import get_secret 

25 key = get_secret('HEVOLVE_DATA_KEY') 

26 except Exception: 

27 pass 

28 

29 if not key: 

30 return None 

31 

32 try: 

33 return Fernet(key.encode() if isinstance(key, str) else key) 

34 except Exception: 

35 logger.error("Invalid HEVOLVE_DATA_KEY format. Must be a Fernet key.") 

36 return None 

37 

38 

39def generate_data_key() -> str: 

40 """Generate a new Fernet key for data encryption. Store this securely.""" 

41 return Fernet.generate_key().decode() 

42 

43 

44def encrypt_data(plaintext: Union[str, bytes]) -> bytes: 

45 """ 

46 Encrypt data using Fernet. 

47 Returns encrypted bytes, or original data if no key is configured. 

48 """ 

49 fernet = _get_data_key() 

50 if fernet is None: 

51 if isinstance(plaintext, str): 

52 return plaintext.encode() 

53 return plaintext 

54 

55 if isinstance(plaintext, str): 

56 plaintext = plaintext.encode() 

57 

58 return fernet.encrypt(plaintext) 

59 

60 

61def decrypt_data(ciphertext: Union[str, bytes]) -> bytes: 

62 """ 

63 Decrypt Fernet-encrypted data. 

64 Auto-detects if data is encrypted (Fernet prefix) or plaintext. 

65 Returns decrypted bytes. 

66 """ 

67 if isinstance(ciphertext, str): 

68 ciphertext = ciphertext.encode() 

69 

70 # Auto-detect: if not Fernet-encrypted, return as-is 

71 if not ciphertext.startswith(_FERNET_PREFIX): 

72 return ciphertext 

73 

74 fernet = _get_data_key() 

75 if fernet is None: 

76 logger.warning("Encrypted data found but HEVOLVE_DATA_KEY not set") 

77 return ciphertext 

78 

79 try: 

80 return fernet.decrypt(ciphertext) 

81 except InvalidToken: 

82 logger.error("Failed to decrypt data - wrong key or corrupted data") 

83 return ciphertext 

84 

85 

86def encrypt_json_file(filepath: str, data: dict): 

87 """ 

88 Write JSON data to an encrypted file. 

89 Falls back to plaintext if encryption key not configured. 

90 """ 

91 plaintext = json.dumps(data, indent=2).encode() 

92 encrypted = encrypt_data(plaintext) 

93 

94 with open(filepath, 'wb') as f: 

95 f.write(encrypted) 

96 

97 

98def decrypt_json_file(filepath: str) -> Optional[dict]: 

99 """ 

100 Read and decrypt a JSON file. 

101 Auto-detects encrypted vs plaintext files. 

102 """ 

103 if not os.path.exists(filepath): 

104 return None 

105 

106 with open(filepath, 'rb') as f: 

107 raw = f.read() 

108 

109 decrypted = decrypt_data(raw) 

110 

111 try: 

112 return json.loads(decrypted.decode()) 

113 except (json.JSONDecodeError, UnicodeDecodeError): 

114 # Try reading as regular text file (legacy plaintext JSON) 

115 try: 

116 with open(filepath, 'r') as f: 

117 return json.load(f) 

118 except Exception: 

119 logger.error(f"Failed to read JSON file: {filepath}") 

120 return None 

121 

122 

123class A2ACrypto: 

124 """ 

125 End-to-end encryption for agent-to-agent communication. 

126 Each session gets a unique symmetric key. 

127 """ 

128 

129 def __init__(self, session_key: Optional[bytes] = None): 

130 if session_key: 

131 self._fernet = Fernet(session_key) 

132 self._key = session_key 

133 else: 

134 self._key = Fernet.generate_key() 

135 self._fernet = Fernet(self._key) 

136 

137 @property 

138 def session_key(self) -> bytes: 

139 """The session key (share with peer agent for decryption).""" 

140 return self._key 

141 

142 def encrypt_message(self, plaintext: str) -> str: 

143 """Encrypt a message for transmission.""" 

144 return self._fernet.encrypt(plaintext.encode()).decode() 

145 

146 def decrypt_message(self, ciphertext: str) -> str: 

147 """Decrypt a received message.""" 

148 try: 

149 return self._fernet.decrypt(ciphertext.encode()).decode() 

150 except InvalidToken: 

151 logger.error("Failed to decrypt A2A message - invalid key or corrupted") 

152 raise ValueError("Decryption failed: invalid key or corrupted message") 

153 

154 def encrypt_payload(self, payload: dict) -> str: 

155 """Encrypt a dict payload as JSON.""" 

156 plaintext = json.dumps(payload) 

157 return self.encrypt_message(plaintext) 

158 

159 def decrypt_payload(self, ciphertext: str) -> dict: 

160 """Decrypt a JSON payload.""" 

161 plaintext = self.decrypt_message(ciphertext) 

162 return json.loads(plaintext)