Coverage for integrations / channels / queue / debounce.py: 54.9%

233 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Inbound Debouncing System 

3 

4Collects rapid-fire messages and batches them together before processing. 

5Ported from HevolveBot's src/auto-reply/inbound-debounce.ts. 

6 

7Features: 

8- Configurable debounce windows per channel 

9- Automatic flush on timeout 

10- Manual flush support 

11- Max messages limit 

12- Thread-safe operation 

13""" 

14 

15from __future__ import annotations 

16 

17import asyncio 

18import logging 

19import threading 

20from dataclasses import dataclass, field 

21from datetime import datetime 

22from typing import Optional, Dict, List, Any, Callable, TypeVar, Generic 

23 

24logger = logging.getLogger(__name__) 

25 

26T = TypeVar('T') 

27 

28 

29@dataclass 

30class DebounceConfig: 

31 """Configuration for debouncing.""" 

32 window_ms: int = 1000 

33 max_messages: int = 10 

34 channel_overrides: Dict[str, int] = field(default_factory=dict) 

35 

36 

37@dataclass 

38class DebounceStats: 

39 """Statistics for debouncer.""" 

40 total_received: int = 0 

41 total_flushed: int = 0 

42 total_batches: int = 0 

43 current_pending: int = 0 

44 

45 

46class DebounceBuffer(Generic[T]): 

47 """Buffer for collecting messages during debounce window.""" 

48 

49 def __init__(self): 

50 self.items: List[T] = [] 

51 self.timer_task: Optional[asyncio.Task] = None 

52 self.created_at: datetime = datetime.now() 

53 self.last_added: datetime = datetime.now() 

54 

55 def add(self, item: T) -> None: 

56 """Add an item to the buffer.""" 

57 self.items.append(item) 

58 self.last_added = datetime.now() 

59 

60 def clear(self) -> List[T]: 

61 """Clear and return all items.""" 

62 items = self.items 

63 self.items = [] 

64 return items 

65 

66 def cancel_timer(self) -> None: 

67 """Cancel the flush timer if running.""" 

68 if self.timer_task and not self.timer_task.done(): 

69 self.timer_task.cancel() 

70 self.timer_task = None 

71 

72 

73class InboundDebouncer(Generic[T]): 

74 """ 

75 Debounces inbound messages by collecting them into batches. 

76 

77 Messages arriving within the debounce window are collected together 

78 and flushed as a batch when the window expires or max messages reached. 

79 

80 Usage: 

81 config = DebounceConfig(window_ms=1000, max_messages=10) 

82 debouncer = InboundDebouncer(config) 

83 

84 # Async usage 

85 async def handle_message(msg): 

86 result = await debouncer.debounce(msg, key_func=lambda m: m.chat_id) 

87 if result: 

88 # Process batch 

89 for m in result: 

90 process(m) 

91 

92 # Or with callback 

93 debouncer = InboundDebouncer( 

94 config, 

95 on_flush=lambda items: process_batch(items) 

96 ) 

97 await debouncer.debounce(msg, key_func=lambda m: m.chat_id) 

98 """ 

99 

100 def __init__( 

101 self, 

102 config: DebounceConfig, 

103 on_flush: Optional[Callable[[List[T]], Any]] = None, 

104 on_error: Optional[Callable[[Exception, List[T]], None]] = None, 

105 ): 

106 self.config = config 

107 self.on_flush = on_flush 

108 self.on_error = on_error 

109 self._buffers: Dict[str, DebounceBuffer[T]] = {} 

110 self._lock = threading.Lock() 

111 self._stats = DebounceStats() 

112 

113 def _get_debounce_ms(self, channel: Optional[str] = None) -> int: 

114 """Get debounce window for a channel.""" 

115 if channel and channel in self.config.channel_overrides: 

116 return max(0, self.config.channel_overrides[channel]) 

117 return max(0, self.config.window_ms) 

118 

119 async def debounce( 

120 self, 

121 item: T, 

122 key: Optional[str] = None, 

123 key_func: Optional[Callable[[T], str]] = None, 

124 channel: Optional[str] = None, 

125 should_debounce: bool = True, 

126 ) -> Optional[List[T]]: 

127 """ 

128 Add an item to the debounce buffer. 

129 

130 Args: 

131 item: The item to debounce 

132 key: Buffer key (e.g., chat_id) 

133 key_func: Function to extract key from item 

134 channel: Channel name for per-channel debounce settings 

135 should_debounce: Whether to actually debounce (False = immediate) 

136 

137 Returns: 

138 List of items if flushed immediately, None if buffered 

139 """ 

140 # Determine the key 

141 if key is None and key_func is not None: 

142 key = key_func(item) 

143 

144 if key is None: 

145 key = "default" 

146 

147 debounce_ms = self._get_debounce_ms(channel) 

148 

149 self._stats.total_received += 1 

150 

151 # If debouncing disabled or not requested, flush immediately 

152 if debounce_ms <= 0 or not should_debounce: 

153 # First flush any pending items for this key 

154 pending = await self._flush_key(key) 

155 

156 # Return all items including the new one 

157 result = pending + [item] 

158 self._stats.total_flushed += len(result) 

159 self._stats.total_batches += 1 

160 

161 if self.on_flush: 

162 try: 

163 await self._call_flush(result) 

164 except Exception as e: 

165 if self.on_error: 

166 self.on_error(e, result) 

167 return None 

168 

169 return result 

170 

171 # Add to buffer 

172 with self._lock: 

173 if key not in self._buffers: 

174 self._buffers[key] = DebounceBuffer() 

175 

176 buffer = self._buffers[key] 

177 buffer.add(item) 

178 self._stats.current_pending += 1 

179 

180 # Check max messages 

181 if len(buffer.items) >= self.config.max_messages: 

182 # Flush immediately 

183 items = buffer.clear() 

184 buffer.cancel_timer() 

185 del self._buffers[key] 

186 self._stats.current_pending -= len(items) 

187 self._stats.total_flushed += len(items) 

188 self._stats.total_batches += 1 

189 

190 if self.on_flush: 

191 try: 

192 await self._call_flush(items) 

193 except Exception as e: 

194 if self.on_error: 

195 self.on_error(e, items) 

196 return None 

197 

198 return items 

199 

200 # Schedule or reschedule timer 

201 buffer.cancel_timer() 

202 buffer.timer_task = asyncio.create_task( 

203 self._timer_flush(key, debounce_ms) 

204 ) 

205 

206 return None 

207 

208 async def _timer_flush(self, key: str, delay_ms: int) -> None: 

209 """Timer callback to flush buffer.""" 

210 try: 

211 await asyncio.sleep(delay_ms / 1000.0) 

212 await self._flush_key(key, from_timer=True) 

213 except asyncio.CancelledError: 

214 pass 

215 except Exception as e: 

216 logger.error(f"Error in debounce timer flush: {e}") 

217 

218 async def _flush_key(self, key: str, from_timer: bool = False) -> List[T]: 

219 """Flush a specific buffer key.""" 

220 with self._lock: 

221 if key not in self._buffers: 

222 return [] 

223 

224 buffer = self._buffers[key] 

225 items = buffer.clear() 

226 

227 if not from_timer: 

228 buffer.cancel_timer() 

229 

230 del self._buffers[key] 

231 

232 self._stats.current_pending -= len(items) 

233 

234 if items: 

235 self._stats.total_flushed += len(items) 

236 self._stats.total_batches += 1 

237 

238 if self.on_flush: 

239 try: 

240 await self._call_flush(items) 

241 except Exception as e: 

242 if self.on_error: 

243 self.on_error(e, items) 

244 return [] 

245 

246 return items 

247 

248 async def _call_flush(self, items: List[T]) -> None: 

249 """Call the flush callback.""" 

250 if self.on_flush: 

251 result = self.on_flush(items) 

252 if asyncio.iscoroutine(result): 

253 await result 

254 

255 def flush(self, key: str) -> List[T]: 

256 """ 

257 Synchronously flush a specific buffer. 

258 

259 Args: 

260 key: Buffer key to flush 

261 

262 Returns: 

263 List of flushed items 

264 """ 

265 with self._lock: 

266 if key not in self._buffers: 

267 return [] 

268 

269 buffer = self._buffers[key] 

270 items = buffer.clear() 

271 buffer.cancel_timer() 

272 del self._buffers[key] 

273 

274 self._stats.current_pending -= len(items) 

275 self._stats.total_flushed += len(items) 

276 if items: 

277 self._stats.total_batches += 1 

278 

279 return items 

280 

281 def flush_all(self) -> Dict[str, List[T]]: 

282 """ 

283 Flush all buffers synchronously. 

284 

285 Returns: 

286 Dict mapping keys to their flushed items 

287 """ 

288 result = {} 

289 

290 with self._lock: 

291 keys = list(self._buffers.keys()) 

292 

293 for key in keys: 

294 items = self.flush(key) 

295 if items: 

296 result[key] = items 

297 

298 return result 

299 

300 def get_pending_count(self) -> int: 

301 """Get total number of pending items across all buffers.""" 

302 with self._lock: 

303 return sum(len(b.items) for b in self._buffers.values()) 

304 

305 def get_pending_keys(self) -> List[str]: 

306 """Get list of keys with pending items.""" 

307 with self._lock: 

308 return list(self._buffers.keys()) 

309 

310 def get_stats(self) -> DebounceStats: 

311 """Get debouncer statistics.""" 

312 with self._lock: 

313 self._stats.current_pending = sum(len(b.items) for b in self._buffers.values()) 

314 return DebounceStats( 

315 total_received=self._stats.total_received, 

316 total_flushed=self._stats.total_flushed, 

317 total_batches=self._stats.total_batches, 

318 current_pending=self._stats.current_pending, 

319 ) 

320 

321 def clear(self) -> int: 

322 """ 

323 Clear all buffers without flushing. 

324 

325 Returns: 

326 Number of items cleared 

327 """ 

328 with self._lock: 

329 total = sum(len(b.items) for b in self._buffers.values()) 

330 for buffer in self._buffers.values(): 

331 buffer.cancel_timer() 

332 self._buffers.clear() 

333 self._stats.current_pending = 0 

334 return total 

335 

336 

337class SyncDebouncer(Generic[T]): 

338 """ 

339 Synchronous debouncer for non-async contexts. 

340 

341 Uses threading for timer-based flushing. 

342 """ 

343 

344 def __init__( 

345 self, 

346 config: DebounceConfig, 

347 on_flush: Optional[Callable[[List[T]], None]] = None, 

348 ): 

349 self.config = config 

350 self.on_flush = on_flush 

351 self._buffers: Dict[str, List[T]] = {} 

352 self._timers: Dict[str, threading.Timer] = {} 

353 self._lock = threading.Lock() 

354 self._stats = DebounceStats() 

355 

356 def _get_debounce_ms(self, channel: Optional[str] = None) -> int: 

357 """Get debounce window for a channel.""" 

358 if channel and channel in self.config.channel_overrides: 

359 return max(0, self.config.channel_overrides[channel]) 

360 return max(0, self.config.window_ms) 

361 

362 def debounce( 

363 self, 

364 item: T, 

365 key: str, 

366 channel: Optional[str] = None, 

367 ) -> Optional[List[T]]: 

368 """ 

369 Add an item to the debounce buffer. 

370 

371 Args: 

372 item: The item to debounce 

373 key: Buffer key 

374 channel: Channel for debounce settings 

375 

376 Returns: 

377 List of items if immediately flushed, None if buffered 

378 """ 

379 debounce_ms = self._get_debounce_ms(channel) 

380 

381 self._stats.total_received += 1 

382 

383 # If debouncing disabled 

384 if debounce_ms <= 0: 

385 pending = self.flush(key) 

386 result = pending + [item] 

387 self._stats.total_flushed += len(result) 

388 self._stats.total_batches += 1 

389 

390 if self.on_flush: 

391 self.on_flush(result) 

392 return None 

393 return result 

394 

395 with self._lock: 

396 # Cancel existing timer 

397 if key in self._timers: 

398 self._timers[key].cancel() 

399 del self._timers[key] 

400 

401 # Add to buffer 

402 if key not in self._buffers: 

403 self._buffers[key] = [] 

404 self._buffers[key].append(item) 

405 self._stats.current_pending += 1 

406 

407 # Check max messages 

408 if len(self._buffers[key]) >= self.config.max_messages: 

409 items = self._buffers.pop(key) 

410 self._stats.current_pending -= len(items) 

411 self._stats.total_flushed += len(items) 

412 self._stats.total_batches += 1 

413 

414 if self.on_flush: 

415 self.on_flush(items) 

416 return None 

417 return items 

418 

419 # Schedule timer 

420 timer = threading.Timer( 

421 debounce_ms / 1000.0, 

422 self._timer_flush, 

423 args=[key], 

424 ) 

425 timer.daemon = True 

426 timer.start() 

427 self._timers[key] = timer 

428 

429 return None 

430 

431 def _timer_flush(self, key: str) -> None: 

432 """Timer callback.""" 

433 items = self.flush(key) 

434 if items and self.on_flush: 

435 self.on_flush(items) 

436 

437 def flush(self, key: str) -> List[T]: 

438 """Flush a specific buffer.""" 

439 with self._lock: 

440 if key in self._timers: 

441 self._timers[key].cancel() 

442 del self._timers[key] 

443 

444 if key not in self._buffers: 

445 return [] 

446 

447 items = self._buffers.pop(key) 

448 self._stats.current_pending -= len(items) 

449 self._stats.total_flushed += len(items) 

450 if items: 

451 self._stats.total_batches += 1 

452 return items 

453 

454 def flush_all(self) -> Dict[str, List[T]]: 

455 """Flush all buffers.""" 

456 result = {} 

457 with self._lock: 

458 keys = list(self._buffers.keys()) 

459 

460 for key in keys: 

461 items = self.flush(key) 

462 if items: 

463 result[key] = items 

464 return result 

465 

466 def get_pending_count(self) -> int: 

467 """Get pending item count.""" 

468 with self._lock: 

469 return sum(len(items) for items in self._buffers.values()) 

470 

471 def get_stats(self) -> DebounceStats: 

472 """Get statistics.""" 

473 with self._lock: 

474 self._stats.current_pending = sum(len(items) for items in self._buffers.values()) 

475 return DebounceStats( 

476 total_received=self._stats.total_received, 

477 total_flushed=self._stats.total_flushed, 

478 total_batches=self._stats.total_batches, 

479 current_pending=self._stats.current_pending, 

480 )