Coverage for core / circuit_breaker.py: 97.2%

107 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Reusable Circuit Breaker — extracted from WorldModelBridge. 

3 

4Three states: 

5- CLOSED: Requests flow normally. Failures increment counter. 

6- OPEN: Requests blocked. Timer ticks down cooldown period. 

7- HALF_OPEN: One probe request allowed. Success → CLOSED, failure → OPEN. 

8 

9Thread-safe via threading.Lock. 

10 

11Usage: 

12 cb = CircuitBreaker(name='hevolve_core', threshold=5, cooldown=60) 

13 

14 if cb.is_open(): 

15 return fallback_response 

16 

17 try: 

18 result = call_external_service() 

19 cb.record_success() 

20 return result 

21 except Exception: 

22 cb.record_failure() 

23 raise 

24 

25Or use the decorator: 

26 @with_circuit_breaker(cb) 

27 def call_service(): 

28 ... 

29""" 

30import logging 

31import threading 

32import time 

33from enum import Enum 

34from typing import Optional, Callable, Any 

35 

36logger = logging.getLogger('hevolve_social') 

37 

38 

39class CircuitState(Enum): 

40 CLOSED = 'closed' 

41 OPEN = 'open' 

42 HALF_OPEN = 'half_open' 

43 

44 

45class CircuitBreaker: 

46 """Thread-safe circuit breaker with configurable threshold and cooldown.""" 

47 

48 def __init__(self, name: str = 'default', threshold: int = 5, 

49 cooldown: float = 60.0): 

50 self.name = name 

51 self.threshold = threshold 

52 self.cooldown = cooldown 

53 self._failures = 0 

54 self._opened_at = 0.0 

55 self._lock = threading.Lock() 

56 self._half_open_in_flight = False 

57 

58 @property 

59 def state(self) -> CircuitState: 

60 with self._lock: 

61 return self._get_state() 

62 

63 def _get_state(self) -> CircuitState: 

64 """Internal state check (caller must hold lock).""" 

65 if self._failures < self.threshold: 

66 return CircuitState.CLOSED 

67 elapsed = time.time() - self._opened_at 

68 if elapsed > self.cooldown: 

69 return CircuitState.HALF_OPEN 

70 return CircuitState.OPEN 

71 

72 def is_open(self) -> bool: 

73 """Returns True if requests should be blocked.""" 

74 with self._lock: 

75 state = self._get_state() 

76 if state == CircuitState.CLOSED: 

77 return False 

78 if state == CircuitState.HALF_OPEN: 

79 if not self._half_open_in_flight: 

80 self._half_open_in_flight = True 

81 return False # Allow one probe 

82 return True # Block additional requests during probe 

83 return True # OPEN 

84 

85 def record_success(self): 

86 """Reset circuit breaker on successful call.""" 

87 with self._lock: 

88 self._failures = 0 

89 self._half_open_in_flight = False 

90 

91 def record_failure(self): 

92 """Record failure; open circuit at threshold.""" 

93 with self._lock: 

94 self._failures += 1 

95 self._half_open_in_flight = False 

96 if self._failures >= self.threshold: 

97 self._opened_at = time.time() 

98 logger.warning( 

99 f"[CircuitBreaker:{self.name}] OPEN after " 

100 f"{self._failures} failures. Cooldown {self.cooldown}s.") 

101 

102 def reset(self): 

103 """Manually reset the circuit breaker.""" 

104 with self._lock: 

105 self._failures = 0 

106 self._opened_at = 0.0 

107 self._half_open_in_flight = False 

108 

109 def get_stats(self) -> dict: 

110 with self._lock: 

111 return { 

112 'name': self.name, 

113 'state': self._get_state().value, 

114 'failures': self._failures, 

115 'threshold': self.threshold, 

116 'cooldown': self.cooldown, 

117 } 

118 

119 

120class PeerBackoff: 

121 """Exponential backoff tracker for unreachable peers/endpoints. 

122 

123 Used by GossipProtocol and FederatedAggregator to avoid hammering 

124 dead peers. Tracks per-key (next_retry_at, current_delay) tuples. 

125 

126 Usage: 

127 backoff = PeerBackoff(initial=10, maximum=300) 

128 if backoff.is_backed_off('http://peer:6777'): 

129 return # skip 

130 try: 

131 do_request() 

132 backoff.record_success('http://peer:6777') 

133 except ConnectionError: 

134 backoff.record_failure('http://peer:6777') 

135 """ 

136 

137 def __init__(self, initial: float = 10.0, maximum: float = 300.0): 

138 self.initial = initial 

139 self.maximum = maximum 

140 self._lock = threading.Lock() 

141 self._entries: dict = {} # key → (next_retry_at, current_delay) 

142 

143 def is_backed_off(self, key: str) -> bool: 

144 with self._lock: 

145 entry = self._entries.get(key) 

146 if not entry: 

147 return False 

148 return time.time() < entry[0] 

149 

150 def record_failure(self, key: str): 

151 with self._lock: 

152 entry = self._entries.get(key) 

153 if entry: 

154 new_delay = min(entry[1] * 2, self.maximum) 

155 else: 

156 new_delay = self.initial 

157 self._entries[key] = (time.time() + new_delay, new_delay) 

158 

159 def record_success(self, key: str): 

160 with self._lock: 

161 self._entries.pop(key, None) 

162 

163 def prune_expired(self): 

164 """Remove entries whose backoff period has elapsed.""" 

165 with self._lock: 

166 now = time.time() 

167 expired = [k for k, (retry_at, _) in self._entries.items() 

168 if now >= retry_at] 

169 for k in expired: 

170 del self._entries[k] 

171 

172 

173class CircuitBreakerOpenError(Exception): 

174 """Raised when a circuit breaker is open and blocking requests.""" 

175 def __init__(self, name: str): 

176 super().__init__(f"Circuit breaker '{name}' is open") 

177 self.breaker_name = name 

178 

179 

180def with_circuit_breaker(cb: CircuitBreaker, 

181 fallback: Optional[Callable] = None): 

182 """Decorator that wraps a function with circuit breaker protection. 

183 

184 Args: 

185 cb: CircuitBreaker instance 

186 fallback: Optional callable returning fallback value when circuit is open. 

187 If None, raises CircuitBreakerOpenError. 

188 """ 

189 def decorator(func): 

190 def wrapper(*args, **kwargs): 

191 if cb.is_open(): 

192 if fallback is not None: 

193 return fallback(*args, **kwargs) 

194 raise CircuitBreakerOpenError(cb.name) 

195 try: 

196 result = func(*args, **kwargs) 

197 cb.record_success() 

198 return result 

199 except Exception: 

200 cb.record_failure() 

201 raise 

202 wrapper.__name__ = func.__name__ 

203 wrapper.__doc__ = func.__doc__ 

204 return wrapper 

205 return decorator