Coverage for core / error_advice.py: 83.8%

68 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""core/error_advice.py — central error advice (AOP-style decorator). 

2 

3Single chokepoint for "something failed inside this function — what now?". 

4Spring's `@ExceptionAdvice` analogue: wrap a function (or use the 

5context-manager form), catch any exception, dispatch to: 

6 

7 1. Structured logging (always) 

8 2. Sentry capture (if crash_reporter initialized — opt-in, 

9 Nunba side; HARTOS server may also call init) 

10 3. Agent remediation (opt-in via agent_remediation=True; creates 

11 an AgentGoal that an autogen agent picks up 

12 via the existing goal-seeding pipeline) 

13 

14Then re-raises by default so callers still see the failure — the 

15advice is *additive* observation + remediation, not a try/except 

16black hole. 

17 

18Why this lives in HARTOS / core: it composes the central agent goal 

19machinery (integrations.agent_engine.goal_manager.GoalManager) plus 

20the central crash reporter; both subsystems are HARTOS-owned, so 

21the decorator that fans out to them is too. Nunba imports the 

22decorator the same way it imports any other shared utility (HARTOS 

23is bundled into the Nunba freeze via _deps/HARTOS). 

24 

25SRP: this module does ONE thing — fan an exception out to the 

26central observability + remediation channels. No retry logic 

27(callers own that), no recovery heuristics (deterministic recovery 

28sits *inside* the wrapped function, e.g. tts.package_installer. 

29_self_heal_missing_transitives), no policy decisions (severity + 

30remediation flag are caller-supplied). 

31""" 

32 

33from __future__ import annotations 

34 

35import functools 

36import logging 

37import threading 

38import traceback 

39from contextlib import contextmanager 

40from typing import Any, Callable, Iterator, Optional 

41 

42logger = logging.getLogger('hevolve_error_advice') 

43 

44# In-process throttle keyed by (category, error-fingerprint). Stops a 

45# loop that hits the same exception 10× per second from creating 10 

46# agent goals + 10 Sentry events. TTL is generous — same exception 

47# 5 minutes apart still counts as a fresh signal. 

48_THROTTLE_LOCK = threading.Lock() 

49_THROTTLE: dict[tuple[str, str], float] = {} 

50_THROTTLE_TTL_SEC = 300.0 

51 

52 

53def _fingerprint(exc: BaseException) -> str: 

54 """Stable error fingerprint for throttling. Type + truncated msg. 

55 Intentionally NOT including traceback — same logical error from 

56 different call sites should still throttle as one.""" 

57 return f"{type(exc).__name__}:{str(exc)[:120]}" 

58 

59 

60def _should_emit(category: str, exc: BaseException) -> bool: 

61 """True iff this (category, error) hasn't been emitted recently.""" 

62 import time 

63 key = (category, _fingerprint(exc)) 

64 now = time.monotonic() 

65 with _THROTTLE_LOCK: 

66 last = _THROTTLE.get(key) 

67 if last is not None and (now - last) < _THROTTLE_TTL_SEC: 

68 return False 

69 _THROTTLE[key] = now 

70 # Drop very old entries so the dict doesn't grow unbounded 

71 cutoff = now - (_THROTTLE_TTL_SEC * 4) 

72 for k, t in list(_THROTTLE.items()): 

73 if t < cutoff: 

74 del _THROTTLE[k] 

75 return True 

76 

77 

78def _try_sentry_capture(exc: BaseException, category: str, context: dict) -> None: 

79 """Best-effort Sentry capture via the Nunba crash_reporter helper. 

80 No-op when crash_reporter isn't importable (HARTOS server-only 

81 deploys, dev mode without Sentry SDK).""" 

82 try: 

83 from desktop.crash_reporter import capture_exception # type: ignore 

84 capture_exception(exc, category=category, **context) 

85 except Exception: 

86 # Never let observability break the actual failure path 

87 pass 

88 

89 

90def _try_agent_remediation( 

91 category: str, exc: BaseException, context: dict, severity: str, 

92) -> None: 

93 """Best-effort: create an AgentGoal so an autogen agent can pick 

94 up the failure and try further remediation beyond the deterministic 

95 recovery the wrapped function may have already tried. 

96 

97 Uses the canonical GoalManager.create_goal pattern from 

98 integrations.agent_engine.goal_seeding.auto_remediate_loopholes. 

99 Throttled per (category, error-fingerprint) so the same failure 

100 looping doesn't spawn N goals — one goal per failure shape per 

101 5-minute window is enough for an agent to investigate. 

102 

103 No-op when GoalManager / DB session isn't reachable (Nunba dev 

104 mode without a HARTOS DB; HARTOS server before init_db).""" 

105 try: 

106 from integrations.agent_engine.goal_manager import GoalManager # type: ignore 

107 from integrations.social.models import db_session # type: ignore 

108 except Exception: 

109 return 

110 

111 try: 

112 # goal_type='self_heal' is the registered builder 

113 # (goal_manager.py:1011 — `register_goal_type('self_heal', 

114 # _build_self_heal_prompt, tool_tags=['coding'])`). Routes 

115 # to the local coding agent (Aider native backend) which has 

116 # tools to read source, write minimal fixes, run tests, and 

117 # iterate. The config keys below must match what 

118 # _build_self_heal_prompt reads at goal_manager.py:836-856 — 

119 # exc_type / source_module / source_function / 

120 # occurrence_count / sample_traceback — otherwise the 

121 # coding agent gets blank fields in its prompt. 

122 tb_obj = getattr(exc, '__traceback__', None) 

123 sample_tb = ( 

124 ''.join(traceback.format_tb(tb_obj)[-20:]) if tb_obj else '' 

125 ) 

126 # Pull source module/function from the deepest frame for 

127 # deterministic identification of the failing site. 

128 source_module = '' 

129 source_function = '' 

130 if tb_obj is not None: 

131 last_frame = traceback.extract_tb(tb_obj)[-1] if tb_obj else None 

132 if last_frame is not None: 

133 source_module = last_frame.filename or '' 

134 source_function = last_frame.name or '' 

135 

136 with db_session() as db: 

137 GoalManager.create_goal( 

138 db, 

139 goal_type='self_heal', 

140 title=f"Self-heal: {category} ({type(exc).__name__})", 

141 description=( 

142 f"A {severity}-severity {category} failure escaped the " 

143 f"deterministic recovery loop. Investigate and remediate.\n\n" 

144 f"Error: {type(exc).__name__}: {exc}\n\n" 

145 f"Context: {context}\n\n" 

146 f"Last 20 frames:\n{sample_tb}" 

147 ), 

148 config={ 

149 # Keys read by _build_self_heal_prompt — DO NOT rename 

150 'exc_type': type(exc).__name__, 

151 'source_module': source_module, 

152 'source_function': source_function, 

153 'occurrence_count': 1, # throttle dedupes; this is 

154 # the count for THIS goal 

155 'sample_traceback': sample_tb, 

156 # Additional context for downstream consumers / 

157 # operator review — not read by the prompt builder 

158 # but stored on the goal for debugging. 

159 'category': category, 

160 'severity': severity, 

161 'error_message': str(exc)[:500], 

162 'fingerprint': _fingerprint(exc), 

163 'context': {k: str(v)[:200] for k, v in context.items()}, 

164 }, 

165 spark_budget=50, 

166 created_by='error_advice', 

167 ) 

168 except Exception as e: 

169 # Never let the remediation path crash the failure path — 

170 # but DO surface the failure at WARNING so an open self-heal 

171 # loop is visible in production logs. Previously this was 

172 # logger.debug, which hid silent breakage of the agentic 

173 # remediation chain (e.g. db_session import fails in dev mode, 

174 # GoalManager.create_goal raises on schema drift). The chain 

175 # being broken is exactly the operational signal we need. 

176 logger.warning( 

177 f"[error_advice/{category}] agent remediation goal creation " 

178 f"failed: {type(e).__name__}: {e}" 

179 ) 

180 

181 

182def handle_exception( 

183 exc: BaseException, 

184 *, 

185 category: str, 

186 severity: str = 'medium', 

187 agent_remediation: bool = False, 

188 context: Optional[dict] = None, 

189) -> None: 

190 """Central dispatch for a caught exception. Used by the 

191 @error_advice decorator and the with-block context manager; safe 

192 to call directly from any handler that already caught its own 

193 exception but wants the central fan-out (logging + Sentry + 

194 agent goal). 

195 

196 severity is one of 'low' / 'medium' / 'high' / 'critical' — drives 

197 the agent goal's spark_budget and the operator-side alerting. 

198 """ 

199 ctx = dict(context or {}) 

200 if not _should_emit(category, exc): 

201 # Throttled — log at debug only so we don't spam 

202 logger.debug( 

203 f"[error_advice/{category}] suppressed-by-throttle " 

204 f"{type(exc).__name__}: {exc}" 

205 ) 

206 return 

207 

208 logger.error( 

209 f"[error_advice/{category}] {type(exc).__name__}: {exc}", 

210 exc_info=exc, 

211 extra={'category': category, 'severity': severity, **ctx}, 

212 ) 

213 _try_sentry_capture(exc, category, ctx) 

214 if agent_remediation: 

215 _try_agent_remediation(category, exc, ctx, severity) 

216 

217 

218def error_advice( 

219 category: str, 

220 *, 

221 severity: str = 'medium', 

222 agent_remediation: bool = False, 

223 reraise: bool = True, 

224 context_extractor: Optional[Callable[..., dict]] = None, 

225) -> Callable: 

226 """Decorator: wrap a function so any uncaught exception inside it 

227 fans out to the central error_advice dispatch. 

228 

229 Args: 

230 category: Short string identifying the failure 

231 domain (e.g. 'tts.install', 'tts.synth', 

232 'llm.onboard', 'social.feed'). Drives 

233 throttling + agent-goal categorization. 

234 severity: 'low' | 'medium' | 'high' | 'critical'. 

235 agent_remediation: True ⇒ create an AgentGoal for an autogen 

236 agent to pick up beyond the deterministic 

237 recovery the wrapped function tried. 

238 reraise: True (default) ⇒ exception still bubbles 

239 up after central dispatch (additive 

240 observation). False ⇒ swallow + return 

241 None (rare; only for true side-effect 

242 functions where caller can't act on the 

243 failure). 

244 context_extractor: Optional `fn(*args, **kwargs) -> dict` 

245 that pulls extra structured context 

246 (e.g. backend name, user_id) out of the 

247 wrapped call's args. Defaults to no 

248 extraction. 

249 

250 Usage: 

251 @error_advice('tts.install', agent_remediation=True) 

252 def install_backend_packages(backend, ...): 

253 ... 

254 

255 # Captures backend, user_id from kwargs: 

256 @error_advice('tts.synth', 

257 context_extractor=lambda *a, **kw: 

258 {'backend': kw.get('backend'), 

259 'user_id': kw.get('user_id')}) 

260 def synthesize(...): 

261 ... 

262 """ 

263 def _decorate(fn: Callable) -> Callable: 

264 @functools.wraps(fn) 

265 def _wrapper(*args, **kwargs): 

266 try: 

267 return fn(*args, **kwargs) 

268 except Exception as exc: # noqa: BLE001 — the whole point 

269 ctx: dict[str, Any] = {'function': fn.__qualname__} 

270 if context_extractor is not None: 

271 try: 

272 ctx.update(context_extractor(*args, **kwargs)) 

273 except Exception: 

274 pass 

275 handle_exception( 

276 exc, 

277 category=category, 

278 severity=severity, 

279 agent_remediation=agent_remediation, 

280 context=ctx, 

281 ) 

282 if reraise: 

283 raise 

284 return None 

285 return _wrapper 

286 return _decorate 

287 

288 

289@contextmanager 

290def error_advice_block( 

291 category: str, 

292 *, 

293 severity: str = 'medium', 

294 agent_remediation: bool = False, 

295 reraise: bool = True, 

296 context: Optional[dict] = None, 

297) -> Iterator[None]: 

298 """Context-manager form for callers that want to wrap an inline 

299 block instead of a whole function. Same semantics as the 

300 decorator — try/except, central dispatch, reraise (or not).""" 

301 try: 

302 yield 

303 except Exception as exc: # noqa: BLE001 

304 handle_exception( 

305 exc, 

306 category=category, 

307 severity=severity, 

308 agent_remediation=agent_remediation, 

309 context=context, 

310 ) 

311 if reraise: 

312 raise