Coverage for security / dlp_engine.py: 98.2%

56 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2DLP Engine — Data Loss Prevention for Outbound Calls 

3 

4Scans text for PII patterns (email, phone, SSN, credit card) and 

5blocks or redacts before outbound API calls or tool invocations. 

6 

7Integrated with MCP sandbox (validate_tool_call) to gate outbound data. 

8 

9Usage: 

10 from security.dlp_engine import get_dlp_engine 

11 

12 dlp = get_dlp_engine() 

13 findings = dlp.scan("Contact john@example.com or 555-123-4567") 

14 clean_text = dlp.redact("SSN is 123-45-6789") 

15 allowed, reason = dlp.check_outbound(text) 

16""" 

17 

18import re 

19import logging 

20from typing import List, Tuple, Optional 

21 

22logger = logging.getLogger('hevolve_security') 

23 

24# PII detection patterns 

25PII_PATTERNS = { 

26 'email': re.compile( 

27 r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' 

28 ), 

29 'phone': re.compile( 

30 r'\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b' 

31 ), 

32 'ssn': re.compile( 

33 r'\b\d{3}-\d{2}-\d{4}\b' 

34 ), 

35 'credit_card': re.compile( 

36 r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b' 

37 ), 

38 'ip_address': re.compile( 

39 r'\b(?:\d{1,3}\.){3}\d{1,3}\b' 

40 ), 

41} 

42 

43# Redaction replacements 

44_REDACT_MAP = { 

45 'email': '[EMAIL_REDACTED]', 

46 'phone': '[PHONE_REDACTED]', 

47 'ssn': '[SSN_REDACTED]', 

48 'credit_card': '[CC_REDACTED]', 

49 'ip_address': '[IP_REDACTED]', 

50} 

51 

52# Known safe patterns (don't flag these) 

53_SAFE_PATTERNS = { 

54 'ip_address': frozenset({ 

55 '127.0.0.1', '0.0.0.0', '255.255.255.255', 

56 '192.168.0.1', '10.0.0.1', '172.16.0.1', 

57 }), 

58} 

59 

60 

61class DLPEngine: 

62 """ 

63 Data Loss Prevention engine. 

64 Scans, redacts, and gates PII in text before outbound transmission. 

65 """ 

66 

67 def __init__(self, enabled: bool = True, 

68 block_on_pii: bool = True, 

69 scan_types: Optional[List[str]] = None): 

70 """ 

71 Args: 

72 enabled: Master switch for DLP scanning 

73 block_on_pii: If True, check_outbound blocks on PII. If False, only logs. 

74 scan_types: Which PII types to scan for (default: all) 

75 """ 

76 self.enabled = enabled 

77 self.block_on_pii = block_on_pii 

78 self.scan_types = scan_types or list(PII_PATTERNS.keys()) 

79 

80 def scan(self, text: str) -> List[Tuple[str, str]]: 

81 """ 

82 Scan text for PII patterns. 

83 

84 Returns: 

85 List of (pii_type, matched_text) tuples 

86 """ 

87 if not self.enabled or not text: 

88 return [] 

89 

90 findings = [] 

91 for pii_type in self.scan_types: 

92 pattern = PII_PATTERNS.get(pii_type) 

93 if not pattern: 

94 continue 

95 for match in pattern.finditer(text): 

96 value = match.group() 

97 # Skip known-safe values 

98 safe_set = _SAFE_PATTERNS.get(pii_type, frozenset()) 

99 if value in safe_set: 

100 continue 

101 findings.append((pii_type, value)) 

102 

103 if findings: 

104 types_found = set(f[0] for f in findings) 

105 logger.warning(f"DLP: found {len(findings)} PII items ({types_found})") 

106 

107 return findings 

108 

109 def redact(self, text: str) -> str: 

110 """ 

111 Redact all PII from text. 

112 

113 Returns: 

114 Text with PII replaced by type-specific placeholders 

115 """ 

116 if not self.enabled or not text: 

117 return text 

118 

119 result = text 

120 for pii_type in self.scan_types: 

121 pattern = PII_PATTERNS.get(pii_type) 

122 replacement = _REDACT_MAP.get(pii_type, '[REDACTED]') 

123 if pattern: 

124 result = pattern.sub(replacement, result) 

125 return result 

126 

127 def check_outbound(self, text: str) -> Tuple[bool, str]: 

128 """ 

129 Gate function for outbound data. 

130 

131 Returns: 

132 (allowed, reason) 

133 - (True, '') if no PII found 

134 - (False, reason) if PII found and block_on_pii is True 

135 - (True, warning) if PII found but block_on_pii is False (log-only mode) 

136 """ 

137 findings = self.scan(text) 

138 if not findings: 

139 return True, '' 

140 

141 types_found = sorted(set(f[0] for f in findings)) 

142 reason = f"PII detected: {', '.join(types_found)} ({len(findings)} items)" 

143 

144 if self.block_on_pii: 

145 logger.warning(f"DLP BLOCKED outbound: {reason}") 

146 return False, reason 

147 

148 logger.info(f"DLP WARNING (non-blocking): {reason}") 

149 return True, reason 

150 

151 

152# Singleton 

153_dlp_engine = None 

154 

155 

156def get_dlp_engine() -> DLPEngine: 

157 global _dlp_engine 

158 if _dlp_engine is None: 

159 _dlp_engine = DLPEngine() 

160 return _dlp_engine