Coverage for integrations / channels / queue / retry.py: 67.6%
111 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Retry Logic with Exponential Backoff
4Provides retry functionality for transient failures.
5Ported from HevolveBot's src/infra/retry.ts.
7Features:
8- Exponential backoff
9- Jitter to prevent thundering herd
10- Configurable retry conditions
11- Async and sync support
12"""
14from __future__ import annotations
16import asyncio
17import logging
18import random
19import time
20from dataclasses import dataclass, field
21from datetime import datetime
22from enum import Enum
23from typing import Optional, Callable, Any, TypeVar, List, Type, Union
25logger = logging.getLogger(__name__)
27T = TypeVar('T')
30class RetryResult(Enum):
31 """Result of a retry operation."""
32 SUCCESS = "success"
33 EXHAUSTED = "exhausted" # Max retries reached
34 NON_RETRYABLE = "non_retryable" # Error is not retryable
37@dataclass
38class RetryConfig:
39 """Configuration for retry behavior."""
40 max_retries: int = 3
41 initial_delay_ms: int = 1000
42 max_delay_ms: int = 30000
43 exponential_base: float = 2.0
44 jitter: bool = True
45 jitter_factor: float = 0.1
46 retryable_exceptions: List[Type[Exception]] = field(default_factory=lambda: [Exception])
47 non_retryable_exceptions: List[Type[Exception]] = field(default_factory=list)
50@dataclass
51class RetryStats:
52 """Statistics for retry operations."""
53 total_attempts: int = 0
54 total_successes: int = 0
55 total_failures: int = 0
56 total_retries: int = 0
57 last_error: Optional[str] = None
58 last_attempt_at: Optional[datetime] = None
61@dataclass
62class RetryAttempt:
63 """Information about a retry attempt."""
64 attempt: int
65 delay_ms: int
66 error: Optional[Exception] = None
67 timestamp: datetime = field(default_factory=datetime.now)
70class RetryHandler:
71 """
72 Handles retry logic with exponential backoff.
74 Usage:
75 config = RetryConfig(max_retries=3, initial_delay_ms=1000)
76 handler = RetryHandler(config)
78 # Async usage
79 result = await handler.with_retry_async(some_async_function, arg1, arg2)
81 # Sync usage
82 result = handler.with_retry(some_function, arg1, arg2)
84 # With custom should_retry
85 result = await handler.with_retry_async(
86 some_function,
87 should_retry=lambda e, attempt: isinstance(e, TimeoutError)
88 )
89 """
91 def __init__(self, config: RetryConfig):
92 self.config = config
93 self._stats = RetryStats()
95 def calculate_delay(self, attempt: int) -> int:
96 """
97 Calculate delay for a retry attempt.
99 Args:
100 attempt: Attempt number (0-indexed)
102 Returns:
103 Delay in milliseconds
104 """
105 # Exponential backoff
106 delay = self.config.initial_delay_ms * (self.config.exponential_base ** attempt)
108 # Cap at max delay
109 delay = min(delay, self.config.max_delay_ms)
111 # Add jitter if enabled
112 if self.config.jitter:
113 jitter_range = delay * self.config.jitter_factor
114 jitter = random.uniform(-jitter_range, jitter_range)
115 delay = max(0, delay + jitter)
117 return int(delay)
119 def should_retry(
120 self,
121 error: Exception,
122 attempt: int,
123 custom_check: Optional[Callable[[Exception, int], bool]] = None,
124 ) -> bool:
125 """
126 Determine if an error should be retried.
128 Args:
129 error: The exception that occurred
130 attempt: Current attempt number
131 custom_check: Optional custom retry check function
133 Returns:
134 True if should retry, False otherwise
135 """
136 # Check max retries
137 if attempt >= self.config.max_retries:
138 return False
140 # Check custom function first
141 if custom_check is not None:
142 return custom_check(error, attempt)
144 # Check non-retryable exceptions
145 for exc_type in self.config.non_retryable_exceptions:
146 if isinstance(error, exc_type):
147 return False
149 # Check retryable exceptions
150 for exc_type in self.config.retryable_exceptions:
151 if isinstance(error, exc_type):
152 return True
154 return False
156 async def with_retry_async(
157 self,
158 func: Callable[..., Any],
159 *args,
160 should_retry: Optional[Callable[[Exception, int], bool]] = None,
161 on_retry: Optional[Callable[[RetryAttempt], None]] = None,
162 **kwargs,
163 ) -> Any:
164 """
165 Execute a function with retry logic (async).
167 Args:
168 func: Function to execute (can be sync or async)
169 *args: Function arguments
170 should_retry: Optional custom retry check
171 on_retry: Optional callback on each retry
172 **kwargs: Function keyword arguments
174 Returns:
175 Function result
177 Raises:
178 Last exception if all retries exhausted
179 """
180 last_error: Optional[Exception] = None
182 for attempt in range(self.config.max_retries + 1):
183 self._stats.total_attempts += 1
184 self._stats.last_attempt_at = datetime.now()
186 try:
187 # Execute function
188 result = func(*args, **kwargs)
189 if asyncio.iscoroutine(result):
190 result = await result
192 self._stats.total_successes += 1
193 return result
195 except Exception as e:
196 last_error = e
197 self._stats.last_error = str(e)
199 # Check if should retry
200 if not self.should_retry(e, attempt, should_retry):
201 self._stats.total_failures += 1
202 raise
204 # Calculate delay
205 delay_ms = self.calculate_delay(attempt)
207 # Create attempt info
208 retry_attempt = RetryAttempt(
209 attempt=attempt + 1,
210 delay_ms=delay_ms,
211 error=e,
212 )
214 # Call on_retry callback
215 if on_retry:
216 on_retry(retry_attempt)
218 logger.debug(
219 f"Retry attempt {attempt + 1}/{self.config.max_retries}: "
220 f"waiting {delay_ms}ms after error: {e}"
221 )
223 self._stats.total_retries += 1
225 # Wait before retry
226 await asyncio.sleep(delay_ms / 1000.0)
228 # All retries exhausted
229 self._stats.total_failures += 1
230 if last_error:
231 raise last_error
232 raise RuntimeError("Retry exhausted without error")
234 def with_retry(
235 self,
236 func: Callable[..., T],
237 *args,
238 should_retry: Optional[Callable[[Exception, int], bool]] = None,
239 on_retry: Optional[Callable[[RetryAttempt], None]] = None,
240 **kwargs,
241 ) -> T:
242 """
243 Execute a function with retry logic (sync).
245 Args:
246 func: Function to execute
247 *args: Function arguments
248 should_retry: Optional custom retry check
249 on_retry: Optional callback on each retry
250 **kwargs: Function keyword arguments
252 Returns:
253 Function result
255 Raises:
256 Last exception if all retries exhausted
257 """
258 last_error: Optional[Exception] = None
260 for attempt in range(self.config.max_retries + 1):
261 self._stats.total_attempts += 1
262 self._stats.last_attempt_at = datetime.now()
264 try:
265 result = func(*args, **kwargs)
266 self._stats.total_successes += 1
267 return result
269 except Exception as e:
270 last_error = e
271 self._stats.last_error = str(e)
273 if not self.should_retry(e, attempt, should_retry):
274 self._stats.total_failures += 1
275 raise
277 delay_ms = self.calculate_delay(attempt)
279 retry_attempt = RetryAttempt(
280 attempt=attempt + 1,
281 delay_ms=delay_ms,
282 error=e,
283 )
285 if on_retry:
286 on_retry(retry_attempt)
288 logger.debug(
289 f"Retry attempt {attempt + 1}/{self.config.max_retries}: "
290 f"waiting {delay_ms}ms after error: {e}"
291 )
293 self._stats.total_retries += 1
294 time.sleep(delay_ms / 1000.0)
296 self._stats.total_failures += 1
297 if last_error:
298 raise last_error
299 raise RuntimeError("Retry exhausted without error")
301 def get_stats(self) -> RetryStats:
302 """Get retry statistics."""
303 return RetryStats(
304 total_attempts=self._stats.total_attempts,
305 total_successes=self._stats.total_successes,
306 total_failures=self._stats.total_failures,
307 total_retries=self._stats.total_retries,
308 last_error=self._stats.last_error,
309 last_attempt_at=self._stats.last_attempt_at,
310 )
312 def reset_stats(self) -> None:
313 """Reset statistics."""
314 self._stats = RetryStats()
317def retry_async(
318 max_retries: int = 3,
319 initial_delay_ms: int = 1000,
320 exponential_base: float = 2.0,
321 jitter: bool = True,
322):
323 """
324 Decorator for async functions with retry.
326 Usage:
327 @retry_async(max_retries=3)
328 async def my_function():
329 ...
330 """
331 def decorator(func: Callable[..., T]) -> Callable[..., T]:
332 config = RetryConfig(
333 max_retries=max_retries,
334 initial_delay_ms=initial_delay_ms,
335 exponential_base=exponential_base,
336 jitter=jitter,
337 )
338 handler = RetryHandler(config)
340 async def wrapper(*args, **kwargs) -> T:
341 return await handler.with_retry_async(func, *args, **kwargs)
343 return wrapper
344 return decorator
347def retry_sync(
348 max_retries: int = 3,
349 initial_delay_ms: int = 1000,
350 exponential_base: float = 2.0,
351 jitter: bool = True,
352):
353 """
354 Decorator for sync functions with retry.
356 Usage:
357 @retry_sync(max_retries=3)
358 def my_function():
359 ...
360 """
361 def decorator(func: Callable[..., T]) -> Callable[..., T]:
362 config = RetryConfig(
363 max_retries=max_retries,
364 initial_delay_ms=initial_delay_ms,
365 exponential_base=exponential_base,
366 jitter=jitter,
367 )
368 handler = RetryHandler(config)
370 def wrapper(*args, **kwargs) -> T:
371 return handler.with_retry(func, *args, **kwargs)
373 return wrapper
374 return decorator