Coverage for integrations / channels / queue / dedupe.py: 94.5%
238 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Message Deduplication System
4Prevents duplicate messages from being processed multiple times.
5Ported from HevolveBot's src/infra/dedupe.ts and src/auto-reply/reply/inbound-dedupe.ts.
7Features:
8- Multiple deduplication modes (ID, content hash, combined)
9- TTL-based expiration
10- Thread-safe operation
11- Statistics tracking
12"""
14from __future__ import annotations
16import hashlib
17import logging
18import threading
19import time
20from collections import OrderedDict
21from dataclasses import dataclass, field
22from datetime import datetime, timedelta
23from enum import Enum
24from typing import Optional, Dict, Any, Callable, TypeVar, Generic
26logger = logging.getLogger(__name__)
28T = TypeVar('T')
31class DedupeMode(Enum):
32 """Deduplication mode."""
33 CONTENT_HASH = "content" # Hash of message content
34 MESSAGE_ID = "id" # Platform message ID
35 COMBINED = "combined" # Both content + ID
36 NONE = "none" # No deduplication
39@dataclass
40class DedupeConfig:
41 """Configuration for deduplication."""
42 mode: DedupeMode = DedupeMode.COMBINED
43 ttl_seconds: int = 300
44 max_entries: int = 10000
45 hash_algorithm: str = "sha256"
48@dataclass
49class DedupeStats:
50 """Statistics for deduplicator."""
51 total_checked: int = 0
52 total_duplicates: int = 0
53 total_unique: int = 0
54 total_expired: int = 0
55 current_entries: int = 0
57 @property
58 def duplicate_rate(self) -> float:
59 """Calculate duplicate rate as percentage."""
60 if self.total_checked == 0:
61 return 0.0
62 return (self.total_duplicates / self.total_checked) * 100
65@dataclass
66class DedupeEntry:
67 """Entry in the deduplication cache."""
68 hash_value: str
69 message_id: Optional[str]
70 content_hash: Optional[str]
71 first_seen: datetime
72 last_seen: datetime
73 count: int = 1
76class MessageDeduplicator(Generic[T]):
77 """
78 Deduplicates messages based on configurable criteria.
80 Supports multiple modes:
81 - CONTENT_HASH: Dedupe by hashing message content
82 - MESSAGE_ID: Dedupe by platform message ID
83 - COMBINED: Dedupe if either content or ID matches
84 - NONE: No deduplication
86 Usage:
87 config = DedupeConfig(mode=DedupeMode.COMBINED, ttl_seconds=300)
88 deduper = MessageDeduplicator(config)
90 # Check if duplicate
91 if deduper.is_duplicate(message, id_func=lambda m: m.id, content_func=lambda m: m.text):
92 # Skip duplicate
93 return
95 # Mark as seen
96 deduper.mark_seen(message, id_func=lambda m: m.id, content_func=lambda m: m.text)
97 """
99 def __init__(self, config: DedupeConfig):
100 self.config = config
101 self._entries: OrderedDict[str, DedupeEntry] = OrderedDict()
102 self._id_to_hash: Dict[str, str] = {}
103 self._content_to_hash: Dict[str, str] = {}
104 self._lock = threading.Lock()
105 self._stats = DedupeStats()
107 def _compute_hash(self, content: str) -> str:
108 """Compute hash of content."""
109 if self.config.hash_algorithm == "sha256":
110 return hashlib.sha256(content.encode()).hexdigest()[:32]
111 elif self.config.hash_algorithm == "md5":
112 return hashlib.md5(content.encode()).hexdigest()
113 else:
114 return hashlib.sha256(content.encode()).hexdigest()[:32]
116 def _compute_content_hash(self, content: str) -> str:
117 """Compute normalized content hash."""
118 # Normalize whitespace for more robust matching
119 normalized = ' '.join(content.split())
120 return self._compute_hash(normalized)
122 def _is_expired(self, entry: DedupeEntry) -> bool:
123 """Check if an entry is expired."""
124 if self.config.ttl_seconds <= 0:
125 return False
126 age = (datetime.now() - entry.last_seen).total_seconds()
127 return age > self.config.ttl_seconds
129 def _cleanup_expired(self) -> int:
130 """Remove expired entries."""
131 if self.config.ttl_seconds <= 0:
132 return 0
134 expired_count = 0
135 expired_hashes = []
137 for hash_val, entry in list(self._entries.items()):
138 if self._is_expired(entry):
139 expired_hashes.append(hash_val)
141 for hash_val in expired_hashes:
142 entry = self._entries.pop(hash_val, None)
143 if entry:
144 expired_count += 1
145 if entry.message_id and entry.message_id in self._id_to_hash:
146 del self._id_to_hash[entry.message_id]
147 if entry.content_hash and entry.content_hash in self._content_to_hash:
148 del self._content_to_hash[entry.content_hash]
150 self._stats.total_expired += expired_count
151 return expired_count
153 def _enforce_max_entries(self) -> int:
154 """Remove oldest entries if over limit."""
155 if len(self._entries) <= self.config.max_entries:
156 return 0
158 removed = 0
159 while len(self._entries) > self.config.max_entries:
160 # Remove oldest (first) entry
161 hash_val, entry = self._entries.popitem(last=False)
162 if entry.message_id and entry.message_id in self._id_to_hash:
163 del self._id_to_hash[entry.message_id]
164 if entry.content_hash and entry.content_hash in self._content_to_hash:
165 del self._content_to_hash[entry.content_hash]
166 removed += 1
168 return removed
170 def is_duplicate(
171 self,
172 item: T,
173 message_id: Optional[str] = None,
174 content: Optional[str] = None,
175 id_func: Optional[Callable[[T], Optional[str]]] = None,
176 content_func: Optional[Callable[[T], Optional[str]]] = None,
177 ) -> bool:
178 """
179 Check if a message is a duplicate.
181 Args:
182 item: The message item
183 message_id: Optional direct message ID
184 content: Optional direct content
185 id_func: Function to extract message ID from item
186 content_func: Function to extract content from item
188 Returns:
189 True if duplicate, False if new
190 """
191 if self.config.mode == DedupeMode.NONE:
192 return False
194 # Extract ID and content
195 msg_id = message_id
196 if msg_id is None and id_func is not None:
197 msg_id = id_func(item)
199 msg_content = content
200 if msg_content is None and content_func is not None:
201 msg_content = content_func(item)
203 with self._lock:
204 self._stats.total_checked += 1
206 # Cleanup expired
207 self._cleanup_expired()
209 # Check by message ID
210 if self.config.mode in (DedupeMode.MESSAGE_ID, DedupeMode.COMBINED):
211 if msg_id and msg_id in self._id_to_hash:
212 hash_val = self._id_to_hash[msg_id]
213 if hash_val in self._entries:
214 entry = self._entries[hash_val]
215 if not self._is_expired(entry):
216 entry.last_seen = datetime.now()
217 entry.count += 1
218 # Move to end (most recently used)
219 self._entries.move_to_end(hash_val)
220 self._stats.total_duplicates += 1
221 return True
223 # Check by content hash
224 if self.config.mode in (DedupeMode.CONTENT_HASH, DedupeMode.COMBINED):
225 if msg_content:
226 content_hash = self._compute_content_hash(msg_content)
227 if content_hash in self._content_to_hash:
228 hash_val = self._content_to_hash[content_hash]
229 if hash_val in self._entries:
230 entry = self._entries[hash_val]
231 if not self._is_expired(entry):
232 entry.last_seen = datetime.now()
233 entry.count += 1
234 self._entries.move_to_end(hash_val)
235 self._stats.total_duplicates += 1
236 return True
238 self._stats.total_unique += 1
239 return False
241 def mark_seen(
242 self,
243 item: T,
244 message_id: Optional[str] = None,
245 content: Optional[str] = None,
246 id_func: Optional[Callable[[T], Optional[str]]] = None,
247 content_func: Optional[Callable[[T], Optional[str]]] = None,
248 ) -> str:
249 """
250 Mark a message as seen.
252 Args:
253 item: The message item
254 message_id: Optional direct message ID
255 content: Optional direct content
256 id_func: Function to extract message ID from item
257 content_func: Function to extract content from item
259 Returns:
260 The hash key for this message
261 """
262 if self.config.mode == DedupeMode.NONE:
263 return ""
265 # Extract ID and content
266 msg_id = message_id
267 if msg_id is None and id_func is not None:
268 msg_id = id_func(item)
270 msg_content = content
271 if msg_content is None and content_func is not None:
272 msg_content = content_func(item)
274 content_hash = None
275 if msg_content:
276 content_hash = self._compute_content_hash(msg_content)
278 # Create combined hash
279 hash_parts = []
280 if msg_id:
281 hash_parts.append(f"id:{msg_id}")
282 if content_hash:
283 hash_parts.append(f"content:{content_hash}")
285 if not hash_parts:
286 return ""
288 combined_hash = self._compute_hash("|".join(hash_parts))
290 with self._lock:
291 now = datetime.now()
293 # Create or update entry
294 if combined_hash in self._entries:
295 entry = self._entries[combined_hash]
296 entry.last_seen = now
297 entry.count += 1
298 self._entries.move_to_end(combined_hash)
299 else:
300 entry = DedupeEntry(
301 hash_value=combined_hash,
302 message_id=msg_id,
303 content_hash=content_hash,
304 first_seen=now,
305 last_seen=now,
306 )
307 self._entries[combined_hash] = entry
309 # Update lookup indexes
310 if msg_id:
311 self._id_to_hash[msg_id] = combined_hash
312 if content_hash:
313 self._content_to_hash[content_hash] = combined_hash
315 # Enforce max entries
316 self._enforce_max_entries()
318 return combined_hash
320 def check_and_mark(
321 self,
322 item: T,
323 message_id: Optional[str] = None,
324 content: Optional[str] = None,
325 id_func: Optional[Callable[[T], Optional[str]]] = None,
326 content_func: Optional[Callable[[T], Optional[str]]] = None,
327 ) -> bool:
328 """
329 Check if duplicate and mark as seen in one operation.
331 Returns:
332 True if duplicate (already seen), False if new (and now marked)
333 """
334 if self.is_duplicate(item, message_id, content, id_func, content_func):
335 return True
337 self.mark_seen(item, message_id, content, id_func, content_func)
338 return False
340 def cleanup_expired(self) -> int:
341 """
342 Manually trigger cleanup of expired entries.
344 Returns:
345 Number of entries removed
346 """
347 with self._lock:
348 return self._cleanup_expired()
350 def clear(self) -> int:
351 """
352 Clear all entries.
354 Returns:
355 Number of entries cleared
356 """
357 with self._lock:
358 count = len(self._entries)
359 self._entries.clear()
360 self._id_to_hash.clear()
361 self._content_to_hash.clear()
362 self._stats.current_entries = 0
363 return count
365 def get_stats(self) -> DedupeStats:
366 """Get deduplication statistics."""
367 with self._lock:
368 self._stats.current_entries = len(self._entries)
369 return DedupeStats(
370 total_checked=self._stats.total_checked,
371 total_duplicates=self._stats.total_duplicates,
372 total_unique=self._stats.total_unique,
373 total_expired=self._stats.total_expired,
374 current_entries=self._stats.current_entries,
375 )
377 def get_entry_count(self) -> int:
378 """Get current number of entries."""
379 with self._lock:
380 return len(self._entries)
383class SimpleDeduplicator:
384 """
385 Simple string-based deduplicator.
387 For simpler use cases where you just have string keys.
388 """
390 def __init__(
391 self,
392 ttl_seconds: int = 300,
393 max_entries: int = 10000,
394 ):
395 self.ttl_seconds = ttl_seconds
396 self.max_entries = max_entries
397 self._seen: OrderedDict[str, datetime] = OrderedDict()
398 self._lock = threading.Lock()
400 def is_duplicate(self, key: str) -> bool:
401 """Check if key was seen recently."""
402 with self._lock:
403 self._cleanup()
405 if key in self._seen:
406 # Update timestamp
407 self._seen[key] = datetime.now()
408 self._seen.move_to_end(key)
409 return True
411 return False
413 def mark_seen(self, key: str) -> None:
414 """Mark a key as seen."""
415 with self._lock:
416 self._seen[key] = datetime.now()
417 self._seen.move_to_end(key)
418 self._enforce_max()
420 def check_and_mark(self, key: str) -> bool:
421 """Check if duplicate and mark in one operation."""
422 with self._lock:
423 self._cleanup()
425 if key in self._seen:
426 self._seen[key] = datetime.now()
427 self._seen.move_to_end(key)
428 return True
430 self._seen[key] = datetime.now()
431 self._enforce_max()
432 return False
434 def _cleanup(self) -> None:
435 """Remove expired entries."""
436 if self.ttl_seconds <= 0:
437 return
439 cutoff = datetime.now() - timedelta(seconds=self.ttl_seconds)
440 expired = [k for k, v in self._seen.items() if v < cutoff]
441 for k in expired:
442 del self._seen[k]
444 def _enforce_max(self) -> None:
445 """Remove oldest if over limit."""
446 while len(self._seen) > self.max_entries:
447 self._seen.popitem(last=False)
449 def clear(self) -> int:
450 """Clear all entries."""
451 with self._lock:
452 count = len(self._seen)
453 self._seen.clear()
454 return count
456 def get_count(self) -> int:
457 """Get current entry count."""
458 with self._lock:
459 return len(self._seen)