Coverage for security / immutable_audit_log.py: 92.6%
95 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Immutable Audit Log — Tamper-Evident Event Chain
4Every security-relevant event (state changes, goal dispatches, tool calls,
5auth events) is recorded with a hash chain. Each entry's hash depends on
6the previous entry's hash, forming a tamper-evident chain.
8If any entry is modified or deleted, verify_chain() detects the break.
10Usage:
11 from security.immutable_audit_log import get_audit_log
12 audit = get_audit_log()
13 entry_id, entry_hash = audit.log_event('state_change', actor_id='user_1', action='completed task 5')
14 ok, reason = audit.verify_chain()
15"""
17import hashlib
18import json
19import logging
20import threading
21import time
22from datetime import datetime
23from typing import Optional, List, Tuple, Dict, Any
25logger = logging.getLogger('hevolve_security')
27# Sensitive keys that should be redacted in detail_json
28_SENSITIVE_KEYS = frozenset({
29 'password', 'token', 'api_key', 'secret', 'credential',
30 'private_key', 'ssn', 'credit_card', 'card_number',
31})
34def _redact_sensitive(detail: Optional[Dict]) -> Optional[str]:
35 """Redact sensitive fields before storing in audit log."""
36 if detail is None:
37 return None
38 safe = {}
39 for k, v in detail.items():
40 if any(s in k.lower() for s in _SENSITIVE_KEYS):
41 safe[k] = '[REDACTED]'
42 else:
43 safe[k] = v
44 return json.dumps(safe, sort_keys=True, default=str)
47def _compute_hash(prev_hash: str, event_type: str, actor_id: str,
48 action: str, timestamp: str, detail_json: Optional[str]) -> str:
49 """Compute SHA-256 hash of entry fields chained to previous hash."""
50 payload = f"{prev_hash}|{event_type}|{actor_id}|{action}|{timestamp}|{detail_json or ''}"
51 return hashlib.sha256(payload.encode('utf-8')).hexdigest()
54class ImmutableAuditLog:
55 """
56 Append-only audit log with hash-chain integrity.
58 Storage: SQLAlchemy AuditLogEntry table (see integrations/social/models.py).
59 Falls back to in-memory list when DB is unavailable (test/standalone mode).
60 """
62 def __init__(self):
63 self._lock = threading.Lock()
64 self._memory_log: List[Dict] = [] # Fallback for no-DB mode
65 self._use_db = self._check_db_available()
67 def _check_db_available(self) -> bool:
68 try:
69 from integrations.social.models import AuditLogEntry # noqa: F401
70 return True
71 except ImportError:
72 return False
74 def _get_last_hash(self) -> str:
75 """Get the hash of the last entry in the chain."""
76 if self._use_db:
77 try:
78 from integrations.social.models import get_db, AuditLogEntry
79 db = get_db()
80 try:
81 last = db.query(AuditLogEntry).order_by(
82 AuditLogEntry.id.desc()
83 ).first()
84 return last.entry_hash if last else 'genesis'
85 finally:
86 db.close()
87 except Exception:
88 pass
90 # Fallback: in-memory
91 if self._memory_log:
92 return self._memory_log[-1]['entry_hash']
93 return 'genesis'
95 def log_event(self, event_type: str, actor_id: str, action: str,
96 detail: Optional[Dict] = None,
97 target_id: Optional[str] = None) -> Tuple[int, str]:
98 """
99 Append an immutable event to the audit log.
101 Args:
102 event_type: Category (state_change, goal_dispatched, tool_call, auth, security)
103 actor_id: Who triggered the event (user_id, agent_id, system)
104 action: What happened (free text, e.g. 'completed action 5')
105 detail: Optional structured data (sensitive keys auto-redacted)
106 target_id: Optional target entity ID
108 Returns:
109 (entry_id, entry_hash)
110 """
111 with self._lock:
112 timestamp = datetime.utcnow().isoformat()
113 detail_json = _redact_sensitive(detail)
114 prev_hash = self._get_last_hash()
115 entry_hash = _compute_hash(
116 prev_hash, event_type, actor_id, action, timestamp, detail_json)
118 if self._use_db:
119 try:
120 from integrations.social.models import get_db, AuditLogEntry
121 db = get_db()
122 try:
123 entry = AuditLogEntry(
124 event_type=event_type,
125 actor_id=actor_id,
126 target_id=target_id,
127 action=action,
128 detail_json=detail_json,
129 prev_hash=prev_hash,
130 entry_hash=entry_hash,
131 )
132 db.add(entry)
133 db.commit()
134 entry_id = entry.id
135 logger.debug(f"Audit log: {event_type} by {actor_id}: {action}")
136 return entry_id, entry_hash
137 except Exception:
138 db.rollback()
139 raise
140 finally:
141 db.close()
142 except Exception as e:
143 logger.warning(f"DB audit log failed, using memory: {e}")
145 # Fallback: in-memory
146 entry_id = len(self._memory_log) + 1
147 self._memory_log.append({
148 'id': entry_id,
149 'event_type': event_type,
150 'actor_id': actor_id,
151 'target_id': target_id,
152 'action': action,
153 'detail_json': detail_json,
154 'prev_hash': prev_hash,
155 'entry_hash': entry_hash,
156 'created_at': timestamp,
157 })
158 return entry_id, entry_hash
160 def verify_chain(self, limit: int = 1000) -> Tuple[bool, str]:
161 """
162 Verify the integrity of the audit log hash chain.
164 Returns:
165 (is_valid, reason)
166 """
167 entries = self._get_entries(limit=limit)
168 if not entries:
169 return True, 'Empty log'
171 prev_hash = 'genesis'
172 for entry in entries:
173 expected = _compute_hash(
174 prev_hash,
175 entry['event_type'],
176 entry['actor_id'],
177 entry['action'],
178 entry['created_at'],
179 entry.get('detail_json'),
180 )
181 if entry['entry_hash'] != expected:
182 return False, (
183 f"Chain broken at entry {entry['id']}: "
184 f"expected {expected[:16]}..., got {entry['entry_hash'][:16]}..."
185 )
186 prev_hash = entry['entry_hash']
188 return True, f'Chain valid ({len(entries)} entries)'
190 def get_trail(self, actor_id: Optional[str] = None,
191 event_type: Optional[str] = None,
192 limit: int = 100) -> List[Dict]:
193 """Get audit trail, optionally filtered by actor or event type."""
194 entries = self._get_entries(limit=limit * 5) # Over-fetch for filtering
195 if actor_id:
196 entries = [e for e in entries if e['actor_id'] == actor_id]
197 if event_type:
198 entries = [e for e in entries if e['event_type'] == event_type]
199 return entries[:limit]
201 def _get_entries(self, limit: int = 1000) -> List[Dict]:
202 """Get raw entries from DB or memory."""
203 if self._use_db:
204 try:
205 from integrations.social.models import get_db, AuditLogEntry
206 db = get_db()
207 try:
208 rows = db.query(AuditLogEntry).order_by(
209 AuditLogEntry.id.asc()
210 ).limit(limit).all()
211 return [{
212 'id': r.id,
213 'event_type': r.event_type,
214 'actor_id': r.actor_id,
215 'target_id': r.target_id,
216 'action': r.action,
217 'detail_json': r.detail_json,
218 'prev_hash': r.prev_hash,
219 'entry_hash': r.entry_hash,
220 'created_at': r.created_at.isoformat() if hasattr(r.created_at, 'isoformat') else r.created_at,
221 } for r in rows]
222 finally:
223 db.close()
224 except Exception:
225 pass
227 return list(self._memory_log[:limit])
230# Singleton
231_audit_log = None
234def get_audit_log() -> ImmutableAuditLog:
235 global _audit_log
236 if _audit_log is None:
237 _audit_log = ImmutableAuditLog()
238 return _audit_log