Coverage for security / immutable_audit_log.py: 92.6%

95 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Immutable Audit Log — Tamper-Evident Event Chain 

3 

4Every security-relevant event (state changes, goal dispatches, tool calls, 

5auth events) is recorded with a hash chain. Each entry's hash depends on 

6the previous entry's hash, forming a tamper-evident chain. 

7 

8If any entry is modified or deleted, verify_chain() detects the break. 

9 

10Usage: 

11 from security.immutable_audit_log import get_audit_log 

12 audit = get_audit_log() 

13 entry_id, entry_hash = audit.log_event('state_change', actor_id='user_1', action='completed task 5') 

14 ok, reason = audit.verify_chain() 

15""" 

16 

17import hashlib 

18import json 

19import logging 

20import threading 

21import time 

22from datetime import datetime 

23from typing import Optional, List, Tuple, Dict, Any 

24 

25logger = logging.getLogger('hevolve_security') 

26 

27# Sensitive keys that should be redacted in detail_json 

28_SENSITIVE_KEYS = frozenset({ 

29 'password', 'token', 'api_key', 'secret', 'credential', 

30 'private_key', 'ssn', 'credit_card', 'card_number', 

31}) 

32 

33 

34def _redact_sensitive(detail: Optional[Dict]) -> Optional[str]: 

35 """Redact sensitive fields before storing in audit log.""" 

36 if detail is None: 

37 return None 

38 safe = {} 

39 for k, v in detail.items(): 

40 if any(s in k.lower() for s in _SENSITIVE_KEYS): 

41 safe[k] = '[REDACTED]' 

42 else: 

43 safe[k] = v 

44 return json.dumps(safe, sort_keys=True, default=str) 

45 

46 

47def _compute_hash(prev_hash: str, event_type: str, actor_id: str, 

48 action: str, timestamp: str, detail_json: Optional[str]) -> str: 

49 """Compute SHA-256 hash of entry fields chained to previous hash.""" 

50 payload = f"{prev_hash}|{event_type}|{actor_id}|{action}|{timestamp}|{detail_json or ''}" 

51 return hashlib.sha256(payload.encode('utf-8')).hexdigest() 

52 

53 

54class ImmutableAuditLog: 

55 """ 

56 Append-only audit log with hash-chain integrity. 

57 

58 Storage: SQLAlchemy AuditLogEntry table (see integrations/social/models.py). 

59 Falls back to in-memory list when DB is unavailable (test/standalone mode). 

60 """ 

61 

62 def __init__(self): 

63 self._lock = threading.Lock() 

64 self._memory_log: List[Dict] = [] # Fallback for no-DB mode 

65 self._use_db = self._check_db_available() 

66 

67 def _check_db_available(self) -> bool: 

68 try: 

69 from integrations.social.models import AuditLogEntry # noqa: F401 

70 return True 

71 except ImportError: 

72 return False 

73 

74 def _get_last_hash(self) -> str: 

75 """Get the hash of the last entry in the chain.""" 

76 if self._use_db: 

77 try: 

78 from integrations.social.models import get_db, AuditLogEntry 

79 db = get_db() 

80 try: 

81 last = db.query(AuditLogEntry).order_by( 

82 AuditLogEntry.id.desc() 

83 ).first() 

84 return last.entry_hash if last else 'genesis' 

85 finally: 

86 db.close() 

87 except Exception: 

88 pass 

89 

90 # Fallback: in-memory 

91 if self._memory_log: 

92 return self._memory_log[-1]['entry_hash'] 

93 return 'genesis' 

94 

95 def log_event(self, event_type: str, actor_id: str, action: str, 

96 detail: Optional[Dict] = None, 

97 target_id: Optional[str] = None) -> Tuple[int, str]: 

98 """ 

99 Append an immutable event to the audit log. 

100 

101 Args: 

102 event_type: Category (state_change, goal_dispatched, tool_call, auth, security) 

103 actor_id: Who triggered the event (user_id, agent_id, system) 

104 action: What happened (free text, e.g. 'completed action 5') 

105 detail: Optional structured data (sensitive keys auto-redacted) 

106 target_id: Optional target entity ID 

107 

108 Returns: 

109 (entry_id, entry_hash) 

110 """ 

111 with self._lock: 

112 timestamp = datetime.utcnow().isoformat() 

113 detail_json = _redact_sensitive(detail) 

114 prev_hash = self._get_last_hash() 

115 entry_hash = _compute_hash( 

116 prev_hash, event_type, actor_id, action, timestamp, detail_json) 

117 

118 if self._use_db: 

119 try: 

120 from integrations.social.models import get_db, AuditLogEntry 

121 db = get_db() 

122 try: 

123 entry = AuditLogEntry( 

124 event_type=event_type, 

125 actor_id=actor_id, 

126 target_id=target_id, 

127 action=action, 

128 detail_json=detail_json, 

129 prev_hash=prev_hash, 

130 entry_hash=entry_hash, 

131 ) 

132 db.add(entry) 

133 db.commit() 

134 entry_id = entry.id 

135 logger.debug(f"Audit log: {event_type} by {actor_id}: {action}") 

136 return entry_id, entry_hash 

137 except Exception: 

138 db.rollback() 

139 raise 

140 finally: 

141 db.close() 

142 except Exception as e: 

143 logger.warning(f"DB audit log failed, using memory: {e}") 

144 

145 # Fallback: in-memory 

146 entry_id = len(self._memory_log) + 1 

147 self._memory_log.append({ 

148 'id': entry_id, 

149 'event_type': event_type, 

150 'actor_id': actor_id, 

151 'target_id': target_id, 

152 'action': action, 

153 'detail_json': detail_json, 

154 'prev_hash': prev_hash, 

155 'entry_hash': entry_hash, 

156 'created_at': timestamp, 

157 }) 

158 return entry_id, entry_hash 

159 

160 def verify_chain(self, limit: int = 1000) -> Tuple[bool, str]: 

161 """ 

162 Verify the integrity of the audit log hash chain. 

163 

164 Returns: 

165 (is_valid, reason) 

166 """ 

167 entries = self._get_entries(limit=limit) 

168 if not entries: 

169 return True, 'Empty log' 

170 

171 prev_hash = 'genesis' 

172 for entry in entries: 

173 expected = _compute_hash( 

174 prev_hash, 

175 entry['event_type'], 

176 entry['actor_id'], 

177 entry['action'], 

178 entry['created_at'], 

179 entry.get('detail_json'), 

180 ) 

181 if entry['entry_hash'] != expected: 

182 return False, ( 

183 f"Chain broken at entry {entry['id']}: " 

184 f"expected {expected[:16]}..., got {entry['entry_hash'][:16]}..." 

185 ) 

186 prev_hash = entry['entry_hash'] 

187 

188 return True, f'Chain valid ({len(entries)} entries)' 

189 

190 def get_trail(self, actor_id: Optional[str] = None, 

191 event_type: Optional[str] = None, 

192 limit: int = 100) -> List[Dict]: 

193 """Get audit trail, optionally filtered by actor or event type.""" 

194 entries = self._get_entries(limit=limit * 5) # Over-fetch for filtering 

195 if actor_id: 

196 entries = [e for e in entries if e['actor_id'] == actor_id] 

197 if event_type: 

198 entries = [e for e in entries if e['event_type'] == event_type] 

199 return entries[:limit] 

200 

201 def _get_entries(self, limit: int = 1000) -> List[Dict]: 

202 """Get raw entries from DB or memory.""" 

203 if self._use_db: 

204 try: 

205 from integrations.social.models import get_db, AuditLogEntry 

206 db = get_db() 

207 try: 

208 rows = db.query(AuditLogEntry).order_by( 

209 AuditLogEntry.id.asc() 

210 ).limit(limit).all() 

211 return [{ 

212 'id': r.id, 

213 'event_type': r.event_type, 

214 'actor_id': r.actor_id, 

215 'target_id': r.target_id, 

216 'action': r.action, 

217 'detail_json': r.detail_json, 

218 'prev_hash': r.prev_hash, 

219 'entry_hash': r.entry_hash, 

220 'created_at': r.created_at.isoformat() if hasattr(r.created_at, 'isoformat') else r.created_at, 

221 } for r in rows] 

222 finally: 

223 db.close() 

224 except Exception: 

225 pass 

226 

227 return list(self._memory_log[:limit]) 

228 

229 

230# Singleton 

231_audit_log = None 

232 

233 

234def get_audit_log() -> ImmutableAuditLog: 

235 global _audit_log 

236 if _audit_log is None: 

237 _audit_log = ImmutableAuditLog() 

238 return _audit_log