Coverage for integrations / agent_engine / agent_baseline_service.py: 75.5%
368 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agent Baseline Service - Unified Performance Snapshots
4Captures a composite snapshot of agent performance at creation time and
5whenever recipe, prompt, or intelligence changes. Enables before/after
6comparison for agent evolution tracking and CI/CD gating.
8All methods are fire-and-forget -- never raise into the main execution flow.
9Snapshots stored at agent_data/baselines/{prompt_id}_{flow_id}/v{N}.json.
10"""
11import json
12import logging
13import os
14import re
15import subprocess
16import sys
17import threading
18import time
19from concurrent.futures import ThreadPoolExecutor
20from pathlib import Path
21from typing import Dict, List, Optional, Tuple
23logger = logging.getLogger('hevolve_social')
25try:
26 from helper import PROMPTS_DIR
27except ImportError:
28 PROMPTS_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'prompts'))
30def _resolve_baseline_dir():
31 import sys as _sys
32 db_path = os.environ.get('HEVOLVE_DB_PATH', '')
33 if db_path and db_path != ':memory:' and os.path.isabs(db_path):
34 return os.path.join(os.path.dirname(db_path), 'agent_data', 'baselines')
35 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False):
36 try:
37 from core.platform_paths import get_agent_data_dir
38 return os.path.join(get_agent_data_dir(), 'baselines')
39 except ImportError:
40 return os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'baselines')
41 return os.path.join('agent_data', 'baselines')
43BASELINE_DIR = _resolve_baseline_dir()
45# Deduplication: skip recipe_change if creation was <60s ago for same agent
46_recent_snapshots: Dict[str, float] = {} # key → timestamp
47_recent_lock = threading.Lock()
48_DEDUP_WINDOW_S = 60
49_MAX_DEDUP_ENTRIES = 1000 # Cap to prevent memory leak
51_SAFE_ID_RE = re.compile(r'^[a-zA-Z0-9_-]+$')
54def _sanitize_id(value: str) -> str:
55 """Sanitize prompt_id/flow_id to prevent path traversal."""
56 s = str(value).strip()
57 if not _SAFE_ID_RE.match(s):
58 raise ValueError(f'Invalid identifier: {s!r}')
59 return s
62class AgentBaselineService:
63 """Singleton service for unified agent performance snapshots."""
65 def __init__(self):
66 self._lock = threading.Lock()
67 os.makedirs(BASELINE_DIR, exist_ok=True)
69 # ── Core Snapshot ────────────────────────────────────────────
71 @staticmethod
72 def capture_snapshot(
73 prompt_id: str,
74 flow_id: int,
75 trigger: str,
76 user_id: str = '',
77 user_prompt: str = '',
78 ) -> Optional[Dict]:
79 """Capture a unified baseline snapshot. Fire-and-forget.
81 Args:
82 prompt_id: Agent prompt identifier.
83 flow_id: Flow index within the prompt.
84 trigger: One of 'creation', 'recipe_change',
85 'prompt_change', 'intelligence_change'.
86 user_id: Owner user id (for trust/evolution lookup).
87 user_prompt: Session key (for lightning lookup).
88 Returns:
89 The snapshot dict, or None on failure.
90 """
91 try:
92 prompt_id = _sanitize_id(prompt_id)
93 flow_id = int(flow_id)
94 agent_key = f'{prompt_id}_{flow_id}'
96 # Dedup: skip recipe_change if creation was recent
97 with _recent_lock:
98 # Evict stale entries to prevent unbounded memory growth
99 if len(_recent_snapshots) > _MAX_DEDUP_ENTRIES:
100 cutoff = time.time() - _DEDUP_WINDOW_S
101 stale = [k for k, v in _recent_snapshots.items() if v < cutoff]
102 for k in stale:
103 del _recent_snapshots[k]
105 if trigger == 'recipe_change':
106 last = _recent_snapshots.get(agent_key, 0)
107 if time.time() - last < _DEDUP_WINDOW_S:
108 logger.debug(
109 f'Baseline dedup: skipping recipe_change for '
110 f'{agent_key} (creation <{_DEDUP_WINDOW_S}s ago)')
111 return None
112 if trigger == 'creation':
113 _recent_snapshots[agent_key] = time.time()
115 version = AgentBaselineService._next_version(prompt_id, flow_id)
117 snapshot: Dict = {
118 'version': version,
119 'prompt_id': prompt_id,
120 'flow_id': flow_id,
121 'trigger': trigger,
122 'timestamp': time.time(),
123 'metadata': AgentBaselineService._build_metadata(trigger),
124 'recipe_metrics': AgentBaselineService._collect_recipe_metrics(
125 prompt_id, flow_id),
126 'lightning_metrics': AgentBaselineService._collect_lightning_metrics(
127 prompt_id, user_prompt),
128 'benchmark_metrics': AgentBaselineService._collect_benchmark_metrics(),
129 'world_model_metrics': AgentBaselineService._collect_world_model_metrics(),
130 'trust_evolution_metrics': AgentBaselineService._collect_trust_evolution_metrics(
131 user_id),
132 }
134 # Persist
135 agent_dir = os.path.join(BASELINE_DIR, agent_key)
136 os.makedirs(agent_dir, exist_ok=True)
137 fpath = os.path.join(agent_dir, f'v{version}.json')
138 tmp = fpath + '.tmp'
139 with open(tmp, 'w') as f:
140 json.dump(snapshot, f, indent=2)
141 os.replace(tmp, fpath)
142 logger.info(f'Baseline v{version} captured for {agent_key} '
143 f'(trigger={trigger})')
144 return snapshot
146 except Exception as e:
147 logger.debug(f'Baseline capture failed: {e}')
148 return None
150 # ── Metric Collectors ────────────────────────────────────────
152 @staticmethod
153 def _collect_recipe_metrics(prompt_id: str, flow_id: int) -> Dict:
154 """Read recipe JSON and extract experience metrics."""
155 try:
156 recipe_path = os.path.join(PROMPTS_DIR, f'{prompt_id}_{flow_id}_recipe.json')
157 if not os.path.exists(recipe_path):
158 return {}
159 with open(recipe_path, 'r') as f:
160 recipe = json.load(f)
162 actions = recipe.get('actions', [])
163 meta = recipe.get('experience_meta', {})
164 per_action: Dict = {}
165 total_dur = 0.0
167 for action in actions:
168 aid = str(action.get('action_id', 0))
169 exp = action.get('experience', {})
170 avg_dur = exp.get('avg_duration_seconds', 0)
171 total_dur += avg_dur
172 per_action[aid] = {
173 'avg_duration_seconds': avg_dur,
174 'success_rate': exp.get('success_rate', 1.0),
175 'run_count': exp.get('run_count', 0),
176 'tool_stats': exp.get('tool_stats', {}),
177 'dead_ends_count': len(exp.get('dead_ends', [])),
178 'effective_fallbacks_count': len(
179 exp.get('effective_fallbacks', [])),
180 }
182 return {
183 'action_count': len(actions),
184 'total_expected_duration_seconds': round(total_dur, 2),
185 'total_runs': meta.get('total_runs', 0),
186 'bottleneck_action_id': meta.get('bottleneck_action_id'),
187 'per_action': per_action,
188 }
189 except Exception as e:
190 logger.debug(f'Recipe metric collection failed: {e}')
191 return {}
193 @staticmethod
194 def _collect_lightning_metrics(prompt_id: str, user_prompt: str) -> Dict:
195 """Read Agent Lightning spans and compute aggregate metrics."""
196 try:
197 from integrations.agent_lightning import is_enabled, LightningStore
198 if not is_enabled():
199 return {}
201 agent_id = f'create_recipe_assistant_{user_prompt}' \
202 if user_prompt else f'create_recipe_assistant_{prompt_id}'
203 store = LightningStore(agent_id, backend='json')
204 spans = store.list_spans(limit=100, status='success')
205 if not spans:
206 spans = store.list_spans(limit=100)
207 if not spans:
208 return {}
210 rewards: List[float] = []
211 error_count = 0
212 durations: List[float] = []
214 for span in spans:
215 if span.get('status') == 'error':
216 error_count += 1
217 dur = span.get('duration', 0)
218 if dur:
219 durations.append(dur)
220 for event in span.get('events', []):
221 if event.get('type') == 'reward':
222 r = event.get('data', {}).get('reward', 0)
223 rewards.append(r)
225 execution_count = len(spans)
226 avg_reward = sum(rewards) / len(rewards) if rewards else 0.0
228 # Trend: compare first half vs second half
229 trend = 'stable'
230 if len(rewards) >= 10:
231 mid = len(rewards) // 2
232 first_half = sum(rewards[:mid]) / mid
233 second_half = sum(rewards[mid:]) / (len(rewards) - mid)
234 if second_half > first_half * 1.10:
235 trend = 'improving'
236 elif second_half < first_half * 0.90:
237 trend = 'declining'
239 return {
240 'avg_reward': round(avg_reward, 4),
241 'total_reward': round(sum(rewards), 4),
242 'reward_trend': trend,
243 'execution_count': execution_count,
244 'error_rate': round(error_count / max(1, execution_count), 3),
245 'avg_duration_ms': round(
246 (sum(durations) / len(durations) * 1000)
247 if durations else 0, 1),
248 }
249 except Exception as e:
250 logger.debug(f'Lightning metric collection failed: {e}')
251 return {}
253 @staticmethod
254 def _collect_benchmark_metrics() -> Dict:
255 """Read latest benchmark results from registry."""
256 try:
257 from integrations.agent_engine.benchmark_registry import (
258 get_benchmark_registry)
259 registry = get_benchmark_registry()
260 results = registry.get_latest_results()
261 # Flatten to {adapter_name: {metric: value}}
262 flat: Dict = {}
263 for name, result in results.items():
264 metrics = result.get('metrics', {})
265 flat[name] = {
266 k: v.get('value', 0) if isinstance(v, dict) else v
267 for k, v in metrics.items()
268 }
269 return flat
270 except Exception as e:
271 logger.debug(f'Benchmark metric collection failed: {e}')
272 return {}
274 @staticmethod
275 def _collect_world_model_metrics() -> Dict:
276 """Pull the latest provider stats from HevolveAI via the
277 world_model_bridge. This is the closed-loop active-learning
278 signal — prediction error, learning step count, runtime —
279 emitted by the EmbodiedLearner provider as it consumes the
280 experience queue (record_interaction /
281 record_embodied_interaction / submit_output_feedback).
283 Field names verified against the sibling repo (do not invent):
284 hevolveai/embodied_ai/inference/embodied_learner.py::EmbodiedLearner.get_stats
285 returns this dict shape:
287 frames_processed: int
288 learning_steps: int ← model maturity signal
289 passive_steps: int
290 active_steps: int
291 static_frames_skipped: int
292 source_switches: int
293 total_actions: int
294 random_actions: int
295 model_driven_actions: int
296 avg_prediction_error: float ← AL acquisition signal
297 (lower = more confident,
298 higher = system has more
299 to learn from this domain)
300 total_runtime_seconds: float
301 mode, is_running, is_paused, active_source,
302 last_learning_time, source_stats, self_state
304 EpistemicMetadata.uncertainty / confidence (in
305 epistemic_response.py) exist but are per-INFERENCE results;
306 get_stats() doesn't carry them. If a per-call uncertainty is
307 needed later, it has to come through a separate surface.
309 Returns the raw stats dict in 'raw' plus two normalized
310 promoted keys (so consumers don't all need to know the
311 HevolveAI schema):
313 'al_signal' — avg_prediction_error in [0, 1], or None
314 'learning_steps' — int count, or None
316 Empty dict on any failure (HARTOS-only deploy without
317 HevolveAI, circuit breaker open, network down) — never
318 raises, never blocks the snapshot.
319 """
320 try:
321 from integrations.agent_engine.world_model_bridge import (
322 get_world_model_bridge)
323 bridge = get_world_model_bridge()
324 if bridge is None:
325 return {}
326 feedback = bridge.get_learning_feedback()
327 if not feedback or not isinstance(feedback, dict):
328 return {}
330 normalized: Dict = {'raw': feedback}
331 err = feedback.get('avg_prediction_error')
332 if isinstance(err, (int, float)):
333 normalized['al_signal'] = max(0.0, min(1.0, float(err)))
334 steps = feedback.get('learning_steps')
335 if isinstance(steps, int):
336 normalized['learning_steps'] = steps
337 return normalized
338 except Exception as e:
339 logger.debug(f'World-model metric collection failed: {e}')
340 return {}
342 @staticmethod
343 def _collect_trust_evolution_metrics(user_id: str) -> Dict:
344 """Query social DB for trust and evolution data."""
345 if not user_id:
346 return {}
347 try:
348 from integrations.social.models import get_db
349 db = get_db()
350 try:
351 # Trust
352 trust_data: Dict = {}
353 try:
354 from integrations.social.rating_service import RatingService
355 ts = RatingService.get_trust_score(db, user_id)
356 if ts:
357 trust_data['composite_trust'] = ts.get(
358 'composite_trust', 0)
359 except Exception:
360 pass
362 # Evolution
363 evo_data: Dict = {}
364 try:
365 from integrations.social.agent_evolution_service import (
366 AgentEvolutionService)
367 evo = AgentEvolutionService.get_evolution(db, user_id)
368 if evo:
369 evo_data = {
370 'generation': evo.get('generation', 1),
371 'specialization_path': evo.get(
372 'specialization_path'),
373 'spec_tier': evo.get('spec_tier'),
374 'evolution_xp': evo.get('evolution_xp', 0),
375 }
376 except Exception:
377 pass
379 return {**trust_data, **evo_data}
380 finally:
381 db.close()
382 except Exception as e:
383 logger.debug(f'Trust/evolution metric collection failed: {e}')
384 return {}
386 @staticmethod
387 def _build_metadata(trigger: str) -> Dict:
388 """Compute snapshot metadata."""
389 meta: Dict = {'trigger': trigger}
390 try:
391 _kw = dict(capture_output=True, text=True, timeout=5)
392 if hasattr(subprocess, 'CREATE_NO_WINDOW'):
393 _kw['creationflags'] = subprocess.CREATE_NO_WINDOW
394 result = subprocess.run(
395 ['git', 'rev-parse', '--short', 'HEAD'], **_kw)
396 if result.returncode == 0:
397 meta['git_sha'] = result.stdout.strip()
398 except Exception:
399 pass
400 try:
401 from security.node_integrity import compute_code_hash
402 meta['code_hash'] = compute_code_hash()
403 except Exception:
404 pass
405 return meta
407 # ── Version Management ───────────────────────────────────────
409 @staticmethod
410 def _next_version(prompt_id: str, flow_id: int) -> int:
411 """Scan baselines dir for existing versions, return N+1."""
412 prompt_id = _sanitize_id(prompt_id)
413 agent_dir = os.path.join(BASELINE_DIR, f'{prompt_id}_{flow_id}')
414 if not os.path.isdir(agent_dir):
415 return 1
416 existing = []
417 for fname in os.listdir(agent_dir):
418 m = re.match(r'^v(\d+)\.json$', fname)
419 if m:
420 existing.append(int(m.group(1)))
421 return max(existing) + 1 if existing else 1
423 @staticmethod
424 def get_latest_snapshot(
425 prompt_id: str, flow_id: int
426 ) -> Optional[Dict]:
427 """Load the most recent baseline snapshot."""
428 prompt_id = _sanitize_id(prompt_id)
429 agent_dir = os.path.join(BASELINE_DIR, f'{prompt_id}_{flow_id}')
430 if not os.path.isdir(agent_dir):
431 return None
432 versions = []
433 for fname in os.listdir(agent_dir):
434 m = re.match(r'^v(\d+)\.json$', fname)
435 if m:
436 versions.append(int(m.group(1)))
437 if not versions:
438 return None
439 latest = max(versions)
440 return AgentBaselineService.get_snapshot(
441 prompt_id, flow_id, latest)
443 @staticmethod
444 def get_snapshot(
445 prompt_id: str, flow_id: int, version: int
446 ) -> Optional[Dict]:
447 """Load a specific version."""
448 prompt_id = _sanitize_id(prompt_id)
449 fpath = os.path.join(
450 BASELINE_DIR, f'{prompt_id}_{flow_id}', f'v{version}.json')
451 if not os.path.isfile(fpath):
452 return None
453 try:
454 with open(fpath, 'r') as f:
455 return json.load(f)
456 except Exception:
457 return None
459 @staticmethod
460 def list_snapshots(prompt_id: str, flow_id: int) -> List[Dict]:
461 """List all snapshot versions with summary metadata."""
462 prompt_id = _sanitize_id(prompt_id)
463 agent_dir = os.path.join(BASELINE_DIR, f'{prompt_id}_{flow_id}')
464 if not os.path.isdir(agent_dir):
465 return []
466 results = []
467 for fname in sorted(os.listdir(agent_dir)):
468 m = re.match(r'^v(\d+)\.json$', fname)
469 if not m:
470 continue
471 try:
472 with open(os.path.join(agent_dir, fname), 'r') as f:
473 snap = json.load(f)
474 results.append({
475 'version': snap.get('version'),
476 'trigger': snap.get('trigger'),
477 'timestamp': snap.get('timestamp'),
478 })
479 except Exception:
480 continue
481 return results
483 # ── Comparison ───────────────────────────────────────────────
485 @staticmethod
486 def compare_snapshots(
487 prompt_id: str, flow_id: int,
488 old_version: int, new_version: int,
489 ) -> Dict:
490 """Compute deltas between two snapshots."""
491 old = AgentBaselineService.get_snapshot(
492 prompt_id, flow_id, old_version)
493 new = AgentBaselineService.get_snapshot(
494 prompt_id, flow_id, new_version)
495 if not old or not new:
496 return {'error': 'snapshot not found'}
498 def _delta(o, n, key, direction='higher'):
499 ov = o.get(key, 0) or 0
500 nv = n.get(key, 0) or 0
501 d = nv - ov
502 improved = d > 0 if direction == 'higher' else d < 0
503 return {'old': ov, 'new': nv, 'delta': round(d, 4),
504 'improved': improved}
506 recipe_delta: Dict = {}
507 or_ = old.get('recipe_metrics', {})
508 nr = new.get('recipe_metrics', {})
509 recipe_delta['action_count'] = _delta(or_, nr, 'action_count', 'higher')
510 recipe_delta['total_duration'] = _delta(
511 or_, nr, 'total_expected_duration_seconds', 'lower')
512 recipe_delta['total_runs'] = _delta(or_, nr, 'total_runs', 'higher')
514 lightning_delta: Dict = {}
515 ol = old.get('lightning_metrics', {})
516 nl = new.get('lightning_metrics', {})
517 lightning_delta['avg_reward'] = _delta(ol, nl, 'avg_reward', 'higher')
518 lightning_delta['error_rate'] = _delta(ol, nl, 'error_rate', 'lower')
519 lightning_delta['execution_count'] = _delta(
520 ol, nl, 'execution_count', 'higher')
522 trust_delta: Dict = {}
523 ot = old.get('trust_evolution_metrics', {})
524 nt = new.get('trust_evolution_metrics', {})
525 trust_delta['composite_trust'] = _delta(
526 ot, nt, 'composite_trust', 'higher')
527 trust_delta['generation'] = _delta(ot, nt, 'generation', 'higher')
529 return {
530 'old_version': old_version,
531 'new_version': new_version,
532 'recipe_delta': recipe_delta,
533 'lightning_delta': lightning_delta,
534 'trust_delta': trust_delta,
535 }
537 @staticmethod
538 def compute_trend(prompt_id: str, flow_id: int) -> Dict:
539 """Analyze all snapshots to determine improving/declining/stable."""
540 snapshots = AgentBaselineService.list_snapshots(prompt_id, flow_id)
541 if len(snapshots) < 2:
542 return {'trend': 'insufficient_data', 'snapshot_count': len(snapshots)}
544 # Load first and latest
545 first = AgentBaselineService.get_snapshot(
546 prompt_id, flow_id, snapshots[0]['version'])
547 latest = AgentBaselineService.get_snapshot(
548 prompt_id, flow_id, snapshots[-1]['version'])
549 if not first or not latest:
550 return {'trend': 'error'}
552 fr = first.get('lightning_metrics', {}).get('avg_reward', 0) or 0
553 lr = latest.get('lightning_metrics', {}).get('avg_reward', 0) or 0
555 fd = first.get('recipe_metrics', {}).get(
556 'total_expected_duration_seconds', 0) or 0
557 ld = latest.get('recipe_metrics', {}).get(
558 'total_expected_duration_seconds', 0) or 0
560 reward_trend = 'stable'
561 if lr > fr * 1.10:
562 reward_trend = 'improving'
563 elif lr < fr * 0.90:
564 reward_trend = 'declining'
566 duration_trend = 'stable'
567 if fd > 0:
568 if ld < fd * 0.90:
569 duration_trend = 'improving'
570 elif ld > fd * 1.10:
571 duration_trend = 'declining'
573 return {
574 'trend': reward_trend,
575 'reward_trend': reward_trend,
576 'duration_trend': duration_trend,
577 'snapshot_count': len(snapshots),
578 }
580 @staticmethod
581 def validate_against_baseline(
582 prompt_id: str, flow_id: int
583 ) -> Dict:
584 """Compare current metrics vs latest baseline.
586 Used by CI/CD (PRReviewService) to gate PR merges.
587 Returns {passed: bool, regressions: []}.
588 """
589 latest = AgentBaselineService.get_latest_snapshot(prompt_id, flow_id)
590 if not latest:
591 return {'passed': True, 'regressions': [],
592 'reason': 'no baseline to compare'}
594 regressions: List[str] = []
596 # Collect current metrics
597 current_recipe = AgentBaselineService._collect_recipe_metrics(
598 prompt_id, flow_id)
599 current_bench = AgentBaselineService._collect_benchmark_metrics()
601 # Check recipe success rates
602 baseline_actions = latest.get('recipe_metrics', {}).get(
603 'per_action', {})
604 current_actions = current_recipe.get('per_action', {})
605 for aid, ba in baseline_actions.items():
606 ca = current_actions.get(aid, {})
607 old_sr = ba.get('success_rate', 1.0)
608 new_sr = ca.get('success_rate', 1.0)
609 if old_sr > 0 and new_sr < old_sr * 0.95:
610 regressions.append(
611 f'action_{aid}_success_rate: {old_sr:.3f} → {new_sr:.3f}')
613 # Check benchmark regression pass rate
614 baseline_bench = latest.get('benchmark_metrics', {})
615 old_reg = baseline_bench.get('regression', {})
616 new_reg = current_bench.get('regression', {})
617 if isinstance(old_reg, dict) and isinstance(new_reg, dict):
618 old_pr = old_reg.get('pass_rate', 1.0)
619 new_pr = new_reg.get('pass_rate', 1.0)
620 if old_pr > 0 and new_pr < old_pr * 0.95:
621 regressions.append(
622 f'regression_pass_rate: {old_pr:.3f} → {new_pr:.3f}')
624 return {
625 'passed': len(regressions) == 0,
626 'regressions': regressions,
627 'baseline_version': latest.get('version'),
628 }
631# ── AgentBaselineAdapter for BenchmarkRegistry ───────────────
633try:
634 from integrations.agent_engine.benchmark_registry import BenchmarkAdapter
635except ImportError:
636 BenchmarkAdapter = object # graceful if benchmark_registry not available
639class AgentBaselineAdapter(BenchmarkAdapter):
640 """Benchmark adapter that reads agent baseline snapshots.
641 Reports reward trends, success rate deltas, and duration improvements."""
643 name = 'agent_baselines'
644 tier = 'fast'
646 def run(self, api_url: str = '', **kwargs) -> Dict:
647 metrics: Dict = {}
648 baseline_dir = Path(BASELINE_DIR)
649 if not baseline_dir.exists():
650 return {'metrics': metrics}
652 for agent_dir in baseline_dir.iterdir():
653 if not agent_dir.is_dir():
654 continue
655 snapshots = sorted(agent_dir.glob('v*.json'))
656 if len(snapshots) < 2:
657 continue
658 try:
659 old = json.loads(snapshots[-2].read_text())
660 new = json.loads(snapshots[-1].read_text())
661 except Exception:
662 continue
664 key = agent_dir.name
666 # Reward delta
667 old_r = old.get('lightning_metrics', {}).get('avg_reward', 0) or 0
668 new_r = new.get('lightning_metrics', {}).get('avg_reward', 0) or 0
669 metrics[f'{key}_reward_delta'] = {
670 'value': round(new_r - old_r, 4),
671 'direction': 'higher', 'unit': 'score'}
673 # Success rate delta
674 old_sr = _avg_success_rate(
675 old.get('recipe_metrics', {}).get('per_action', {}))
676 new_sr = _avg_success_rate(
677 new.get('recipe_metrics', {}).get('per_action', {}))
678 metrics[f'{key}_success_rate_delta'] = {
679 'value': round(new_sr - old_sr, 4),
680 'direction': 'higher', 'unit': 'ratio'}
682 # Duration improvement %
683 old_d = old.get('recipe_metrics', {}).get(
684 'total_expected_duration_seconds', 0) or 0
685 new_d = new.get('recipe_metrics', {}).get(
686 'total_expected_duration_seconds', 0) or 0
687 improvement = ((old_d - new_d) / old_d * 100) if old_d > 0 else 0
688 metrics[f'{key}_duration_improvement_pct'] = {
689 'value': round(improvement, 2),
690 'direction': 'higher', 'unit': '%'}
692 return {'metrics': metrics}
695def _avg_success_rate(per_action: Dict) -> float:
696 """Compute average success rate across actions."""
697 if not per_action:
698 return 1.0
699 rates = [
700 v.get('success_rate', 1.0)
701 for v in per_action.values()
702 if isinstance(v, dict)
703 ]
704 return sum(rates) / len(rates) if rates else 1.0
707# ── Singleton ────────────────────────────────────────────────
709_service: Optional[AgentBaselineService] = None
710_service_lock = threading.Lock()
713def get_baseline_service() -> AgentBaselineService:
714 global _service
715 if _service is None:
716 with _service_lock:
717 if _service is None:
718 _service = AgentBaselineService()
719 return _service
722# ── Fire-and-forget async helper ─────────────────────────────
724_snapshot_executor: Optional[ThreadPoolExecutor] = None
727def capture_baseline_async(
728 prompt_id: str,
729 flow_id: int,
730 trigger: str,
731 user_id: str = '',
732 user_prompt: str = '',
733):
734 """Submit snapshot capture to a background thread. Never blocks caller."""
735 global _snapshot_executor
736 if _snapshot_executor is None:
737 _snapshot_executor = ThreadPoolExecutor(
738 max_workers=1, thread_name_prefix='baseline_snap')
739 try:
740 _snapshot_executor.submit(
741 AgentBaselineService.capture_snapshot,
742 prompt_id, flow_id, trigger, user_id, user_prompt)
743 except Exception:
744 pass # fire-and-forget