Coverage for integrations / agent_engine / benchmark_registry.py: 63.1%
268 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Unified Agent Goal Engine - Dynamic Benchmark Registry
4Benchmarks are adapters that wrap measurement suites. Built-in adapters
5reuse existing HevolveAI code. Dynamic adapters are installed by the
6coding agent at regional compute-heavy nodes via RuntimeToolManager pattern.
8Snapshots stored at agent_data/benchmarks/{version}.json.
9"""
10import json
11import logging
12import os
13import subprocess
14import sys
15import threading
17# Windows: suppress console windows for all subprocess calls
18_SUBPROCESS_KW = {}
19if sys.platform == 'win32':
20 _SUBPROCESS_KW['creationflags'] = subprocess.CREATE_NO_WINDOW
21import time
22from pathlib import Path
23from typing import Dict, List, Optional, Tuple
25logger = logging.getLogger('hevolve_social')
27def _resolve_benchmark_dir():
28 import sys as _sys
29 db_path = os.environ.get('HEVOLVE_DB_PATH', '')
30 if db_path and db_path != ':memory:' and os.path.isabs(db_path):
31 return os.path.join(os.path.dirname(db_path), 'agent_data', 'benchmarks')
32 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False):
33 try:
34 from core.platform_paths import get_agent_data_dir
35 return os.path.join(get_agent_data_dir(), 'benchmarks')
36 except ImportError:
37 return os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'benchmarks')
38 return os.path.join('agent_data', 'benchmarks')
40BENCHMARK_DIR = _resolve_benchmark_dir()
43class BenchmarkAdapter:
44 """Base class for benchmark adapters."""
46 name: str = ''
47 source: str = 'builtin' # 'builtin' | 'git' | 'pip'
48 repo_url: str = ''
49 requires_gpu: bool = False
50 min_vram_gb: float = 0.0
51 tier: str = 'fast' # 'fast' | 'heavy'
53 def run(self, api_url: str = '', **kwargs) -> Dict:
54 """Run benchmark. Return {metrics: {name: {value, direction, unit}}}."""
55 raise NotImplementedError
57 def is_available(self) -> bool:
58 """Check if dependencies are installed."""
59 return True
61 def install(self) -> bool:
62 """Install dependencies. Return True on success."""
63 return True
66class ModelRegistryAdapter(BenchmarkAdapter):
67 """Benchmark via ModelRegistry: per-model latency, accuracy, cost."""
68 name = 'model_registry'
69 tier = 'fast'
71 def run(self, api_url: str = '', **kwargs) -> Dict:
72 try:
73 from .model_registry import ModelRegistry
74 registry = ModelRegistry.get_instance()
75 models = registry.list_models()
76 metrics = {}
77 for m in models:
78 d = m.to_dict() if hasattr(m, 'to_dict') else m
79 mid = d.get('model_id', 'unknown')
80 metrics[f'{mid}_latency_ms'] = {
81 'value': d.get('avg_latency_ms', 0),
82 'direction': 'lower', 'unit': 'ms'}
83 metrics[f'{mid}_accuracy'] = {
84 'value': d.get('accuracy_score', 0),
85 'direction': 'higher', 'unit': 'score'}
86 return {'metrics': metrics}
87 except Exception as e:
88 return {'metrics': {}, 'error': str(e)}
91class WorldModelAdapter(BenchmarkAdapter):
92 """Benchmark via WorldModelBridge stats."""
93 name = 'world_model'
94 tier = 'fast'
96 def run(self, api_url: str = '', **kwargs) -> Dict:
97 try:
98 from .world_model_bridge import get_world_model_bridge
99 bridge = get_world_model_bridge()
100 stats = bridge.get_stats()
101 return {'metrics': {
102 'flush_rate': {
103 'value': stats.get('total_flushed', 0) / max(1, stats.get('total_recorded', 1)),
104 'direction': 'higher', 'unit': 'ratio'},
105 'correction_density': {
106 'value': stats.get('total_corrections', 0),
107 'direction': 'higher', 'unit': 'count'},
108 'hivemind_queries': {
109 'value': stats.get('total_hivemind_queries', 0),
110 'direction': 'higher', 'unit': 'count'},
111 }}
112 except Exception as e:
113 return {'metrics': {}, 'error': str(e)}
116class RegressionAdapter(BenchmarkAdapter):
117 """Run pytest regression as a benchmark."""
118 name = 'regression'
119 tier = 'fast'
121 def run(self, api_url: str = '', **kwargs) -> Dict:
122 try:
123 python = os.environ.get(
124 'HEVOLVE_PYTHON',
125 os.path.join('venv310', 'Scripts', 'python.exe')
126 if sys.platform == 'win32' else
127 os.path.join('venv310', 'bin', 'python'))
128 result = subprocess.run(
129 [python, '-m', 'pytest', 'tests/', '-s',
130 '--ignore=tests/runtime_tests', '-q',
131 '--tb=no', '-k', 'not nested_task'],
132 capture_output=True, text=True, timeout=600,
133 cwd=os.environ.get('HEVOLVE_PROJECT_ROOT',
134 os.path.dirname(os.path.dirname(
135 os.path.dirname(__file__)))),
136 **_SUBPROCESS_KW
137 )
138 # Parse pytest output for pass/fail counts
139 output = result.stdout + result.stderr
140 passed = failed = 0
141 for line in output.split('\n'):
142 if 'passed' in line:
143 parts = line.split()
144 for i, p in enumerate(parts):
145 if p == 'passed' and i > 0:
146 try:
147 passed = int(parts[i - 1])
148 except ValueError:
149 pass
150 if p == 'failed' and i > 0:
151 try:
152 failed = int(parts[i - 1])
153 except ValueError:
154 pass
155 total = passed + failed
156 return {'metrics': {
157 'pass_rate': {
158 'value': passed / max(1, total),
159 'direction': 'higher', 'unit': 'ratio'},
160 'fail_count': {
161 'value': failed,
162 'direction': 'lower', 'unit': 'count'},
163 }}
164 except Exception as e:
165 return {'metrics': {}, 'error': str(e)}
168class GuardrailAdapter(BenchmarkAdapter):
169 """Verify guardrail integrity."""
170 name = 'guardrail'
171 tier = 'fast'
173 def run(self, api_url: str = '', **kwargs) -> Dict:
174 try:
175 from security.hive_guardrails import (
176 compute_guardrail_hash, verify_guardrail_integrity)
177 hash_val = compute_guardrail_hash()
178 integrity = verify_guardrail_integrity()
179 return {'metrics': {
180 'hash_match': {
181 'value': 1 if integrity else 0,
182 'direction': 'higher', 'unit': 'bool'},
183 'integrity_verified': {
184 'value': 1 if integrity else 0,
185 'direction': 'higher', 'unit': 'bool'},
186 }}
187 except Exception as e:
188 return {'metrics': {}, 'error': str(e)}
191class QuantiPhyAdapter(BenchmarkAdapter):
192 """QuantiPhy physics reasoning benchmark from HevolveAI."""
193 name = 'quantiphy'
194 source = 'builtin'
195 requires_gpu = True
196 min_vram_gb = 4.0
197 tier = 'heavy'
199 def is_available(self) -> bool:
200 try:
201 # Check if HevolveAI quantiphy benchmark exists
202 import importlib.util
203 spec = importlib.util.find_spec('hevolveai')
204 return spec is not None
205 except Exception:
206 return False
208 def run(self, api_url: str = '', **kwargs) -> Dict:
209 try:
210 from hevolveai.tests.benchmarks.quantiphy_benchmark import QuantiPhyBenchmark
211 bench = QuantiPhyBenchmark(api_url=api_url or 'http://localhost:8000')
212 results = bench.run_benchmark(
213 phase='baseline',
214 max_instances=kwargs.get('max_instances', 20))
215 mra = results.get('mra', {})
216 return {'metrics': {
217 'mra_mean': {
218 'value': mra.get('mean', 0),
219 'direction': 'higher', 'unit': 'score'},
220 'latency_p95_ms': {
221 'value': results.get('latency', {}).get('p95', 0),
222 'direction': 'lower', 'unit': 'ms'},
223 }}
224 except Exception as e:
225 return {'metrics': {}, 'error': str(e)}
228class EmbodiedValidationAdapter(BenchmarkAdapter):
229 """Embodied AI validation benchmark from HevolveAI."""
230 name = 'embodied_validation'
231 source = 'builtin'
232 requires_gpu = True
233 min_vram_gb = 2.0
234 tier = 'heavy'
236 def is_available(self) -> bool:
237 try:
238 import importlib.util
239 spec = importlib.util.find_spec('hevolveai')
240 return spec is not None
241 except Exception:
242 return False
244 def run(self, api_url: str = '', **kwargs) -> Dict:
245 try:
246 from hevolveai.embodied_ai.validation.benchmark import (
247 PerformanceBenchmark, ForgettingBenchmark, MemoryBenchmark)
248 # Run lightweight validation checks
249 metrics = {}
250 try:
251 perf = PerformanceBenchmark()
252 perf_result = perf.run()
253 metrics['mean_latency_ms'] = {
254 'value': perf_result.get('mean_latency_ms', 0),
255 'direction': 'lower', 'unit': 'ms'}
256 except Exception:
257 pass
258 try:
259 mem = MemoryBenchmark()
260 mem_result = mem.run()
261 metrics['ram_mb'] = {
262 'value': mem_result.get('ram_mb', 0),
263 'direction': 'lower', 'unit': 'MB'}
264 except Exception:
265 pass
266 return {'metrics': metrics}
267 except Exception as e:
268 return {'metrics': {}, 'error': str(e)}
271class QwenEncoderAdapter(BenchmarkAdapter):
272 """Qwen encoder throughput benchmark from HevolveAI."""
273 name = 'qwen_encoder'
274 source = 'builtin'
275 requires_gpu = True
276 min_vram_gb = 2.0
277 tier = 'fast'
279 def is_available(self) -> bool:
280 try:
281 import importlib.util
282 spec = importlib.util.find_spec('hevolveai')
283 return spec is not None
284 except Exception:
285 return False
287 def run(self, api_url: str = '', **kwargs) -> Dict:
288 try:
289 from hevolveai.embodied_ai.models.qwen_benchmark import (
290 benchmark_llamacpp)
291 from core.port_registry import get_local_llm_url
292 _llm_url = api_url or get_local_llm_url().replace('/v1', '')
293 result = benchmark_llamacpp(server_url=_llm_url)
294 return {'metrics': {
295 'tokens_per_second': {
296 'value': result.get('tokens_per_second', 0),
297 'direction': 'higher', 'unit': 'tok/s'},
298 }}
299 except Exception as e:
300 return {'metrics': {}, 'error': str(e)}
303class DynamicBenchmarkAdapter(BenchmarkAdapter):
304 """Adapter for dynamically installed benchmarks (git repos)."""
306 def __init__(self, name: str, repo_url: str,
307 requires_gpu: bool = False, min_vram_gb: float = 0.0,
308 run_command: str = '', metrics_file: str = ''):
309 self.name = name
310 self.source = 'git'
311 self.repo_url = repo_url
312 self.requires_gpu = requires_gpu
313 self.min_vram_gb = min_vram_gb
314 self.tier = 'heavy'
315 self._run_command = run_command
316 self._metrics_file = metrics_file
317 self._install_dir = os.path.join(
318 os.path.expanduser('~'), '.hevolve', 'benchmarks', name)
320 def is_available(self) -> bool:
321 return os.path.isdir(self._install_dir)
323 def install(self) -> bool:
324 try:
325 os.makedirs(os.path.dirname(self._install_dir), exist_ok=True)
326 if not os.path.isdir(self._install_dir):
327 subprocess.run(
328 ['git', 'clone', '--depth', '1', self.repo_url, self._install_dir],
329 check=True, timeout=120, **_SUBPROCESS_KW)
330 # Install requirements if present
331 req_file = os.path.join(self._install_dir, 'requirements.txt')
332 if os.path.isfile(req_file):
333 subprocess.run(
334 [sys.executable, '-m', 'pip', 'install', '-r', req_file, '-q'],
335 timeout=300, **_SUBPROCESS_KW)
336 return True
337 except Exception as e:
338 logger.debug(f"Benchmark install failed for {self.name}: {e}")
339 return False
341 def run(self, api_url: str = '', **kwargs) -> Dict:
342 if not self.is_available():
343 return {'metrics': {}, 'error': 'not installed'}
344 try:
345 if self._run_command:
346 # G7: use shlex.split to tokenize safely (no shell execution)
347 import shlex
348 cmd_list = shlex.split(self._run_command)
349 else:
350 cmd_list = [sys.executable, '-m', 'pytest', '--benchmark-json=results.json']
351 result = subprocess.run(
352 cmd_list, shell=False, capture_output=True, text=True,
353 timeout=600, cwd=self._install_dir, **_SUBPROCESS_KW)
354 # Try to parse metrics file
355 mf = os.path.join(self._install_dir, self._metrics_file or 'results.json')
356 if os.path.isfile(mf):
357 with open(mf) as f:
358 return {'metrics': json.load(f)}
359 return {'metrics': {'exit_code': {
360 'value': result.returncode, 'direction': 'lower', 'unit': 'code'}}}
361 except Exception as e:
362 return {'metrics': {}, 'error': str(e)}
365class BenchmarkRegistry:
366 """Dynamic benchmark registry. Singleton."""
368 def __init__(self):
369 self._lock = threading.Lock()
370 self._adapters: Dict[str, BenchmarkAdapter] = {}
371 self._latest_results: Dict[str, dict] = {}
372 self._register_builtins()
373 os.makedirs(BENCHMARK_DIR, exist_ok=True)
375 def _register_builtins(self):
376 for adapter_cls in [
377 ModelRegistryAdapter, WorldModelAdapter, RegressionAdapter,
378 GuardrailAdapter, QuantiPhyAdapter, EmbodiedValidationAdapter,
379 QwenEncoderAdapter,
380 ]:
381 adapter = adapter_cls()
382 self._adapters[adapter.name] = adapter
384 def register_benchmark(self, adapter: BenchmarkAdapter):
385 """Register a new benchmark adapter. Idempotent."""
386 with self._lock:
387 self._adapters[adapter.name] = adapter
389 def discover_and_install(self, repo_url: str, name: str,
390 requires_gpu: bool = False,
391 min_vram_gb: float = 0.0,
392 run_command: str = '',
393 metrics_file: str = '') -> bool:
394 """Coding agent installs a dynamic benchmark from a git repo."""
395 adapter = DynamicBenchmarkAdapter(
396 name=name, repo_url=repo_url,
397 requires_gpu=requires_gpu, min_vram_gb=min_vram_gb,
398 run_command=run_command, metrics_file=metrics_file)
399 if adapter.install():
400 self.register_benchmark(adapter)
401 return True
402 return False
404 def capture_snapshot(self, version: str, git_sha: str = '',
405 tier: str = 'fast') -> Dict:
406 """Run benchmarks and store snapshot. tier='fast' or 'heavy' or 'all'."""
407 snapshot = {
408 'version': version,
409 'git_sha': git_sha,
410 'timestamp': time.time(),
411 'tier': tier,
412 'benchmarks': {},
413 }
415 # Check node capability for GPU benchmarks
416 node_tier = 'standard'
417 try:
418 from security.system_requirements import get_tier_name
419 node_tier = get_tier_name()
420 except Exception:
421 pass
423 with self._lock:
424 adapters = dict(self._adapters)
426 for name, adapter in adapters.items():
427 # Filter by tier
428 if tier == 'fast' and adapter.tier != 'fast':
429 continue
430 if tier == 'heavy' and adapter.tier != 'heavy':
431 continue
432 # Skip GPU benchmarks on lite nodes
433 if adapter.requires_gpu and node_tier in ('lite', 'minimal'):
434 snapshot['benchmarks'][name] = {
435 'skipped': True, 'reason': f'requires GPU, node tier={node_tier}'}
436 continue
437 if not adapter.is_available():
438 snapshot['benchmarks'][name] = {
439 'skipped': True, 'reason': 'not available'}
440 continue
441 try:
442 result = adapter.run()
443 snapshot['benchmarks'][name] = result
444 with self._lock:
445 self._latest_results[name] = result
446 except Exception as e:
447 snapshot['benchmarks'][name] = {
448 'error': str(e)}
450 # Persist
451 fname = os.path.join(BENCHMARK_DIR, f'{version}.json')
452 try:
453 with open(fname, 'w') as f:
454 json.dump(snapshot, f, indent=2)
455 except Exception as e:
456 logger.debug(f"Benchmark snapshot save failed: {e}")
458 return snapshot
460 def is_upgrade_safe(self, old_version: str, new_version: str) -> Tuple[bool, str]:
461 """ALL fast-tier metrics must be >= old version."""
462 old_file = os.path.join(BENCHMARK_DIR, f'{old_version}.json')
463 new_file = os.path.join(BENCHMARK_DIR, f'{new_version}.json')
465 if not os.path.isfile(old_file):
466 return True, 'no baseline to compare'
467 if not os.path.isfile(new_file):
468 return False, 'new version snapshot missing'
470 with open(old_file) as f:
471 old = json.load(f)
472 with open(new_file) as f:
473 new = json.load(f)
475 regressions = []
476 for bench_name, old_result in old.get('benchmarks', {}).items():
477 new_result = new.get('benchmarks', {}).get(bench_name, {})
478 if old_result.get('skipped') or new_result.get('skipped'):
479 continue
480 old_metrics = old_result.get('metrics', {})
481 new_metrics = new_result.get('metrics', {})
482 for metric_name, old_m in old_metrics.items():
483 new_m = new_metrics.get(metric_name)
484 if not new_m or not isinstance(old_m, dict) or not isinstance(new_m, dict):
485 continue
486 old_val = old_m.get('value', 0)
487 new_val = new_m.get('value', 0)
488 direction = old_m.get('direction', 'higher')
489 if direction == 'higher' and new_val < old_val * 0.95:
490 regressions.append(
491 f"{bench_name}.{metric_name}: {old_val:.3f} → {new_val:.3f} (regression)")
492 elif direction == 'lower' and new_val > old_val * 1.05:
493 regressions.append(
494 f"{bench_name}.{metric_name}: {old_val:.3f} → {new_val:.3f} (regression)")
496 if regressions:
497 return False, f"Regressions: {'; '.join(regressions)}"
498 return True, 'all metrics pass'
500 def get_latest_results(self) -> dict:
501 """Get latest benchmark results (used by federation delta)."""
502 with self._lock:
503 return dict(self._latest_results)
505 def list_benchmarks(self) -> List[Dict]:
506 """List all registered benchmarks with status."""
507 with self._lock:
508 return [
509 {
510 'name': name,
511 'source': adapter.source,
512 'tier': adapter.tier,
513 'requires_gpu': adapter.requires_gpu,
514 'available': adapter.is_available(),
515 }
516 for name, adapter in self._adapters.items()
517 ]
520# ─── Singleton ───
521_registry = None
522_registry_lock = threading.Lock()
525def get_benchmark_registry() -> BenchmarkRegistry:
526 global _registry
527 if _registry is None:
528 with _registry_lock:
529 if _registry is None:
530 _registry = BenchmarkRegistry()
531 return _registry