Coverage for integrations / channels / queue / debounce.py: 54.9%
233 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Inbound Debouncing System
4Collects rapid-fire messages and batches them together before processing.
5Ported from HevolveBot's src/auto-reply/inbound-debounce.ts.
7Features:
8- Configurable debounce windows per channel
9- Automatic flush on timeout
10- Manual flush support
11- Max messages limit
12- Thread-safe operation
13"""
15from __future__ import annotations
17import asyncio
18import logging
19import threading
20from dataclasses import dataclass, field
21from datetime import datetime
22from typing import Optional, Dict, List, Any, Callable, TypeVar, Generic
24logger = logging.getLogger(__name__)
26T = TypeVar('T')
29@dataclass
30class DebounceConfig:
31 """Configuration for debouncing."""
32 window_ms: int = 1000
33 max_messages: int = 10
34 channel_overrides: Dict[str, int] = field(default_factory=dict)
37@dataclass
38class DebounceStats:
39 """Statistics for debouncer."""
40 total_received: int = 0
41 total_flushed: int = 0
42 total_batches: int = 0
43 current_pending: int = 0
46class DebounceBuffer(Generic[T]):
47 """Buffer for collecting messages during debounce window."""
49 def __init__(self):
50 self.items: List[T] = []
51 self.timer_task: Optional[asyncio.Task] = None
52 self.created_at: datetime = datetime.now()
53 self.last_added: datetime = datetime.now()
55 def add(self, item: T) -> None:
56 """Add an item to the buffer."""
57 self.items.append(item)
58 self.last_added = datetime.now()
60 def clear(self) -> List[T]:
61 """Clear and return all items."""
62 items = self.items
63 self.items = []
64 return items
66 def cancel_timer(self) -> None:
67 """Cancel the flush timer if running."""
68 if self.timer_task and not self.timer_task.done():
69 self.timer_task.cancel()
70 self.timer_task = None
73class InboundDebouncer(Generic[T]):
74 """
75 Debounces inbound messages by collecting them into batches.
77 Messages arriving within the debounce window are collected together
78 and flushed as a batch when the window expires or max messages reached.
80 Usage:
81 config = DebounceConfig(window_ms=1000, max_messages=10)
82 debouncer = InboundDebouncer(config)
84 # Async usage
85 async def handle_message(msg):
86 result = await debouncer.debounce(msg, key_func=lambda m: m.chat_id)
87 if result:
88 # Process batch
89 for m in result:
90 process(m)
92 # Or with callback
93 debouncer = InboundDebouncer(
94 config,
95 on_flush=lambda items: process_batch(items)
96 )
97 await debouncer.debounce(msg, key_func=lambda m: m.chat_id)
98 """
100 def __init__(
101 self,
102 config: DebounceConfig,
103 on_flush: Optional[Callable[[List[T]], Any]] = None,
104 on_error: Optional[Callable[[Exception, List[T]], None]] = None,
105 ):
106 self.config = config
107 self.on_flush = on_flush
108 self.on_error = on_error
109 self._buffers: Dict[str, DebounceBuffer[T]] = {}
110 self._lock = threading.Lock()
111 self._stats = DebounceStats()
113 def _get_debounce_ms(self, channel: Optional[str] = None) -> int:
114 """Get debounce window for a channel."""
115 if channel and channel in self.config.channel_overrides:
116 return max(0, self.config.channel_overrides[channel])
117 return max(0, self.config.window_ms)
119 async def debounce(
120 self,
121 item: T,
122 key: Optional[str] = None,
123 key_func: Optional[Callable[[T], str]] = None,
124 channel: Optional[str] = None,
125 should_debounce: bool = True,
126 ) -> Optional[List[T]]:
127 """
128 Add an item to the debounce buffer.
130 Args:
131 item: The item to debounce
132 key: Buffer key (e.g., chat_id)
133 key_func: Function to extract key from item
134 channel: Channel name for per-channel debounce settings
135 should_debounce: Whether to actually debounce (False = immediate)
137 Returns:
138 List of items if flushed immediately, None if buffered
139 """
140 # Determine the key
141 if key is None and key_func is not None:
142 key = key_func(item)
144 if key is None:
145 key = "default"
147 debounce_ms = self._get_debounce_ms(channel)
149 self._stats.total_received += 1
151 # If debouncing disabled or not requested, flush immediately
152 if debounce_ms <= 0 or not should_debounce:
153 # First flush any pending items for this key
154 pending = await self._flush_key(key)
156 # Return all items including the new one
157 result = pending + [item]
158 self._stats.total_flushed += len(result)
159 self._stats.total_batches += 1
161 if self.on_flush:
162 try:
163 await self._call_flush(result)
164 except Exception as e:
165 if self.on_error:
166 self.on_error(e, result)
167 return None
169 return result
171 # Add to buffer
172 with self._lock:
173 if key not in self._buffers:
174 self._buffers[key] = DebounceBuffer()
176 buffer = self._buffers[key]
177 buffer.add(item)
178 self._stats.current_pending += 1
180 # Check max messages
181 if len(buffer.items) >= self.config.max_messages:
182 # Flush immediately
183 items = buffer.clear()
184 buffer.cancel_timer()
185 del self._buffers[key]
186 self._stats.current_pending -= len(items)
187 self._stats.total_flushed += len(items)
188 self._stats.total_batches += 1
190 if self.on_flush:
191 try:
192 await self._call_flush(items)
193 except Exception as e:
194 if self.on_error:
195 self.on_error(e, items)
196 return None
198 return items
200 # Schedule or reschedule timer
201 buffer.cancel_timer()
202 buffer.timer_task = asyncio.create_task(
203 self._timer_flush(key, debounce_ms)
204 )
206 return None
208 async def _timer_flush(self, key: str, delay_ms: int) -> None:
209 """Timer callback to flush buffer."""
210 try:
211 await asyncio.sleep(delay_ms / 1000.0)
212 await self._flush_key(key, from_timer=True)
213 except asyncio.CancelledError:
214 pass
215 except Exception as e:
216 logger.error(f"Error in debounce timer flush: {e}")
218 async def _flush_key(self, key: str, from_timer: bool = False) -> List[T]:
219 """Flush a specific buffer key."""
220 with self._lock:
221 if key not in self._buffers:
222 return []
224 buffer = self._buffers[key]
225 items = buffer.clear()
227 if not from_timer:
228 buffer.cancel_timer()
230 del self._buffers[key]
232 self._stats.current_pending -= len(items)
234 if items:
235 self._stats.total_flushed += len(items)
236 self._stats.total_batches += 1
238 if self.on_flush:
239 try:
240 await self._call_flush(items)
241 except Exception as e:
242 if self.on_error:
243 self.on_error(e, items)
244 return []
246 return items
248 async def _call_flush(self, items: List[T]) -> None:
249 """Call the flush callback."""
250 if self.on_flush:
251 result = self.on_flush(items)
252 if asyncio.iscoroutine(result):
253 await result
255 def flush(self, key: str) -> List[T]:
256 """
257 Synchronously flush a specific buffer.
259 Args:
260 key: Buffer key to flush
262 Returns:
263 List of flushed items
264 """
265 with self._lock:
266 if key not in self._buffers:
267 return []
269 buffer = self._buffers[key]
270 items = buffer.clear()
271 buffer.cancel_timer()
272 del self._buffers[key]
274 self._stats.current_pending -= len(items)
275 self._stats.total_flushed += len(items)
276 if items:
277 self._stats.total_batches += 1
279 return items
281 def flush_all(self) -> Dict[str, List[T]]:
282 """
283 Flush all buffers synchronously.
285 Returns:
286 Dict mapping keys to their flushed items
287 """
288 result = {}
290 with self._lock:
291 keys = list(self._buffers.keys())
293 for key in keys:
294 items = self.flush(key)
295 if items:
296 result[key] = items
298 return result
300 def get_pending_count(self) -> int:
301 """Get total number of pending items across all buffers."""
302 with self._lock:
303 return sum(len(b.items) for b in self._buffers.values())
305 def get_pending_keys(self) -> List[str]:
306 """Get list of keys with pending items."""
307 with self._lock:
308 return list(self._buffers.keys())
310 def get_stats(self) -> DebounceStats:
311 """Get debouncer statistics."""
312 with self._lock:
313 self._stats.current_pending = sum(len(b.items) for b in self._buffers.values())
314 return DebounceStats(
315 total_received=self._stats.total_received,
316 total_flushed=self._stats.total_flushed,
317 total_batches=self._stats.total_batches,
318 current_pending=self._stats.current_pending,
319 )
321 def clear(self) -> int:
322 """
323 Clear all buffers without flushing.
325 Returns:
326 Number of items cleared
327 """
328 with self._lock:
329 total = sum(len(b.items) for b in self._buffers.values())
330 for buffer in self._buffers.values():
331 buffer.cancel_timer()
332 self._buffers.clear()
333 self._stats.current_pending = 0
334 return total
337class SyncDebouncer(Generic[T]):
338 """
339 Synchronous debouncer for non-async contexts.
341 Uses threading for timer-based flushing.
342 """
344 def __init__(
345 self,
346 config: DebounceConfig,
347 on_flush: Optional[Callable[[List[T]], None]] = None,
348 ):
349 self.config = config
350 self.on_flush = on_flush
351 self._buffers: Dict[str, List[T]] = {}
352 self._timers: Dict[str, threading.Timer] = {}
353 self._lock = threading.Lock()
354 self._stats = DebounceStats()
356 def _get_debounce_ms(self, channel: Optional[str] = None) -> int:
357 """Get debounce window for a channel."""
358 if channel and channel in self.config.channel_overrides:
359 return max(0, self.config.channel_overrides[channel])
360 return max(0, self.config.window_ms)
362 def debounce(
363 self,
364 item: T,
365 key: str,
366 channel: Optional[str] = None,
367 ) -> Optional[List[T]]:
368 """
369 Add an item to the debounce buffer.
371 Args:
372 item: The item to debounce
373 key: Buffer key
374 channel: Channel for debounce settings
376 Returns:
377 List of items if immediately flushed, None if buffered
378 """
379 debounce_ms = self._get_debounce_ms(channel)
381 self._stats.total_received += 1
383 # If debouncing disabled
384 if debounce_ms <= 0:
385 pending = self.flush(key)
386 result = pending + [item]
387 self._stats.total_flushed += len(result)
388 self._stats.total_batches += 1
390 if self.on_flush:
391 self.on_flush(result)
392 return None
393 return result
395 with self._lock:
396 # Cancel existing timer
397 if key in self._timers:
398 self._timers[key].cancel()
399 del self._timers[key]
401 # Add to buffer
402 if key not in self._buffers:
403 self._buffers[key] = []
404 self._buffers[key].append(item)
405 self._stats.current_pending += 1
407 # Check max messages
408 if len(self._buffers[key]) >= self.config.max_messages:
409 items = self._buffers.pop(key)
410 self._stats.current_pending -= len(items)
411 self._stats.total_flushed += len(items)
412 self._stats.total_batches += 1
414 if self.on_flush:
415 self.on_flush(items)
416 return None
417 return items
419 # Schedule timer
420 timer = threading.Timer(
421 debounce_ms / 1000.0,
422 self._timer_flush,
423 args=[key],
424 )
425 timer.daemon = True
426 timer.start()
427 self._timers[key] = timer
429 return None
431 def _timer_flush(self, key: str) -> None:
432 """Timer callback."""
433 items = self.flush(key)
434 if items and self.on_flush:
435 self.on_flush(items)
437 def flush(self, key: str) -> List[T]:
438 """Flush a specific buffer."""
439 with self._lock:
440 if key in self._timers:
441 self._timers[key].cancel()
442 del self._timers[key]
444 if key not in self._buffers:
445 return []
447 items = self._buffers.pop(key)
448 self._stats.current_pending -= len(items)
449 self._stats.total_flushed += len(items)
450 if items:
451 self._stats.total_batches += 1
452 return items
454 def flush_all(self) -> Dict[str, List[T]]:
455 """Flush all buffers."""
456 result = {}
457 with self._lock:
458 keys = list(self._buffers.keys())
460 for key in keys:
461 items = self.flush(key)
462 if items:
463 result[key] = items
464 return result
466 def get_pending_count(self) -> int:
467 """Get pending item count."""
468 with self._lock:
469 return sum(len(items) for items in self._buffers.values())
471 def get_stats(self) -> DebounceStats:
472 """Get statistics."""
473 with self._lock:
474 self._stats.current_pending = sum(len(items) for items in self._buffers.values())
475 return DebounceStats(
476 total_received=self._stats.total_received,
477 total_flushed=self._stats.total_flushed,
478 total_batches=self._stats.total_batches,
479 current_pending=self._stats.current_pending,
480 )