Coverage for integrations / agent_engine / agent_baseline_service.py: 75.5%

368 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Agent Baseline Service - Unified Performance Snapshots 

3 

4Captures a composite snapshot of agent performance at creation time and 

5whenever recipe, prompt, or intelligence changes. Enables before/after 

6comparison for agent evolution tracking and CI/CD gating. 

7 

8All methods are fire-and-forget -- never raise into the main execution flow. 

9Snapshots stored at agent_data/baselines/{prompt_id}_{flow_id}/v{N}.json. 

10""" 

11import json 

12import logging 

13import os 

14import re 

15import subprocess 

16import sys 

17import threading 

18import time 

19from concurrent.futures import ThreadPoolExecutor 

20from pathlib import Path 

21from typing import Dict, List, Optional, Tuple 

22 

23logger = logging.getLogger('hevolve_social') 

24 

25try: 

26 from helper import PROMPTS_DIR 

27except ImportError: 

28 PROMPTS_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'prompts')) 

29 

30def _resolve_baseline_dir(): 

31 import sys as _sys 

32 db_path = os.environ.get('HEVOLVE_DB_PATH', '') 

33 if db_path and db_path != ':memory:' and os.path.isabs(db_path): 

34 return os.path.join(os.path.dirname(db_path), 'agent_data', 'baselines') 

35 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False): 

36 try: 

37 from core.platform_paths import get_agent_data_dir 

38 return os.path.join(get_agent_data_dir(), 'baselines') 

39 except ImportError: 

40 return os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'baselines') 

41 return os.path.join('agent_data', 'baselines') 

42 

43BASELINE_DIR = _resolve_baseline_dir() 

44 

45# Deduplication: skip recipe_change if creation was <60s ago for same agent 

46_recent_snapshots: Dict[str, float] = {} # key → timestamp 

47_recent_lock = threading.Lock() 

48_DEDUP_WINDOW_S = 60 

49_MAX_DEDUP_ENTRIES = 1000 # Cap to prevent memory leak 

50 

51_SAFE_ID_RE = re.compile(r'^[a-zA-Z0-9_-]+$') 

52 

53 

54def _sanitize_id(value: str) -> str: 

55 """Sanitize prompt_id/flow_id to prevent path traversal.""" 

56 s = str(value).strip() 

57 if not _SAFE_ID_RE.match(s): 

58 raise ValueError(f'Invalid identifier: {s!r}') 

59 return s 

60 

61 

62class AgentBaselineService: 

63 """Singleton service for unified agent performance snapshots.""" 

64 

65 def __init__(self): 

66 self._lock = threading.Lock() 

67 os.makedirs(BASELINE_DIR, exist_ok=True) 

68 

69 # ── Core Snapshot ──────────────────────────────────────────── 

70 

71 @staticmethod 

72 def capture_snapshot( 

73 prompt_id: str, 

74 flow_id: int, 

75 trigger: str, 

76 user_id: str = '', 

77 user_prompt: str = '', 

78 ) -> Optional[Dict]: 

79 """Capture a unified baseline snapshot. Fire-and-forget. 

80 

81 Args: 

82 prompt_id: Agent prompt identifier. 

83 flow_id: Flow index within the prompt. 

84 trigger: One of 'creation', 'recipe_change', 

85 'prompt_change', 'intelligence_change'. 

86 user_id: Owner user id (for trust/evolution lookup). 

87 user_prompt: Session key (for lightning lookup). 

88 Returns: 

89 The snapshot dict, or None on failure. 

90 """ 

91 try: 

92 prompt_id = _sanitize_id(prompt_id) 

93 flow_id = int(flow_id) 

94 agent_key = f'{prompt_id}_{flow_id}' 

95 

96 # Dedup: skip recipe_change if creation was recent 

97 with _recent_lock: 

98 # Evict stale entries to prevent unbounded memory growth 

99 if len(_recent_snapshots) > _MAX_DEDUP_ENTRIES: 

100 cutoff = time.time() - _DEDUP_WINDOW_S 

101 stale = [k for k, v in _recent_snapshots.items() if v < cutoff] 

102 for k in stale: 

103 del _recent_snapshots[k] 

104 

105 if trigger == 'recipe_change': 

106 last = _recent_snapshots.get(agent_key, 0) 

107 if time.time() - last < _DEDUP_WINDOW_S: 

108 logger.debug( 

109 f'Baseline dedup: skipping recipe_change for ' 

110 f'{agent_key} (creation <{_DEDUP_WINDOW_S}s ago)') 

111 return None 

112 if trigger == 'creation': 

113 _recent_snapshots[agent_key] = time.time() 

114 

115 version = AgentBaselineService._next_version(prompt_id, flow_id) 

116 

117 snapshot: Dict = { 

118 'version': version, 

119 'prompt_id': prompt_id, 

120 'flow_id': flow_id, 

121 'trigger': trigger, 

122 'timestamp': time.time(), 

123 'metadata': AgentBaselineService._build_metadata(trigger), 

124 'recipe_metrics': AgentBaselineService._collect_recipe_metrics( 

125 prompt_id, flow_id), 

126 'lightning_metrics': AgentBaselineService._collect_lightning_metrics( 

127 prompt_id, user_prompt), 

128 'benchmark_metrics': AgentBaselineService._collect_benchmark_metrics(), 

129 'world_model_metrics': AgentBaselineService._collect_world_model_metrics(), 

130 'trust_evolution_metrics': AgentBaselineService._collect_trust_evolution_metrics( 

131 user_id), 

132 } 

133 

134 # Persist 

135 agent_dir = os.path.join(BASELINE_DIR, agent_key) 

136 os.makedirs(agent_dir, exist_ok=True) 

137 fpath = os.path.join(agent_dir, f'v{version}.json') 

138 tmp = fpath + '.tmp' 

139 with open(tmp, 'w') as f: 

140 json.dump(snapshot, f, indent=2) 

141 os.replace(tmp, fpath) 

142 logger.info(f'Baseline v{version} captured for {agent_key} ' 

143 f'(trigger={trigger})') 

144 return snapshot 

145 

146 except Exception as e: 

147 logger.debug(f'Baseline capture failed: {e}') 

148 return None 

149 

150 # ── Metric Collectors ──────────────────────────────────────── 

151 

152 @staticmethod 

153 def _collect_recipe_metrics(prompt_id: str, flow_id: int) -> Dict: 

154 """Read recipe JSON and extract experience metrics.""" 

155 try: 

156 recipe_path = os.path.join(PROMPTS_DIR, f'{prompt_id}_{flow_id}_recipe.json') 

157 if not os.path.exists(recipe_path): 

158 return {} 

159 with open(recipe_path, 'r') as f: 

160 recipe = json.load(f) 

161 

162 actions = recipe.get('actions', []) 

163 meta = recipe.get('experience_meta', {}) 

164 per_action: Dict = {} 

165 total_dur = 0.0 

166 

167 for action in actions: 

168 aid = str(action.get('action_id', 0)) 

169 exp = action.get('experience', {}) 

170 avg_dur = exp.get('avg_duration_seconds', 0) 

171 total_dur += avg_dur 

172 per_action[aid] = { 

173 'avg_duration_seconds': avg_dur, 

174 'success_rate': exp.get('success_rate', 1.0), 

175 'run_count': exp.get('run_count', 0), 

176 'tool_stats': exp.get('tool_stats', {}), 

177 'dead_ends_count': len(exp.get('dead_ends', [])), 

178 'effective_fallbacks_count': len( 

179 exp.get('effective_fallbacks', [])), 

180 } 

181 

182 return { 

183 'action_count': len(actions), 

184 'total_expected_duration_seconds': round(total_dur, 2), 

185 'total_runs': meta.get('total_runs', 0), 

186 'bottleneck_action_id': meta.get('bottleneck_action_id'), 

187 'per_action': per_action, 

188 } 

189 except Exception as e: 

190 logger.debug(f'Recipe metric collection failed: {e}') 

191 return {} 

192 

193 @staticmethod 

194 def _collect_lightning_metrics(prompt_id: str, user_prompt: str) -> Dict: 

195 """Read Agent Lightning spans and compute aggregate metrics.""" 

196 try: 

197 from integrations.agent_lightning import is_enabled, LightningStore 

198 if not is_enabled(): 

199 return {} 

200 

201 agent_id = f'create_recipe_assistant_{user_prompt}' \ 

202 if user_prompt else f'create_recipe_assistant_{prompt_id}' 

203 store = LightningStore(agent_id, backend='json') 

204 spans = store.list_spans(limit=100, status='success') 

205 if not spans: 

206 spans = store.list_spans(limit=100) 

207 if not spans: 

208 return {} 

209 

210 rewards: List[float] = [] 

211 error_count = 0 

212 durations: List[float] = [] 

213 

214 for span in spans: 

215 if span.get('status') == 'error': 

216 error_count += 1 

217 dur = span.get('duration', 0) 

218 if dur: 

219 durations.append(dur) 

220 for event in span.get('events', []): 

221 if event.get('type') == 'reward': 

222 r = event.get('data', {}).get('reward', 0) 

223 rewards.append(r) 

224 

225 execution_count = len(spans) 

226 avg_reward = sum(rewards) / len(rewards) if rewards else 0.0 

227 

228 # Trend: compare first half vs second half 

229 trend = 'stable' 

230 if len(rewards) >= 10: 

231 mid = len(rewards) // 2 

232 first_half = sum(rewards[:mid]) / mid 

233 second_half = sum(rewards[mid:]) / (len(rewards) - mid) 

234 if second_half > first_half * 1.10: 

235 trend = 'improving' 

236 elif second_half < first_half * 0.90: 

237 trend = 'declining' 

238 

239 return { 

240 'avg_reward': round(avg_reward, 4), 

241 'total_reward': round(sum(rewards), 4), 

242 'reward_trend': trend, 

243 'execution_count': execution_count, 

244 'error_rate': round(error_count / max(1, execution_count), 3), 

245 'avg_duration_ms': round( 

246 (sum(durations) / len(durations) * 1000) 

247 if durations else 0, 1), 

248 } 

249 except Exception as e: 

250 logger.debug(f'Lightning metric collection failed: {e}') 

251 return {} 

252 

253 @staticmethod 

254 def _collect_benchmark_metrics() -> Dict: 

255 """Read latest benchmark results from registry.""" 

256 try: 

257 from integrations.agent_engine.benchmark_registry import ( 

258 get_benchmark_registry) 

259 registry = get_benchmark_registry() 

260 results = registry.get_latest_results() 

261 # Flatten to {adapter_name: {metric: value}} 

262 flat: Dict = {} 

263 for name, result in results.items(): 

264 metrics = result.get('metrics', {}) 

265 flat[name] = { 

266 k: v.get('value', 0) if isinstance(v, dict) else v 

267 for k, v in metrics.items() 

268 } 

269 return flat 

270 except Exception as e: 

271 logger.debug(f'Benchmark metric collection failed: {e}') 

272 return {} 

273 

274 @staticmethod 

275 def _collect_world_model_metrics() -> Dict: 

276 """Pull the latest provider stats from HevolveAI via the 

277 world_model_bridge. This is the closed-loop active-learning 

278 signal — prediction error, learning step count, runtime — 

279 emitted by the EmbodiedLearner provider as it consumes the 

280 experience queue (record_interaction / 

281 record_embodied_interaction / submit_output_feedback). 

282 

283 Field names verified against the sibling repo (do not invent): 

284 hevolveai/embodied_ai/inference/embodied_learner.py::EmbodiedLearner.get_stats 

285 returns this dict shape: 

286 

287 frames_processed: int 

288 learning_steps: int ← model maturity signal 

289 passive_steps: int 

290 active_steps: int 

291 static_frames_skipped: int 

292 source_switches: int 

293 total_actions: int 

294 random_actions: int 

295 model_driven_actions: int 

296 avg_prediction_error: float ← AL acquisition signal 

297 (lower = more confident, 

298 higher = system has more 

299 to learn from this domain) 

300 total_runtime_seconds: float 

301 mode, is_running, is_paused, active_source, 

302 last_learning_time, source_stats, self_state 

303 

304 EpistemicMetadata.uncertainty / confidence (in 

305 epistemic_response.py) exist but are per-INFERENCE results; 

306 get_stats() doesn't carry them. If a per-call uncertainty is 

307 needed later, it has to come through a separate surface. 

308 

309 Returns the raw stats dict in 'raw' plus two normalized 

310 promoted keys (so consumers don't all need to know the 

311 HevolveAI schema): 

312 

313 'al_signal' — avg_prediction_error in [0, 1], or None 

314 'learning_steps' — int count, or None 

315 

316 Empty dict on any failure (HARTOS-only deploy without 

317 HevolveAI, circuit breaker open, network down) — never 

318 raises, never blocks the snapshot. 

319 """ 

320 try: 

321 from integrations.agent_engine.world_model_bridge import ( 

322 get_world_model_bridge) 

323 bridge = get_world_model_bridge() 

324 if bridge is None: 

325 return {} 

326 feedback = bridge.get_learning_feedback() 

327 if not feedback or not isinstance(feedback, dict): 

328 return {} 

329 

330 normalized: Dict = {'raw': feedback} 

331 err = feedback.get('avg_prediction_error') 

332 if isinstance(err, (int, float)): 

333 normalized['al_signal'] = max(0.0, min(1.0, float(err))) 

334 steps = feedback.get('learning_steps') 

335 if isinstance(steps, int): 

336 normalized['learning_steps'] = steps 

337 return normalized 

338 except Exception as e: 

339 logger.debug(f'World-model metric collection failed: {e}') 

340 return {} 

341 

342 @staticmethod 

343 def _collect_trust_evolution_metrics(user_id: str) -> Dict: 

344 """Query social DB for trust and evolution data.""" 

345 if not user_id: 

346 return {} 

347 try: 

348 from integrations.social.models import get_db 

349 db = get_db() 

350 try: 

351 # Trust 

352 trust_data: Dict = {} 

353 try: 

354 from integrations.social.rating_service import RatingService 

355 ts = RatingService.get_trust_score(db, user_id) 

356 if ts: 

357 trust_data['composite_trust'] = ts.get( 

358 'composite_trust', 0) 

359 except Exception: 

360 pass 

361 

362 # Evolution 

363 evo_data: Dict = {} 

364 try: 

365 from integrations.social.agent_evolution_service import ( 

366 AgentEvolutionService) 

367 evo = AgentEvolutionService.get_evolution(db, user_id) 

368 if evo: 

369 evo_data = { 

370 'generation': evo.get('generation', 1), 

371 'specialization_path': evo.get( 

372 'specialization_path'), 

373 'spec_tier': evo.get('spec_tier'), 

374 'evolution_xp': evo.get('evolution_xp', 0), 

375 } 

376 except Exception: 

377 pass 

378 

379 return {**trust_data, **evo_data} 

380 finally: 

381 db.close() 

382 except Exception as e: 

383 logger.debug(f'Trust/evolution metric collection failed: {e}') 

384 return {} 

385 

386 @staticmethod 

387 def _build_metadata(trigger: str) -> Dict: 

388 """Compute snapshot metadata.""" 

389 meta: Dict = {'trigger': trigger} 

390 try: 

391 _kw = dict(capture_output=True, text=True, timeout=5) 

392 if hasattr(subprocess, 'CREATE_NO_WINDOW'): 

393 _kw['creationflags'] = subprocess.CREATE_NO_WINDOW 

394 result = subprocess.run( 

395 ['git', 'rev-parse', '--short', 'HEAD'], **_kw) 

396 if result.returncode == 0: 

397 meta['git_sha'] = result.stdout.strip() 

398 except Exception: 

399 pass 

400 try: 

401 from security.node_integrity import compute_code_hash 

402 meta['code_hash'] = compute_code_hash() 

403 except Exception: 

404 pass 

405 return meta 

406 

407 # ── Version Management ─────────────────────────────────────── 

408 

409 @staticmethod 

410 def _next_version(prompt_id: str, flow_id: int) -> int: 

411 """Scan baselines dir for existing versions, return N+1.""" 

412 prompt_id = _sanitize_id(prompt_id) 

413 agent_dir = os.path.join(BASELINE_DIR, f'{prompt_id}_{flow_id}') 

414 if not os.path.isdir(agent_dir): 

415 return 1 

416 existing = [] 

417 for fname in os.listdir(agent_dir): 

418 m = re.match(r'^v(\d+)\.json$', fname) 

419 if m: 

420 existing.append(int(m.group(1))) 

421 return max(existing) + 1 if existing else 1 

422 

423 @staticmethod 

424 def get_latest_snapshot( 

425 prompt_id: str, flow_id: int 

426 ) -> Optional[Dict]: 

427 """Load the most recent baseline snapshot.""" 

428 prompt_id = _sanitize_id(prompt_id) 

429 agent_dir = os.path.join(BASELINE_DIR, f'{prompt_id}_{flow_id}') 

430 if not os.path.isdir(agent_dir): 

431 return None 

432 versions = [] 

433 for fname in os.listdir(agent_dir): 

434 m = re.match(r'^v(\d+)\.json$', fname) 

435 if m: 

436 versions.append(int(m.group(1))) 

437 if not versions: 

438 return None 

439 latest = max(versions) 

440 return AgentBaselineService.get_snapshot( 

441 prompt_id, flow_id, latest) 

442 

443 @staticmethod 

444 def get_snapshot( 

445 prompt_id: str, flow_id: int, version: int 

446 ) -> Optional[Dict]: 

447 """Load a specific version.""" 

448 prompt_id = _sanitize_id(prompt_id) 

449 fpath = os.path.join( 

450 BASELINE_DIR, f'{prompt_id}_{flow_id}', f'v{version}.json') 

451 if not os.path.isfile(fpath): 

452 return None 

453 try: 

454 with open(fpath, 'r') as f: 

455 return json.load(f) 

456 except Exception: 

457 return None 

458 

459 @staticmethod 

460 def list_snapshots(prompt_id: str, flow_id: int) -> List[Dict]: 

461 """List all snapshot versions with summary metadata.""" 

462 prompt_id = _sanitize_id(prompt_id) 

463 agent_dir = os.path.join(BASELINE_DIR, f'{prompt_id}_{flow_id}') 

464 if not os.path.isdir(agent_dir): 

465 return [] 

466 results = [] 

467 for fname in sorted(os.listdir(agent_dir)): 

468 m = re.match(r'^v(\d+)\.json$', fname) 

469 if not m: 

470 continue 

471 try: 

472 with open(os.path.join(agent_dir, fname), 'r') as f: 

473 snap = json.load(f) 

474 results.append({ 

475 'version': snap.get('version'), 

476 'trigger': snap.get('trigger'), 

477 'timestamp': snap.get('timestamp'), 

478 }) 

479 except Exception: 

480 continue 

481 return results 

482 

483 # ── Comparison ─────────────────────────────────────────────── 

484 

485 @staticmethod 

486 def compare_snapshots( 

487 prompt_id: str, flow_id: int, 

488 old_version: int, new_version: int, 

489 ) -> Dict: 

490 """Compute deltas between two snapshots.""" 

491 old = AgentBaselineService.get_snapshot( 

492 prompt_id, flow_id, old_version) 

493 new = AgentBaselineService.get_snapshot( 

494 prompt_id, flow_id, new_version) 

495 if not old or not new: 

496 return {'error': 'snapshot not found'} 

497 

498 def _delta(o, n, key, direction='higher'): 

499 ov = o.get(key, 0) or 0 

500 nv = n.get(key, 0) or 0 

501 d = nv - ov 

502 improved = d > 0 if direction == 'higher' else d < 0 

503 return {'old': ov, 'new': nv, 'delta': round(d, 4), 

504 'improved': improved} 

505 

506 recipe_delta: Dict = {} 

507 or_ = old.get('recipe_metrics', {}) 

508 nr = new.get('recipe_metrics', {}) 

509 recipe_delta['action_count'] = _delta(or_, nr, 'action_count', 'higher') 

510 recipe_delta['total_duration'] = _delta( 

511 or_, nr, 'total_expected_duration_seconds', 'lower') 

512 recipe_delta['total_runs'] = _delta(or_, nr, 'total_runs', 'higher') 

513 

514 lightning_delta: Dict = {} 

515 ol = old.get('lightning_metrics', {}) 

516 nl = new.get('lightning_metrics', {}) 

517 lightning_delta['avg_reward'] = _delta(ol, nl, 'avg_reward', 'higher') 

518 lightning_delta['error_rate'] = _delta(ol, nl, 'error_rate', 'lower') 

519 lightning_delta['execution_count'] = _delta( 

520 ol, nl, 'execution_count', 'higher') 

521 

522 trust_delta: Dict = {} 

523 ot = old.get('trust_evolution_metrics', {}) 

524 nt = new.get('trust_evolution_metrics', {}) 

525 trust_delta['composite_trust'] = _delta( 

526 ot, nt, 'composite_trust', 'higher') 

527 trust_delta['generation'] = _delta(ot, nt, 'generation', 'higher') 

528 

529 return { 

530 'old_version': old_version, 

531 'new_version': new_version, 

532 'recipe_delta': recipe_delta, 

533 'lightning_delta': lightning_delta, 

534 'trust_delta': trust_delta, 

535 } 

536 

537 @staticmethod 

538 def compute_trend(prompt_id: str, flow_id: int) -> Dict: 

539 """Analyze all snapshots to determine improving/declining/stable.""" 

540 snapshots = AgentBaselineService.list_snapshots(prompt_id, flow_id) 

541 if len(snapshots) < 2: 

542 return {'trend': 'insufficient_data', 'snapshot_count': len(snapshots)} 

543 

544 # Load first and latest 

545 first = AgentBaselineService.get_snapshot( 

546 prompt_id, flow_id, snapshots[0]['version']) 

547 latest = AgentBaselineService.get_snapshot( 

548 prompt_id, flow_id, snapshots[-1]['version']) 

549 if not first or not latest: 

550 return {'trend': 'error'} 

551 

552 fr = first.get('lightning_metrics', {}).get('avg_reward', 0) or 0 

553 lr = latest.get('lightning_metrics', {}).get('avg_reward', 0) or 0 

554 

555 fd = first.get('recipe_metrics', {}).get( 

556 'total_expected_duration_seconds', 0) or 0 

557 ld = latest.get('recipe_metrics', {}).get( 

558 'total_expected_duration_seconds', 0) or 0 

559 

560 reward_trend = 'stable' 

561 if lr > fr * 1.10: 

562 reward_trend = 'improving' 

563 elif lr < fr * 0.90: 

564 reward_trend = 'declining' 

565 

566 duration_trend = 'stable' 

567 if fd > 0: 

568 if ld < fd * 0.90: 

569 duration_trend = 'improving' 

570 elif ld > fd * 1.10: 

571 duration_trend = 'declining' 

572 

573 return { 

574 'trend': reward_trend, 

575 'reward_trend': reward_trend, 

576 'duration_trend': duration_trend, 

577 'snapshot_count': len(snapshots), 

578 } 

579 

580 @staticmethod 

581 def validate_against_baseline( 

582 prompt_id: str, flow_id: int 

583 ) -> Dict: 

584 """Compare current metrics vs latest baseline. 

585 

586 Used by CI/CD (PRReviewService) to gate PR merges. 

587 Returns {passed: bool, regressions: []}. 

588 """ 

589 latest = AgentBaselineService.get_latest_snapshot(prompt_id, flow_id) 

590 if not latest: 

591 return {'passed': True, 'regressions': [], 

592 'reason': 'no baseline to compare'} 

593 

594 regressions: List[str] = [] 

595 

596 # Collect current metrics 

597 current_recipe = AgentBaselineService._collect_recipe_metrics( 

598 prompt_id, flow_id) 

599 current_bench = AgentBaselineService._collect_benchmark_metrics() 

600 

601 # Check recipe success rates 

602 baseline_actions = latest.get('recipe_metrics', {}).get( 

603 'per_action', {}) 

604 current_actions = current_recipe.get('per_action', {}) 

605 for aid, ba in baseline_actions.items(): 

606 ca = current_actions.get(aid, {}) 

607 old_sr = ba.get('success_rate', 1.0) 

608 new_sr = ca.get('success_rate', 1.0) 

609 if old_sr > 0 and new_sr < old_sr * 0.95: 

610 regressions.append( 

611 f'action_{aid}_success_rate: {old_sr:.3f} → {new_sr:.3f}') 

612 

613 # Check benchmark regression pass rate 

614 baseline_bench = latest.get('benchmark_metrics', {}) 

615 old_reg = baseline_bench.get('regression', {}) 

616 new_reg = current_bench.get('regression', {}) 

617 if isinstance(old_reg, dict) and isinstance(new_reg, dict): 

618 old_pr = old_reg.get('pass_rate', 1.0) 

619 new_pr = new_reg.get('pass_rate', 1.0) 

620 if old_pr > 0 and new_pr < old_pr * 0.95: 

621 regressions.append( 

622 f'regression_pass_rate: {old_pr:.3f} → {new_pr:.3f}') 

623 

624 return { 

625 'passed': len(regressions) == 0, 

626 'regressions': regressions, 

627 'baseline_version': latest.get('version'), 

628 } 

629 

630 

631# ── AgentBaselineAdapter for BenchmarkRegistry ─────────────── 

632 

633try: 

634 from integrations.agent_engine.benchmark_registry import BenchmarkAdapter 

635except ImportError: 

636 BenchmarkAdapter = object # graceful if benchmark_registry not available 

637 

638 

639class AgentBaselineAdapter(BenchmarkAdapter): 

640 """Benchmark adapter that reads agent baseline snapshots. 

641 Reports reward trends, success rate deltas, and duration improvements.""" 

642 

643 name = 'agent_baselines' 

644 tier = 'fast' 

645 

646 def run(self, api_url: str = '', **kwargs) -> Dict: 

647 metrics: Dict = {} 

648 baseline_dir = Path(BASELINE_DIR) 

649 if not baseline_dir.exists(): 

650 return {'metrics': metrics} 

651 

652 for agent_dir in baseline_dir.iterdir(): 

653 if not agent_dir.is_dir(): 

654 continue 

655 snapshots = sorted(agent_dir.glob('v*.json')) 

656 if len(snapshots) < 2: 

657 continue 

658 try: 

659 old = json.loads(snapshots[-2].read_text()) 

660 new = json.loads(snapshots[-1].read_text()) 

661 except Exception: 

662 continue 

663 

664 key = agent_dir.name 

665 

666 # Reward delta 

667 old_r = old.get('lightning_metrics', {}).get('avg_reward', 0) or 0 

668 new_r = new.get('lightning_metrics', {}).get('avg_reward', 0) or 0 

669 metrics[f'{key}_reward_delta'] = { 

670 'value': round(new_r - old_r, 4), 

671 'direction': 'higher', 'unit': 'score'} 

672 

673 # Success rate delta 

674 old_sr = _avg_success_rate( 

675 old.get('recipe_metrics', {}).get('per_action', {})) 

676 new_sr = _avg_success_rate( 

677 new.get('recipe_metrics', {}).get('per_action', {})) 

678 metrics[f'{key}_success_rate_delta'] = { 

679 'value': round(new_sr - old_sr, 4), 

680 'direction': 'higher', 'unit': 'ratio'} 

681 

682 # Duration improvement % 

683 old_d = old.get('recipe_metrics', {}).get( 

684 'total_expected_duration_seconds', 0) or 0 

685 new_d = new.get('recipe_metrics', {}).get( 

686 'total_expected_duration_seconds', 0) or 0 

687 improvement = ((old_d - new_d) / old_d * 100) if old_d > 0 else 0 

688 metrics[f'{key}_duration_improvement_pct'] = { 

689 'value': round(improvement, 2), 

690 'direction': 'higher', 'unit': '%'} 

691 

692 return {'metrics': metrics} 

693 

694 

695def _avg_success_rate(per_action: Dict) -> float: 

696 """Compute average success rate across actions.""" 

697 if not per_action: 

698 return 1.0 

699 rates = [ 

700 v.get('success_rate', 1.0) 

701 for v in per_action.values() 

702 if isinstance(v, dict) 

703 ] 

704 return sum(rates) / len(rates) if rates else 1.0 

705 

706 

707# ── Singleton ──────────────────────────────────────────────── 

708 

709_service: Optional[AgentBaselineService] = None 

710_service_lock = threading.Lock() 

711 

712 

713def get_baseline_service() -> AgentBaselineService: 

714 global _service 

715 if _service is None: 

716 with _service_lock: 

717 if _service is None: 

718 _service = AgentBaselineService() 

719 return _service 

720 

721 

722# ── Fire-and-forget async helper ───────────────────────────── 

723 

724_snapshot_executor: Optional[ThreadPoolExecutor] = None 

725 

726 

727def capture_baseline_async( 

728 prompt_id: str, 

729 flow_id: int, 

730 trigger: str, 

731 user_id: str = '', 

732 user_prompt: str = '', 

733): 

734 """Submit snapshot capture to a background thread. Never blocks caller.""" 

735 global _snapshot_executor 

736 if _snapshot_executor is None: 

737 _snapshot_executor = ThreadPoolExecutor( 

738 max_workers=1, thread_name_prefix='baseline_snap') 

739 try: 

740 _snapshot_executor.submit( 

741 AgentBaselineService.capture_snapshot, 

742 prompt_id, flow_id, trigger, user_id, user_prompt) 

743 except Exception: 

744 pass # fire-and-forget