Coverage for integrations / channels / queue / pipeline.py: 0.0%

400 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Message Pipeline 

3 

4Combines all queue components into a single processing pipeline: 

5debounce -> dedupe -> rate_limit -> concurrency -> queue 

6 

7Ported from HevolveBot's unified message handling approach. 

8 

9Features: 

10- Configurable pipeline stages 

11- Async and sync processing 

12- Statistics tracking across all stages 

13- Error handling with retry 

14- Graceful shutdown 

15""" 

16 

17from __future__ import annotations 

18 

19import asyncio 

20import logging 

21import threading 

22import time 

23from dataclasses import dataclass, field 

24from datetime import datetime 

25from enum import Enum 

26from typing import ( 

27 Optional, 

28 Dict, 

29 List, 

30 Any, 

31 Callable, 

32 TypeVar, 

33 Generic, 

34 Awaitable, 

35 Union, 

36) 

37 

38from .debounce import DebounceConfig, InboundDebouncer, SyncDebouncer 

39from .dedupe import DedupeConfig, DedupeMode, MessageDeduplicator 

40from .rate_limit import RateLimitConfig, RateLimiter, RateLimitResult 

41from .concurrency import ConcurrencyLimits, ConcurrencyController 

42from .message_queue import QueueConfig, QueuePolicy, MessageQueue, QueuedMessage 

43from .retry import RetryConfig, RetryHandler 

44from .batching import BatchConfig, MessageBatcher, BatchResult 

45 

46logger = logging.getLogger(__name__) 

47 

48T = TypeVar('T') 

49 

50 

51class PipelineStage(Enum): 

52 """Stages in the message pipeline.""" 

53 DEBOUNCE = "debounce" 

54 DEDUPE = "dedupe" 

55 RATE_LIMIT = "rate_limit" 

56 CONCURRENCY = "concurrency" 

57 QUEUE = "queue" 

58 BATCH = "batch" 

59 PROCESS = "process" 

60 

61 

62class PipelineResult(Enum): 

63 """Result of pipeline processing.""" 

64 PROCESSED = "processed" 

65 DEBOUNCED = "debounced" # Held for debouncing 

66 DUPLICATE = "duplicate" 

67 RATE_LIMITED = "rate_limited" 

68 CONCURRENCY_LIMITED = "concurrency_limited" 

69 QUEUED = "queued" 

70 BATCHED = "batched" 

71 REJECTED = "rejected" 

72 ERROR = "error" 

73 

74 

75@dataclass 

76class PipelineConfig: 

77 """Configuration for the message pipeline.""" 

78 # Stage enablement 

79 enable_debounce: bool = True 

80 enable_dedupe: bool = True 

81 enable_rate_limit: bool = True 

82 enable_concurrency: bool = True 

83 enable_queue: bool = True 

84 enable_batch: bool = False 

85 enable_retry: bool = True 

86 

87 # Stage configs 

88 debounce: DebounceConfig = field(default_factory=DebounceConfig) 

89 dedupe: DedupeConfig = field(default_factory=DedupeConfig) 

90 rate_limit: RateLimitConfig = field(default_factory=RateLimitConfig) 

91 concurrency: ConcurrencyLimits = field(default_factory=ConcurrencyLimits) 

92 queue: QueueConfig = field(default_factory=QueueConfig) 

93 batch: BatchConfig = field(default_factory=BatchConfig) 

94 retry: RetryConfig = field(default_factory=RetryConfig) 

95 

96 

97@dataclass 

98class PipelineStats: 

99 """Statistics for the pipeline.""" 

100 total_received: int = 0 

101 total_processed: int = 0 

102 total_debounced: int = 0 

103 total_deduplicated: int = 0 

104 total_rate_limited: int = 0 

105 total_concurrency_limited: int = 0 

106 total_queued: int = 0 

107 total_batched: int = 0 

108 total_rejected: int = 0 

109 total_errors: int = 0 

110 total_retries: int = 0 

111 last_processed_at: Optional[datetime] = None 

112 current_in_flight: int = 0 

113 

114 def to_dict(self) -> Dict[str, Any]: 

115 """Convert stats to dictionary.""" 

116 return { 

117 "total_received": self.total_received, 

118 "total_processed": self.total_processed, 

119 "total_debounced": self.total_debounced, 

120 "total_deduplicated": self.total_deduplicated, 

121 "total_rate_limited": self.total_rate_limited, 

122 "total_concurrency_limited": self.total_concurrency_limited, 

123 "total_queued": self.total_queued, 

124 "total_batched": self.total_batched, 

125 "total_rejected": self.total_rejected, 

126 "total_errors": self.total_errors, 

127 "total_retries": self.total_retries, 

128 "last_processed_at": self.last_processed_at.isoformat() if self.last_processed_at else None, 

129 "current_in_flight": self.current_in_flight, 

130 } 

131 

132 

133@dataclass 

134class PipelineMessage(Generic[T]): 

135 """A message wrapper with pipeline metadata.""" 

136 payload: T 

137 message_id: str 

138 channel: str 

139 chat_id: str 

140 user_id: str 

141 content: str = "" 

142 priority: int = 0 

143 metadata: Dict[str, Any] = field(default_factory=dict) 

144 received_at: datetime = field(default_factory=datetime.now) 

145 

146 # Pipeline state 

147 current_stage: PipelineStage = PipelineStage.DEBOUNCE 

148 result: Optional[PipelineResult] = None 

149 error: Optional[Exception] = None 

150 retry_count: int = 0 

151 

152 

153class MessagePipeline(Generic[T]): 

154 """ 

155 Unified message processing pipeline. 

156 

157 Combines all queue components in sequence: 

158 1. Debounce - Collect rapid-fire messages 

159 2. Dedupe - Filter duplicate messages 

160 3. Rate Limit - Enforce rate limits 

161 4. Concurrency - Control concurrent processing 

162 5. Queue - Buffer messages when needed 

163 6. Batch (optional) - Batch messages together 

164 7. Process - Execute message handler 

165 

166 Usage: 

167 config = PipelineConfig() 

168 pipeline = MessagePipeline(config) 

169 

170 # Set message handler 

171 async def handle_message(msg): 

172 print(f"Processing: {msg.content}") 

173 

174 pipeline.set_handler(handle_message) 

175 

176 # Process messages 

177 result = await pipeline.process(message) 

178 

179 # Get stats 

180 stats = pipeline.get_stats() 

181 """ 

182 

183 def __init__( 

184 self, 

185 config: PipelineConfig, 

186 handler: Optional[Callable[[T], Awaitable[Any]]] = None, 

187 ): 

188 """ 

189 Initialize the pipeline. 

190 

191 Args: 

192 config: Pipeline configuration 

193 handler: Message handler function 

194 """ 

195 self.config = config 

196 self._handler = handler 

197 self._lock = threading.Lock() 

198 self._stats = PipelineStats() 

199 self._shutdown = False 

200 

201 # Initialize components 

202 self._debouncer: Optional[InboundDebouncer[PipelineMessage[T]]] = None 

203 self._deduper: Optional[MessageDeduplicator[PipelineMessage[T]]] = None 

204 self._rate_limiter: Optional[RateLimiter] = None 

205 self._concurrency: Optional[ConcurrencyController] = None 

206 self._queue: Optional[MessageQueue] = None 

207 self._batcher: Optional[MessageBatcher[PipelineMessage[T]]] = None 

208 self._retry_handler: Optional[RetryHandler] = None 

209 

210 self._init_components() 

211 

212 def _init_components(self) -> None: 

213 """Initialize pipeline components.""" 

214 if self.config.enable_debounce: 

215 self._debouncer = InboundDebouncer( 

216 self.config.debounce, 

217 on_flush=self._on_debounce_flush, 

218 ) 

219 

220 if self.config.enable_dedupe: 

221 self._deduper = MessageDeduplicator(self.config.dedupe) 

222 

223 if self.config.enable_rate_limit: 

224 self._rate_limiter = RateLimiter(self.config.rate_limit) 

225 

226 if self.config.enable_concurrency: 

227 self._concurrency = ConcurrencyController(self.config.concurrency) 

228 

229 if self.config.enable_queue: 

230 self._queue = MessageQueue(self.config.queue) 

231 

232 if self.config.enable_batch: 

233 self._batcher = MessageBatcher( 

234 self.config.batch, 

235 on_flush=self._on_batch_flush, 

236 ) 

237 

238 if self.config.enable_retry: 

239 self._retry_handler = RetryHandler(self.config.retry) 

240 

241 def set_handler( 

242 self, 

243 handler: Callable[[T], Awaitable[Any]], 

244 ) -> None: 

245 """ 

246 Set the message handler. 

247 

248 Args: 

249 handler: Async function to handle messages 

250 """ 

251 self._handler = handler 

252 

253 async def _on_debounce_flush( 

254 self, 

255 messages: List[PipelineMessage[T]], 

256 ) -> None: 

257 """Handle debounce flush.""" 

258 for msg in messages: 

259 await self._process_after_debounce(msg) 

260 

261 def _on_batch_flush( 

262 self, 

263 batch: BatchResult[PipelineMessage[T]], 

264 ) -> None: 

265 """Handle batch flush - schedule processing.""" 

266 asyncio.create_task(self._process_batch(batch)) 

267 

268 async def _process_batch( 

269 self, 

270 batch: BatchResult[PipelineMessage[T]], 

271 ) -> None: 

272 """Process a batch of messages.""" 

273 for msg in batch.items: 

274 await self._execute_handler(msg) 

275 

276 async def process( 

277 self, 

278 message: Union[T, PipelineMessage[T]], 

279 message_id: Optional[str] = None, 

280 channel: Optional[str] = None, 

281 chat_id: Optional[str] = None, 

282 user_id: Optional[str] = None, 

283 content: Optional[str] = None, 

284 ) -> PipelineResult: 

285 """ 

286 Process a message through the pipeline. 

287 

288 Args: 

289 message: Message to process (or PipelineMessage wrapper) 

290 message_id: Message ID (if not wrapped) 

291 channel: Channel name (if not wrapped) 

292 chat_id: Chat ID (if not wrapped) 

293 user_id: User ID (if not wrapped) 

294 content: Message content (if not wrapped) 

295 

296 Returns: 

297 PipelineResult indicating outcome 

298 """ 

299 if self._shutdown: 

300 return PipelineResult.REJECTED 

301 

302 # Wrap message if needed 

303 if isinstance(message, PipelineMessage): 

304 msg = message 

305 else: 

306 msg = PipelineMessage( 

307 payload=message, 

308 message_id=message_id or str(id(message)), 

309 channel=channel or getattr(message, 'channel', 'default'), 

310 chat_id=chat_id or getattr(message, 'chat_id', 'default'), 

311 user_id=user_id or getattr(message, 'sender_id', getattr(message, 'user_id', 'default')), 

312 content=content or getattr(message, 'content', ''), 

313 ) 

314 

315 self._stats.total_received += 1 

316 

317 try: 

318 # Stage 1: Debounce 

319 if self.config.enable_debounce and self._debouncer: 

320 msg.current_stage = PipelineStage.DEBOUNCE 

321 result = await self._debouncer.debounce( 

322 msg, 

323 key=msg.chat_id, 

324 channel=msg.channel, 

325 ) 

326 if result is None: 

327 # Message is being debounced 

328 self._stats.total_debounced += 1 

329 return PipelineResult.DEBOUNCED 

330 

331 # Continue processing 

332 return await self._process_after_debounce(msg) 

333 

334 except Exception as e: 

335 logger.error(f"Pipeline error: {e}") 

336 self._stats.total_errors += 1 

337 msg.error = e 

338 msg.result = PipelineResult.ERROR 

339 return PipelineResult.ERROR 

340 

341 async def _process_after_debounce( 

342 self, 

343 msg: PipelineMessage[T], 

344 ) -> PipelineResult: 

345 """Process message after debounce stage.""" 

346 try: 

347 # Stage 2: Dedupe 

348 if self.config.enable_dedupe and self._deduper: 

349 msg.current_stage = PipelineStage.DEDUPE 

350 if self._deduper.check_and_mark( 

351 msg, 

352 message_id=msg.message_id, 

353 content=msg.content, 

354 ): 

355 self._stats.total_deduplicated += 1 

356 msg.result = PipelineResult.DUPLICATE 

357 return PipelineResult.DUPLICATE 

358 

359 # Stage 3: Rate Limit 

360 if self.config.enable_rate_limit and self._rate_limiter: 

361 msg.current_stage = PipelineStage.RATE_LIMIT 

362 rate_result = self._rate_limiter.check_and_consume( 

363 msg.channel, 

364 msg.chat_id, 

365 ) 

366 if not rate_result.allowed: 

367 self._stats.total_rate_limited += 1 

368 msg.result = PipelineResult.RATE_LIMITED 

369 return PipelineResult.RATE_LIMITED 

370 

371 # Stage 4: Concurrency 

372 if self.config.enable_concurrency and self._concurrency: 

373 msg.current_stage = PipelineStage.CONCURRENCY 

374 slot_id = await self._concurrency.acquire( 

375 msg.channel, 

376 msg.chat_id, 

377 msg.user_id, 

378 wait=self.config.concurrency.queue_when_limited, 

379 ) 

380 if slot_id is None: 

381 self._stats.total_concurrency_limited += 1 

382 msg.result = PipelineResult.CONCURRENCY_LIMITED 

383 

384 # Queue if enabled 

385 if self.config.enable_queue and self._queue: 

386 return await self._queue_message(msg) 

387 

388 return PipelineResult.CONCURRENCY_LIMITED 

389 

390 try: 

391 return await self._process_with_slot(msg, slot_id) 

392 finally: 

393 self._concurrency.release(slot_id=slot_id) 

394 else: 

395 # No concurrency control 

396 return await self._process_with_slot(msg, None) 

397 

398 except Exception as e: 

399 logger.error(f"Pipeline processing error: {e}") 

400 self._stats.total_errors += 1 

401 msg.error = e 

402 msg.result = PipelineResult.ERROR 

403 return PipelineResult.ERROR 

404 

405 async def _queue_message( 

406 self, 

407 msg: PipelineMessage[T], 

408 ) -> PipelineResult: 

409 """Queue a message for later processing.""" 

410 msg.current_stage = PipelineStage.QUEUE 

411 

412 if not self._queue: 

413 self._stats.total_rejected += 1 

414 return PipelineResult.REJECTED 

415 

416 queued_msg = QueuedMessage( 

417 message_id=msg.message_id, 

418 channel=msg.channel, 

419 chat_id=msg.chat_id, 

420 sender_id=msg.user_id, 

421 content=msg.content, 

422 priority=msg.priority, 

423 metadata={"pipeline_message": msg}, 

424 ) 

425 

426 if self._queue.enqueue(queued_msg): 

427 self._stats.total_queued += 1 

428 msg.result = PipelineResult.QUEUED 

429 return PipelineResult.QUEUED 

430 else: 

431 self._stats.total_rejected += 1 

432 msg.result = PipelineResult.REJECTED 

433 return PipelineResult.REJECTED 

434 

435 async def _process_with_slot( 

436 self, 

437 msg: PipelineMessage[T], 

438 slot_id: Optional[str], 

439 ) -> PipelineResult: 

440 """Process message with acquired concurrency slot.""" 

441 # Stage 5: Batch (optional) 

442 if self.config.enable_batch and self._batcher: 

443 msg.current_stage = PipelineStage.BATCH 

444 batch_result = await self._batcher.add(msg, key=msg.chat_id) 

445 if batch_result is None: 

446 # Message is batched, will be processed later 

447 self._stats.total_batched += 1 

448 msg.result = PipelineResult.BATCHED 

449 return PipelineResult.BATCHED 

450 # Batch was flushed - already handled by callback 

451 

452 # Stage 6: Process 

453 return await self._execute_handler(msg) 

454 

455 async def _execute_handler( 

456 self, 

457 msg: PipelineMessage[T], 

458 ) -> PipelineResult: 

459 """Execute the message handler.""" 

460 msg.current_stage = PipelineStage.PROCESS 

461 

462 if not self._handler: 

463 logger.warning("No handler set for pipeline") 

464 self._stats.total_rejected += 1 

465 return PipelineResult.REJECTED 

466 

467 self._stats.current_in_flight += 1 

468 

469 try: 

470 if self.config.enable_retry and self._retry_handler: 

471 await self._retry_handler.with_retry_async( 

472 self._handler, 

473 msg.payload, 

474 on_retry=lambda attempt: self._on_retry(msg, attempt), 

475 ) 

476 else: 

477 result = self._handler(msg.payload) 

478 if asyncio.iscoroutine(result): 

479 await result 

480 

481 self._stats.total_processed += 1 

482 self._stats.last_processed_at = datetime.now() 

483 msg.result = PipelineResult.PROCESSED 

484 return PipelineResult.PROCESSED 

485 

486 except Exception as e: 

487 logger.error(f"Handler error: {e}") 

488 self._stats.total_errors += 1 

489 msg.error = e 

490 msg.result = PipelineResult.ERROR 

491 return PipelineResult.ERROR 

492 

493 finally: 

494 self._stats.current_in_flight -= 1 

495 

496 def _on_retry(self, msg: PipelineMessage[T], attempt: Any) -> None: 

497 """Handle retry attempt.""" 

498 msg.retry_count += 1 

499 self._stats.total_retries += 1 

500 

501 async def process_queued(self, max_items: int = 10) -> int: 

502 """ 

503 Process queued messages. 

504 

505 Args: 

506 max_items: Maximum items to process 

507 

508 Returns: 

509 Number of messages processed 

510 """ 

511 if not self._queue: 

512 return 0 

513 

514 processed = 0 

515 for _ in range(max_items): 

516 queued = self._queue.dequeue() 

517 if queued is None: 

518 break 

519 

520 msg = queued.metadata.get("pipeline_message") 

521 if msg: 

522 await self._process_after_debounce(msg) 

523 processed += 1 

524 

525 return processed 

526 

527 def get_stats(self) -> PipelineStats: 

528 """Get pipeline statistics.""" 

529 return PipelineStats( 

530 total_received=self._stats.total_received, 

531 total_processed=self._stats.total_processed, 

532 total_debounced=self._stats.total_debounced, 

533 total_deduplicated=self._stats.total_deduplicated, 

534 total_rate_limited=self._stats.total_rate_limited, 

535 total_concurrency_limited=self._stats.total_concurrency_limited, 

536 total_queued=self._stats.total_queued, 

537 total_batched=self._stats.total_batched, 

538 total_rejected=self._stats.total_rejected, 

539 total_errors=self._stats.total_errors, 

540 total_retries=self._stats.total_retries, 

541 last_processed_at=self._stats.last_processed_at, 

542 current_in_flight=self._stats.current_in_flight, 

543 ) 

544 

545 def get_component_stats(self) -> Dict[str, Any]: 

546 """Get statistics from all components.""" 

547 stats = {} 

548 

549 if self._debouncer: 

550 stats["debounce"] = self._debouncer.get_stats().__dict__ 

551 

552 if self._deduper: 

553 stats["dedupe"] = self._deduper.get_stats().__dict__ 

554 

555 if self._rate_limiter: 

556 stats["rate_limit"] = self._rate_limiter.get_stats().__dict__ 

557 

558 if self._concurrency: 

559 usage = self._concurrency.get_usage() 

560 stats["concurrency"] = { 

561 "current_global": usage.current_global, 

562 "total_acquired": usage.total_acquired, 

563 "total_rejected": usage.total_rejected, 

564 } 

565 

566 if self._queue: 

567 stats["queue"] = self._queue.get_stats().to_dict() 

568 

569 if self._batcher: 

570 stats["batch"] = self._batcher.get_stats().to_dict() 

571 

572 if self._retry_handler: 

573 stats["retry"] = self._retry_handler.get_stats().__dict__ 

574 

575 return stats 

576 

577 def get_queue_size(self) -> int: 

578 """Get current queue size.""" 

579 if self._queue: 

580 return self._queue.size 

581 return 0 

582 

583 def get_pending_count(self) -> int: 

584 """Get total pending messages (debounced + queued + batched).""" 

585 count = 0 

586 

587 if self._debouncer: 

588 count += self._debouncer.get_pending_count() 

589 

590 if self._queue: 

591 count += self._queue.size 

592 

593 if self._batcher: 

594 count += self._batcher.get_pending_count() 

595 

596 return count 

597 

598 async def flush_all(self) -> Dict[str, int]: 

599 """ 

600 Flush all pending items in the pipeline. 

601 

602 Returns: 

603 Dict with counts of flushed items per stage 

604 """ 

605 flushed = {} 

606 

607 if self._debouncer: 

608 result = self._debouncer.flush_all() 

609 flushed["debounce"] = sum(len(items) for items in result.values()) 

610 

611 if self._batcher: 

612 results = await self._batcher.flush_all() 

613 flushed["batch"] = sum(r.batch_size for r in results) 

614 

615 return flushed 

616 

617 def reset_stats(self) -> None: 

618 """Reset all pipeline statistics.""" 

619 self._stats = PipelineStats() 

620 

621 if self._retry_handler: 

622 self._retry_handler.reset_stats() 

623 

624 async def shutdown(self) -> None: 

625 """Gracefully shutdown the pipeline.""" 

626 self._shutdown = True 

627 

628 # Flush remaining items 

629 await self.flush_all() 

630 

631 # Process queued messages 

632 if self._queue: 

633 await self.process_queued(max_items=self._queue.size) 

634 

635 # Clear components 

636 if self._debouncer: 

637 self._debouncer.clear() 

638 

639 if self._deduper: 

640 self._deduper.clear() 

641 

642 if self._concurrency: 

643 self._concurrency.clear() 

644 

645 if self._queue: 

646 self._queue.clear() 

647 

648 if self._batcher: 

649 self._batcher.clear() 

650 

651 

652class SyncMessagePipeline(Generic[T]): 

653 """ 

654 Synchronous version of MessagePipeline. 

655 

656 For use in non-async contexts. 

657 """ 

658 

659 def __init__( 

660 self, 

661 config: PipelineConfig, 

662 handler: Optional[Callable[[T], Any]] = None, 

663 ): 

664 self.config = config 

665 self._handler = handler 

666 self._lock = threading.Lock() 

667 self._stats = PipelineStats() 

668 self._shutdown = False 

669 

670 # Initialize sync components 

671 self._debouncer: Optional[SyncDebouncer[PipelineMessage[T]]] = None 

672 self._deduper: Optional[MessageDeduplicator[PipelineMessage[T]]] = None 

673 self._rate_limiter: Optional[RateLimiter] = None 

674 self._concurrency: Optional[ConcurrencyController] = None 

675 self._queue: Optional[MessageQueue] = None 

676 self._retry_handler: Optional[RetryHandler] = None 

677 

678 self._init_components() 

679 

680 def _init_components(self) -> None: 

681 """Initialize components.""" 

682 if self.config.enable_debounce: 

683 self._debouncer = SyncDebouncer( 

684 self.config.debounce, 

685 on_flush=self._on_debounce_flush, 

686 ) 

687 

688 if self.config.enable_dedupe: 

689 self._deduper = MessageDeduplicator(self.config.dedupe) 

690 

691 if self.config.enable_rate_limit: 

692 self._rate_limiter = RateLimiter(self.config.rate_limit) 

693 

694 if self.config.enable_concurrency: 

695 self._concurrency = ConcurrencyController(self.config.concurrency) 

696 

697 if self.config.enable_queue: 

698 self._queue = MessageQueue(self.config.queue) 

699 

700 if self.config.enable_retry: 

701 self._retry_handler = RetryHandler(self.config.retry) 

702 

703 def set_handler(self, handler: Callable[[T], Any]) -> None: 

704 """Set message handler.""" 

705 self._handler = handler 

706 

707 def _on_debounce_flush(self, messages: List[PipelineMessage[T]]) -> None: 

708 """Handle debounce flush.""" 

709 for msg in messages: 

710 self._process_after_debounce(msg) 

711 

712 def process( 

713 self, 

714 message: Union[T, PipelineMessage[T]], 

715 message_id: Optional[str] = None, 

716 channel: Optional[str] = None, 

717 chat_id: Optional[str] = None, 

718 user_id: Optional[str] = None, 

719 content: Optional[str] = None, 

720 ) -> PipelineResult: 

721 """ 

722 Process a message through the pipeline. 

723 

724 Returns: 

725 PipelineResult indicating outcome 

726 """ 

727 if self._shutdown: 

728 return PipelineResult.REJECTED 

729 

730 # Wrap message 

731 if isinstance(message, PipelineMessage): 

732 msg = message 

733 else: 

734 msg = PipelineMessage( 

735 payload=message, 

736 message_id=message_id or str(id(message)), 

737 channel=channel or getattr(message, 'channel', 'default'), 

738 chat_id=chat_id or getattr(message, 'chat_id', 'default'), 

739 user_id=user_id or getattr(message, 'sender_id', getattr(message, 'user_id', 'default')), 

740 content=content or getattr(message, 'content', ''), 

741 ) 

742 

743 self._stats.total_received += 1 

744 

745 try: 

746 # Stage 1: Debounce 

747 if self.config.enable_debounce and self._debouncer: 

748 result = self._debouncer.debounce(msg, key=msg.chat_id, channel=msg.channel) 

749 if result is None: 

750 self._stats.total_debounced += 1 

751 return PipelineResult.DEBOUNCED 

752 

753 return self._process_after_debounce(msg) 

754 

755 except Exception as e: 

756 logger.error(f"Pipeline error: {e}") 

757 self._stats.total_errors += 1 

758 return PipelineResult.ERROR 

759 

760 def _process_after_debounce(self, msg: PipelineMessage[T]) -> PipelineResult: 

761 """Process after debounce.""" 

762 # Stage 2: Dedupe 

763 if self.config.enable_dedupe and self._deduper: 

764 if self._deduper.check_and_mark(msg, message_id=msg.message_id, content=msg.content): 

765 self._stats.total_deduplicated += 1 

766 return PipelineResult.DUPLICATE 

767 

768 # Stage 3: Rate Limit 

769 if self.config.enable_rate_limit and self._rate_limiter: 

770 rate_result = self._rate_limiter.check_and_consume(msg.channel, msg.chat_id) 

771 if not rate_result.allowed: 

772 self._stats.total_rate_limited += 1 

773 return PipelineResult.RATE_LIMITED 

774 

775 # Stage 4: Concurrency 

776 if self.config.enable_concurrency and self._concurrency: 

777 slot_id = self._concurrency.acquire_sync(msg.channel, msg.chat_id, msg.user_id) 

778 if slot_id is None: 

779 self._stats.total_concurrency_limited += 1 

780 

781 if self.config.enable_queue and self._queue: 

782 return self._queue_message(msg) 

783 

784 return PipelineResult.CONCURRENCY_LIMITED 

785 

786 try: 

787 return self._execute_handler(msg) 

788 finally: 

789 self._concurrency.release(slot_id=slot_id) 

790 else: 

791 return self._execute_handler(msg) 

792 

793 def _queue_message(self, msg: PipelineMessage[T]) -> PipelineResult: 

794 """Queue a message.""" 

795 if not self._queue: 

796 self._stats.total_rejected += 1 

797 return PipelineResult.REJECTED 

798 

799 queued_msg = QueuedMessage( 

800 message_id=msg.message_id, 

801 channel=msg.channel, 

802 chat_id=msg.chat_id, 

803 sender_id=msg.user_id, 

804 content=msg.content, 

805 priority=msg.priority, 

806 metadata={"pipeline_message": msg}, 

807 ) 

808 

809 if self._queue.enqueue(queued_msg): 

810 self._stats.total_queued += 1 

811 return PipelineResult.QUEUED 

812 else: 

813 self._stats.total_rejected += 1 

814 return PipelineResult.REJECTED 

815 

816 def _execute_handler(self, msg: PipelineMessage[T]) -> PipelineResult: 

817 """Execute handler.""" 

818 if not self._handler: 

819 self._stats.total_rejected += 1 

820 return PipelineResult.REJECTED 

821 

822 self._stats.current_in_flight += 1 

823 

824 try: 

825 if self.config.enable_retry and self._retry_handler: 

826 self._retry_handler.with_retry( 

827 self._handler, 

828 msg.payload, 

829 on_retry=lambda attempt: self._on_retry(msg, attempt), 

830 ) 

831 else: 

832 self._handler(msg.payload) 

833 

834 self._stats.total_processed += 1 

835 self._stats.last_processed_at = datetime.now() 

836 return PipelineResult.PROCESSED 

837 

838 except Exception as e: 

839 logger.error(f"Handler error: {e}") 

840 self._stats.total_errors += 1 

841 return PipelineResult.ERROR 

842 

843 finally: 

844 self._stats.current_in_flight -= 1 

845 

846 def _on_retry(self, msg: PipelineMessage[T], attempt: Any) -> None: 

847 """Handle retry.""" 

848 msg.retry_count += 1 

849 self._stats.total_retries += 1 

850 

851 def get_stats(self) -> PipelineStats: 

852 """Get pipeline statistics.""" 

853 return PipelineStats( 

854 total_received=self._stats.total_received, 

855 total_processed=self._stats.total_processed, 

856 total_debounced=self._stats.total_debounced, 

857 total_deduplicated=self._stats.total_deduplicated, 

858 total_rate_limited=self._stats.total_rate_limited, 

859 total_concurrency_limited=self._stats.total_concurrency_limited, 

860 total_queued=self._stats.total_queued, 

861 total_batched=self._stats.total_batched, 

862 total_rejected=self._stats.total_rejected, 

863 total_errors=self._stats.total_errors, 

864 total_retries=self._stats.total_retries, 

865 last_processed_at=self._stats.last_processed_at, 

866 current_in_flight=self._stats.current_in_flight, 

867 ) 

868 

869 def process_queued(self, max_items: int = 10) -> int: 

870 """Process queued messages.""" 

871 if not self._queue: 

872 return 0 

873 

874 processed = 0 

875 for _ in range(max_items): 

876 queued = self._queue.dequeue() 

877 if queued is None: 

878 break 

879 

880 msg = queued.metadata.get("pipeline_message") 

881 if msg: 

882 self._process_after_debounce(msg) 

883 processed += 1 

884 

885 return processed 

886 

887 def shutdown(self) -> None: 

888 """Shutdown pipeline.""" 

889 self._shutdown = True 

890 

891 if self._debouncer: 

892 self._debouncer.flush_all() 

893 

894 if self._queue: 

895 self.process_queued(max_items=self._queue.size) 

896 

897 if self._concurrency: 

898 self._concurrency.clear()