Coverage for integrations / channels / queue / message_queue.py: 90.4%

314 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Message Queue System 

3 

4Provides queue management for message processing with multiple policies. 

5Ported from HevolveBot's src/auto-reply/reply/queue/. 

6 

7Features: 

8- Multiple queue policies (DROP, LATEST, BACKLOG, PRIORITY, COLLECT) 

9- Drop policies (OLD, NEW, SUMMARIZE) 

10- Deduplication 

11- Per-channel/user queue management 

12- Message expiration 

13- Statistics tracking 

14""" 

15 

16from __future__ import annotations 

17 

18import hashlib 

19import logging 

20import threading 

21import time 

22from collections import OrderedDict 

23from dataclasses import dataclass, field 

24from datetime import datetime, timedelta 

25from enum import Enum 

26from typing import Optional, Dict, List, Any, Callable, Tuple 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class QueuePolicy(Enum): 

32 """Queue processing policy.""" 

33 DROP = "drop" # Drop new messages when busy/at capacity 

34 LATEST = "latest" # Keep only latest message, drop older 

35 BACKLOG = "backlog" # Process all in order (FIFO) 

36 PRIORITY = "priority" # Priority-based ordering 

37 COLLECT = "collect" # Collect messages into batches 

38 

39 

40class DropPolicy(Enum): 

41 """Policy for handling messages when queue is at capacity.""" 

42 OLD = "old" # Drop oldest messages 

43 NEW = "new" # Drop new incoming messages 

44 SUMMARIZE = "summarize" # Drop old but keep summary 

45 

46 

47class DedupeMode(Enum): 

48 """Deduplication mode for messages.""" 

49 MESSAGE_ID = "message-id" # Dedupe by platform message ID 

50 CONTENT = "content" # Dedupe by content hash 

51 COMBINED = "combined" # Both message ID and content 

52 NONE = "none" # No deduplication 

53 

54 

55@dataclass 

56class QueuedMessage: 

57 """A message in the queue.""" 

58 message_id: str 

59 channel: str 

60 chat_id: str 

61 sender_id: str 

62 content: str 

63 priority: int = 0 

64 enqueued_at: datetime = field(default_factory=datetime.now) 

65 metadata: Dict[str, Any] = field(default_factory=dict) 

66 content_hash: str = field(default="") 

67 

68 def __post_init__(self): 

69 if not self.content_hash: 

70 self.content_hash = hashlib.sha256(self.content.encode()).hexdigest()[:16] 

71 

72 

73@dataclass 

74class QueueConfig: 

75 """Configuration for a message queue.""" 

76 policy: QueuePolicy = QueuePolicy.BACKLOG 

77 drop_policy: DropPolicy = DropPolicy.SUMMARIZE 

78 dedupe_mode: DedupeMode = DedupeMode.MESSAGE_ID 

79 max_size: int = 20 

80 max_age_seconds: int = 300 

81 debounce_ms: int = 1000 

82 priority_boost_mentions: bool = True 

83 priority_boost_replies: bool = True 

84 collect_batch_size: int = 10 

85 

86 

87@dataclass 

88class QueueStats: 

89 """Statistics for a queue.""" 

90 total_enqueued: int = 0 

91 total_dequeued: int = 0 

92 total_dropped: int = 0 

93 total_expired: int = 0 

94 total_deduplicated: int = 0 

95 current_size: int = 0 

96 dropped_summaries: List[str] = field(default_factory=list) 

97 

98 def to_dict(self) -> Dict[str, Any]: 

99 return { 

100 "total_enqueued": self.total_enqueued, 

101 "total_dequeued": self.total_dequeued, 

102 "total_dropped": self.total_dropped, 

103 "total_expired": self.total_expired, 

104 "total_deduplicated": self.total_deduplicated, 

105 "current_size": self.current_size, 

106 "dropped_summaries": self.dropped_summaries[:10], # Last 10 

107 } 

108 

109 

110class MessageQueue: 

111 """ 

112 Message queue with configurable policies. 

113 

114 Supports various queue modes ported from HevolveBot: 

115 - DROP: Reject new messages when at capacity 

116 - LATEST: Keep only the most recent message 

117 - BACKLOG: FIFO processing of all messages 

118 - PRIORITY: Process by priority order 

119 - COLLECT: Batch messages together 

120 

121 Usage: 

122 config = QueueConfig(policy=QueuePolicy.BACKLOG, max_size=100) 

123 queue = MessageQueue(config) 

124 

125 # Enqueue 

126 success = queue.enqueue(message) 

127 

128 # Dequeue 

129 msg = queue.dequeue() 

130 

131 # Get stats 

132 stats = queue.get_stats() 

133 """ 

134 

135 def __init__(self, config: QueueConfig): 

136 self.config = config 

137 self._items: List[QueuedMessage] = [] 

138 self._seen_ids: OrderedDict[str, datetime] = OrderedDict() 

139 self._seen_hashes: OrderedDict[str, datetime] = OrderedDict() 

140 self._stats = QueueStats() 

141 self._lock = threading.Lock() 

142 self._last_enqueue_time: Optional[datetime] = None 

143 self._draining = False 

144 

145 @property 

146 def size(self) -> int: 

147 """Current queue size.""" 

148 return len(self._items) 

149 

150 @property 

151 def is_empty(self) -> bool: 

152 """Check if queue is empty.""" 

153 return len(self._items) == 0 

154 

155 @property 

156 def is_full(self) -> bool: 

157 """Check if queue is at capacity.""" 

158 return len(self._items) >= self.config.max_size 

159 

160 def _is_duplicate(self, message: QueuedMessage) -> bool: 

161 """Check if message is a duplicate.""" 

162 if self.config.dedupe_mode == DedupeMode.NONE: 

163 return False 

164 

165 # Check by message ID 

166 if self.config.dedupe_mode in (DedupeMode.MESSAGE_ID, DedupeMode.COMBINED): 

167 if message.message_id and message.message_id in self._seen_ids: 

168 return True 

169 

170 # Check by content hash 

171 if self.config.dedupe_mode in (DedupeMode.CONTENT, DedupeMode.COMBINED): 

172 if message.content_hash in self._seen_hashes: 

173 return True 

174 

175 return False 

176 

177 def _mark_seen(self, message: QueuedMessage) -> None: 

178 """Mark message as seen for deduplication.""" 

179 now = datetime.now() 

180 

181 if message.message_id: 

182 self._seen_ids[message.message_id] = now 

183 # Keep seen IDs bounded 

184 while len(self._seen_ids) > self.config.max_size * 2: 

185 self._seen_ids.popitem(last=False) 

186 

187 self._seen_hashes[message.content_hash] = now 

188 # Keep seen hashes bounded 

189 while len(self._seen_hashes) > self.config.max_size * 2: 

190 self._seen_hashes.popitem(last=False) 

191 

192 def _create_summary(self, message: QueuedMessage) -> str: 

193 """Create a summary line for a dropped message.""" 

194 text = message.content.replace('\n', ' ').strip() 

195 if len(text) > 140: 

196 text = text[:139].rstrip() + '…' 

197 return text 

198 

199 def _apply_drop_policy(self) -> bool: 

200 """ 

201 Apply drop policy when at capacity. 

202 

203 Returns: 

204 True if new message can be added, False otherwise 

205 """ 

206 if not self.is_full: 

207 return True 

208 

209 policy = self.config.drop_policy 

210 

211 if policy == DropPolicy.NEW: 

212 # Reject the new message 

213 return False 

214 

215 elif policy == DropPolicy.OLD: 

216 # Drop oldest message 

217 if self._items: 

218 self._items.pop(0) 

219 self._stats.total_dropped += 1 

220 return True 

221 

222 elif policy == DropPolicy.SUMMARIZE: 

223 # Drop oldest but keep summary 

224 if self._items: 

225 dropped = self._items.pop(0) 

226 self._stats.total_dropped += 1 

227 summary = self._create_summary(dropped) 

228 self._stats.dropped_summaries.append(summary) 

229 # Keep summaries bounded 

230 while len(self._stats.dropped_summaries) > self.config.max_size: 

231 self._stats.dropped_summaries.pop(0) 

232 return True 

233 

234 return True 

235 

236 def _clean_expired(self) -> int: 

237 """Remove expired messages from queue.""" 

238 if self.config.max_age_seconds <= 0: 

239 return 0 

240 

241 cutoff = datetime.now() - timedelta(seconds=self.config.max_age_seconds) 

242 original_size = len(self._items) 

243 self._items = [m for m in self._items if m.enqueued_at > cutoff] 

244 expired = original_size - len(self._items) 

245 self._stats.total_expired += expired 

246 return expired 

247 

248 def enqueue( 

249 self, 

250 message: QueuedMessage, 

251 priority: Optional[int] = None, 

252 ) -> bool: 

253 """ 

254 Add a message to the queue. 

255 

256 Args: 

257 message: Message to enqueue 

258 priority: Optional priority override 

259 

260 Returns: 

261 True if enqueued, False if rejected 

262 """ 

263 with self._lock: 

264 # Clean expired first 

265 self._clean_expired() 

266 

267 # Check for duplicates 

268 if self._is_duplicate(message): 

269 self._stats.total_deduplicated += 1 

270 logger.debug(f"Duplicate message rejected: {message.message_id}") 

271 return False 

272 

273 # Apply priority 

274 if priority is not None: 

275 message.priority = priority 

276 

277 # Handle LATEST policy - replace all with just this message 

278 if self.config.policy == QueuePolicy.LATEST: 

279 dropped_count = len(self._items) 

280 self._items.clear() 

281 self._stats.total_dropped += dropped_count 

282 

283 # Handle DROP policy - reject if at capacity 

284 elif self.config.policy == QueuePolicy.DROP: 

285 if self.is_full: 

286 self._stats.total_dropped += 1 

287 return False 

288 

289 # Apply drop policy for other modes 

290 else: 

291 if not self._apply_drop_policy(): 

292 self._stats.total_dropped += 1 

293 return False 

294 

295 # Mark as seen 

296 self._mark_seen(message) 

297 

298 # Add to queue 

299 self._items.append(message) 

300 self._last_enqueue_time = datetime.now() 

301 self._stats.total_enqueued += 1 

302 self._stats.current_size = len(self._items) 

303 

304 # Sort by priority if needed 

305 if self.config.policy == QueuePolicy.PRIORITY: 

306 self._items.sort(key=lambda m: -m.priority) 

307 

308 return True 

309 

310 def dequeue(self) -> Optional[QueuedMessage]: 

311 """ 

312 Remove and return the next message from the queue. 

313 

314 Returns: 

315 Next message or None if empty 

316 """ 

317 with self._lock: 

318 # Clean expired first 

319 self._clean_expired() 

320 

321 if not self._items: 

322 return None 

323 

324 message = self._items.pop(0) 

325 self._stats.total_dequeued += 1 

326 self._stats.current_size = len(self._items) 

327 return message 

328 

329 def peek(self) -> Optional[QueuedMessage]: 

330 """ 

331 View the next message without removing it. 

332 

333 Returns: 

334 Next message or None if empty 

335 """ 

336 with self._lock: 

337 self._clean_expired() 

338 return self._items[0] if self._items else None 

339 

340 def collect(self, max_items: Optional[int] = None) -> List[QueuedMessage]: 

341 """ 

342 Collect and remove multiple messages (for COLLECT mode). 

343 

344 Args: 

345 max_items: Maximum items to collect (defaults to config batch size) 

346 

347 Returns: 

348 List of collected messages 

349 """ 

350 with self._lock: 

351 self._clean_expired() 

352 

353 limit = max_items or self.config.collect_batch_size 

354 collected = self._items[:limit] 

355 self._items = self._items[limit:] 

356 

357 self._stats.total_dequeued += len(collected) 

358 self._stats.current_size = len(self._items) 

359 

360 return collected 

361 

362 def clear(self) -> int: 

363 """ 

364 Clear all messages from the queue. 

365 

366 Returns: 

367 Number of messages cleared 

368 """ 

369 with self._lock: 

370 count = len(self._items) 

371 self._items.clear() 

372 self._seen_ids.clear() 

373 self._seen_hashes.clear() 

374 self._stats.current_size = 0 

375 self._stats.dropped_summaries.clear() 

376 return count 

377 

378 def get_stats(self) -> QueueStats: 

379 """Get queue statistics.""" 

380 with self._lock: 

381 self._stats.current_size = len(self._items) 

382 return QueueStats( 

383 total_enqueued=self._stats.total_enqueued, 

384 total_dequeued=self._stats.total_dequeued, 

385 total_dropped=self._stats.total_dropped, 

386 total_expired=self._stats.total_expired, 

387 total_deduplicated=self._stats.total_deduplicated, 

388 current_size=self._stats.current_size, 

389 dropped_summaries=list(self._stats.dropped_summaries), 

390 ) 

391 

392 def get_dropped_summary(self) -> Optional[str]: 

393 """ 

394 Get and clear the dropped messages summary. 

395 

396 Returns: 

397 Summary text or None if no dropped messages 

398 """ 

399 with self._lock: 

400 if not self._stats.dropped_summaries: 

401 return None 

402 

403 count = len(self._stats.dropped_summaries) 

404 title = f"[Queue overflow] Dropped {count} message{'s' if count != 1 else ''} due to cap." 

405 lines = [title, "Summary:"] 

406 for summary in self._stats.dropped_summaries: 

407 lines.append(f"- {summary}") 

408 

409 self._stats.dropped_summaries.clear() 

410 return "\n".join(lines) 

411 

412 def should_debounce(self) -> bool: 

413 """Check if we should wait for debounce period.""" 

414 if self.config.debounce_ms <= 0: 

415 return False 

416 if self._last_enqueue_time is None: 

417 return False 

418 elapsed = (datetime.now() - self._last_enqueue_time).total_seconds() * 1000 

419 return elapsed < self.config.debounce_ms 

420 

421 def time_until_debounce_complete(self) -> float: 

422 """ 

423 Get milliseconds until debounce period completes. 

424 

425 Returns: 

426 Milliseconds to wait, or 0 if no wait needed 

427 """ 

428 if not self.should_debounce(): 

429 return 0 

430 if self._last_enqueue_time is None: 

431 return 0 

432 elapsed = (datetime.now() - self._last_enqueue_time).total_seconds() * 1000 

433 remaining = self.config.debounce_ms - elapsed 

434 return max(0, remaining) 

435 

436 

437class QueueManager: 

438 """ 

439 Manages multiple message queues by channel/chat. 

440 

441 Usage: 

442 manager = QueueManager() 

443 

444 # Get or create queue for a chat 

445 queue = manager.get_queue("telegram", "chat123") 

446 

447 # Enqueue message 

448 queue.enqueue(message) 

449 

450 # Process all queues 

451 count = manager.process_all(processor_func) 

452 """ 

453 

454 def __init__( 

455 self, 

456 default_config: Optional[QueueConfig] = None, 

457 max_queues: int = 1000, 

458 ): 

459 self._default_config = default_config or QueueConfig() 

460 self._max_queues = max_queues 

461 self._queues: Dict[Tuple[str, str], MessageQueue] = {} 

462 self._channel_configs: Dict[str, QueueConfig] = {} 

463 self._lock = threading.Lock() 

464 

465 def set_channel_config(self, channel: str, config: QueueConfig) -> None: 

466 """Set custom config for a specific channel.""" 

467 self._channel_configs[channel] = config 

468 

469 def get_queue( 

470 self, 

471 channel: str, 

472 chat_id: str, 

473 create: bool = True, 

474 ) -> Optional[MessageQueue]: 

475 """ 

476 Get or create a queue for a channel/chat. 

477 

478 Args: 

479 channel: Channel name 

480 chat_id: Chat identifier 

481 create: Whether to create if not exists 

482 

483 Returns: 

484 MessageQueue or None if not found and create=False 

485 """ 

486 key = (channel, chat_id) 

487 

488 with self._lock: 

489 if key in self._queues: 

490 return self._queues[key] 

491 

492 if not create: 

493 return None 

494 

495 # Check capacity 

496 if len(self._queues) >= self._max_queues: 

497 self._cleanup_empty_queues() 

498 

499 # Get config for channel 

500 config = self._channel_configs.get(channel, self._default_config) 

501 

502 queue = MessageQueue(config) 

503 self._queues[key] = queue 

504 return queue 

505 

506 def has_queue(self, channel: str, chat_id: str) -> bool: 

507 """Check if a queue exists.""" 

508 return (channel, chat_id) in self._queues 

509 

510 def delete_queue(self, channel: str, chat_id: str) -> bool: 

511 """Delete a queue.""" 

512 key = (channel, chat_id) 

513 with self._lock: 

514 if key in self._queues: 

515 del self._queues[key] 

516 return True 

517 return False 

518 

519 def list_queues(self, channel: Optional[str] = None) -> List[Tuple[str, str]]: 

520 """List all queue keys, optionally filtered by channel.""" 

521 with self._lock: 

522 if channel: 

523 return [k for k in self._queues.keys() if k[0] == channel] 

524 return list(self._queues.keys()) 

525 

526 def get_total_size(self) -> int: 

527 """Get total messages across all queues.""" 

528 with self._lock: 

529 return sum(q.size for q in self._queues.values()) 

530 

531 def process_all( 

532 self, 

533 processor: Callable[[QueuedMessage], None], 

534 max_per_queue: int = 1, 

535 ) -> int: 

536 """ 

537 Process messages from all queues. 

538 

539 Args: 

540 processor: Function to process each message 

541 max_per_queue: Maximum messages to process per queue 

542 

543 Returns: 

544 Total messages processed 

545 """ 

546 total = 0 

547 

548 with self._lock: 

549 queues = list(self._queues.items()) 

550 

551 for (channel, chat_id), queue in queues: 

552 for _ in range(max_per_queue): 

553 msg = queue.dequeue() 

554 if msg is None: 

555 break 

556 try: 

557 processor(msg) 

558 total += 1 

559 except Exception as e: 

560 logger.error(f"Error processing message from {channel}/{chat_id}: {e}") 

561 

562 return total 

563 

564 def cleanup_stale(self, max_age_seconds: int = 3600) -> int: 

565 """ 

566 Remove queues that have been empty for too long. 

567 

568 Args: 

569 max_age_seconds: Maximum age of empty queues 

570 

571 Returns: 

572 Number of queues removed 

573 """ 

574 removed = 0 

575 cutoff = datetime.now() - timedelta(seconds=max_age_seconds) 

576 

577 with self._lock: 

578 to_remove = [] 

579 for key, queue in self._queues.items(): 

580 if queue.is_empty and queue._last_enqueue_time: 

581 if queue._last_enqueue_time < cutoff: 

582 to_remove.append(key) 

583 

584 for key in to_remove: 

585 del self._queues[key] 

586 removed += 1 

587 

588 return removed 

589 

590 def _cleanup_empty_queues(self) -> int: 

591 """Remove empty queues to make room.""" 

592 removed = 0 

593 to_remove = [k for k, q in self._queues.items() if q.is_empty] 

594 for key in to_remove: 

595 del self._queues[key] 

596 removed += 1 

597 return removed 

598 

599 def get_stats(self) -> Dict[str, Any]: 

600 """Get aggregated statistics for all queues.""" 

601 with self._lock: 

602 total_enqueued = 0 

603 total_dequeued = 0 

604 total_dropped = 0 

605 total_messages = 0 

606 

607 for queue in self._queues.values(): 

608 stats = queue.get_stats() 

609 total_enqueued += stats.total_enqueued 

610 total_dequeued += stats.total_dequeued 

611 total_dropped += stats.total_dropped 

612 total_messages += stats.current_size 

613 

614 return { 

615 "queue_count": len(self._queues), 

616 "total_messages": total_messages, 

617 "total_enqueued": total_enqueued, 

618 "total_dequeued": total_dequeued, 

619 "total_dropped": total_dropped, 

620 } 

621 

622 

623# Singleton instance 

624_queue_manager: Optional[QueueManager] = None 

625 

626 

627def get_queue_manager() -> QueueManager: 

628 """Get or create the global queue manager.""" 

629 global _queue_manager 

630 if _queue_manager is None: 

631 _queue_manager = QueueManager() 

632 return _queue_manager