Coverage for integrations / agent_engine / rsi_trigger.py: 90.6%

139 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Realtime RSI trigger — usage-stat-driven autoresearch enqueue. 

2 

3Closes the cadence axis of the recursive self-improvement loop. The 

4baseline autoresearch pipeline only fires on a 6-hour cron + admin 

5button, which misses the realtime feedback that the AutoEconomy / 

6Hive principle demands: agents should iterate on the live signal 

7they just received, not on a nightly digest. 

8 

9Responsibilities — one per class (SRP): 

10 * UsageSignalTracker — rolling-window counter with cooldown; pure, 

11 thread-safe, no I/O. Independently testable. 

12 * RealtimeRSITrigger — glues the tracker to the EventBus and to a 

13 caller-injected enqueue callback. Fires enqueue on its own daemon 

14 thread so the EventBus dispatch is never blocked. 

15 

16Feature flag: ``HEVOLVE_RSI_REALTIME=1`` (off by default). The tracker 

17always runs because dashboards read its snapshot; the auto-enqueue leg 

18only wakes when the flag is set. The promoted candidate still passes 

19through RSI-1 (ConstitutionalFilter) + RSI-2 (AgentBaselineService 

20delta) inside ``autoevolve_code_tools.commit_improvement`` — this 

21module only opens the TRIGGER, not the SAFETY gate. 

22""" 

23import logging 

24import os 

25import threading 

26import time 

27from collections import defaultdict, deque 

28from typing import Callable, Deque, Dict, List, Optional, Tuple 

29 

30logger = logging.getLogger('hevolve.rsi_trigger') 

31 

32 

33# ── Defaults (tune via kwargs, not by editing) ───────────────── 

34_DEFAULT_WINDOW_S = 300 # 5 min rolling window 

35_DEFAULT_THRESHOLD = 3 # N signals in window → trigger 

36_DEFAULT_COOLDOWN_S = 1800 # 30 min per-key cooldown post-trigger 

37_MAX_KEYS = 500 # unbounded-dict cap 

38 

39_DEFAULT_TOPICS: Tuple[str, ...] = ( 

40 'tool.error', 

41 'tool.failed', 

42 'goal.failed', 

43 'agent.failure', 

44 'benchmark.regression', 

45) 

46 

47 

48def _flag_enabled() -> bool: 

49 return os.environ.get('HEVOLVE_RSI_REALTIME', '').lower() in ( 

50 '1', 'true', 'yes', 'on' 

51 ) 

52 

53 

54# ── Tracker (pure logic, no I/O) ─────────────────────────────── 

55 

56class UsageSignalTracker: 

57 """Thread-safe rolling-window failure counter with cooldowns. 

58 

59 One deque of timestamps per ``(signal_type, key)``. Timestamps 

60 older than ``window_s`` are pruned lazily on record/query. The 

61 cooldown suppresses re-triggering the same ``(signal, key)`` until 

62 ``cooldown_s`` has elapsed since ``mark_triggered``. 

63 """ 

64 

65 def __init__(self, window_s: int = _DEFAULT_WINDOW_S, 

66 threshold: int = _DEFAULT_THRESHOLD, 

67 cooldown_s: int = _DEFAULT_COOLDOWN_S, 

68 max_keys: int = _MAX_KEYS, 

69 now_fn: Callable[[], float] = time.time): 

70 self._window_s = max(1, int(window_s)) 

71 self._threshold = max(1, int(threshold)) 

72 self._cooldown_s = max(0, int(cooldown_s)) 

73 self._max_keys = max(1, int(max_keys)) 

74 self._now = now_fn 

75 self._lock = threading.Lock() 

76 self._events: Dict[Tuple[str, str], Deque[float]] = defaultdict(deque) 

77 self._last_trigger: Dict[Tuple[str, str], float] = {} 

78 

79 def record(self, signal_type: str, key: str, 

80 ts: Optional[float] = None) -> None: 

81 if not signal_type or not key: 

82 return 

83 t = self._now() if ts is None else float(ts) 

84 k = (str(signal_type), str(key)) 

85 with self._lock: 

86 dq = self._events[k] 

87 dq.append(t) 

88 self._prune_locked(dq, t) 

89 self._enforce_cap_locked() 

90 

91 def should_trigger(self, signal_type: str, key: str) -> bool: 

92 """True iff threshold crossed AND cooldown has elapsed.""" 

93 t = self._now() 

94 k = (str(signal_type), str(key)) 

95 with self._lock: 

96 dq = self._events.get(k) 

97 if not dq: 

98 return False 

99 self._prune_locked(dq, t) 

100 if len(dq) < self._threshold: 

101 return False 

102 last = self._last_trigger.get(k, 0.0) 

103 return (t - last) >= self._cooldown_s 

104 

105 def mark_triggered(self, signal_type: str, key: str) -> None: 

106 with self._lock: 

107 self._last_trigger[(str(signal_type), str(key))] = self._now() 

108 

109 def get_snapshot(self) -> Dict[str, Dict]: 

110 """Non-blocking-friendly view for dashboards.""" 

111 t = self._now() 

112 with self._lock: 

113 out: Dict[str, Dict] = {} 

114 for k, dq in list(self._events.items()): 

115 self._prune_locked(dq, t) 

116 out[f'{k[0]}:{k[1]}'] = { 

117 'count_in_window': len(dq), 

118 'last_ts': dq[-1] if dq else None, 

119 'last_triggered': self._last_trigger.get(k), 

120 } 

121 return out 

122 

123 # ── internal (lock-held) ── 

124 def _prune_locked(self, dq: Deque[float], t: float) -> None: 

125 cutoff = t - self._window_s 

126 while dq and dq[0] < cutoff: 

127 dq.popleft() 

128 

129 def _enforce_cap_locked(self) -> None: 

130 if len(self._events) <= self._max_keys: 

131 return 

132 empty_keys = [k for k, dq in self._events.items() if not dq] 

133 for k in empty_keys[: len(self._events) - self._max_keys]: 

134 self._events.pop(k, None) 

135 self._last_trigger.pop(k, None) 

136 if len(self._events) <= self._max_keys: 

137 return 

138 ranked = sorted( 

139 self._events.items(), 

140 key=lambda kv: (kv[1][-1] if kv[1] else 0.0), 

141 ) 

142 for k, _ in ranked[: len(self._events) - self._max_keys]: 

143 self._events.pop(k, None) 

144 self._last_trigger.pop(k, None) 

145 

146 

147# ── Glue class (I/O, threading, EventBus) ────────────────────── 

148 

149class RealtimeRSITrigger: 

150 """Binds a UsageSignalTracker to the platform EventBus. 

151 

152 ``enqueue_fn`` is dependency-injected so unit tests don't need the 

153 autoresearch infrastructure and so callers can route enqueues to a 

154 different backend (e.g. a goal queue, a distributed task). 

155 """ 

156 

157 def __init__(self, tracker: Optional[UsageSignalTracker] = None, 

158 enqueue_fn: Optional[Callable[[str, str], None]] = None): 

159 self._tracker = tracker or UsageSignalTracker() 

160 self._enqueue = enqueue_fn or _default_enqueue_autoresearch 

161 self._bound = False 

162 

163 @property 

164 def tracker(self) -> UsageSignalTracker: 

165 return self._tracker 

166 

167 def on_signal(self, signal_type: str, data) -> None: 

168 """EventBus callback — expects ``data`` dict with ``key`` / ``tool`` 

169 / ``goal_id`` field. Records the signal unconditionally (for 

170 dashboards) and only fires the enqueue leg when the feature 

171 flag is set AND the threshold is crossed AND cooldown elapsed. 

172 """ 

173 key = _extract_key(data) 

174 if not key: 

175 return 

176 self._tracker.record(signal_type, key) 

177 if not _flag_enabled(): 

178 return 

179 if not self._tracker.should_trigger(signal_type, key): 

180 return 

181 self._tracker.mark_triggered(signal_type, key) 

182 threading.Thread( 

183 target=self._safe_enqueue, 

184 args=(signal_type, key), 

185 daemon=True, 

186 name=f'rsi-enqueue-{key[:20]}', 

187 ).start() 

188 

189 def bind_to_bus(self, topics: Optional[List[str]] = None) -> bool: 

190 """Subscribe to EventBus topics. Idempotent. Returns True 

191 iff the bus was available and subscriptions were added.""" 

192 if self._bound: 

193 return True 

194 topics = list(topics) if topics is not None else list(_DEFAULT_TOPICS) 

195 bus = _get_event_bus() 

196 if bus is None: 

197 return False 

198 for topic in topics: 

199 try: 

200 bus.on(topic, lambda data, _t=topic: self.on_signal(_t, data)) 

201 except Exception as e: 

202 logger.debug('rsi_trigger: failed to bind %s (%s)', topic, e) 

203 self._bound = True 

204 logger.info('rsi_trigger: bound to %d EventBus topics', len(topics)) 

205 return True 

206 

207 # ── internal ── 

208 def _safe_enqueue(self, signal_type: str, key: str) -> None: 

209 try: 

210 self._enqueue(signal_type, key) 

211 logger.info('rsi_trigger: enqueued autoresearch for %s:%s', 

212 signal_type, key) 

213 except Exception as e: 

214 logger.warning( 

215 'rsi_trigger: enqueue failed for %s:%s (%s: %s)', 

216 signal_type, key, type(e).__name__, e, 

217 ) 

218 

219 

220# ── Helpers ──────────────────────────────────────────────────── 

221 

222def _extract_key(data) -> str: 

223 if isinstance(data, dict): 

224 for field in ('key', 'tool', 'tool_name', 'goal_id', 'benchmark'): 

225 v = data.get(field) 

226 if v: 

227 return str(v) 

228 elif isinstance(data, str): 

229 return data 

230 return '' 

231 

232 

233def _get_event_bus(): 

234 """Return the platform EventBus if registered, else None.""" 

235 try: 

236 from core.platform.registry import get_registry 

237 reg = get_registry() 

238 if not reg.has('events'): 

239 return None 

240 return reg.get('events') 

241 except Exception as e: 

242 logger.debug('rsi_trigger: registry unavailable (%s)', e) 

243 return None 

244 

245 

246def _default_enqueue_autoresearch(signal_type: str, key: str) -> None: 

247 """Default enqueue: log intent only. 

248 

249 A real enqueue needs a ``(repo_path, target_file, run_command)`` 

250 context that can't be derived from every signal payload — the 

251 orchestrator supplies that context by injecting its own 

252 ``enqueue_fn`` at construction. Without injection, the trigger 

253 arm still records and fires, but the enqueue leg is dark. This 

254 is deliberate: we open the trigger first, wire the real enqueue 

255 once the context-resolution code lands, so a bug in enqueue 

256 composition never silently bypasses the safety gates. 

257 """ 

258 logger.info( 

259 'rsi_trigger: threshold crossed for %s:%s — enqueue intent ' 

260 '(inject enqueue_fn to dispatch)', 

261 signal_type, key, 

262 ) 

263 

264 

265# ── Module singleton ─────────────────────────────────────────── 

266 

267_trigger: Optional[RealtimeRSITrigger] = None 

268_trigger_lock = threading.Lock() 

269 

270 

271def get_rsi_trigger() -> RealtimeRSITrigger: 

272 """Return the process-wide RealtimeRSITrigger.""" 

273 global _trigger 

274 if _trigger is None: 

275 with _trigger_lock: 

276 if _trigger is None: 

277 _trigger = RealtimeRSITrigger() 

278 return _trigger 

279 

280 

281def reset_for_tests() -> None: 

282 """Drop the singleton — for unit tests only.""" 

283 global _trigger 

284 with _trigger_lock: 

285 _trigger = None