Coverage for security / edge_privacy.py: 95.3%
106 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Edge Privacy — Scope-based data protection at the edge.
4FIRST PRINCIPLE: Privacy lives on the edge. Data has a SCOPE — where
5it's allowed to exist. Guards enforce that scope at every egress point.
6This is not a parallel system. It is the single scope definition that
7the existing DLP engine, secret redactor, shard engine, and PeerLink
8trust boundaries all converge to enforce.
10ARCHITECTURE:
11 PrivacyScope (enum) — tags data with where it can live
12 ScopeGuard (class) — checks scope at egress, delegates to existing engines
13 check_egress() — single function, called at every boundary
15REUSES (does NOT duplicate):
16 - DLP engine (dlp_engine.py) → PII scanning at outbound
17 - Secret redactor (secret_redactor.py) → 3-layer redaction for world model
18 - Shard scoping (shard_engine.py) → code exposure proportional to trust
19 - PeerLink TrustLevel (link.py) → encryption decisions
20 - Immutable audit log → scope violations recorded
22The being understands every human it befriends deeply.
23But understanding is NOT surveillance.
24Understanding comes from CONVERSATION, not from invading privacy.
25Secrets never leave the edge — this is structurally enforced.
26"""
28import logging
29from enum import Enum
30from typing import Any, Dict, List, Optional, Tuple
32logger = logging.getLogger('hevolve_security')
35# ═══════════════════════════════════════════════════════════════════════
36# Privacy Scope — where data is allowed to exist
37# ═══════════════════════════════════════════════════════════════════════
39class PrivacyScope(str, Enum):
40 """Where a piece of data is allowed to exist.
42 The scope hierarchy (most restrictive → least):
43 EDGE_ONLY → never leaves user's device
44 USER_DEVICES → user's own devices (PeerLink SAME_USER)
45 TRUSTED_PEER → E2E encrypted to pre-trusted peers only
46 FEDERATED → anonymized, shared with hive (via secret_redactor)
47 PUBLIC → safe for anyone
49 Default is EDGE_ONLY — privacy by default, not by opt-in.
50 """
51 EDGE_ONLY = 'edge_only' # Biometrics, secrets, raw PII
52 USER_DEVICES = 'user_devices' # Resonance profile, preferences
53 TRUSTED_PEER = 'trusted_peer' # Goal context for peer compute
54 FEDERATED = 'federated' # Anonymized patterns, recipes
55 PUBLIC = 'public' # Safe for anyone
58# Scope ordering for comparison
59_SCOPE_LEVEL = {
60 PrivacyScope.EDGE_ONLY: 0,
61 PrivacyScope.USER_DEVICES: 1,
62 PrivacyScope.TRUSTED_PEER: 2,
63 PrivacyScope.FEDERATED: 3,
64 PrivacyScope.PUBLIC: 4,
65}
68def scope_allows(data_scope: PrivacyScope,
69 destination_scope: PrivacyScope) -> bool:
70 """Check if data with `data_scope` can transit to `destination_scope`.
72 Data can only flow to destinations at the SAME or MORE restrictive scope.
73 EDGE_ONLY data cannot go to FEDERATED.
74 FEDERATED data can go to FEDERATED or EDGE_ONLY (already anonymized).
75 """
76 return _SCOPE_LEVEL[destination_scope] <= _SCOPE_LEVEL[data_scope]
79# ═══════════════════════════════════════════════════════════════════════
80# Scope Guard — enforces scope at egress
81# ═══════════════════════════════════════════════════════════════════════
83class ScopeGuard:
84 """Checks data scope at egress points. Delegates to existing engines.
86 This is the single guard. MCP sandbox calls it. Federation calls it.
87 PeerLink calls it. There is no second path.
88 """
90 def check_egress(self, data: Dict[str, Any],
91 destination: PrivacyScope,
92 context: Optional[Dict] = None) -> Tuple[bool, str]:
93 """Can this data transit to this destination?
95 Steps:
96 1. Check declared scope (fast, deterministic)
97 2. Run DLP scan for undeclared PII (delegates to existing engine)
98 3. Audit log on violation
100 Returns (allowed, reason).
101 """
102 context = context or {}
103 data_scope = data.get('_privacy_scope', PrivacyScope.EDGE_ONLY)
105 # Normalize string to enum
106 if isinstance(data_scope, str):
107 try:
108 data_scope = PrivacyScope(data_scope)
109 except ValueError:
110 data_scope = PrivacyScope.EDGE_ONLY # Unknown = most restrictive
112 # ── Check 1: Declared scope ──
113 if not scope_allows(data_scope, destination):
114 reason = (
115 f'Scope violation: data is {data_scope.value}, '
116 f'destination is {destination.value} — blocked'
117 )
118 self._audit_violation(reason, context)
119 return False, reason
121 # ── Check 2: DLP scan for undeclared PII ──
122 # Even if scope says FEDERATED, check for PII that shouldn't be there
123 if destination in (PrivacyScope.FEDERATED, PrivacyScope.PUBLIC):
124 text_fields = self._extract_text(data)
125 if text_fields:
126 try:
127 from security.dlp_engine import get_dlp_engine
128 dlp = get_dlp_engine()
129 for field_name, text in text_fields:
130 findings = dlp.scan(text)
131 if findings:
132 types = sorted(set(f[0] for f in findings))
133 reason = (
134 f'PII found in "{field_name}" '
135 f'({", ".join(types)}) — '
136 f'blocked from {destination.value}'
137 )
138 self._audit_violation(reason, context)
139 return False, reason
140 except ImportError:
141 pass # DLP not available — allow but log
143 # ── Check 3: Secret scan for trusted_peer+ destinations ──
144 if destination in (PrivacyScope.TRUSTED_PEER,
145 PrivacyScope.FEDERATED,
146 PrivacyScope.PUBLIC):
147 text_fields = self._extract_text(data)
148 if text_fields:
149 try:
150 from security.secret_redactor import redact_secrets
151 for field_name, text in text_fields:
152 _, count = redact_secrets(text)
153 if count > 0:
154 reason = (
155 f'Secrets found in "{field_name}" '
156 f'({count} redactions) — '
157 f'blocked from {destination.value}'
158 )
159 self._audit_violation(reason, context)
160 return False, reason
161 except ImportError:
162 pass
164 return True, f'Scope check passed: {data_scope.value} → {destination.value}'
166 def redact_for_scope(self, data: Dict[str, Any],
167 destination: PrivacyScope) -> Dict[str, Any]:
168 """Redact data to make it safe for the given destination scope.
170 Instead of blocking, this strips fields that exceed the scope.
171 Returns a copy — never mutates the original.
172 """
173 result = {}
174 for key, value in data.items():
175 if key == '_privacy_scope':
176 continue
178 field_scope = data.get(f'_scope_{key}', data.get('_privacy_scope',
179 PrivacyScope.EDGE_ONLY))
180 if isinstance(field_scope, str):
181 try:
182 field_scope = PrivacyScope(field_scope)
183 except ValueError:
184 field_scope = PrivacyScope.EDGE_ONLY
186 if scope_allows(field_scope, destination):
187 result[key] = value
188 else:
189 result[key] = f'[SCOPE_REDACTED:{field_scope.value}]'
191 # Run DLP on remaining text for federated/public
192 if destination in (PrivacyScope.FEDERATED, PrivacyScope.PUBLIC):
193 try:
194 from security.dlp_engine import get_dlp_engine
195 dlp = get_dlp_engine()
196 for key, value in result.items():
197 if isinstance(value, str) and len(value) > 5:
198 result[key] = dlp.redact(value)
199 except ImportError:
200 pass
202 return result
204 def _extract_text(self, data: Dict) -> List[Tuple[str, str]]:
205 """Extract string fields from data for scanning."""
206 fields = []
207 for key, value in data.items():
208 if key.startswith('_'):
209 continue
210 if isinstance(value, str) and len(value) > 3:
211 fields.append((key, value))
212 return fields
214 def _audit_violation(self, reason: str, context: Dict):
215 """Log scope violation to immutable audit log."""
216 logger.warning(f'EDGE PRIVACY: {reason}')
217 try:
218 from security.immutable_audit_log import get_audit_log
219 get_audit_log().log_event(
220 'scope_violation',
221 actor_id=context.get('actor_id', 'unknown'),
222 action=reason,
223 )
224 except Exception:
225 pass
228# ═══════════════════════════════════════════════════════════════════════
229# Governance Integration — privacy as a constitutional scorer
230# ═══════════════════════════════════════════════════════════════════════
232def score_privacy(context: dict):
233 """Constitutional scorer for the governance pipeline.
235 Evaluates whether a decision respects privacy scopes.
236 Imported and registered by ai_governance.py.
237 """
238 from security.ai_governance import ConstitutionalSignal
240 data = context.get('data', {})
241 destination = context.get('destination_scope', '')
243 if not data or not destination:
244 return ConstitutionalSignal(
245 name='privacy', score=1.0, confidence=0.5,
246 weight=1.5, reasoning='No data/destination to evaluate',
247 )
249 if isinstance(destination, str):
250 try:
251 destination = PrivacyScope(destination)
252 except ValueError:
253 return ConstitutionalSignal(
254 name='privacy', score=0.5, confidence=0.3,
255 weight=1.5, reasoning=f'Unknown scope: {destination}',
256 )
258 guard = get_scope_guard()
259 allowed, reason = guard.check_egress(data, destination, context)
261 if allowed:
262 return ConstitutionalSignal(
263 name='privacy', score=1.0, confidence=0.95,
264 weight=1.5, reasoning=reason,
265 )
267 return ConstitutionalSignal(
268 name='privacy', score=0.02, confidence=1.0,
269 weight=2.0, # Privacy violations are high-weight
270 reasoning=reason,
271 )
274# ═══════════════════════════════════════════════════════════════════════
275# Singleton
276# ═══════════════════════════════════════════════════════════════════════
278_guard = None
281def get_scope_guard() -> ScopeGuard:
282 """Module-level singleton."""
283 global _guard
284 if _guard is None:
285 _guard = ScopeGuard()
286 return _guard