Coverage for integrations / channels / queue / dedupe.py: 94.5%

238 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Message Deduplication System 

3 

4Prevents duplicate messages from being processed multiple times. 

5Ported from HevolveBot's src/infra/dedupe.ts and src/auto-reply/reply/inbound-dedupe.ts. 

6 

7Features: 

8- Multiple deduplication modes (ID, content hash, combined) 

9- TTL-based expiration 

10- Thread-safe operation 

11- Statistics tracking 

12""" 

13 

14from __future__ import annotations 

15 

16import hashlib 

17import logging 

18import threading 

19import time 

20from collections import OrderedDict 

21from dataclasses import dataclass, field 

22from datetime import datetime, timedelta 

23from enum import Enum 

24from typing import Optional, Dict, Any, Callable, TypeVar, Generic 

25 

26logger = logging.getLogger(__name__) 

27 

28T = TypeVar('T') 

29 

30 

31class DedupeMode(Enum): 

32 """Deduplication mode.""" 

33 CONTENT_HASH = "content" # Hash of message content 

34 MESSAGE_ID = "id" # Platform message ID 

35 COMBINED = "combined" # Both content + ID 

36 NONE = "none" # No deduplication 

37 

38 

39@dataclass 

40class DedupeConfig: 

41 """Configuration for deduplication.""" 

42 mode: DedupeMode = DedupeMode.COMBINED 

43 ttl_seconds: int = 300 

44 max_entries: int = 10000 

45 hash_algorithm: str = "sha256" 

46 

47 

48@dataclass 

49class DedupeStats: 

50 """Statistics for deduplicator.""" 

51 total_checked: int = 0 

52 total_duplicates: int = 0 

53 total_unique: int = 0 

54 total_expired: int = 0 

55 current_entries: int = 0 

56 

57 @property 

58 def duplicate_rate(self) -> float: 

59 """Calculate duplicate rate as percentage.""" 

60 if self.total_checked == 0: 

61 return 0.0 

62 return (self.total_duplicates / self.total_checked) * 100 

63 

64 

65@dataclass 

66class DedupeEntry: 

67 """Entry in the deduplication cache.""" 

68 hash_value: str 

69 message_id: Optional[str] 

70 content_hash: Optional[str] 

71 first_seen: datetime 

72 last_seen: datetime 

73 count: int = 1 

74 

75 

76class MessageDeduplicator(Generic[T]): 

77 """ 

78 Deduplicates messages based on configurable criteria. 

79 

80 Supports multiple modes: 

81 - CONTENT_HASH: Dedupe by hashing message content 

82 - MESSAGE_ID: Dedupe by platform message ID 

83 - COMBINED: Dedupe if either content or ID matches 

84 - NONE: No deduplication 

85 

86 Usage: 

87 config = DedupeConfig(mode=DedupeMode.COMBINED, ttl_seconds=300) 

88 deduper = MessageDeduplicator(config) 

89 

90 # Check if duplicate 

91 if deduper.is_duplicate(message, id_func=lambda m: m.id, content_func=lambda m: m.text): 

92 # Skip duplicate 

93 return 

94 

95 # Mark as seen 

96 deduper.mark_seen(message, id_func=lambda m: m.id, content_func=lambda m: m.text) 

97 """ 

98 

99 def __init__(self, config: DedupeConfig): 

100 self.config = config 

101 self._entries: OrderedDict[str, DedupeEntry] = OrderedDict() 

102 self._id_to_hash: Dict[str, str] = {} 

103 self._content_to_hash: Dict[str, str] = {} 

104 self._lock = threading.Lock() 

105 self._stats = DedupeStats() 

106 

107 def _compute_hash(self, content: str) -> str: 

108 """Compute hash of content.""" 

109 if self.config.hash_algorithm == "sha256": 

110 return hashlib.sha256(content.encode()).hexdigest()[:32] 

111 elif self.config.hash_algorithm == "md5": 

112 return hashlib.md5(content.encode()).hexdigest() 

113 else: 

114 return hashlib.sha256(content.encode()).hexdigest()[:32] 

115 

116 def _compute_content_hash(self, content: str) -> str: 

117 """Compute normalized content hash.""" 

118 # Normalize whitespace for more robust matching 

119 normalized = ' '.join(content.split()) 

120 return self._compute_hash(normalized) 

121 

122 def _is_expired(self, entry: DedupeEntry) -> bool: 

123 """Check if an entry is expired.""" 

124 if self.config.ttl_seconds <= 0: 

125 return False 

126 age = (datetime.now() - entry.last_seen).total_seconds() 

127 return age > self.config.ttl_seconds 

128 

129 def _cleanup_expired(self) -> int: 

130 """Remove expired entries.""" 

131 if self.config.ttl_seconds <= 0: 

132 return 0 

133 

134 expired_count = 0 

135 expired_hashes = [] 

136 

137 for hash_val, entry in list(self._entries.items()): 

138 if self._is_expired(entry): 

139 expired_hashes.append(hash_val) 

140 

141 for hash_val in expired_hashes: 

142 entry = self._entries.pop(hash_val, None) 

143 if entry: 

144 expired_count += 1 

145 if entry.message_id and entry.message_id in self._id_to_hash: 

146 del self._id_to_hash[entry.message_id] 

147 if entry.content_hash and entry.content_hash in self._content_to_hash: 

148 del self._content_to_hash[entry.content_hash] 

149 

150 self._stats.total_expired += expired_count 

151 return expired_count 

152 

153 def _enforce_max_entries(self) -> int: 

154 """Remove oldest entries if over limit.""" 

155 if len(self._entries) <= self.config.max_entries: 

156 return 0 

157 

158 removed = 0 

159 while len(self._entries) > self.config.max_entries: 

160 # Remove oldest (first) entry 

161 hash_val, entry = self._entries.popitem(last=False) 

162 if entry.message_id and entry.message_id in self._id_to_hash: 

163 del self._id_to_hash[entry.message_id] 

164 if entry.content_hash and entry.content_hash in self._content_to_hash: 

165 del self._content_to_hash[entry.content_hash] 

166 removed += 1 

167 

168 return removed 

169 

170 def is_duplicate( 

171 self, 

172 item: T, 

173 message_id: Optional[str] = None, 

174 content: Optional[str] = None, 

175 id_func: Optional[Callable[[T], Optional[str]]] = None, 

176 content_func: Optional[Callable[[T], Optional[str]]] = None, 

177 ) -> bool: 

178 """ 

179 Check if a message is a duplicate. 

180 

181 Args: 

182 item: The message item 

183 message_id: Optional direct message ID 

184 content: Optional direct content 

185 id_func: Function to extract message ID from item 

186 content_func: Function to extract content from item 

187 

188 Returns: 

189 True if duplicate, False if new 

190 """ 

191 if self.config.mode == DedupeMode.NONE: 

192 return False 

193 

194 # Extract ID and content 

195 msg_id = message_id 

196 if msg_id is None and id_func is not None: 

197 msg_id = id_func(item) 

198 

199 msg_content = content 

200 if msg_content is None and content_func is not None: 

201 msg_content = content_func(item) 

202 

203 with self._lock: 

204 self._stats.total_checked += 1 

205 

206 # Cleanup expired 

207 self._cleanup_expired() 

208 

209 # Check by message ID 

210 if self.config.mode in (DedupeMode.MESSAGE_ID, DedupeMode.COMBINED): 

211 if msg_id and msg_id in self._id_to_hash: 

212 hash_val = self._id_to_hash[msg_id] 

213 if hash_val in self._entries: 

214 entry = self._entries[hash_val] 

215 if not self._is_expired(entry): 

216 entry.last_seen = datetime.now() 

217 entry.count += 1 

218 # Move to end (most recently used) 

219 self._entries.move_to_end(hash_val) 

220 self._stats.total_duplicates += 1 

221 return True 

222 

223 # Check by content hash 

224 if self.config.mode in (DedupeMode.CONTENT_HASH, DedupeMode.COMBINED): 

225 if msg_content: 

226 content_hash = self._compute_content_hash(msg_content) 

227 if content_hash in self._content_to_hash: 

228 hash_val = self._content_to_hash[content_hash] 

229 if hash_val in self._entries: 

230 entry = self._entries[hash_val] 

231 if not self._is_expired(entry): 

232 entry.last_seen = datetime.now() 

233 entry.count += 1 

234 self._entries.move_to_end(hash_val) 

235 self._stats.total_duplicates += 1 

236 return True 

237 

238 self._stats.total_unique += 1 

239 return False 

240 

241 def mark_seen( 

242 self, 

243 item: T, 

244 message_id: Optional[str] = None, 

245 content: Optional[str] = None, 

246 id_func: Optional[Callable[[T], Optional[str]]] = None, 

247 content_func: Optional[Callable[[T], Optional[str]]] = None, 

248 ) -> str: 

249 """ 

250 Mark a message as seen. 

251 

252 Args: 

253 item: The message item 

254 message_id: Optional direct message ID 

255 content: Optional direct content 

256 id_func: Function to extract message ID from item 

257 content_func: Function to extract content from item 

258 

259 Returns: 

260 The hash key for this message 

261 """ 

262 if self.config.mode == DedupeMode.NONE: 

263 return "" 

264 

265 # Extract ID and content 

266 msg_id = message_id 

267 if msg_id is None and id_func is not None: 

268 msg_id = id_func(item) 

269 

270 msg_content = content 

271 if msg_content is None and content_func is not None: 

272 msg_content = content_func(item) 

273 

274 content_hash = None 

275 if msg_content: 

276 content_hash = self._compute_content_hash(msg_content) 

277 

278 # Create combined hash 

279 hash_parts = [] 

280 if msg_id: 

281 hash_parts.append(f"id:{msg_id}") 

282 if content_hash: 

283 hash_parts.append(f"content:{content_hash}") 

284 

285 if not hash_parts: 

286 return "" 

287 

288 combined_hash = self._compute_hash("|".join(hash_parts)) 

289 

290 with self._lock: 

291 now = datetime.now() 

292 

293 # Create or update entry 

294 if combined_hash in self._entries: 

295 entry = self._entries[combined_hash] 

296 entry.last_seen = now 

297 entry.count += 1 

298 self._entries.move_to_end(combined_hash) 

299 else: 

300 entry = DedupeEntry( 

301 hash_value=combined_hash, 

302 message_id=msg_id, 

303 content_hash=content_hash, 

304 first_seen=now, 

305 last_seen=now, 

306 ) 

307 self._entries[combined_hash] = entry 

308 

309 # Update lookup indexes 

310 if msg_id: 

311 self._id_to_hash[msg_id] = combined_hash 

312 if content_hash: 

313 self._content_to_hash[content_hash] = combined_hash 

314 

315 # Enforce max entries 

316 self._enforce_max_entries() 

317 

318 return combined_hash 

319 

320 def check_and_mark( 

321 self, 

322 item: T, 

323 message_id: Optional[str] = None, 

324 content: Optional[str] = None, 

325 id_func: Optional[Callable[[T], Optional[str]]] = None, 

326 content_func: Optional[Callable[[T], Optional[str]]] = None, 

327 ) -> bool: 

328 """ 

329 Check if duplicate and mark as seen in one operation. 

330 

331 Returns: 

332 True if duplicate (already seen), False if new (and now marked) 

333 """ 

334 if self.is_duplicate(item, message_id, content, id_func, content_func): 

335 return True 

336 

337 self.mark_seen(item, message_id, content, id_func, content_func) 

338 return False 

339 

340 def cleanup_expired(self) -> int: 

341 """ 

342 Manually trigger cleanup of expired entries. 

343 

344 Returns: 

345 Number of entries removed 

346 """ 

347 with self._lock: 

348 return self._cleanup_expired() 

349 

350 def clear(self) -> int: 

351 """ 

352 Clear all entries. 

353 

354 Returns: 

355 Number of entries cleared 

356 """ 

357 with self._lock: 

358 count = len(self._entries) 

359 self._entries.clear() 

360 self._id_to_hash.clear() 

361 self._content_to_hash.clear() 

362 self._stats.current_entries = 0 

363 return count 

364 

365 def get_stats(self) -> DedupeStats: 

366 """Get deduplication statistics.""" 

367 with self._lock: 

368 self._stats.current_entries = len(self._entries) 

369 return DedupeStats( 

370 total_checked=self._stats.total_checked, 

371 total_duplicates=self._stats.total_duplicates, 

372 total_unique=self._stats.total_unique, 

373 total_expired=self._stats.total_expired, 

374 current_entries=self._stats.current_entries, 

375 ) 

376 

377 def get_entry_count(self) -> int: 

378 """Get current number of entries.""" 

379 with self._lock: 

380 return len(self._entries) 

381 

382 

383class SimpleDeduplicator: 

384 """ 

385 Simple string-based deduplicator. 

386 

387 For simpler use cases where you just have string keys. 

388 """ 

389 

390 def __init__( 

391 self, 

392 ttl_seconds: int = 300, 

393 max_entries: int = 10000, 

394 ): 

395 self.ttl_seconds = ttl_seconds 

396 self.max_entries = max_entries 

397 self._seen: OrderedDict[str, datetime] = OrderedDict() 

398 self._lock = threading.Lock() 

399 

400 def is_duplicate(self, key: str) -> bool: 

401 """Check if key was seen recently.""" 

402 with self._lock: 

403 self._cleanup() 

404 

405 if key in self._seen: 

406 # Update timestamp 

407 self._seen[key] = datetime.now() 

408 self._seen.move_to_end(key) 

409 return True 

410 

411 return False 

412 

413 def mark_seen(self, key: str) -> None: 

414 """Mark a key as seen.""" 

415 with self._lock: 

416 self._seen[key] = datetime.now() 

417 self._seen.move_to_end(key) 

418 self._enforce_max() 

419 

420 def check_and_mark(self, key: str) -> bool: 

421 """Check if duplicate and mark in one operation.""" 

422 with self._lock: 

423 self._cleanup() 

424 

425 if key in self._seen: 

426 self._seen[key] = datetime.now() 

427 self._seen.move_to_end(key) 

428 return True 

429 

430 self._seen[key] = datetime.now() 

431 self._enforce_max() 

432 return False 

433 

434 def _cleanup(self) -> None: 

435 """Remove expired entries.""" 

436 if self.ttl_seconds <= 0: 

437 return 

438 

439 cutoff = datetime.now() - timedelta(seconds=self.ttl_seconds) 

440 expired = [k for k, v in self._seen.items() if v < cutoff] 

441 for k in expired: 

442 del self._seen[k] 

443 

444 def _enforce_max(self) -> None: 

445 """Remove oldest if over limit.""" 

446 while len(self._seen) > self.max_entries: 

447 self._seen.popitem(last=False) 

448 

449 def clear(self) -> int: 

450 """Clear all entries.""" 

451 with self._lock: 

452 count = len(self._seen) 

453 self._seen.clear() 

454 return count 

455 

456 def get_count(self) -> int: 

457 """Get current entry count.""" 

458 with self._lock: 

459 return len(self._seen)