Coverage for integrations / channels / queue / pipeline.py: 0.0%
400 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Unified Message Pipeline
4Combines all queue components into a single processing pipeline:
5debounce -> dedupe -> rate_limit -> concurrency -> queue
7Ported from HevolveBot's unified message handling approach.
9Features:
10- Configurable pipeline stages
11- Async and sync processing
12- Statistics tracking across all stages
13- Error handling with retry
14- Graceful shutdown
15"""
17from __future__ import annotations
19import asyncio
20import logging
21import threading
22import time
23from dataclasses import dataclass, field
24from datetime import datetime
25from enum import Enum
26from typing import (
27 Optional,
28 Dict,
29 List,
30 Any,
31 Callable,
32 TypeVar,
33 Generic,
34 Awaitable,
35 Union,
36)
38from .debounce import DebounceConfig, InboundDebouncer, SyncDebouncer
39from .dedupe import DedupeConfig, DedupeMode, MessageDeduplicator
40from .rate_limit import RateLimitConfig, RateLimiter, RateLimitResult
41from .concurrency import ConcurrencyLimits, ConcurrencyController
42from .message_queue import QueueConfig, QueuePolicy, MessageQueue, QueuedMessage
43from .retry import RetryConfig, RetryHandler
44from .batching import BatchConfig, MessageBatcher, BatchResult
46logger = logging.getLogger(__name__)
48T = TypeVar('T')
51class PipelineStage(Enum):
52 """Stages in the message pipeline."""
53 DEBOUNCE = "debounce"
54 DEDUPE = "dedupe"
55 RATE_LIMIT = "rate_limit"
56 CONCURRENCY = "concurrency"
57 QUEUE = "queue"
58 BATCH = "batch"
59 PROCESS = "process"
62class PipelineResult(Enum):
63 """Result of pipeline processing."""
64 PROCESSED = "processed"
65 DEBOUNCED = "debounced" # Held for debouncing
66 DUPLICATE = "duplicate"
67 RATE_LIMITED = "rate_limited"
68 CONCURRENCY_LIMITED = "concurrency_limited"
69 QUEUED = "queued"
70 BATCHED = "batched"
71 REJECTED = "rejected"
72 ERROR = "error"
75@dataclass
76class PipelineConfig:
77 """Configuration for the message pipeline."""
78 # Stage enablement
79 enable_debounce: bool = True
80 enable_dedupe: bool = True
81 enable_rate_limit: bool = True
82 enable_concurrency: bool = True
83 enable_queue: bool = True
84 enable_batch: bool = False
85 enable_retry: bool = True
87 # Stage configs
88 debounce: DebounceConfig = field(default_factory=DebounceConfig)
89 dedupe: DedupeConfig = field(default_factory=DedupeConfig)
90 rate_limit: RateLimitConfig = field(default_factory=RateLimitConfig)
91 concurrency: ConcurrencyLimits = field(default_factory=ConcurrencyLimits)
92 queue: QueueConfig = field(default_factory=QueueConfig)
93 batch: BatchConfig = field(default_factory=BatchConfig)
94 retry: RetryConfig = field(default_factory=RetryConfig)
97@dataclass
98class PipelineStats:
99 """Statistics for the pipeline."""
100 total_received: int = 0
101 total_processed: int = 0
102 total_debounced: int = 0
103 total_deduplicated: int = 0
104 total_rate_limited: int = 0
105 total_concurrency_limited: int = 0
106 total_queued: int = 0
107 total_batched: int = 0
108 total_rejected: int = 0
109 total_errors: int = 0
110 total_retries: int = 0
111 last_processed_at: Optional[datetime] = None
112 current_in_flight: int = 0
114 def to_dict(self) -> Dict[str, Any]:
115 """Convert stats to dictionary."""
116 return {
117 "total_received": self.total_received,
118 "total_processed": self.total_processed,
119 "total_debounced": self.total_debounced,
120 "total_deduplicated": self.total_deduplicated,
121 "total_rate_limited": self.total_rate_limited,
122 "total_concurrency_limited": self.total_concurrency_limited,
123 "total_queued": self.total_queued,
124 "total_batched": self.total_batched,
125 "total_rejected": self.total_rejected,
126 "total_errors": self.total_errors,
127 "total_retries": self.total_retries,
128 "last_processed_at": self.last_processed_at.isoformat() if self.last_processed_at else None,
129 "current_in_flight": self.current_in_flight,
130 }
133@dataclass
134class PipelineMessage(Generic[T]):
135 """A message wrapper with pipeline metadata."""
136 payload: T
137 message_id: str
138 channel: str
139 chat_id: str
140 user_id: str
141 content: str = ""
142 priority: int = 0
143 metadata: Dict[str, Any] = field(default_factory=dict)
144 received_at: datetime = field(default_factory=datetime.now)
146 # Pipeline state
147 current_stage: PipelineStage = PipelineStage.DEBOUNCE
148 result: Optional[PipelineResult] = None
149 error: Optional[Exception] = None
150 retry_count: int = 0
153class MessagePipeline(Generic[T]):
154 """
155 Unified message processing pipeline.
157 Combines all queue components in sequence:
158 1. Debounce - Collect rapid-fire messages
159 2. Dedupe - Filter duplicate messages
160 3. Rate Limit - Enforce rate limits
161 4. Concurrency - Control concurrent processing
162 5. Queue - Buffer messages when needed
163 6. Batch (optional) - Batch messages together
164 7. Process - Execute message handler
166 Usage:
167 config = PipelineConfig()
168 pipeline = MessagePipeline(config)
170 # Set message handler
171 async def handle_message(msg):
172 print(f"Processing: {msg.content}")
174 pipeline.set_handler(handle_message)
176 # Process messages
177 result = await pipeline.process(message)
179 # Get stats
180 stats = pipeline.get_stats()
181 """
183 def __init__(
184 self,
185 config: PipelineConfig,
186 handler: Optional[Callable[[T], Awaitable[Any]]] = None,
187 ):
188 """
189 Initialize the pipeline.
191 Args:
192 config: Pipeline configuration
193 handler: Message handler function
194 """
195 self.config = config
196 self._handler = handler
197 self._lock = threading.Lock()
198 self._stats = PipelineStats()
199 self._shutdown = False
201 # Initialize components
202 self._debouncer: Optional[InboundDebouncer[PipelineMessage[T]]] = None
203 self._deduper: Optional[MessageDeduplicator[PipelineMessage[T]]] = None
204 self._rate_limiter: Optional[RateLimiter] = None
205 self._concurrency: Optional[ConcurrencyController] = None
206 self._queue: Optional[MessageQueue] = None
207 self._batcher: Optional[MessageBatcher[PipelineMessage[T]]] = None
208 self._retry_handler: Optional[RetryHandler] = None
210 self._init_components()
212 def _init_components(self) -> None:
213 """Initialize pipeline components."""
214 if self.config.enable_debounce:
215 self._debouncer = InboundDebouncer(
216 self.config.debounce,
217 on_flush=self._on_debounce_flush,
218 )
220 if self.config.enable_dedupe:
221 self._deduper = MessageDeduplicator(self.config.dedupe)
223 if self.config.enable_rate_limit:
224 self._rate_limiter = RateLimiter(self.config.rate_limit)
226 if self.config.enable_concurrency:
227 self._concurrency = ConcurrencyController(self.config.concurrency)
229 if self.config.enable_queue:
230 self._queue = MessageQueue(self.config.queue)
232 if self.config.enable_batch:
233 self._batcher = MessageBatcher(
234 self.config.batch,
235 on_flush=self._on_batch_flush,
236 )
238 if self.config.enable_retry:
239 self._retry_handler = RetryHandler(self.config.retry)
241 def set_handler(
242 self,
243 handler: Callable[[T], Awaitable[Any]],
244 ) -> None:
245 """
246 Set the message handler.
248 Args:
249 handler: Async function to handle messages
250 """
251 self._handler = handler
253 async def _on_debounce_flush(
254 self,
255 messages: List[PipelineMessage[T]],
256 ) -> None:
257 """Handle debounce flush."""
258 for msg in messages:
259 await self._process_after_debounce(msg)
261 def _on_batch_flush(
262 self,
263 batch: BatchResult[PipelineMessage[T]],
264 ) -> None:
265 """Handle batch flush - schedule processing."""
266 asyncio.create_task(self._process_batch(batch))
268 async def _process_batch(
269 self,
270 batch: BatchResult[PipelineMessage[T]],
271 ) -> None:
272 """Process a batch of messages."""
273 for msg in batch.items:
274 await self._execute_handler(msg)
276 async def process(
277 self,
278 message: Union[T, PipelineMessage[T]],
279 message_id: Optional[str] = None,
280 channel: Optional[str] = None,
281 chat_id: Optional[str] = None,
282 user_id: Optional[str] = None,
283 content: Optional[str] = None,
284 ) -> PipelineResult:
285 """
286 Process a message through the pipeline.
288 Args:
289 message: Message to process (or PipelineMessage wrapper)
290 message_id: Message ID (if not wrapped)
291 channel: Channel name (if not wrapped)
292 chat_id: Chat ID (if not wrapped)
293 user_id: User ID (if not wrapped)
294 content: Message content (if not wrapped)
296 Returns:
297 PipelineResult indicating outcome
298 """
299 if self._shutdown:
300 return PipelineResult.REJECTED
302 # Wrap message if needed
303 if isinstance(message, PipelineMessage):
304 msg = message
305 else:
306 msg = PipelineMessage(
307 payload=message,
308 message_id=message_id or str(id(message)),
309 channel=channel or getattr(message, 'channel', 'default'),
310 chat_id=chat_id or getattr(message, 'chat_id', 'default'),
311 user_id=user_id or getattr(message, 'sender_id', getattr(message, 'user_id', 'default')),
312 content=content or getattr(message, 'content', ''),
313 )
315 self._stats.total_received += 1
317 try:
318 # Stage 1: Debounce
319 if self.config.enable_debounce and self._debouncer:
320 msg.current_stage = PipelineStage.DEBOUNCE
321 result = await self._debouncer.debounce(
322 msg,
323 key=msg.chat_id,
324 channel=msg.channel,
325 )
326 if result is None:
327 # Message is being debounced
328 self._stats.total_debounced += 1
329 return PipelineResult.DEBOUNCED
331 # Continue processing
332 return await self._process_after_debounce(msg)
334 except Exception as e:
335 logger.error(f"Pipeline error: {e}")
336 self._stats.total_errors += 1
337 msg.error = e
338 msg.result = PipelineResult.ERROR
339 return PipelineResult.ERROR
341 async def _process_after_debounce(
342 self,
343 msg: PipelineMessage[T],
344 ) -> PipelineResult:
345 """Process message after debounce stage."""
346 try:
347 # Stage 2: Dedupe
348 if self.config.enable_dedupe and self._deduper:
349 msg.current_stage = PipelineStage.DEDUPE
350 if self._deduper.check_and_mark(
351 msg,
352 message_id=msg.message_id,
353 content=msg.content,
354 ):
355 self._stats.total_deduplicated += 1
356 msg.result = PipelineResult.DUPLICATE
357 return PipelineResult.DUPLICATE
359 # Stage 3: Rate Limit
360 if self.config.enable_rate_limit and self._rate_limiter:
361 msg.current_stage = PipelineStage.RATE_LIMIT
362 rate_result = self._rate_limiter.check_and_consume(
363 msg.channel,
364 msg.chat_id,
365 )
366 if not rate_result.allowed:
367 self._stats.total_rate_limited += 1
368 msg.result = PipelineResult.RATE_LIMITED
369 return PipelineResult.RATE_LIMITED
371 # Stage 4: Concurrency
372 if self.config.enable_concurrency and self._concurrency:
373 msg.current_stage = PipelineStage.CONCURRENCY
374 slot_id = await self._concurrency.acquire(
375 msg.channel,
376 msg.chat_id,
377 msg.user_id,
378 wait=self.config.concurrency.queue_when_limited,
379 )
380 if slot_id is None:
381 self._stats.total_concurrency_limited += 1
382 msg.result = PipelineResult.CONCURRENCY_LIMITED
384 # Queue if enabled
385 if self.config.enable_queue and self._queue:
386 return await self._queue_message(msg)
388 return PipelineResult.CONCURRENCY_LIMITED
390 try:
391 return await self._process_with_slot(msg, slot_id)
392 finally:
393 self._concurrency.release(slot_id=slot_id)
394 else:
395 # No concurrency control
396 return await self._process_with_slot(msg, None)
398 except Exception as e:
399 logger.error(f"Pipeline processing error: {e}")
400 self._stats.total_errors += 1
401 msg.error = e
402 msg.result = PipelineResult.ERROR
403 return PipelineResult.ERROR
405 async def _queue_message(
406 self,
407 msg: PipelineMessage[T],
408 ) -> PipelineResult:
409 """Queue a message for later processing."""
410 msg.current_stage = PipelineStage.QUEUE
412 if not self._queue:
413 self._stats.total_rejected += 1
414 return PipelineResult.REJECTED
416 queued_msg = QueuedMessage(
417 message_id=msg.message_id,
418 channel=msg.channel,
419 chat_id=msg.chat_id,
420 sender_id=msg.user_id,
421 content=msg.content,
422 priority=msg.priority,
423 metadata={"pipeline_message": msg},
424 )
426 if self._queue.enqueue(queued_msg):
427 self._stats.total_queued += 1
428 msg.result = PipelineResult.QUEUED
429 return PipelineResult.QUEUED
430 else:
431 self._stats.total_rejected += 1
432 msg.result = PipelineResult.REJECTED
433 return PipelineResult.REJECTED
435 async def _process_with_slot(
436 self,
437 msg: PipelineMessage[T],
438 slot_id: Optional[str],
439 ) -> PipelineResult:
440 """Process message with acquired concurrency slot."""
441 # Stage 5: Batch (optional)
442 if self.config.enable_batch and self._batcher:
443 msg.current_stage = PipelineStage.BATCH
444 batch_result = await self._batcher.add(msg, key=msg.chat_id)
445 if batch_result is None:
446 # Message is batched, will be processed later
447 self._stats.total_batched += 1
448 msg.result = PipelineResult.BATCHED
449 return PipelineResult.BATCHED
450 # Batch was flushed - already handled by callback
452 # Stage 6: Process
453 return await self._execute_handler(msg)
455 async def _execute_handler(
456 self,
457 msg: PipelineMessage[T],
458 ) -> PipelineResult:
459 """Execute the message handler."""
460 msg.current_stage = PipelineStage.PROCESS
462 if not self._handler:
463 logger.warning("No handler set for pipeline")
464 self._stats.total_rejected += 1
465 return PipelineResult.REJECTED
467 self._stats.current_in_flight += 1
469 try:
470 if self.config.enable_retry and self._retry_handler:
471 await self._retry_handler.with_retry_async(
472 self._handler,
473 msg.payload,
474 on_retry=lambda attempt: self._on_retry(msg, attempt),
475 )
476 else:
477 result = self._handler(msg.payload)
478 if asyncio.iscoroutine(result):
479 await result
481 self._stats.total_processed += 1
482 self._stats.last_processed_at = datetime.now()
483 msg.result = PipelineResult.PROCESSED
484 return PipelineResult.PROCESSED
486 except Exception as e:
487 logger.error(f"Handler error: {e}")
488 self._stats.total_errors += 1
489 msg.error = e
490 msg.result = PipelineResult.ERROR
491 return PipelineResult.ERROR
493 finally:
494 self._stats.current_in_flight -= 1
496 def _on_retry(self, msg: PipelineMessage[T], attempt: Any) -> None:
497 """Handle retry attempt."""
498 msg.retry_count += 1
499 self._stats.total_retries += 1
501 async def process_queued(self, max_items: int = 10) -> int:
502 """
503 Process queued messages.
505 Args:
506 max_items: Maximum items to process
508 Returns:
509 Number of messages processed
510 """
511 if not self._queue:
512 return 0
514 processed = 0
515 for _ in range(max_items):
516 queued = self._queue.dequeue()
517 if queued is None:
518 break
520 msg = queued.metadata.get("pipeline_message")
521 if msg:
522 await self._process_after_debounce(msg)
523 processed += 1
525 return processed
527 def get_stats(self) -> PipelineStats:
528 """Get pipeline statistics."""
529 return PipelineStats(
530 total_received=self._stats.total_received,
531 total_processed=self._stats.total_processed,
532 total_debounced=self._stats.total_debounced,
533 total_deduplicated=self._stats.total_deduplicated,
534 total_rate_limited=self._stats.total_rate_limited,
535 total_concurrency_limited=self._stats.total_concurrency_limited,
536 total_queued=self._stats.total_queued,
537 total_batched=self._stats.total_batched,
538 total_rejected=self._stats.total_rejected,
539 total_errors=self._stats.total_errors,
540 total_retries=self._stats.total_retries,
541 last_processed_at=self._stats.last_processed_at,
542 current_in_flight=self._stats.current_in_flight,
543 )
545 def get_component_stats(self) -> Dict[str, Any]:
546 """Get statistics from all components."""
547 stats = {}
549 if self._debouncer:
550 stats["debounce"] = self._debouncer.get_stats().__dict__
552 if self._deduper:
553 stats["dedupe"] = self._deduper.get_stats().__dict__
555 if self._rate_limiter:
556 stats["rate_limit"] = self._rate_limiter.get_stats().__dict__
558 if self._concurrency:
559 usage = self._concurrency.get_usage()
560 stats["concurrency"] = {
561 "current_global": usage.current_global,
562 "total_acquired": usage.total_acquired,
563 "total_rejected": usage.total_rejected,
564 }
566 if self._queue:
567 stats["queue"] = self._queue.get_stats().to_dict()
569 if self._batcher:
570 stats["batch"] = self._batcher.get_stats().to_dict()
572 if self._retry_handler:
573 stats["retry"] = self._retry_handler.get_stats().__dict__
575 return stats
577 def get_queue_size(self) -> int:
578 """Get current queue size."""
579 if self._queue:
580 return self._queue.size
581 return 0
583 def get_pending_count(self) -> int:
584 """Get total pending messages (debounced + queued + batched)."""
585 count = 0
587 if self._debouncer:
588 count += self._debouncer.get_pending_count()
590 if self._queue:
591 count += self._queue.size
593 if self._batcher:
594 count += self._batcher.get_pending_count()
596 return count
598 async def flush_all(self) -> Dict[str, int]:
599 """
600 Flush all pending items in the pipeline.
602 Returns:
603 Dict with counts of flushed items per stage
604 """
605 flushed = {}
607 if self._debouncer:
608 result = self._debouncer.flush_all()
609 flushed["debounce"] = sum(len(items) for items in result.values())
611 if self._batcher:
612 results = await self._batcher.flush_all()
613 flushed["batch"] = sum(r.batch_size for r in results)
615 return flushed
617 def reset_stats(self) -> None:
618 """Reset all pipeline statistics."""
619 self._stats = PipelineStats()
621 if self._retry_handler:
622 self._retry_handler.reset_stats()
624 async def shutdown(self) -> None:
625 """Gracefully shutdown the pipeline."""
626 self._shutdown = True
628 # Flush remaining items
629 await self.flush_all()
631 # Process queued messages
632 if self._queue:
633 await self.process_queued(max_items=self._queue.size)
635 # Clear components
636 if self._debouncer:
637 self._debouncer.clear()
639 if self._deduper:
640 self._deduper.clear()
642 if self._concurrency:
643 self._concurrency.clear()
645 if self._queue:
646 self._queue.clear()
648 if self._batcher:
649 self._batcher.clear()
652class SyncMessagePipeline(Generic[T]):
653 """
654 Synchronous version of MessagePipeline.
656 For use in non-async contexts.
657 """
659 def __init__(
660 self,
661 config: PipelineConfig,
662 handler: Optional[Callable[[T], Any]] = None,
663 ):
664 self.config = config
665 self._handler = handler
666 self._lock = threading.Lock()
667 self._stats = PipelineStats()
668 self._shutdown = False
670 # Initialize sync components
671 self._debouncer: Optional[SyncDebouncer[PipelineMessage[T]]] = None
672 self._deduper: Optional[MessageDeduplicator[PipelineMessage[T]]] = None
673 self._rate_limiter: Optional[RateLimiter] = None
674 self._concurrency: Optional[ConcurrencyController] = None
675 self._queue: Optional[MessageQueue] = None
676 self._retry_handler: Optional[RetryHandler] = None
678 self._init_components()
680 def _init_components(self) -> None:
681 """Initialize components."""
682 if self.config.enable_debounce:
683 self._debouncer = SyncDebouncer(
684 self.config.debounce,
685 on_flush=self._on_debounce_flush,
686 )
688 if self.config.enable_dedupe:
689 self._deduper = MessageDeduplicator(self.config.dedupe)
691 if self.config.enable_rate_limit:
692 self._rate_limiter = RateLimiter(self.config.rate_limit)
694 if self.config.enable_concurrency:
695 self._concurrency = ConcurrencyController(self.config.concurrency)
697 if self.config.enable_queue:
698 self._queue = MessageQueue(self.config.queue)
700 if self.config.enable_retry:
701 self._retry_handler = RetryHandler(self.config.retry)
703 def set_handler(self, handler: Callable[[T], Any]) -> None:
704 """Set message handler."""
705 self._handler = handler
707 def _on_debounce_flush(self, messages: List[PipelineMessage[T]]) -> None:
708 """Handle debounce flush."""
709 for msg in messages:
710 self._process_after_debounce(msg)
712 def process(
713 self,
714 message: Union[T, PipelineMessage[T]],
715 message_id: Optional[str] = None,
716 channel: Optional[str] = None,
717 chat_id: Optional[str] = None,
718 user_id: Optional[str] = None,
719 content: Optional[str] = None,
720 ) -> PipelineResult:
721 """
722 Process a message through the pipeline.
724 Returns:
725 PipelineResult indicating outcome
726 """
727 if self._shutdown:
728 return PipelineResult.REJECTED
730 # Wrap message
731 if isinstance(message, PipelineMessage):
732 msg = message
733 else:
734 msg = PipelineMessage(
735 payload=message,
736 message_id=message_id or str(id(message)),
737 channel=channel or getattr(message, 'channel', 'default'),
738 chat_id=chat_id or getattr(message, 'chat_id', 'default'),
739 user_id=user_id or getattr(message, 'sender_id', getattr(message, 'user_id', 'default')),
740 content=content or getattr(message, 'content', ''),
741 )
743 self._stats.total_received += 1
745 try:
746 # Stage 1: Debounce
747 if self.config.enable_debounce and self._debouncer:
748 result = self._debouncer.debounce(msg, key=msg.chat_id, channel=msg.channel)
749 if result is None:
750 self._stats.total_debounced += 1
751 return PipelineResult.DEBOUNCED
753 return self._process_after_debounce(msg)
755 except Exception as e:
756 logger.error(f"Pipeline error: {e}")
757 self._stats.total_errors += 1
758 return PipelineResult.ERROR
760 def _process_after_debounce(self, msg: PipelineMessage[T]) -> PipelineResult:
761 """Process after debounce."""
762 # Stage 2: Dedupe
763 if self.config.enable_dedupe and self._deduper:
764 if self._deduper.check_and_mark(msg, message_id=msg.message_id, content=msg.content):
765 self._stats.total_deduplicated += 1
766 return PipelineResult.DUPLICATE
768 # Stage 3: Rate Limit
769 if self.config.enable_rate_limit and self._rate_limiter:
770 rate_result = self._rate_limiter.check_and_consume(msg.channel, msg.chat_id)
771 if not rate_result.allowed:
772 self._stats.total_rate_limited += 1
773 return PipelineResult.RATE_LIMITED
775 # Stage 4: Concurrency
776 if self.config.enable_concurrency and self._concurrency:
777 slot_id = self._concurrency.acquire_sync(msg.channel, msg.chat_id, msg.user_id)
778 if slot_id is None:
779 self._stats.total_concurrency_limited += 1
781 if self.config.enable_queue and self._queue:
782 return self._queue_message(msg)
784 return PipelineResult.CONCURRENCY_LIMITED
786 try:
787 return self._execute_handler(msg)
788 finally:
789 self._concurrency.release(slot_id=slot_id)
790 else:
791 return self._execute_handler(msg)
793 def _queue_message(self, msg: PipelineMessage[T]) -> PipelineResult:
794 """Queue a message."""
795 if not self._queue:
796 self._stats.total_rejected += 1
797 return PipelineResult.REJECTED
799 queued_msg = QueuedMessage(
800 message_id=msg.message_id,
801 channel=msg.channel,
802 chat_id=msg.chat_id,
803 sender_id=msg.user_id,
804 content=msg.content,
805 priority=msg.priority,
806 metadata={"pipeline_message": msg},
807 )
809 if self._queue.enqueue(queued_msg):
810 self._stats.total_queued += 1
811 return PipelineResult.QUEUED
812 else:
813 self._stats.total_rejected += 1
814 return PipelineResult.REJECTED
816 def _execute_handler(self, msg: PipelineMessage[T]) -> PipelineResult:
817 """Execute handler."""
818 if not self._handler:
819 self._stats.total_rejected += 1
820 return PipelineResult.REJECTED
822 self._stats.current_in_flight += 1
824 try:
825 if self.config.enable_retry and self._retry_handler:
826 self._retry_handler.with_retry(
827 self._handler,
828 msg.payload,
829 on_retry=lambda attempt: self._on_retry(msg, attempt),
830 )
831 else:
832 self._handler(msg.payload)
834 self._stats.total_processed += 1
835 self._stats.last_processed_at = datetime.now()
836 return PipelineResult.PROCESSED
838 except Exception as e:
839 logger.error(f"Handler error: {e}")
840 self._stats.total_errors += 1
841 return PipelineResult.ERROR
843 finally:
844 self._stats.current_in_flight -= 1
846 def _on_retry(self, msg: PipelineMessage[T], attempt: Any) -> None:
847 """Handle retry."""
848 msg.retry_count += 1
849 self._stats.total_retries += 1
851 def get_stats(self) -> PipelineStats:
852 """Get pipeline statistics."""
853 return PipelineStats(
854 total_received=self._stats.total_received,
855 total_processed=self._stats.total_processed,
856 total_debounced=self._stats.total_debounced,
857 total_deduplicated=self._stats.total_deduplicated,
858 total_rate_limited=self._stats.total_rate_limited,
859 total_concurrency_limited=self._stats.total_concurrency_limited,
860 total_queued=self._stats.total_queued,
861 total_batched=self._stats.total_batched,
862 total_rejected=self._stats.total_rejected,
863 total_errors=self._stats.total_errors,
864 total_retries=self._stats.total_retries,
865 last_processed_at=self._stats.last_processed_at,
866 current_in_flight=self._stats.current_in_flight,
867 )
869 def process_queued(self, max_items: int = 10) -> int:
870 """Process queued messages."""
871 if not self._queue:
872 return 0
874 processed = 0
875 for _ in range(max_items):
876 queued = self._queue.dequeue()
877 if queued is None:
878 break
880 msg = queued.metadata.get("pipeline_message")
881 if msg:
882 self._process_after_debounce(msg)
883 processed += 1
885 return processed
887 def shutdown(self) -> None:
888 """Shutdown pipeline."""
889 self._shutdown = True
891 if self._debouncer:
892 self._debouncer.flush_all()
894 if self._queue:
895 self.process_queued(max_items=self._queue.size)
897 if self._concurrency:
898 self._concurrency.clear()