Coverage for integrations / vlm / safety.py: 100.0%

99 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2integrations.vlm.safety — guards for the VLM action pipeline. 

3 

4Phase 6 of memory/vlm_best_of_all_worlds_plan.md §5. Three layers 

5of protection between the VLM's decisions and the user's screen: 

6 

7 1. SessionGuard per-session action cap + per-second throttle 

8 (avoid runaway loops spamming clicks) 

9 2. WindowBlocklist refuse to click in sensitive apps 

10 (lsass / password managers / banking-titled 

11 windows) and an admin-overridable allowlist 

12 3. AuditLogger JSONL trail at ~/.nunba/audit/vlm_actions_*.jsonl 

13 with timestamp / window / coords / hash / exit 

14 code so post-incident review can reconstruct 

15 what the VLM did 

16 

17All three are OPT-IN via ``execute_action(..., safety=True)`` so 

18existing call sites stay unchanged unless they explicitly opt in. 

19The plan §5 calls these out as production-readiness, not always- 

20on hard limits. 

21 

22Configuration via ``SafetyConfig`` dataclass; module-level singletons 

23returned by ``get_session_guard()`` / ``get_audit_logger()``. The 

24singletons are reset between distinct user sessions via 

25``reset_session_guard()`` (called by /api/vlm/stop and by the loop 

26when it terminates a goal). 

27""" 

28 

29import collections 

30import hashlib 

31import json 

32import logging 

33import os 

34import re 

35import threading 

36import time 

37from dataclasses import dataclass, field 

38from typing import List, Optional, Tuple 

39 

40logger = logging.getLogger('hevolve.vlm.safety') 

41 

42 

43# ─── Defaults ───────────────────────────────────────────────────────── 

44 

45#: Process names that must never receive VLM clicks. Lowercased. 

46#: Includes Windows credential broker (lsass), session manager 

47#: (winlogon), known password managers, and the Windows logon UI 

48#: (LogonUI.exe). Admins may extend at runtime via 

49#: ``SafetyConfig(blocked_processes=...)``. 

50DEFAULT_BLOCKED_PROCESSES: Tuple[str, ...] = ( 

51 'lsass.exe', 'winlogon.exe', 'logonui.exe', 'consent.exe', 

52 'bitwarden.exe', '1password.exe', 'keepass.exe', 'keepassxc.exe', 

53 'lastpass.exe', 'dashlane.exe', 'enpass.exe', 

54) 

55 

56#: Window-title regex patterns that suggest sensitive content. 

57#: Case-insensitive. Designed to be conservative — false positives 

58#: are recoverable (user can override per-window), false negatives 

59#: are not. 

60DEFAULT_BLOCKED_TITLE_PATTERNS: Tuple[str, ...] = ( 

61 r'\b(?:online[\s-]?)?bank(?:ing)?\b', 

62 r'\bcredit[\s-]?card\b', 

63 r'\b(?:enter|change|reset)[\s-]+password\b', 

64 r'\b(?:UAC|elevation|administrator)\s*prompt\b', 

65 r'\b(?:pin|cvv|security[\s-]?code)\b', 

66) 

67 

68 

69# ─── Configuration ──────────────────────────────────────────────────── 

70 

71@dataclass 

72class SafetyConfig: 

73 """Tuneable knobs. All env-overridable so per-host policies 

74 don't require code changes.""" 

75 

76 max_actions_per_session: int = int( 

77 os.environ.get('HEVOLVE_VLM_MAX_ACTIONS_PER_SESSION', '100')) 

78 max_actions_per_second: float = float( 

79 os.environ.get('HEVOLVE_VLM_MAX_ACTIONS_PER_SECOND', '5.0')) 

80 blocked_processes: Tuple[str, ...] = field( 

81 default_factory=lambda: DEFAULT_BLOCKED_PROCESSES) 

82 blocked_title_patterns: Tuple[str, ...] = field( 

83 default_factory=lambda: DEFAULT_BLOCKED_TITLE_PATTERNS) 

84 audit_enabled: bool = ( 

85 os.environ.get('HEVOLVE_VLM_AUDIT_ENABLED', '1') not in ('0', 'false', 'no')) 

86 # Override with HEVOLVE_VLM_AUDIT_DIR; empty default → ~/.nunba/audit 

87 # via _default_dir(). 

88 audit_dir: str = field( 

89 default_factory=lambda: os.environ.get('HEVOLVE_VLM_AUDIT_DIR', '')) 

90 

91 

92# ─── Session guard (count + throttle) ───────────────────────────────── 

93 

94class SessionGuard: 

95 """Tracks per-session action count + per-second rate. 

96 

97 Returns a non-None block reason string from :meth:`check` when the 

98 limit has been reached; the caller MUST treat this as a refusal 

99 to act. :meth:`record` is called after a successful action to 

100 increment counters. 

101 

102 Thread-safe: a single lock protects counter updates so concurrent 

103 VLM calls (e.g. the agentic loop dispatching from a worker pool) 

104 don't double-count. 

105 """ 

106 

107 def __init__(self, config: Optional[SafetyConfig] = None): 

108 self.config = config or SafetyConfig() 

109 self.action_count: int = 0 

110 # Bounded deque so memory doesn't grow unbounded over a long 

111 # session; capacity covers ~1 second of max-rate actions. 

112 self.recent_action_times: collections.deque = collections.deque( 

113 maxlen=max(64, int(self.config.max_actions_per_second * 4))) 

114 self._lock = threading.Lock() 

115 

116 def check(self) -> Optional[str]: 

117 """Return None when OK; otherwise a reason string.""" 

118 with self._lock: 

119 if self.action_count >= self.config.max_actions_per_session: 

120 return (f'session-cap reached ' 

121 f'({self.config.max_actions_per_session} actions)') 

122 now = time.time() 

123 recent = sum( 

124 1 for t in self.recent_action_times if now - t < 1.0) 

125 if recent >= self.config.max_actions_per_second: 

126 return (f'throttle exceeded ' 

127 f'(>{self.config.max_actions_per_second}/s)') 

128 return None 

129 

130 def record(self) -> None: 

131 with self._lock: 

132 self.action_count += 1 

133 self.recent_action_times.append(time.time()) 

134 

135 def reset(self) -> None: 

136 with self._lock: 

137 self.action_count = 0 

138 self.recent_action_times.clear() 

139 

140 

141# ─── Window blocklist ───────────────────────────────────────────────── 

142 

143def is_window_blocked(window_meta: Optional[dict], 

144 config: Optional[SafetyConfig] = None 

145 ) -> Optional[str]: 

146 """Return a block-reason string when the window is sensitive, 

147 None otherwise. Safe to call with ``window_meta=None`` (returns 

148 None — no info to block on). 

149 

150 ``window_meta`` is the dict shape :func:`integrations.remote_desktop. 

151 window_capture.list_windows` returns: ``{title, process_name, ...}``. 

152 """ 

153 if not window_meta: 

154 return None 

155 config = config or SafetyConfig() 

156 pname = (window_meta.get('process_name') or '').lower().strip() 

157 if pname: 

158 for blocked in config.blocked_processes: 

159 blocked_l = blocked.lower() 

160 if pname == blocked_l or pname.endswith('\\' + blocked_l) \ 

161 or pname.endswith('/' + blocked_l): 

162 return f'process_blocked: {pname}' 

163 title = window_meta.get('title') or '' 

164 for pat in config.blocked_title_patterns: 

165 if re.search(pat, title, re.IGNORECASE): 

166 return f'title_pattern_blocked: "{title[:60]}" matches /{pat}/' 

167 return None 

168 

169 

170# ─── Audit logger ───────────────────────────────────────────────────── 

171 

172class AuditLogger: 

173 """Append-only JSONL audit trail of every VLM action.""" 

174 

175 def __init__(self, config: Optional[SafetyConfig] = None): 

176 self.config = config or SafetyConfig() 

177 self.path: Optional[str] = None 

178 self._lock = threading.Lock() 

179 self._ensure_dir() 

180 

181 def _ensure_dir(self) -> None: 

182 target = self.config.audit_dir or self._default_dir() 

183 try: 

184 os.makedirs(target, exist_ok=True) 

185 self.path = target 

186 except Exception as e: 

187 logger.warning(f'audit dir create failed for {target}: {e}') 

188 self.path = None # disables logging 

189 

190 def _default_dir(self) -> str: 

191 """Audit log location. 

192 

193 Plan §5 spec: ``~/.nunba/audit/vlm_actions_{date}.jsonl``. 

194 Reviewer flagged the prior implementation deferred to 

195 ``platform_paths.get_data_dir()`` which gave platform-correct 

196 paths but didn't match the plan literally. Resolution: use 

197 the plan-literal ``~/.nunba/audit`` as the default; admins 

198 who want platform-default paths set 

199 ``HEVOLVE_VLM_AUDIT_DIR=$(python -c "from core.platform_paths 

200 import get_data_dir; import os; print(os.path.join( 

201 get_data_dir(), 'audit'))")`` once at install time. 

202 

203 Override with ``HEVOLVE_VLM_AUDIT_DIR=...`` env var (read in 

204 SafetyConfig). Empty string honored (audit logger inits but 

205 never writes). 

206 """ 

207 return os.path.expanduser('~/.nunba/audit') 

208 

209 def log(self, action: dict, result: dict, *, 

210 window_meta: Optional[dict] = None, 

211 screenshot_b64: Optional[str] = None, 

212 block_reason: Optional[str] = None) -> None: 

213 """Append one JSONL record. No-op when audit_enabled is False 

214 or the dir couldn't be created.""" 

215 if not self.config.audit_enabled or not self.path: 

216 return 

217 record = { 

218 'ts': time.time(), 

219 'iso': time.strftime('%Y-%m-%dT%H:%M:%S'), 

220 'action': action.get('action'), 

221 'coordinate': action.get('coordinate'), 

222 'text': action.get('text', '')[:80] if action.get('text') else '', 

223 'translated_from': action.get('_translated_from'), 

224 'translated_to': action.get('_translated_to'), 

225 'window': { 

226 'hwnd': (window_meta or {}).get('hwnd'), 

227 'title': ((window_meta or {}).get('title') or '')[:80], 

228 'process_name': (window_meta or {}).get('process_name'), 

229 'pid': (window_meta or {}).get('pid'), 

230 } if window_meta else None, 

231 'screenshot_sha256': ( 

232 hashlib.sha256(screenshot_b64.encode('ascii')).hexdigest()[:16] 

233 if screenshot_b64 else None), 

234 'status': result.get('status'), 

235 'error': result.get('error'), 

236 'block_reason': block_reason, 

237 'verify_diff': result.get('verify_diff'), 

238 'verify_retried': result.get('verify_retried'), 

239 } 

240 date = time.strftime('%Y%m%d') 

241 log_path = os.path.join(self.path, f'vlm_actions_{date}.jsonl') 

242 line = json.dumps(record, default=str) 

243 try: 

244 with self._lock: 

245 with open(log_path, 'a', encoding='utf-8') as f: 

246 f.write(line + '\n') 

247 except Exception as e: 

248 logger.debug(f'audit write failed: {e}') 

249 

250 

251# ─── Module-level singletons ────────────────────────────────────────── 

252 

253_session_guard: Optional[SessionGuard] = None 

254_audit_logger: Optional[AuditLogger] = None 

255_singleton_lock = threading.Lock() 

256 

257 

258def get_session_guard() -> SessionGuard: 

259 global _session_guard 

260 if _session_guard is None: 

261 with _singleton_lock: 

262 if _session_guard is None: 

263 _session_guard = SessionGuard() 

264 return _session_guard 

265 

266 

267def get_audit_logger() -> AuditLogger: 

268 global _audit_logger 

269 if _audit_logger is None: 

270 with _singleton_lock: 

271 if _audit_logger is None: 

272 _audit_logger = AuditLogger() 

273 return _audit_logger 

274 

275 

276def reset_session_guard() -> None: 

277 """Called when a VLM session ends (loop terminated, /api/vlm/stop 

278 fired, user-id changes) so the next session starts fresh.""" 

279 guard = get_session_guard() 

280 guard.reset()