Coverage for core / circuit_breaker.py: 97.2%
107 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Reusable Circuit Breaker — extracted from WorldModelBridge.
4Three states:
5- CLOSED: Requests flow normally. Failures increment counter.
6- OPEN: Requests blocked. Timer ticks down cooldown period.
7- HALF_OPEN: One probe request allowed. Success → CLOSED, failure → OPEN.
9Thread-safe via threading.Lock.
11Usage:
12 cb = CircuitBreaker(name='hevolve_core', threshold=5, cooldown=60)
14 if cb.is_open():
15 return fallback_response
17 try:
18 result = call_external_service()
19 cb.record_success()
20 return result
21 except Exception:
22 cb.record_failure()
23 raise
25Or use the decorator:
26 @with_circuit_breaker(cb)
27 def call_service():
28 ...
29"""
30import logging
31import threading
32import time
33from enum import Enum
34from typing import Optional, Callable, Any
36logger = logging.getLogger('hevolve_social')
39class CircuitState(Enum):
40 CLOSED = 'closed'
41 OPEN = 'open'
42 HALF_OPEN = 'half_open'
45class CircuitBreaker:
46 """Thread-safe circuit breaker with configurable threshold and cooldown."""
48 def __init__(self, name: str = 'default', threshold: int = 5,
49 cooldown: float = 60.0):
50 self.name = name
51 self.threshold = threshold
52 self.cooldown = cooldown
53 self._failures = 0
54 self._opened_at = 0.0
55 self._lock = threading.Lock()
56 self._half_open_in_flight = False
58 @property
59 def state(self) -> CircuitState:
60 with self._lock:
61 return self._get_state()
63 def _get_state(self) -> CircuitState:
64 """Internal state check (caller must hold lock)."""
65 if self._failures < self.threshold:
66 return CircuitState.CLOSED
67 elapsed = time.time() - self._opened_at
68 if elapsed > self.cooldown:
69 return CircuitState.HALF_OPEN
70 return CircuitState.OPEN
72 def is_open(self) -> bool:
73 """Returns True if requests should be blocked."""
74 with self._lock:
75 state = self._get_state()
76 if state == CircuitState.CLOSED:
77 return False
78 if state == CircuitState.HALF_OPEN:
79 if not self._half_open_in_flight:
80 self._half_open_in_flight = True
81 return False # Allow one probe
82 return True # Block additional requests during probe
83 return True # OPEN
85 def record_success(self):
86 """Reset circuit breaker on successful call."""
87 with self._lock:
88 self._failures = 0
89 self._half_open_in_flight = False
91 def record_failure(self):
92 """Record failure; open circuit at threshold."""
93 with self._lock:
94 self._failures += 1
95 self._half_open_in_flight = False
96 if self._failures >= self.threshold:
97 self._opened_at = time.time()
98 logger.warning(
99 f"[CircuitBreaker:{self.name}] OPEN after "
100 f"{self._failures} failures. Cooldown {self.cooldown}s.")
102 def reset(self):
103 """Manually reset the circuit breaker."""
104 with self._lock:
105 self._failures = 0
106 self._opened_at = 0.0
107 self._half_open_in_flight = False
109 def get_stats(self) -> dict:
110 with self._lock:
111 return {
112 'name': self.name,
113 'state': self._get_state().value,
114 'failures': self._failures,
115 'threshold': self.threshold,
116 'cooldown': self.cooldown,
117 }
120class PeerBackoff:
121 """Exponential backoff tracker for unreachable peers/endpoints.
123 Used by GossipProtocol and FederatedAggregator to avoid hammering
124 dead peers. Tracks per-key (next_retry_at, current_delay) tuples.
126 Usage:
127 backoff = PeerBackoff(initial=10, maximum=300)
128 if backoff.is_backed_off('http://peer:6777'):
129 return # skip
130 try:
131 do_request()
132 backoff.record_success('http://peer:6777')
133 except ConnectionError:
134 backoff.record_failure('http://peer:6777')
135 """
137 def __init__(self, initial: float = 10.0, maximum: float = 300.0):
138 self.initial = initial
139 self.maximum = maximum
140 self._lock = threading.Lock()
141 self._entries: dict = {} # key → (next_retry_at, current_delay)
143 def is_backed_off(self, key: str) -> bool:
144 with self._lock:
145 entry = self._entries.get(key)
146 if not entry:
147 return False
148 return time.time() < entry[0]
150 def record_failure(self, key: str):
151 with self._lock:
152 entry = self._entries.get(key)
153 if entry:
154 new_delay = min(entry[1] * 2, self.maximum)
155 else:
156 new_delay = self.initial
157 self._entries[key] = (time.time() + new_delay, new_delay)
159 def record_success(self, key: str):
160 with self._lock:
161 self._entries.pop(key, None)
163 def prune_expired(self):
164 """Remove entries whose backoff period has elapsed."""
165 with self._lock:
166 now = time.time()
167 expired = [k for k, (retry_at, _) in self._entries.items()
168 if now >= retry_at]
169 for k in expired:
170 del self._entries[k]
173class CircuitBreakerOpenError(Exception):
174 """Raised when a circuit breaker is open and blocking requests."""
175 def __init__(self, name: str):
176 super().__init__(f"Circuit breaker '{name}' is open")
177 self.breaker_name = name
180def with_circuit_breaker(cb: CircuitBreaker,
181 fallback: Optional[Callable] = None):
182 """Decorator that wraps a function with circuit breaker protection.
184 Args:
185 cb: CircuitBreaker instance
186 fallback: Optional callable returning fallback value when circuit is open.
187 If None, raises CircuitBreakerOpenError.
188 """
189 def decorator(func):
190 def wrapper(*args, **kwargs):
191 if cb.is_open():
192 if fallback is not None:
193 return fallback(*args, **kwargs)
194 raise CircuitBreakerOpenError(cb.name)
195 try:
196 result = func(*args, **kwargs)
197 cb.record_success()
198 return result
199 except Exception:
200 cb.record_failure()
201 raise
202 wrapper.__name__ = func.__name__
203 wrapper.__doc__ = func.__doc__
204 return wrapper
205 return decorator