Coverage for integrations / agent_engine / rsi_trigger.py: 90.6%
139 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Realtime RSI trigger — usage-stat-driven autoresearch enqueue.
3Closes the cadence axis of the recursive self-improvement loop. The
4baseline autoresearch pipeline only fires on a 6-hour cron + admin
5button, which misses the realtime feedback that the AutoEconomy /
6Hive principle demands: agents should iterate on the live signal
7they just received, not on a nightly digest.
9Responsibilities — one per class (SRP):
10 * UsageSignalTracker — rolling-window counter with cooldown; pure,
11 thread-safe, no I/O. Independently testable.
12 * RealtimeRSITrigger — glues the tracker to the EventBus and to a
13 caller-injected enqueue callback. Fires enqueue on its own daemon
14 thread so the EventBus dispatch is never blocked.
16Feature flag: ``HEVOLVE_RSI_REALTIME=1`` (off by default). The tracker
17always runs because dashboards read its snapshot; the auto-enqueue leg
18only wakes when the flag is set. The promoted candidate still passes
19through RSI-1 (ConstitutionalFilter) + RSI-2 (AgentBaselineService
20delta) inside ``autoevolve_code_tools.commit_improvement`` — this
21module only opens the TRIGGER, not the SAFETY gate.
22"""
23import logging
24import os
25import threading
26import time
27from collections import defaultdict, deque
28from typing import Callable, Deque, Dict, List, Optional, Tuple
30logger = logging.getLogger('hevolve.rsi_trigger')
33# ── Defaults (tune via kwargs, not by editing) ─────────────────
34_DEFAULT_WINDOW_S = 300 # 5 min rolling window
35_DEFAULT_THRESHOLD = 3 # N signals in window → trigger
36_DEFAULT_COOLDOWN_S = 1800 # 30 min per-key cooldown post-trigger
37_MAX_KEYS = 500 # unbounded-dict cap
39_DEFAULT_TOPICS: Tuple[str, ...] = (
40 'tool.error',
41 'tool.failed',
42 'goal.failed',
43 'agent.failure',
44 'benchmark.regression',
45)
48def _flag_enabled() -> bool:
49 return os.environ.get('HEVOLVE_RSI_REALTIME', '').lower() in (
50 '1', 'true', 'yes', 'on'
51 )
54# ── Tracker (pure logic, no I/O) ───────────────────────────────
56class UsageSignalTracker:
57 """Thread-safe rolling-window failure counter with cooldowns.
59 One deque of timestamps per ``(signal_type, key)``. Timestamps
60 older than ``window_s`` are pruned lazily on record/query. The
61 cooldown suppresses re-triggering the same ``(signal, key)`` until
62 ``cooldown_s`` has elapsed since ``mark_triggered``.
63 """
65 def __init__(self, window_s: int = _DEFAULT_WINDOW_S,
66 threshold: int = _DEFAULT_THRESHOLD,
67 cooldown_s: int = _DEFAULT_COOLDOWN_S,
68 max_keys: int = _MAX_KEYS,
69 now_fn: Callable[[], float] = time.time):
70 self._window_s = max(1, int(window_s))
71 self._threshold = max(1, int(threshold))
72 self._cooldown_s = max(0, int(cooldown_s))
73 self._max_keys = max(1, int(max_keys))
74 self._now = now_fn
75 self._lock = threading.Lock()
76 self._events: Dict[Tuple[str, str], Deque[float]] = defaultdict(deque)
77 self._last_trigger: Dict[Tuple[str, str], float] = {}
79 def record(self, signal_type: str, key: str,
80 ts: Optional[float] = None) -> None:
81 if not signal_type or not key:
82 return
83 t = self._now() if ts is None else float(ts)
84 k = (str(signal_type), str(key))
85 with self._lock:
86 dq = self._events[k]
87 dq.append(t)
88 self._prune_locked(dq, t)
89 self._enforce_cap_locked()
91 def should_trigger(self, signal_type: str, key: str) -> bool:
92 """True iff threshold crossed AND cooldown has elapsed."""
93 t = self._now()
94 k = (str(signal_type), str(key))
95 with self._lock:
96 dq = self._events.get(k)
97 if not dq:
98 return False
99 self._prune_locked(dq, t)
100 if len(dq) < self._threshold:
101 return False
102 last = self._last_trigger.get(k, 0.0)
103 return (t - last) >= self._cooldown_s
105 def mark_triggered(self, signal_type: str, key: str) -> None:
106 with self._lock:
107 self._last_trigger[(str(signal_type), str(key))] = self._now()
109 def get_snapshot(self) -> Dict[str, Dict]:
110 """Non-blocking-friendly view for dashboards."""
111 t = self._now()
112 with self._lock:
113 out: Dict[str, Dict] = {}
114 for k, dq in list(self._events.items()):
115 self._prune_locked(dq, t)
116 out[f'{k[0]}:{k[1]}'] = {
117 'count_in_window': len(dq),
118 'last_ts': dq[-1] if dq else None,
119 'last_triggered': self._last_trigger.get(k),
120 }
121 return out
123 # ── internal (lock-held) ──
124 def _prune_locked(self, dq: Deque[float], t: float) -> None:
125 cutoff = t - self._window_s
126 while dq and dq[0] < cutoff:
127 dq.popleft()
129 def _enforce_cap_locked(self) -> None:
130 if len(self._events) <= self._max_keys:
131 return
132 empty_keys = [k for k, dq in self._events.items() if not dq]
133 for k in empty_keys[: len(self._events) - self._max_keys]:
134 self._events.pop(k, None)
135 self._last_trigger.pop(k, None)
136 if len(self._events) <= self._max_keys:
137 return
138 ranked = sorted(
139 self._events.items(),
140 key=lambda kv: (kv[1][-1] if kv[1] else 0.0),
141 )
142 for k, _ in ranked[: len(self._events) - self._max_keys]:
143 self._events.pop(k, None)
144 self._last_trigger.pop(k, None)
147# ── Glue class (I/O, threading, EventBus) ──────────────────────
149class RealtimeRSITrigger:
150 """Binds a UsageSignalTracker to the platform EventBus.
152 ``enqueue_fn`` is dependency-injected so unit tests don't need the
153 autoresearch infrastructure and so callers can route enqueues to a
154 different backend (e.g. a goal queue, a distributed task).
155 """
157 def __init__(self, tracker: Optional[UsageSignalTracker] = None,
158 enqueue_fn: Optional[Callable[[str, str], None]] = None):
159 self._tracker = tracker or UsageSignalTracker()
160 self._enqueue = enqueue_fn or _default_enqueue_autoresearch
161 self._bound = False
163 @property
164 def tracker(self) -> UsageSignalTracker:
165 return self._tracker
167 def on_signal(self, signal_type: str, data) -> None:
168 """EventBus callback — expects ``data`` dict with ``key`` / ``tool``
169 / ``goal_id`` field. Records the signal unconditionally (for
170 dashboards) and only fires the enqueue leg when the feature
171 flag is set AND the threshold is crossed AND cooldown elapsed.
172 """
173 key = _extract_key(data)
174 if not key:
175 return
176 self._tracker.record(signal_type, key)
177 if not _flag_enabled():
178 return
179 if not self._tracker.should_trigger(signal_type, key):
180 return
181 self._tracker.mark_triggered(signal_type, key)
182 threading.Thread(
183 target=self._safe_enqueue,
184 args=(signal_type, key),
185 daemon=True,
186 name=f'rsi-enqueue-{key[:20]}',
187 ).start()
189 def bind_to_bus(self, topics: Optional[List[str]] = None) -> bool:
190 """Subscribe to EventBus topics. Idempotent. Returns True
191 iff the bus was available and subscriptions were added."""
192 if self._bound:
193 return True
194 topics = list(topics) if topics is not None else list(_DEFAULT_TOPICS)
195 bus = _get_event_bus()
196 if bus is None:
197 return False
198 for topic in topics:
199 try:
200 bus.on(topic, lambda data, _t=topic: self.on_signal(_t, data))
201 except Exception as e:
202 logger.debug('rsi_trigger: failed to bind %s (%s)', topic, e)
203 self._bound = True
204 logger.info('rsi_trigger: bound to %d EventBus topics', len(topics))
205 return True
207 # ── internal ──
208 def _safe_enqueue(self, signal_type: str, key: str) -> None:
209 try:
210 self._enqueue(signal_type, key)
211 logger.info('rsi_trigger: enqueued autoresearch for %s:%s',
212 signal_type, key)
213 except Exception as e:
214 logger.warning(
215 'rsi_trigger: enqueue failed for %s:%s (%s: %s)',
216 signal_type, key, type(e).__name__, e,
217 )
220# ── Helpers ────────────────────────────────────────────────────
222def _extract_key(data) -> str:
223 if isinstance(data, dict):
224 for field in ('key', 'tool', 'tool_name', 'goal_id', 'benchmark'):
225 v = data.get(field)
226 if v:
227 return str(v)
228 elif isinstance(data, str):
229 return data
230 return ''
233def _get_event_bus():
234 """Return the platform EventBus if registered, else None."""
235 try:
236 from core.platform.registry import get_registry
237 reg = get_registry()
238 if not reg.has('events'):
239 return None
240 return reg.get('events')
241 except Exception as e:
242 logger.debug('rsi_trigger: registry unavailable (%s)', e)
243 return None
246def _default_enqueue_autoresearch(signal_type: str, key: str) -> None:
247 """Default enqueue: log intent only.
249 A real enqueue needs a ``(repo_path, target_file, run_command)``
250 context that can't be derived from every signal payload — the
251 orchestrator supplies that context by injecting its own
252 ``enqueue_fn`` at construction. Without injection, the trigger
253 arm still records and fires, but the enqueue leg is dark. This
254 is deliberate: we open the trigger first, wire the real enqueue
255 once the context-resolution code lands, so a bug in enqueue
256 composition never silently bypasses the safety gates.
257 """
258 logger.info(
259 'rsi_trigger: threshold crossed for %s:%s — enqueue intent '
260 '(inject enqueue_fn to dispatch)',
261 signal_type, key,
262 )
265# ── Module singleton ───────────────────────────────────────────
267_trigger: Optional[RealtimeRSITrigger] = None
268_trigger_lock = threading.Lock()
271def get_rsi_trigger() -> RealtimeRSITrigger:
272 """Return the process-wide RealtimeRSITrigger."""
273 global _trigger
274 if _trigger is None:
275 with _trigger_lock:
276 if _trigger is None:
277 _trigger = RealtimeRSITrigger()
278 return _trigger
281def reset_for_tests() -> None:
282 """Drop the singleton — for unit tests only."""
283 global _trigger
284 with _trigger_lock:
285 _trigger = None