Coverage for integrations / channels / queue / message_queue.py: 90.4%
314 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Message Queue System
4Provides queue management for message processing with multiple policies.
5Ported from HevolveBot's src/auto-reply/reply/queue/.
7Features:
8- Multiple queue policies (DROP, LATEST, BACKLOG, PRIORITY, COLLECT)
9- Drop policies (OLD, NEW, SUMMARIZE)
10- Deduplication
11- Per-channel/user queue management
12- Message expiration
13- Statistics tracking
14"""
16from __future__ import annotations
18import hashlib
19import logging
20import threading
21import time
22from collections import OrderedDict
23from dataclasses import dataclass, field
24from datetime import datetime, timedelta
25from enum import Enum
26from typing import Optional, Dict, List, Any, Callable, Tuple
28logger = logging.getLogger(__name__)
31class QueuePolicy(Enum):
32 """Queue processing policy."""
33 DROP = "drop" # Drop new messages when busy/at capacity
34 LATEST = "latest" # Keep only latest message, drop older
35 BACKLOG = "backlog" # Process all in order (FIFO)
36 PRIORITY = "priority" # Priority-based ordering
37 COLLECT = "collect" # Collect messages into batches
40class DropPolicy(Enum):
41 """Policy for handling messages when queue is at capacity."""
42 OLD = "old" # Drop oldest messages
43 NEW = "new" # Drop new incoming messages
44 SUMMARIZE = "summarize" # Drop old but keep summary
47class DedupeMode(Enum):
48 """Deduplication mode for messages."""
49 MESSAGE_ID = "message-id" # Dedupe by platform message ID
50 CONTENT = "content" # Dedupe by content hash
51 COMBINED = "combined" # Both message ID and content
52 NONE = "none" # No deduplication
55@dataclass
56class QueuedMessage:
57 """A message in the queue."""
58 message_id: str
59 channel: str
60 chat_id: str
61 sender_id: str
62 content: str
63 priority: int = 0
64 enqueued_at: datetime = field(default_factory=datetime.now)
65 metadata: Dict[str, Any] = field(default_factory=dict)
66 content_hash: str = field(default="")
68 def __post_init__(self):
69 if not self.content_hash:
70 self.content_hash = hashlib.sha256(self.content.encode()).hexdigest()[:16]
73@dataclass
74class QueueConfig:
75 """Configuration for a message queue."""
76 policy: QueuePolicy = QueuePolicy.BACKLOG
77 drop_policy: DropPolicy = DropPolicy.SUMMARIZE
78 dedupe_mode: DedupeMode = DedupeMode.MESSAGE_ID
79 max_size: int = 20
80 max_age_seconds: int = 300
81 debounce_ms: int = 1000
82 priority_boost_mentions: bool = True
83 priority_boost_replies: bool = True
84 collect_batch_size: int = 10
87@dataclass
88class QueueStats:
89 """Statistics for a queue."""
90 total_enqueued: int = 0
91 total_dequeued: int = 0
92 total_dropped: int = 0
93 total_expired: int = 0
94 total_deduplicated: int = 0
95 current_size: int = 0
96 dropped_summaries: List[str] = field(default_factory=list)
98 def to_dict(self) -> Dict[str, Any]:
99 return {
100 "total_enqueued": self.total_enqueued,
101 "total_dequeued": self.total_dequeued,
102 "total_dropped": self.total_dropped,
103 "total_expired": self.total_expired,
104 "total_deduplicated": self.total_deduplicated,
105 "current_size": self.current_size,
106 "dropped_summaries": self.dropped_summaries[:10], # Last 10
107 }
110class MessageQueue:
111 """
112 Message queue with configurable policies.
114 Supports various queue modes ported from HevolveBot:
115 - DROP: Reject new messages when at capacity
116 - LATEST: Keep only the most recent message
117 - BACKLOG: FIFO processing of all messages
118 - PRIORITY: Process by priority order
119 - COLLECT: Batch messages together
121 Usage:
122 config = QueueConfig(policy=QueuePolicy.BACKLOG, max_size=100)
123 queue = MessageQueue(config)
125 # Enqueue
126 success = queue.enqueue(message)
128 # Dequeue
129 msg = queue.dequeue()
131 # Get stats
132 stats = queue.get_stats()
133 """
135 def __init__(self, config: QueueConfig):
136 self.config = config
137 self._items: List[QueuedMessage] = []
138 self._seen_ids: OrderedDict[str, datetime] = OrderedDict()
139 self._seen_hashes: OrderedDict[str, datetime] = OrderedDict()
140 self._stats = QueueStats()
141 self._lock = threading.Lock()
142 self._last_enqueue_time: Optional[datetime] = None
143 self._draining = False
145 @property
146 def size(self) -> int:
147 """Current queue size."""
148 return len(self._items)
150 @property
151 def is_empty(self) -> bool:
152 """Check if queue is empty."""
153 return len(self._items) == 0
155 @property
156 def is_full(self) -> bool:
157 """Check if queue is at capacity."""
158 return len(self._items) >= self.config.max_size
160 def _is_duplicate(self, message: QueuedMessage) -> bool:
161 """Check if message is a duplicate."""
162 if self.config.dedupe_mode == DedupeMode.NONE:
163 return False
165 # Check by message ID
166 if self.config.dedupe_mode in (DedupeMode.MESSAGE_ID, DedupeMode.COMBINED):
167 if message.message_id and message.message_id in self._seen_ids:
168 return True
170 # Check by content hash
171 if self.config.dedupe_mode in (DedupeMode.CONTENT, DedupeMode.COMBINED):
172 if message.content_hash in self._seen_hashes:
173 return True
175 return False
177 def _mark_seen(self, message: QueuedMessage) -> None:
178 """Mark message as seen for deduplication."""
179 now = datetime.now()
181 if message.message_id:
182 self._seen_ids[message.message_id] = now
183 # Keep seen IDs bounded
184 while len(self._seen_ids) > self.config.max_size * 2:
185 self._seen_ids.popitem(last=False)
187 self._seen_hashes[message.content_hash] = now
188 # Keep seen hashes bounded
189 while len(self._seen_hashes) > self.config.max_size * 2:
190 self._seen_hashes.popitem(last=False)
192 def _create_summary(self, message: QueuedMessage) -> str:
193 """Create a summary line for a dropped message."""
194 text = message.content.replace('\n', ' ').strip()
195 if len(text) > 140:
196 text = text[:139].rstrip() + '…'
197 return text
199 def _apply_drop_policy(self) -> bool:
200 """
201 Apply drop policy when at capacity.
203 Returns:
204 True if new message can be added, False otherwise
205 """
206 if not self.is_full:
207 return True
209 policy = self.config.drop_policy
211 if policy == DropPolicy.NEW:
212 # Reject the new message
213 return False
215 elif policy == DropPolicy.OLD:
216 # Drop oldest message
217 if self._items:
218 self._items.pop(0)
219 self._stats.total_dropped += 1
220 return True
222 elif policy == DropPolicy.SUMMARIZE:
223 # Drop oldest but keep summary
224 if self._items:
225 dropped = self._items.pop(0)
226 self._stats.total_dropped += 1
227 summary = self._create_summary(dropped)
228 self._stats.dropped_summaries.append(summary)
229 # Keep summaries bounded
230 while len(self._stats.dropped_summaries) > self.config.max_size:
231 self._stats.dropped_summaries.pop(0)
232 return True
234 return True
236 def _clean_expired(self) -> int:
237 """Remove expired messages from queue."""
238 if self.config.max_age_seconds <= 0:
239 return 0
241 cutoff = datetime.now() - timedelta(seconds=self.config.max_age_seconds)
242 original_size = len(self._items)
243 self._items = [m for m in self._items if m.enqueued_at > cutoff]
244 expired = original_size - len(self._items)
245 self._stats.total_expired += expired
246 return expired
248 def enqueue(
249 self,
250 message: QueuedMessage,
251 priority: Optional[int] = None,
252 ) -> bool:
253 """
254 Add a message to the queue.
256 Args:
257 message: Message to enqueue
258 priority: Optional priority override
260 Returns:
261 True if enqueued, False if rejected
262 """
263 with self._lock:
264 # Clean expired first
265 self._clean_expired()
267 # Check for duplicates
268 if self._is_duplicate(message):
269 self._stats.total_deduplicated += 1
270 logger.debug(f"Duplicate message rejected: {message.message_id}")
271 return False
273 # Apply priority
274 if priority is not None:
275 message.priority = priority
277 # Handle LATEST policy - replace all with just this message
278 if self.config.policy == QueuePolicy.LATEST:
279 dropped_count = len(self._items)
280 self._items.clear()
281 self._stats.total_dropped += dropped_count
283 # Handle DROP policy - reject if at capacity
284 elif self.config.policy == QueuePolicy.DROP:
285 if self.is_full:
286 self._stats.total_dropped += 1
287 return False
289 # Apply drop policy for other modes
290 else:
291 if not self._apply_drop_policy():
292 self._stats.total_dropped += 1
293 return False
295 # Mark as seen
296 self._mark_seen(message)
298 # Add to queue
299 self._items.append(message)
300 self._last_enqueue_time = datetime.now()
301 self._stats.total_enqueued += 1
302 self._stats.current_size = len(self._items)
304 # Sort by priority if needed
305 if self.config.policy == QueuePolicy.PRIORITY:
306 self._items.sort(key=lambda m: -m.priority)
308 return True
310 def dequeue(self) -> Optional[QueuedMessage]:
311 """
312 Remove and return the next message from the queue.
314 Returns:
315 Next message or None if empty
316 """
317 with self._lock:
318 # Clean expired first
319 self._clean_expired()
321 if not self._items:
322 return None
324 message = self._items.pop(0)
325 self._stats.total_dequeued += 1
326 self._stats.current_size = len(self._items)
327 return message
329 def peek(self) -> Optional[QueuedMessage]:
330 """
331 View the next message without removing it.
333 Returns:
334 Next message or None if empty
335 """
336 with self._lock:
337 self._clean_expired()
338 return self._items[0] if self._items else None
340 def collect(self, max_items: Optional[int] = None) -> List[QueuedMessage]:
341 """
342 Collect and remove multiple messages (for COLLECT mode).
344 Args:
345 max_items: Maximum items to collect (defaults to config batch size)
347 Returns:
348 List of collected messages
349 """
350 with self._lock:
351 self._clean_expired()
353 limit = max_items or self.config.collect_batch_size
354 collected = self._items[:limit]
355 self._items = self._items[limit:]
357 self._stats.total_dequeued += len(collected)
358 self._stats.current_size = len(self._items)
360 return collected
362 def clear(self) -> int:
363 """
364 Clear all messages from the queue.
366 Returns:
367 Number of messages cleared
368 """
369 with self._lock:
370 count = len(self._items)
371 self._items.clear()
372 self._seen_ids.clear()
373 self._seen_hashes.clear()
374 self._stats.current_size = 0
375 self._stats.dropped_summaries.clear()
376 return count
378 def get_stats(self) -> QueueStats:
379 """Get queue statistics."""
380 with self._lock:
381 self._stats.current_size = len(self._items)
382 return QueueStats(
383 total_enqueued=self._stats.total_enqueued,
384 total_dequeued=self._stats.total_dequeued,
385 total_dropped=self._stats.total_dropped,
386 total_expired=self._stats.total_expired,
387 total_deduplicated=self._stats.total_deduplicated,
388 current_size=self._stats.current_size,
389 dropped_summaries=list(self._stats.dropped_summaries),
390 )
392 def get_dropped_summary(self) -> Optional[str]:
393 """
394 Get and clear the dropped messages summary.
396 Returns:
397 Summary text or None if no dropped messages
398 """
399 with self._lock:
400 if not self._stats.dropped_summaries:
401 return None
403 count = len(self._stats.dropped_summaries)
404 title = f"[Queue overflow] Dropped {count} message{'s' if count != 1 else ''} due to cap."
405 lines = [title, "Summary:"]
406 for summary in self._stats.dropped_summaries:
407 lines.append(f"- {summary}")
409 self._stats.dropped_summaries.clear()
410 return "\n".join(lines)
412 def should_debounce(self) -> bool:
413 """Check if we should wait for debounce period."""
414 if self.config.debounce_ms <= 0:
415 return False
416 if self._last_enqueue_time is None:
417 return False
418 elapsed = (datetime.now() - self._last_enqueue_time).total_seconds() * 1000
419 return elapsed < self.config.debounce_ms
421 def time_until_debounce_complete(self) -> float:
422 """
423 Get milliseconds until debounce period completes.
425 Returns:
426 Milliseconds to wait, or 0 if no wait needed
427 """
428 if not self.should_debounce():
429 return 0
430 if self._last_enqueue_time is None:
431 return 0
432 elapsed = (datetime.now() - self._last_enqueue_time).total_seconds() * 1000
433 remaining = self.config.debounce_ms - elapsed
434 return max(0, remaining)
437class QueueManager:
438 """
439 Manages multiple message queues by channel/chat.
441 Usage:
442 manager = QueueManager()
444 # Get or create queue for a chat
445 queue = manager.get_queue("telegram", "chat123")
447 # Enqueue message
448 queue.enqueue(message)
450 # Process all queues
451 count = manager.process_all(processor_func)
452 """
454 def __init__(
455 self,
456 default_config: Optional[QueueConfig] = None,
457 max_queues: int = 1000,
458 ):
459 self._default_config = default_config or QueueConfig()
460 self._max_queues = max_queues
461 self._queues: Dict[Tuple[str, str], MessageQueue] = {}
462 self._channel_configs: Dict[str, QueueConfig] = {}
463 self._lock = threading.Lock()
465 def set_channel_config(self, channel: str, config: QueueConfig) -> None:
466 """Set custom config for a specific channel."""
467 self._channel_configs[channel] = config
469 def get_queue(
470 self,
471 channel: str,
472 chat_id: str,
473 create: bool = True,
474 ) -> Optional[MessageQueue]:
475 """
476 Get or create a queue for a channel/chat.
478 Args:
479 channel: Channel name
480 chat_id: Chat identifier
481 create: Whether to create if not exists
483 Returns:
484 MessageQueue or None if not found and create=False
485 """
486 key = (channel, chat_id)
488 with self._lock:
489 if key in self._queues:
490 return self._queues[key]
492 if not create:
493 return None
495 # Check capacity
496 if len(self._queues) >= self._max_queues:
497 self._cleanup_empty_queues()
499 # Get config for channel
500 config = self._channel_configs.get(channel, self._default_config)
502 queue = MessageQueue(config)
503 self._queues[key] = queue
504 return queue
506 def has_queue(self, channel: str, chat_id: str) -> bool:
507 """Check if a queue exists."""
508 return (channel, chat_id) in self._queues
510 def delete_queue(self, channel: str, chat_id: str) -> bool:
511 """Delete a queue."""
512 key = (channel, chat_id)
513 with self._lock:
514 if key in self._queues:
515 del self._queues[key]
516 return True
517 return False
519 def list_queues(self, channel: Optional[str] = None) -> List[Tuple[str, str]]:
520 """List all queue keys, optionally filtered by channel."""
521 with self._lock:
522 if channel:
523 return [k for k in self._queues.keys() if k[0] == channel]
524 return list(self._queues.keys())
526 def get_total_size(self) -> int:
527 """Get total messages across all queues."""
528 with self._lock:
529 return sum(q.size for q in self._queues.values())
531 def process_all(
532 self,
533 processor: Callable[[QueuedMessage], None],
534 max_per_queue: int = 1,
535 ) -> int:
536 """
537 Process messages from all queues.
539 Args:
540 processor: Function to process each message
541 max_per_queue: Maximum messages to process per queue
543 Returns:
544 Total messages processed
545 """
546 total = 0
548 with self._lock:
549 queues = list(self._queues.items())
551 for (channel, chat_id), queue in queues:
552 for _ in range(max_per_queue):
553 msg = queue.dequeue()
554 if msg is None:
555 break
556 try:
557 processor(msg)
558 total += 1
559 except Exception as e:
560 logger.error(f"Error processing message from {channel}/{chat_id}: {e}")
562 return total
564 def cleanup_stale(self, max_age_seconds: int = 3600) -> int:
565 """
566 Remove queues that have been empty for too long.
568 Args:
569 max_age_seconds: Maximum age of empty queues
571 Returns:
572 Number of queues removed
573 """
574 removed = 0
575 cutoff = datetime.now() - timedelta(seconds=max_age_seconds)
577 with self._lock:
578 to_remove = []
579 for key, queue in self._queues.items():
580 if queue.is_empty and queue._last_enqueue_time:
581 if queue._last_enqueue_time < cutoff:
582 to_remove.append(key)
584 for key in to_remove:
585 del self._queues[key]
586 removed += 1
588 return removed
590 def _cleanup_empty_queues(self) -> int:
591 """Remove empty queues to make room."""
592 removed = 0
593 to_remove = [k for k, q in self._queues.items() if q.is_empty]
594 for key in to_remove:
595 del self._queues[key]
596 removed += 1
597 return removed
599 def get_stats(self) -> Dict[str, Any]:
600 """Get aggregated statistics for all queues."""
601 with self._lock:
602 total_enqueued = 0
603 total_dequeued = 0
604 total_dropped = 0
605 total_messages = 0
607 for queue in self._queues.values():
608 stats = queue.get_stats()
609 total_enqueued += stats.total_enqueued
610 total_dequeued += stats.total_dequeued
611 total_dropped += stats.total_dropped
612 total_messages += stats.current_size
614 return {
615 "queue_count": len(self._queues),
616 "total_messages": total_messages,
617 "total_enqueued": total_enqueued,
618 "total_dequeued": total_dequeued,
619 "total_dropped": total_dropped,
620 }
623# Singleton instance
624_queue_manager: Optional[QueueManager] = None
627def get_queue_manager() -> QueueManager:
628 """Get or create the global queue manager."""
629 global _queue_manager
630 if _queue_manager is None:
631 _queue_manager = QueueManager()
632 return _queue_manager