Coverage for security / dlp_engine.py: 98.2%
56 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2DLP Engine — Data Loss Prevention for Outbound Calls
4Scans text for PII patterns (email, phone, SSN, credit card) and
5blocks or redacts before outbound API calls or tool invocations.
7Integrated with MCP sandbox (validate_tool_call) to gate outbound data.
9Usage:
10 from security.dlp_engine import get_dlp_engine
12 dlp = get_dlp_engine()
13 findings = dlp.scan("Contact john@example.com or 555-123-4567")
14 clean_text = dlp.redact("SSN is 123-45-6789")
15 allowed, reason = dlp.check_outbound(text)
16"""
18import re
19import logging
20from typing import List, Tuple, Optional
22logger = logging.getLogger('hevolve_security')
24# PII detection patterns
25PII_PATTERNS = {
26 'email': re.compile(
27 r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
28 ),
29 'phone': re.compile(
30 r'\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'
31 ),
32 'ssn': re.compile(
33 r'\b\d{3}-\d{2}-\d{4}\b'
34 ),
35 'credit_card': re.compile(
36 r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'
37 ),
38 'ip_address': re.compile(
39 r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
40 ),
41}
43# Redaction replacements
44_REDACT_MAP = {
45 'email': '[EMAIL_REDACTED]',
46 'phone': '[PHONE_REDACTED]',
47 'ssn': '[SSN_REDACTED]',
48 'credit_card': '[CC_REDACTED]',
49 'ip_address': '[IP_REDACTED]',
50}
52# Known safe patterns (don't flag these)
53_SAFE_PATTERNS = {
54 'ip_address': frozenset({
55 '127.0.0.1', '0.0.0.0', '255.255.255.255',
56 '192.168.0.1', '10.0.0.1', '172.16.0.1',
57 }),
58}
61class DLPEngine:
62 """
63 Data Loss Prevention engine.
64 Scans, redacts, and gates PII in text before outbound transmission.
65 """
67 def __init__(self, enabled: bool = True,
68 block_on_pii: bool = True,
69 scan_types: Optional[List[str]] = None):
70 """
71 Args:
72 enabled: Master switch for DLP scanning
73 block_on_pii: If True, check_outbound blocks on PII. If False, only logs.
74 scan_types: Which PII types to scan for (default: all)
75 """
76 self.enabled = enabled
77 self.block_on_pii = block_on_pii
78 self.scan_types = scan_types or list(PII_PATTERNS.keys())
80 def scan(self, text: str) -> List[Tuple[str, str]]:
81 """
82 Scan text for PII patterns.
84 Returns:
85 List of (pii_type, matched_text) tuples
86 """
87 if not self.enabled or not text:
88 return []
90 findings = []
91 for pii_type in self.scan_types:
92 pattern = PII_PATTERNS.get(pii_type)
93 if not pattern:
94 continue
95 for match in pattern.finditer(text):
96 value = match.group()
97 # Skip known-safe values
98 safe_set = _SAFE_PATTERNS.get(pii_type, frozenset())
99 if value in safe_set:
100 continue
101 findings.append((pii_type, value))
103 if findings:
104 types_found = set(f[0] for f in findings)
105 logger.warning(f"DLP: found {len(findings)} PII items ({types_found})")
107 return findings
109 def redact(self, text: str) -> str:
110 """
111 Redact all PII from text.
113 Returns:
114 Text with PII replaced by type-specific placeholders
115 """
116 if not self.enabled or not text:
117 return text
119 result = text
120 for pii_type in self.scan_types:
121 pattern = PII_PATTERNS.get(pii_type)
122 replacement = _REDACT_MAP.get(pii_type, '[REDACTED]')
123 if pattern:
124 result = pattern.sub(replacement, result)
125 return result
127 def check_outbound(self, text: str) -> Tuple[bool, str]:
128 """
129 Gate function for outbound data.
131 Returns:
132 (allowed, reason)
133 - (True, '') if no PII found
134 - (False, reason) if PII found and block_on_pii is True
135 - (True, warning) if PII found but block_on_pii is False (log-only mode)
136 """
137 findings = self.scan(text)
138 if not findings:
139 return True, ''
141 types_found = sorted(set(f[0] for f in findings))
142 reason = f"PII detected: {', '.join(types_found)} ({len(findings)} items)"
144 if self.block_on_pii:
145 logger.warning(f"DLP BLOCKED outbound: {reason}")
146 return False, reason
148 logger.info(f"DLP WARNING (non-blocking): {reason}")
149 return True, reason
152# Singleton
153_dlp_engine = None
156def get_dlp_engine() -> DLPEngine:
157 global _dlp_engine
158 if _dlp_engine is None:
159 _dlp_engine = DLPEngine()
160 return _dlp_engine