Coverage for security / crypto.py: 95.2%
83 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Encryption for Data at Rest and Agent-to-Agent E2E Communication
3Uses Fernet symmetric encryption (AES-128-CBC with HMAC-SHA256).
4"""
6import os
7import json
8import logging
9from typing import Optional, Union
11from cryptography.fernet import Fernet, InvalidToken
13logger = logging.getLogger('hevolve_security')
15# Fernet-encrypted data always starts with 'gAAAAA'
16_FERNET_PREFIX = b'gAAAAA'
19def _get_data_key() -> Optional[Fernet]:
20 """Get Fernet cipher for data-at-rest encryption."""
21 key = os.environ.get('HEVOLVE_DATA_KEY')
22 if not key:
23 try:
24 from security.secrets_manager import get_secret
25 key = get_secret('HEVOLVE_DATA_KEY')
26 except Exception:
27 pass
29 if not key:
30 return None
32 try:
33 return Fernet(key.encode() if isinstance(key, str) else key)
34 except Exception:
35 logger.error("Invalid HEVOLVE_DATA_KEY format. Must be a Fernet key.")
36 return None
39def generate_data_key() -> str:
40 """Generate a new Fernet key for data encryption. Store this securely."""
41 return Fernet.generate_key().decode()
44def encrypt_data(plaintext: Union[str, bytes]) -> bytes:
45 """
46 Encrypt data using Fernet.
47 Returns encrypted bytes, or original data if no key is configured.
48 """
49 fernet = _get_data_key()
50 if fernet is None:
51 if isinstance(plaintext, str):
52 return plaintext.encode()
53 return plaintext
55 if isinstance(plaintext, str):
56 plaintext = plaintext.encode()
58 return fernet.encrypt(plaintext)
61def decrypt_data(ciphertext: Union[str, bytes]) -> bytes:
62 """
63 Decrypt Fernet-encrypted data.
64 Auto-detects if data is encrypted (Fernet prefix) or plaintext.
65 Returns decrypted bytes.
66 """
67 if isinstance(ciphertext, str):
68 ciphertext = ciphertext.encode()
70 # Auto-detect: if not Fernet-encrypted, return as-is
71 if not ciphertext.startswith(_FERNET_PREFIX):
72 return ciphertext
74 fernet = _get_data_key()
75 if fernet is None:
76 logger.warning("Encrypted data found but HEVOLVE_DATA_KEY not set")
77 return ciphertext
79 try:
80 return fernet.decrypt(ciphertext)
81 except InvalidToken:
82 logger.error("Failed to decrypt data - wrong key or corrupted data")
83 return ciphertext
86def encrypt_json_file(filepath: str, data: dict):
87 """
88 Write JSON data to an encrypted file.
89 Falls back to plaintext if encryption key not configured.
90 """
91 plaintext = json.dumps(data, indent=2).encode()
92 encrypted = encrypt_data(plaintext)
94 with open(filepath, 'wb') as f:
95 f.write(encrypted)
98def decrypt_json_file(filepath: str) -> Optional[dict]:
99 """
100 Read and decrypt a JSON file.
101 Auto-detects encrypted vs plaintext files.
102 """
103 if not os.path.exists(filepath):
104 return None
106 with open(filepath, 'rb') as f:
107 raw = f.read()
109 decrypted = decrypt_data(raw)
111 try:
112 return json.loads(decrypted.decode())
113 except (json.JSONDecodeError, UnicodeDecodeError):
114 # Try reading as regular text file (legacy plaintext JSON)
115 try:
116 with open(filepath, 'r') as f:
117 return json.load(f)
118 except Exception:
119 logger.error(f"Failed to read JSON file: {filepath}")
120 return None
123class A2ACrypto:
124 """
125 End-to-end encryption for agent-to-agent communication.
126 Each session gets a unique symmetric key.
127 """
129 def __init__(self, session_key: Optional[bytes] = None):
130 if session_key:
131 self._fernet = Fernet(session_key)
132 self._key = session_key
133 else:
134 self._key = Fernet.generate_key()
135 self._fernet = Fernet(self._key)
137 @property
138 def session_key(self) -> bytes:
139 """The session key (share with peer agent for decryption)."""
140 return self._key
142 def encrypt_message(self, plaintext: str) -> str:
143 """Encrypt a message for transmission."""
144 return self._fernet.encrypt(plaintext.encode()).decode()
146 def decrypt_message(self, ciphertext: str) -> str:
147 """Decrypt a received message."""
148 try:
149 return self._fernet.decrypt(ciphertext.encode()).decode()
150 except InvalidToken:
151 logger.error("Failed to decrypt A2A message - invalid key or corrupted")
152 raise ValueError("Decryption failed: invalid key or corrupted message")
154 def encrypt_payload(self, payload: dict) -> str:
155 """Encrypt a dict payload as JSON."""
156 plaintext = json.dumps(payload)
157 return self.encrypt_message(plaintext)
159 def decrypt_payload(self, ciphertext: str) -> dict:
160 """Decrypt a JSON payload."""
161 plaintext = self.decrypt_message(ciphertext)
162 return json.loads(plaintext)