Coverage for integrations / vlm / safety.py: 100.0%
99 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2integrations.vlm.safety — guards for the VLM action pipeline.
4Phase 6 of memory/vlm_best_of_all_worlds_plan.md §5. Three layers
5of protection between the VLM's decisions and the user's screen:
7 1. SessionGuard per-session action cap + per-second throttle
8 (avoid runaway loops spamming clicks)
9 2. WindowBlocklist refuse to click in sensitive apps
10 (lsass / password managers / banking-titled
11 windows) and an admin-overridable allowlist
12 3. AuditLogger JSONL trail at ~/.nunba/audit/vlm_actions_*.jsonl
13 with timestamp / window / coords / hash / exit
14 code so post-incident review can reconstruct
15 what the VLM did
17All three are OPT-IN via ``execute_action(..., safety=True)`` so
18existing call sites stay unchanged unless they explicitly opt in.
19The plan §5 calls these out as production-readiness, not always-
20on hard limits.
22Configuration via ``SafetyConfig`` dataclass; module-level singletons
23returned by ``get_session_guard()`` / ``get_audit_logger()``. The
24singletons are reset between distinct user sessions via
25``reset_session_guard()`` (called by /api/vlm/stop and by the loop
26when it terminates a goal).
27"""
29import collections
30import hashlib
31import json
32import logging
33import os
34import re
35import threading
36import time
37from dataclasses import dataclass, field
38from typing import List, Optional, Tuple
40logger = logging.getLogger('hevolve.vlm.safety')
43# ─── Defaults ─────────────────────────────────────────────────────────
45#: Process names that must never receive VLM clicks. Lowercased.
46#: Includes Windows credential broker (lsass), session manager
47#: (winlogon), known password managers, and the Windows logon UI
48#: (LogonUI.exe). Admins may extend at runtime via
49#: ``SafetyConfig(blocked_processes=...)``.
50DEFAULT_BLOCKED_PROCESSES: Tuple[str, ...] = (
51 'lsass.exe', 'winlogon.exe', 'logonui.exe', 'consent.exe',
52 'bitwarden.exe', '1password.exe', 'keepass.exe', 'keepassxc.exe',
53 'lastpass.exe', 'dashlane.exe', 'enpass.exe',
54)
56#: Window-title regex patterns that suggest sensitive content.
57#: Case-insensitive. Designed to be conservative — false positives
58#: are recoverable (user can override per-window), false negatives
59#: are not.
60DEFAULT_BLOCKED_TITLE_PATTERNS: Tuple[str, ...] = (
61 r'\b(?:online[\s-]?)?bank(?:ing)?\b',
62 r'\bcredit[\s-]?card\b',
63 r'\b(?:enter|change|reset)[\s-]+password\b',
64 r'\b(?:UAC|elevation|administrator)\s*prompt\b',
65 r'\b(?:pin|cvv|security[\s-]?code)\b',
66)
69# ─── Configuration ────────────────────────────────────────────────────
71@dataclass
72class SafetyConfig:
73 """Tuneable knobs. All env-overridable so per-host policies
74 don't require code changes."""
76 max_actions_per_session: int = int(
77 os.environ.get('HEVOLVE_VLM_MAX_ACTIONS_PER_SESSION', '100'))
78 max_actions_per_second: float = float(
79 os.environ.get('HEVOLVE_VLM_MAX_ACTIONS_PER_SECOND', '5.0'))
80 blocked_processes: Tuple[str, ...] = field(
81 default_factory=lambda: DEFAULT_BLOCKED_PROCESSES)
82 blocked_title_patterns: Tuple[str, ...] = field(
83 default_factory=lambda: DEFAULT_BLOCKED_TITLE_PATTERNS)
84 audit_enabled: bool = (
85 os.environ.get('HEVOLVE_VLM_AUDIT_ENABLED', '1') not in ('0', 'false', 'no'))
86 # Override with HEVOLVE_VLM_AUDIT_DIR; empty default → ~/.nunba/audit
87 # via _default_dir().
88 audit_dir: str = field(
89 default_factory=lambda: os.environ.get('HEVOLVE_VLM_AUDIT_DIR', ''))
92# ─── Session guard (count + throttle) ─────────────────────────────────
94class SessionGuard:
95 """Tracks per-session action count + per-second rate.
97 Returns a non-None block reason string from :meth:`check` when the
98 limit has been reached; the caller MUST treat this as a refusal
99 to act. :meth:`record` is called after a successful action to
100 increment counters.
102 Thread-safe: a single lock protects counter updates so concurrent
103 VLM calls (e.g. the agentic loop dispatching from a worker pool)
104 don't double-count.
105 """
107 def __init__(self, config: Optional[SafetyConfig] = None):
108 self.config = config or SafetyConfig()
109 self.action_count: int = 0
110 # Bounded deque so memory doesn't grow unbounded over a long
111 # session; capacity covers ~1 second of max-rate actions.
112 self.recent_action_times: collections.deque = collections.deque(
113 maxlen=max(64, int(self.config.max_actions_per_second * 4)))
114 self._lock = threading.Lock()
116 def check(self) -> Optional[str]:
117 """Return None when OK; otherwise a reason string."""
118 with self._lock:
119 if self.action_count >= self.config.max_actions_per_session:
120 return (f'session-cap reached '
121 f'({self.config.max_actions_per_session} actions)')
122 now = time.time()
123 recent = sum(
124 1 for t in self.recent_action_times if now - t < 1.0)
125 if recent >= self.config.max_actions_per_second:
126 return (f'throttle exceeded '
127 f'(>{self.config.max_actions_per_second}/s)')
128 return None
130 def record(self) -> None:
131 with self._lock:
132 self.action_count += 1
133 self.recent_action_times.append(time.time())
135 def reset(self) -> None:
136 with self._lock:
137 self.action_count = 0
138 self.recent_action_times.clear()
141# ─── Window blocklist ─────────────────────────────────────────────────
143def is_window_blocked(window_meta: Optional[dict],
144 config: Optional[SafetyConfig] = None
145 ) -> Optional[str]:
146 """Return a block-reason string when the window is sensitive,
147 None otherwise. Safe to call with ``window_meta=None`` (returns
148 None — no info to block on).
150 ``window_meta`` is the dict shape :func:`integrations.remote_desktop.
151 window_capture.list_windows` returns: ``{title, process_name, ...}``.
152 """
153 if not window_meta:
154 return None
155 config = config or SafetyConfig()
156 pname = (window_meta.get('process_name') or '').lower().strip()
157 if pname:
158 for blocked in config.blocked_processes:
159 blocked_l = blocked.lower()
160 if pname == blocked_l or pname.endswith('\\' + blocked_l) \
161 or pname.endswith('/' + blocked_l):
162 return f'process_blocked: {pname}'
163 title = window_meta.get('title') or ''
164 for pat in config.blocked_title_patterns:
165 if re.search(pat, title, re.IGNORECASE):
166 return f'title_pattern_blocked: "{title[:60]}" matches /{pat}/'
167 return None
170# ─── Audit logger ─────────────────────────────────────────────────────
172class AuditLogger:
173 """Append-only JSONL audit trail of every VLM action."""
175 def __init__(self, config: Optional[SafetyConfig] = None):
176 self.config = config or SafetyConfig()
177 self.path: Optional[str] = None
178 self._lock = threading.Lock()
179 self._ensure_dir()
181 def _ensure_dir(self) -> None:
182 target = self.config.audit_dir or self._default_dir()
183 try:
184 os.makedirs(target, exist_ok=True)
185 self.path = target
186 except Exception as e:
187 logger.warning(f'audit dir create failed for {target}: {e}')
188 self.path = None # disables logging
190 def _default_dir(self) -> str:
191 """Audit log location.
193 Plan §5 spec: ``~/.nunba/audit/vlm_actions_{date}.jsonl``.
194 Reviewer flagged the prior implementation deferred to
195 ``platform_paths.get_data_dir()`` which gave platform-correct
196 paths but didn't match the plan literally. Resolution: use
197 the plan-literal ``~/.nunba/audit`` as the default; admins
198 who want platform-default paths set
199 ``HEVOLVE_VLM_AUDIT_DIR=$(python -c "from core.platform_paths
200 import get_data_dir; import os; print(os.path.join(
201 get_data_dir(), 'audit'))")`` once at install time.
203 Override with ``HEVOLVE_VLM_AUDIT_DIR=...`` env var (read in
204 SafetyConfig). Empty string honored (audit logger inits but
205 never writes).
206 """
207 return os.path.expanduser('~/.nunba/audit')
209 def log(self, action: dict, result: dict, *,
210 window_meta: Optional[dict] = None,
211 screenshot_b64: Optional[str] = None,
212 block_reason: Optional[str] = None) -> None:
213 """Append one JSONL record. No-op when audit_enabled is False
214 or the dir couldn't be created."""
215 if not self.config.audit_enabled or not self.path:
216 return
217 record = {
218 'ts': time.time(),
219 'iso': time.strftime('%Y-%m-%dT%H:%M:%S'),
220 'action': action.get('action'),
221 'coordinate': action.get('coordinate'),
222 'text': action.get('text', '')[:80] if action.get('text') else '',
223 'translated_from': action.get('_translated_from'),
224 'translated_to': action.get('_translated_to'),
225 'window': {
226 'hwnd': (window_meta or {}).get('hwnd'),
227 'title': ((window_meta or {}).get('title') or '')[:80],
228 'process_name': (window_meta or {}).get('process_name'),
229 'pid': (window_meta or {}).get('pid'),
230 } if window_meta else None,
231 'screenshot_sha256': (
232 hashlib.sha256(screenshot_b64.encode('ascii')).hexdigest()[:16]
233 if screenshot_b64 else None),
234 'status': result.get('status'),
235 'error': result.get('error'),
236 'block_reason': block_reason,
237 'verify_diff': result.get('verify_diff'),
238 'verify_retried': result.get('verify_retried'),
239 }
240 date = time.strftime('%Y%m%d')
241 log_path = os.path.join(self.path, f'vlm_actions_{date}.jsonl')
242 line = json.dumps(record, default=str)
243 try:
244 with self._lock:
245 with open(log_path, 'a', encoding='utf-8') as f:
246 f.write(line + '\n')
247 except Exception as e:
248 logger.debug(f'audit write failed: {e}')
251# ─── Module-level singletons ──────────────────────────────────────────
253_session_guard: Optional[SessionGuard] = None
254_audit_logger: Optional[AuditLogger] = None
255_singleton_lock = threading.Lock()
258def get_session_guard() -> SessionGuard:
259 global _session_guard
260 if _session_guard is None:
261 with _singleton_lock:
262 if _session_guard is None:
263 _session_guard = SessionGuard()
264 return _session_guard
267def get_audit_logger() -> AuditLogger:
268 global _audit_logger
269 if _audit_logger is None:
270 with _singleton_lock:
271 if _audit_logger is None:
272 _audit_logger = AuditLogger()
273 return _audit_logger
276def reset_session_guard() -> None:
277 """Called when a VLM session ends (loop terminated, /api/vlm/stop
278 fired, user-id changes) so the next session starts fresh."""
279 guard = get_session_guard()
280 guard.reset()