Coverage for core / error_advice.py: 83.8%
68 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""core/error_advice.py — central error advice (AOP-style decorator).
3Single chokepoint for "something failed inside this function — what now?".
4Spring's `@ExceptionAdvice` analogue: wrap a function (or use the
5context-manager form), catch any exception, dispatch to:
7 1. Structured logging (always)
8 2. Sentry capture (if crash_reporter initialized — opt-in,
9 Nunba side; HARTOS server may also call init)
10 3. Agent remediation (opt-in via agent_remediation=True; creates
11 an AgentGoal that an autogen agent picks up
12 via the existing goal-seeding pipeline)
14Then re-raises by default so callers still see the failure — the
15advice is *additive* observation + remediation, not a try/except
16black hole.
18Why this lives in HARTOS / core: it composes the central agent goal
19machinery (integrations.agent_engine.goal_manager.GoalManager) plus
20the central crash reporter; both subsystems are HARTOS-owned, so
21the decorator that fans out to them is too. Nunba imports the
22decorator the same way it imports any other shared utility (HARTOS
23is bundled into the Nunba freeze via _deps/HARTOS).
25SRP: this module does ONE thing — fan an exception out to the
26central observability + remediation channels. No retry logic
27(callers own that), no recovery heuristics (deterministic recovery
28sits *inside* the wrapped function, e.g. tts.package_installer.
29_self_heal_missing_transitives), no policy decisions (severity +
30remediation flag are caller-supplied).
31"""
33from __future__ import annotations
35import functools
36import logging
37import threading
38import traceback
39from contextlib import contextmanager
40from typing import Any, Callable, Iterator, Optional
42logger = logging.getLogger('hevolve_error_advice')
44# In-process throttle keyed by (category, error-fingerprint). Stops a
45# loop that hits the same exception 10× per second from creating 10
46# agent goals + 10 Sentry events. TTL is generous — same exception
47# 5 minutes apart still counts as a fresh signal.
48_THROTTLE_LOCK = threading.Lock()
49_THROTTLE: dict[tuple[str, str], float] = {}
50_THROTTLE_TTL_SEC = 300.0
53def _fingerprint(exc: BaseException) -> str:
54 """Stable error fingerprint for throttling. Type + truncated msg.
55 Intentionally NOT including traceback — same logical error from
56 different call sites should still throttle as one."""
57 return f"{type(exc).__name__}:{str(exc)[:120]}"
60def _should_emit(category: str, exc: BaseException) -> bool:
61 """True iff this (category, error) hasn't been emitted recently."""
62 import time
63 key = (category, _fingerprint(exc))
64 now = time.monotonic()
65 with _THROTTLE_LOCK:
66 last = _THROTTLE.get(key)
67 if last is not None and (now - last) < _THROTTLE_TTL_SEC:
68 return False
69 _THROTTLE[key] = now
70 # Drop very old entries so the dict doesn't grow unbounded
71 cutoff = now - (_THROTTLE_TTL_SEC * 4)
72 for k, t in list(_THROTTLE.items()):
73 if t < cutoff:
74 del _THROTTLE[k]
75 return True
78def _try_sentry_capture(exc: BaseException, category: str, context: dict) -> None:
79 """Best-effort Sentry capture via the Nunba crash_reporter helper.
80 No-op when crash_reporter isn't importable (HARTOS server-only
81 deploys, dev mode without Sentry SDK)."""
82 try:
83 from desktop.crash_reporter import capture_exception # type: ignore
84 capture_exception(exc, category=category, **context)
85 except Exception:
86 # Never let observability break the actual failure path
87 pass
90def _try_agent_remediation(
91 category: str, exc: BaseException, context: dict, severity: str,
92) -> None:
93 """Best-effort: create an AgentGoal so an autogen agent can pick
94 up the failure and try further remediation beyond the deterministic
95 recovery the wrapped function may have already tried.
97 Uses the canonical GoalManager.create_goal pattern from
98 integrations.agent_engine.goal_seeding.auto_remediate_loopholes.
99 Throttled per (category, error-fingerprint) so the same failure
100 looping doesn't spawn N goals — one goal per failure shape per
101 5-minute window is enough for an agent to investigate.
103 No-op when GoalManager / DB session isn't reachable (Nunba dev
104 mode without a HARTOS DB; HARTOS server before init_db)."""
105 try:
106 from integrations.agent_engine.goal_manager import GoalManager # type: ignore
107 from integrations.social.models import db_session # type: ignore
108 except Exception:
109 return
111 try:
112 # goal_type='self_heal' is the registered builder
113 # (goal_manager.py:1011 — `register_goal_type('self_heal',
114 # _build_self_heal_prompt, tool_tags=['coding'])`). Routes
115 # to the local coding agent (Aider native backend) which has
116 # tools to read source, write minimal fixes, run tests, and
117 # iterate. The config keys below must match what
118 # _build_self_heal_prompt reads at goal_manager.py:836-856 —
119 # exc_type / source_module / source_function /
120 # occurrence_count / sample_traceback — otherwise the
121 # coding agent gets blank fields in its prompt.
122 tb_obj = getattr(exc, '__traceback__', None)
123 sample_tb = (
124 ''.join(traceback.format_tb(tb_obj)[-20:]) if tb_obj else ''
125 )
126 # Pull source module/function from the deepest frame for
127 # deterministic identification of the failing site.
128 source_module = ''
129 source_function = ''
130 if tb_obj is not None:
131 last_frame = traceback.extract_tb(tb_obj)[-1] if tb_obj else None
132 if last_frame is not None:
133 source_module = last_frame.filename or ''
134 source_function = last_frame.name or ''
136 with db_session() as db:
137 GoalManager.create_goal(
138 db,
139 goal_type='self_heal',
140 title=f"Self-heal: {category} ({type(exc).__name__})",
141 description=(
142 f"A {severity}-severity {category} failure escaped the "
143 f"deterministic recovery loop. Investigate and remediate.\n\n"
144 f"Error: {type(exc).__name__}: {exc}\n\n"
145 f"Context: {context}\n\n"
146 f"Last 20 frames:\n{sample_tb}"
147 ),
148 config={
149 # Keys read by _build_self_heal_prompt — DO NOT rename
150 'exc_type': type(exc).__name__,
151 'source_module': source_module,
152 'source_function': source_function,
153 'occurrence_count': 1, # throttle dedupes; this is
154 # the count for THIS goal
155 'sample_traceback': sample_tb,
156 # Additional context for downstream consumers /
157 # operator review — not read by the prompt builder
158 # but stored on the goal for debugging.
159 'category': category,
160 'severity': severity,
161 'error_message': str(exc)[:500],
162 'fingerprint': _fingerprint(exc),
163 'context': {k: str(v)[:200] for k, v in context.items()},
164 },
165 spark_budget=50,
166 created_by='error_advice',
167 )
168 except Exception as e:
169 # Never let the remediation path crash the failure path —
170 # but DO surface the failure at WARNING so an open self-heal
171 # loop is visible in production logs. Previously this was
172 # logger.debug, which hid silent breakage of the agentic
173 # remediation chain (e.g. db_session import fails in dev mode,
174 # GoalManager.create_goal raises on schema drift). The chain
175 # being broken is exactly the operational signal we need.
176 logger.warning(
177 f"[error_advice/{category}] agent remediation goal creation "
178 f"failed: {type(e).__name__}: {e}"
179 )
182def handle_exception(
183 exc: BaseException,
184 *,
185 category: str,
186 severity: str = 'medium',
187 agent_remediation: bool = False,
188 context: Optional[dict] = None,
189) -> None:
190 """Central dispatch for a caught exception. Used by the
191 @error_advice decorator and the with-block context manager; safe
192 to call directly from any handler that already caught its own
193 exception but wants the central fan-out (logging + Sentry +
194 agent goal).
196 severity is one of 'low' / 'medium' / 'high' / 'critical' — drives
197 the agent goal's spark_budget and the operator-side alerting.
198 """
199 ctx = dict(context or {})
200 if not _should_emit(category, exc):
201 # Throttled — log at debug only so we don't spam
202 logger.debug(
203 f"[error_advice/{category}] suppressed-by-throttle "
204 f"{type(exc).__name__}: {exc}"
205 )
206 return
208 logger.error(
209 f"[error_advice/{category}] {type(exc).__name__}: {exc}",
210 exc_info=exc,
211 extra={'category': category, 'severity': severity, **ctx},
212 )
213 _try_sentry_capture(exc, category, ctx)
214 if agent_remediation:
215 _try_agent_remediation(category, exc, ctx, severity)
218def error_advice(
219 category: str,
220 *,
221 severity: str = 'medium',
222 agent_remediation: bool = False,
223 reraise: bool = True,
224 context_extractor: Optional[Callable[..., dict]] = None,
225) -> Callable:
226 """Decorator: wrap a function so any uncaught exception inside it
227 fans out to the central error_advice dispatch.
229 Args:
230 category: Short string identifying the failure
231 domain (e.g. 'tts.install', 'tts.synth',
232 'llm.onboard', 'social.feed'). Drives
233 throttling + agent-goal categorization.
234 severity: 'low' | 'medium' | 'high' | 'critical'.
235 agent_remediation: True ⇒ create an AgentGoal for an autogen
236 agent to pick up beyond the deterministic
237 recovery the wrapped function tried.
238 reraise: True (default) ⇒ exception still bubbles
239 up after central dispatch (additive
240 observation). False ⇒ swallow + return
241 None (rare; only for true side-effect
242 functions where caller can't act on the
243 failure).
244 context_extractor: Optional `fn(*args, **kwargs) -> dict`
245 that pulls extra structured context
246 (e.g. backend name, user_id) out of the
247 wrapped call's args. Defaults to no
248 extraction.
250 Usage:
251 @error_advice('tts.install', agent_remediation=True)
252 def install_backend_packages(backend, ...):
253 ...
255 # Captures backend, user_id from kwargs:
256 @error_advice('tts.synth',
257 context_extractor=lambda *a, **kw:
258 {'backend': kw.get('backend'),
259 'user_id': kw.get('user_id')})
260 def synthesize(...):
261 ...
262 """
263 def _decorate(fn: Callable) -> Callable:
264 @functools.wraps(fn)
265 def _wrapper(*args, **kwargs):
266 try:
267 return fn(*args, **kwargs)
268 except Exception as exc: # noqa: BLE001 — the whole point
269 ctx: dict[str, Any] = {'function': fn.__qualname__}
270 if context_extractor is not None:
271 try:
272 ctx.update(context_extractor(*args, **kwargs))
273 except Exception:
274 pass
275 handle_exception(
276 exc,
277 category=category,
278 severity=severity,
279 agent_remediation=agent_remediation,
280 context=ctx,
281 )
282 if reraise:
283 raise
284 return None
285 return _wrapper
286 return _decorate
289@contextmanager
290def error_advice_block(
291 category: str,
292 *,
293 severity: str = 'medium',
294 agent_remediation: bool = False,
295 reraise: bool = True,
296 context: Optional[dict] = None,
297) -> Iterator[None]:
298 """Context-manager form for callers that want to wrap an inline
299 block instead of a whole function. Same semantics as the
300 decorator — try/except, central dispatch, reraise (or not)."""
301 try:
302 yield
303 except Exception as exc: # noqa: BLE001
304 handle_exception(
305 exc,
306 category=category,
307 severity=severity,
308 agent_remediation=agent_remediation,
309 context=context,
310 )
311 if reraise:
312 raise