Coverage for integrations / channels / queue / retry.py: 67.6%

111 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Retry Logic with Exponential Backoff 

3 

4Provides retry functionality for transient failures. 

5Ported from HevolveBot's src/infra/retry.ts. 

6 

7Features: 

8- Exponential backoff 

9- Jitter to prevent thundering herd 

10- Configurable retry conditions 

11- Async and sync support 

12""" 

13 

14from __future__ import annotations 

15 

16import asyncio 

17import logging 

18import random 

19import time 

20from dataclasses import dataclass, field 

21from datetime import datetime 

22from enum import Enum 

23from typing import Optional, Callable, Any, TypeVar, List, Type, Union 

24 

25logger = logging.getLogger(__name__) 

26 

27T = TypeVar('T') 

28 

29 

30class RetryResult(Enum): 

31 """Result of a retry operation.""" 

32 SUCCESS = "success" 

33 EXHAUSTED = "exhausted" # Max retries reached 

34 NON_RETRYABLE = "non_retryable" # Error is not retryable 

35 

36 

37@dataclass 

38class RetryConfig: 

39 """Configuration for retry behavior.""" 

40 max_retries: int = 3 

41 initial_delay_ms: int = 1000 

42 max_delay_ms: int = 30000 

43 exponential_base: float = 2.0 

44 jitter: bool = True 

45 jitter_factor: float = 0.1 

46 retryable_exceptions: List[Type[Exception]] = field(default_factory=lambda: [Exception]) 

47 non_retryable_exceptions: List[Type[Exception]] = field(default_factory=list) 

48 

49 

50@dataclass 

51class RetryStats: 

52 """Statistics for retry operations.""" 

53 total_attempts: int = 0 

54 total_successes: int = 0 

55 total_failures: int = 0 

56 total_retries: int = 0 

57 last_error: Optional[str] = None 

58 last_attempt_at: Optional[datetime] = None 

59 

60 

61@dataclass 

62class RetryAttempt: 

63 """Information about a retry attempt.""" 

64 attempt: int 

65 delay_ms: int 

66 error: Optional[Exception] = None 

67 timestamp: datetime = field(default_factory=datetime.now) 

68 

69 

70class RetryHandler: 

71 """ 

72 Handles retry logic with exponential backoff. 

73 

74 Usage: 

75 config = RetryConfig(max_retries=3, initial_delay_ms=1000) 

76 handler = RetryHandler(config) 

77 

78 # Async usage 

79 result = await handler.with_retry_async(some_async_function, arg1, arg2) 

80 

81 # Sync usage 

82 result = handler.with_retry(some_function, arg1, arg2) 

83 

84 # With custom should_retry 

85 result = await handler.with_retry_async( 

86 some_function, 

87 should_retry=lambda e, attempt: isinstance(e, TimeoutError) 

88 ) 

89 """ 

90 

91 def __init__(self, config: RetryConfig): 

92 self.config = config 

93 self._stats = RetryStats() 

94 

95 def calculate_delay(self, attempt: int) -> int: 

96 """ 

97 Calculate delay for a retry attempt. 

98 

99 Args: 

100 attempt: Attempt number (0-indexed) 

101 

102 Returns: 

103 Delay in milliseconds 

104 """ 

105 # Exponential backoff 

106 delay = self.config.initial_delay_ms * (self.config.exponential_base ** attempt) 

107 

108 # Cap at max delay 

109 delay = min(delay, self.config.max_delay_ms) 

110 

111 # Add jitter if enabled 

112 if self.config.jitter: 

113 jitter_range = delay * self.config.jitter_factor 

114 jitter = random.uniform(-jitter_range, jitter_range) 

115 delay = max(0, delay + jitter) 

116 

117 return int(delay) 

118 

119 def should_retry( 

120 self, 

121 error: Exception, 

122 attempt: int, 

123 custom_check: Optional[Callable[[Exception, int], bool]] = None, 

124 ) -> bool: 

125 """ 

126 Determine if an error should be retried. 

127 

128 Args: 

129 error: The exception that occurred 

130 attempt: Current attempt number 

131 custom_check: Optional custom retry check function 

132 

133 Returns: 

134 True if should retry, False otherwise 

135 """ 

136 # Check max retries 

137 if attempt >= self.config.max_retries: 

138 return False 

139 

140 # Check custom function first 

141 if custom_check is not None: 

142 return custom_check(error, attempt) 

143 

144 # Check non-retryable exceptions 

145 for exc_type in self.config.non_retryable_exceptions: 

146 if isinstance(error, exc_type): 

147 return False 

148 

149 # Check retryable exceptions 

150 for exc_type in self.config.retryable_exceptions: 

151 if isinstance(error, exc_type): 

152 return True 

153 

154 return False 

155 

156 async def with_retry_async( 

157 self, 

158 func: Callable[..., Any], 

159 *args, 

160 should_retry: Optional[Callable[[Exception, int], bool]] = None, 

161 on_retry: Optional[Callable[[RetryAttempt], None]] = None, 

162 **kwargs, 

163 ) -> Any: 

164 """ 

165 Execute a function with retry logic (async). 

166 

167 Args: 

168 func: Function to execute (can be sync or async) 

169 *args: Function arguments 

170 should_retry: Optional custom retry check 

171 on_retry: Optional callback on each retry 

172 **kwargs: Function keyword arguments 

173 

174 Returns: 

175 Function result 

176 

177 Raises: 

178 Last exception if all retries exhausted 

179 """ 

180 last_error: Optional[Exception] = None 

181 

182 for attempt in range(self.config.max_retries + 1): 

183 self._stats.total_attempts += 1 

184 self._stats.last_attempt_at = datetime.now() 

185 

186 try: 

187 # Execute function 

188 result = func(*args, **kwargs) 

189 if asyncio.iscoroutine(result): 

190 result = await result 

191 

192 self._stats.total_successes += 1 

193 return result 

194 

195 except Exception as e: 

196 last_error = e 

197 self._stats.last_error = str(e) 

198 

199 # Check if should retry 

200 if not self.should_retry(e, attempt, should_retry): 

201 self._stats.total_failures += 1 

202 raise 

203 

204 # Calculate delay 

205 delay_ms = self.calculate_delay(attempt) 

206 

207 # Create attempt info 

208 retry_attempt = RetryAttempt( 

209 attempt=attempt + 1, 

210 delay_ms=delay_ms, 

211 error=e, 

212 ) 

213 

214 # Call on_retry callback 

215 if on_retry: 

216 on_retry(retry_attempt) 

217 

218 logger.debug( 

219 f"Retry attempt {attempt + 1}/{self.config.max_retries}: " 

220 f"waiting {delay_ms}ms after error: {e}" 

221 ) 

222 

223 self._stats.total_retries += 1 

224 

225 # Wait before retry 

226 await asyncio.sleep(delay_ms / 1000.0) 

227 

228 # All retries exhausted 

229 self._stats.total_failures += 1 

230 if last_error: 

231 raise last_error 

232 raise RuntimeError("Retry exhausted without error") 

233 

234 def with_retry( 

235 self, 

236 func: Callable[..., T], 

237 *args, 

238 should_retry: Optional[Callable[[Exception, int], bool]] = None, 

239 on_retry: Optional[Callable[[RetryAttempt], None]] = None, 

240 **kwargs, 

241 ) -> T: 

242 """ 

243 Execute a function with retry logic (sync). 

244 

245 Args: 

246 func: Function to execute 

247 *args: Function arguments 

248 should_retry: Optional custom retry check 

249 on_retry: Optional callback on each retry 

250 **kwargs: Function keyword arguments 

251 

252 Returns: 

253 Function result 

254 

255 Raises: 

256 Last exception if all retries exhausted 

257 """ 

258 last_error: Optional[Exception] = None 

259 

260 for attempt in range(self.config.max_retries + 1): 

261 self._stats.total_attempts += 1 

262 self._stats.last_attempt_at = datetime.now() 

263 

264 try: 

265 result = func(*args, **kwargs) 

266 self._stats.total_successes += 1 

267 return result 

268 

269 except Exception as e: 

270 last_error = e 

271 self._stats.last_error = str(e) 

272 

273 if not self.should_retry(e, attempt, should_retry): 

274 self._stats.total_failures += 1 

275 raise 

276 

277 delay_ms = self.calculate_delay(attempt) 

278 

279 retry_attempt = RetryAttempt( 

280 attempt=attempt + 1, 

281 delay_ms=delay_ms, 

282 error=e, 

283 ) 

284 

285 if on_retry: 

286 on_retry(retry_attempt) 

287 

288 logger.debug( 

289 f"Retry attempt {attempt + 1}/{self.config.max_retries}: " 

290 f"waiting {delay_ms}ms after error: {e}" 

291 ) 

292 

293 self._stats.total_retries += 1 

294 time.sleep(delay_ms / 1000.0) 

295 

296 self._stats.total_failures += 1 

297 if last_error: 

298 raise last_error 

299 raise RuntimeError("Retry exhausted without error") 

300 

301 def get_stats(self) -> RetryStats: 

302 """Get retry statistics.""" 

303 return RetryStats( 

304 total_attempts=self._stats.total_attempts, 

305 total_successes=self._stats.total_successes, 

306 total_failures=self._stats.total_failures, 

307 total_retries=self._stats.total_retries, 

308 last_error=self._stats.last_error, 

309 last_attempt_at=self._stats.last_attempt_at, 

310 ) 

311 

312 def reset_stats(self) -> None: 

313 """Reset statistics.""" 

314 self._stats = RetryStats() 

315 

316 

317def retry_async( 

318 max_retries: int = 3, 

319 initial_delay_ms: int = 1000, 

320 exponential_base: float = 2.0, 

321 jitter: bool = True, 

322): 

323 """ 

324 Decorator for async functions with retry. 

325 

326 Usage: 

327 @retry_async(max_retries=3) 

328 async def my_function(): 

329 ... 

330 """ 

331 def decorator(func: Callable[..., T]) -> Callable[..., T]: 

332 config = RetryConfig( 

333 max_retries=max_retries, 

334 initial_delay_ms=initial_delay_ms, 

335 exponential_base=exponential_base, 

336 jitter=jitter, 

337 ) 

338 handler = RetryHandler(config) 

339 

340 async def wrapper(*args, **kwargs) -> T: 

341 return await handler.with_retry_async(func, *args, **kwargs) 

342 

343 return wrapper 

344 return decorator 

345 

346 

347def retry_sync( 

348 max_retries: int = 3, 

349 initial_delay_ms: int = 1000, 

350 exponential_base: float = 2.0, 

351 jitter: bool = True, 

352): 

353 """ 

354 Decorator for sync functions with retry. 

355 

356 Usage: 

357 @retry_sync(max_retries=3) 

358 def my_function(): 

359 ... 

360 """ 

361 def decorator(func: Callable[..., T]) -> Callable[..., T]: 

362 config = RetryConfig( 

363 max_retries=max_retries, 

364 initial_delay_ms=initial_delay_ms, 

365 exponential_base=exponential_base, 

366 jitter=jitter, 

367 ) 

368 handler = RetryHandler(config) 

369 

370 def wrapper(*args, **kwargs) -> T: 

371 return handler.with_retry(func, *args, **kwargs) 

372 

373 return wrapper 

374 return decorator