Coverage for integrations / agent_engine / benchmark_registry.py: 63.1%

268 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Agent Goal Engine - Dynamic Benchmark Registry 

3 

4Benchmarks are adapters that wrap measurement suites. Built-in adapters 

5reuse existing HevolveAI code. Dynamic adapters are installed by the 

6coding agent at regional compute-heavy nodes via RuntimeToolManager pattern. 

7 

8Snapshots stored at agent_data/benchmarks/{version}.json. 

9""" 

10import json 

11import logging 

12import os 

13import subprocess 

14import sys 

15import threading 

16 

17# Windows: suppress console windows for all subprocess calls 

18_SUBPROCESS_KW = {} 

19if sys.platform == 'win32': 

20 _SUBPROCESS_KW['creationflags'] = subprocess.CREATE_NO_WINDOW 

21import time 

22from pathlib import Path 

23from typing import Dict, List, Optional, Tuple 

24 

25logger = logging.getLogger('hevolve_social') 

26 

27def _resolve_benchmark_dir(): 

28 import sys as _sys 

29 db_path = os.environ.get('HEVOLVE_DB_PATH', '') 

30 if db_path and db_path != ':memory:' and os.path.isabs(db_path): 

31 return os.path.join(os.path.dirname(db_path), 'agent_data', 'benchmarks') 

32 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False): 

33 try: 

34 from core.platform_paths import get_agent_data_dir 

35 return os.path.join(get_agent_data_dir(), 'benchmarks') 

36 except ImportError: 

37 return os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'benchmarks') 

38 return os.path.join('agent_data', 'benchmarks') 

39 

40BENCHMARK_DIR = _resolve_benchmark_dir() 

41 

42 

43class BenchmarkAdapter: 

44 """Base class for benchmark adapters.""" 

45 

46 name: str = '' 

47 source: str = 'builtin' # 'builtin' | 'git' | 'pip' 

48 repo_url: str = '' 

49 requires_gpu: bool = False 

50 min_vram_gb: float = 0.0 

51 tier: str = 'fast' # 'fast' | 'heavy' 

52 

53 def run(self, api_url: str = '', **kwargs) -> Dict: 

54 """Run benchmark. Return {metrics: {name: {value, direction, unit}}}.""" 

55 raise NotImplementedError 

56 

57 def is_available(self) -> bool: 

58 """Check if dependencies are installed.""" 

59 return True 

60 

61 def install(self) -> bool: 

62 """Install dependencies. Return True on success.""" 

63 return True 

64 

65 

66class ModelRegistryAdapter(BenchmarkAdapter): 

67 """Benchmark via ModelRegistry: per-model latency, accuracy, cost.""" 

68 name = 'model_registry' 

69 tier = 'fast' 

70 

71 def run(self, api_url: str = '', **kwargs) -> Dict: 

72 try: 

73 from .model_registry import ModelRegistry 

74 registry = ModelRegistry.get_instance() 

75 models = registry.list_models() 

76 metrics = {} 

77 for m in models: 

78 d = m.to_dict() if hasattr(m, 'to_dict') else m 

79 mid = d.get('model_id', 'unknown') 

80 metrics[f'{mid}_latency_ms'] = { 

81 'value': d.get('avg_latency_ms', 0), 

82 'direction': 'lower', 'unit': 'ms'} 

83 metrics[f'{mid}_accuracy'] = { 

84 'value': d.get('accuracy_score', 0), 

85 'direction': 'higher', 'unit': 'score'} 

86 return {'metrics': metrics} 

87 except Exception as e: 

88 return {'metrics': {}, 'error': str(e)} 

89 

90 

91class WorldModelAdapter(BenchmarkAdapter): 

92 """Benchmark via WorldModelBridge stats.""" 

93 name = 'world_model' 

94 tier = 'fast' 

95 

96 def run(self, api_url: str = '', **kwargs) -> Dict: 

97 try: 

98 from .world_model_bridge import get_world_model_bridge 

99 bridge = get_world_model_bridge() 

100 stats = bridge.get_stats() 

101 return {'metrics': { 

102 'flush_rate': { 

103 'value': stats.get('total_flushed', 0) / max(1, stats.get('total_recorded', 1)), 

104 'direction': 'higher', 'unit': 'ratio'}, 

105 'correction_density': { 

106 'value': stats.get('total_corrections', 0), 

107 'direction': 'higher', 'unit': 'count'}, 

108 'hivemind_queries': { 

109 'value': stats.get('total_hivemind_queries', 0), 

110 'direction': 'higher', 'unit': 'count'}, 

111 }} 

112 except Exception as e: 

113 return {'metrics': {}, 'error': str(e)} 

114 

115 

116class RegressionAdapter(BenchmarkAdapter): 

117 """Run pytest regression as a benchmark.""" 

118 name = 'regression' 

119 tier = 'fast' 

120 

121 def run(self, api_url: str = '', **kwargs) -> Dict: 

122 try: 

123 python = os.environ.get( 

124 'HEVOLVE_PYTHON', 

125 os.path.join('venv310', 'Scripts', 'python.exe') 

126 if sys.platform == 'win32' else 

127 os.path.join('venv310', 'bin', 'python')) 

128 result = subprocess.run( 

129 [python, '-m', 'pytest', 'tests/', '-s', 

130 '--ignore=tests/runtime_tests', '-q', 

131 '--tb=no', '-k', 'not nested_task'], 

132 capture_output=True, text=True, timeout=600, 

133 cwd=os.environ.get('HEVOLVE_PROJECT_ROOT', 

134 os.path.dirname(os.path.dirname( 

135 os.path.dirname(__file__)))), 

136 **_SUBPROCESS_KW 

137 ) 

138 # Parse pytest output for pass/fail counts 

139 output = result.stdout + result.stderr 

140 passed = failed = 0 

141 for line in output.split('\n'): 

142 if 'passed' in line: 

143 parts = line.split() 

144 for i, p in enumerate(parts): 

145 if p == 'passed' and i > 0: 

146 try: 

147 passed = int(parts[i - 1]) 

148 except ValueError: 

149 pass 

150 if p == 'failed' and i > 0: 

151 try: 

152 failed = int(parts[i - 1]) 

153 except ValueError: 

154 pass 

155 total = passed + failed 

156 return {'metrics': { 

157 'pass_rate': { 

158 'value': passed / max(1, total), 

159 'direction': 'higher', 'unit': 'ratio'}, 

160 'fail_count': { 

161 'value': failed, 

162 'direction': 'lower', 'unit': 'count'}, 

163 }} 

164 except Exception as e: 

165 return {'metrics': {}, 'error': str(e)} 

166 

167 

168class GuardrailAdapter(BenchmarkAdapter): 

169 """Verify guardrail integrity.""" 

170 name = 'guardrail' 

171 tier = 'fast' 

172 

173 def run(self, api_url: str = '', **kwargs) -> Dict: 

174 try: 

175 from security.hive_guardrails import ( 

176 compute_guardrail_hash, verify_guardrail_integrity) 

177 hash_val = compute_guardrail_hash() 

178 integrity = verify_guardrail_integrity() 

179 return {'metrics': { 

180 'hash_match': { 

181 'value': 1 if integrity else 0, 

182 'direction': 'higher', 'unit': 'bool'}, 

183 'integrity_verified': { 

184 'value': 1 if integrity else 0, 

185 'direction': 'higher', 'unit': 'bool'}, 

186 }} 

187 except Exception as e: 

188 return {'metrics': {}, 'error': str(e)} 

189 

190 

191class QuantiPhyAdapter(BenchmarkAdapter): 

192 """QuantiPhy physics reasoning benchmark from HevolveAI.""" 

193 name = 'quantiphy' 

194 source = 'builtin' 

195 requires_gpu = True 

196 min_vram_gb = 4.0 

197 tier = 'heavy' 

198 

199 def is_available(self) -> bool: 

200 try: 

201 # Check if HevolveAI quantiphy benchmark exists 

202 import importlib.util 

203 spec = importlib.util.find_spec('hevolveai') 

204 return spec is not None 

205 except Exception: 

206 return False 

207 

208 def run(self, api_url: str = '', **kwargs) -> Dict: 

209 try: 

210 from hevolveai.tests.benchmarks.quantiphy_benchmark import QuantiPhyBenchmark 

211 bench = QuantiPhyBenchmark(api_url=api_url or 'http://localhost:8000') 

212 results = bench.run_benchmark( 

213 phase='baseline', 

214 max_instances=kwargs.get('max_instances', 20)) 

215 mra = results.get('mra', {}) 

216 return {'metrics': { 

217 'mra_mean': { 

218 'value': mra.get('mean', 0), 

219 'direction': 'higher', 'unit': 'score'}, 

220 'latency_p95_ms': { 

221 'value': results.get('latency', {}).get('p95', 0), 

222 'direction': 'lower', 'unit': 'ms'}, 

223 }} 

224 except Exception as e: 

225 return {'metrics': {}, 'error': str(e)} 

226 

227 

228class EmbodiedValidationAdapter(BenchmarkAdapter): 

229 """Embodied AI validation benchmark from HevolveAI.""" 

230 name = 'embodied_validation' 

231 source = 'builtin' 

232 requires_gpu = True 

233 min_vram_gb = 2.0 

234 tier = 'heavy' 

235 

236 def is_available(self) -> bool: 

237 try: 

238 import importlib.util 

239 spec = importlib.util.find_spec('hevolveai') 

240 return spec is not None 

241 except Exception: 

242 return False 

243 

244 def run(self, api_url: str = '', **kwargs) -> Dict: 

245 try: 

246 from hevolveai.embodied_ai.validation.benchmark import ( 

247 PerformanceBenchmark, ForgettingBenchmark, MemoryBenchmark) 

248 # Run lightweight validation checks 

249 metrics = {} 

250 try: 

251 perf = PerformanceBenchmark() 

252 perf_result = perf.run() 

253 metrics['mean_latency_ms'] = { 

254 'value': perf_result.get('mean_latency_ms', 0), 

255 'direction': 'lower', 'unit': 'ms'} 

256 except Exception: 

257 pass 

258 try: 

259 mem = MemoryBenchmark() 

260 mem_result = mem.run() 

261 metrics['ram_mb'] = { 

262 'value': mem_result.get('ram_mb', 0), 

263 'direction': 'lower', 'unit': 'MB'} 

264 except Exception: 

265 pass 

266 return {'metrics': metrics} 

267 except Exception as e: 

268 return {'metrics': {}, 'error': str(e)} 

269 

270 

271class QwenEncoderAdapter(BenchmarkAdapter): 

272 """Qwen encoder throughput benchmark from HevolveAI.""" 

273 name = 'qwen_encoder' 

274 source = 'builtin' 

275 requires_gpu = True 

276 min_vram_gb = 2.0 

277 tier = 'fast' 

278 

279 def is_available(self) -> bool: 

280 try: 

281 import importlib.util 

282 spec = importlib.util.find_spec('hevolveai') 

283 return spec is not None 

284 except Exception: 

285 return False 

286 

287 def run(self, api_url: str = '', **kwargs) -> Dict: 

288 try: 

289 from hevolveai.embodied_ai.models.qwen_benchmark import ( 

290 benchmark_llamacpp) 

291 from core.port_registry import get_local_llm_url 

292 _llm_url = api_url or get_local_llm_url().replace('/v1', '') 

293 result = benchmark_llamacpp(server_url=_llm_url) 

294 return {'metrics': { 

295 'tokens_per_second': { 

296 'value': result.get('tokens_per_second', 0), 

297 'direction': 'higher', 'unit': 'tok/s'}, 

298 }} 

299 except Exception as e: 

300 return {'metrics': {}, 'error': str(e)} 

301 

302 

303class DynamicBenchmarkAdapter(BenchmarkAdapter): 

304 """Adapter for dynamically installed benchmarks (git repos).""" 

305 

306 def __init__(self, name: str, repo_url: str, 

307 requires_gpu: bool = False, min_vram_gb: float = 0.0, 

308 run_command: str = '', metrics_file: str = ''): 

309 self.name = name 

310 self.source = 'git' 

311 self.repo_url = repo_url 

312 self.requires_gpu = requires_gpu 

313 self.min_vram_gb = min_vram_gb 

314 self.tier = 'heavy' 

315 self._run_command = run_command 

316 self._metrics_file = metrics_file 

317 self._install_dir = os.path.join( 

318 os.path.expanduser('~'), '.hevolve', 'benchmarks', name) 

319 

320 def is_available(self) -> bool: 

321 return os.path.isdir(self._install_dir) 

322 

323 def install(self) -> bool: 

324 try: 

325 os.makedirs(os.path.dirname(self._install_dir), exist_ok=True) 

326 if not os.path.isdir(self._install_dir): 

327 subprocess.run( 

328 ['git', 'clone', '--depth', '1', self.repo_url, self._install_dir], 

329 check=True, timeout=120, **_SUBPROCESS_KW) 

330 # Install requirements if present 

331 req_file = os.path.join(self._install_dir, 'requirements.txt') 

332 if os.path.isfile(req_file): 

333 subprocess.run( 

334 [sys.executable, '-m', 'pip', 'install', '-r', req_file, '-q'], 

335 timeout=300, **_SUBPROCESS_KW) 

336 return True 

337 except Exception as e: 

338 logger.debug(f"Benchmark install failed for {self.name}: {e}") 

339 return False 

340 

341 def run(self, api_url: str = '', **kwargs) -> Dict: 

342 if not self.is_available(): 

343 return {'metrics': {}, 'error': 'not installed'} 

344 try: 

345 if self._run_command: 

346 # G7: use shlex.split to tokenize safely (no shell execution) 

347 import shlex 

348 cmd_list = shlex.split(self._run_command) 

349 else: 

350 cmd_list = [sys.executable, '-m', 'pytest', '--benchmark-json=results.json'] 

351 result = subprocess.run( 

352 cmd_list, shell=False, capture_output=True, text=True, 

353 timeout=600, cwd=self._install_dir, **_SUBPROCESS_KW) 

354 # Try to parse metrics file 

355 mf = os.path.join(self._install_dir, self._metrics_file or 'results.json') 

356 if os.path.isfile(mf): 

357 with open(mf) as f: 

358 return {'metrics': json.load(f)} 

359 return {'metrics': {'exit_code': { 

360 'value': result.returncode, 'direction': 'lower', 'unit': 'code'}}} 

361 except Exception as e: 

362 return {'metrics': {}, 'error': str(e)} 

363 

364 

365class BenchmarkRegistry: 

366 """Dynamic benchmark registry. Singleton.""" 

367 

368 def __init__(self): 

369 self._lock = threading.Lock() 

370 self._adapters: Dict[str, BenchmarkAdapter] = {} 

371 self._latest_results: Dict[str, dict] = {} 

372 self._register_builtins() 

373 os.makedirs(BENCHMARK_DIR, exist_ok=True) 

374 

375 def _register_builtins(self): 

376 for adapter_cls in [ 

377 ModelRegistryAdapter, WorldModelAdapter, RegressionAdapter, 

378 GuardrailAdapter, QuantiPhyAdapter, EmbodiedValidationAdapter, 

379 QwenEncoderAdapter, 

380 ]: 

381 adapter = adapter_cls() 

382 self._adapters[adapter.name] = adapter 

383 

384 def register_benchmark(self, adapter: BenchmarkAdapter): 

385 """Register a new benchmark adapter. Idempotent.""" 

386 with self._lock: 

387 self._adapters[adapter.name] = adapter 

388 

389 def discover_and_install(self, repo_url: str, name: str, 

390 requires_gpu: bool = False, 

391 min_vram_gb: float = 0.0, 

392 run_command: str = '', 

393 metrics_file: str = '') -> bool: 

394 """Coding agent installs a dynamic benchmark from a git repo.""" 

395 adapter = DynamicBenchmarkAdapter( 

396 name=name, repo_url=repo_url, 

397 requires_gpu=requires_gpu, min_vram_gb=min_vram_gb, 

398 run_command=run_command, metrics_file=metrics_file) 

399 if adapter.install(): 

400 self.register_benchmark(adapter) 

401 return True 

402 return False 

403 

404 def capture_snapshot(self, version: str, git_sha: str = '', 

405 tier: str = 'fast') -> Dict: 

406 """Run benchmarks and store snapshot. tier='fast' or 'heavy' or 'all'.""" 

407 snapshot = { 

408 'version': version, 

409 'git_sha': git_sha, 

410 'timestamp': time.time(), 

411 'tier': tier, 

412 'benchmarks': {}, 

413 } 

414 

415 # Check node capability for GPU benchmarks 

416 node_tier = 'standard' 

417 try: 

418 from security.system_requirements import get_tier_name 

419 node_tier = get_tier_name() 

420 except Exception: 

421 pass 

422 

423 with self._lock: 

424 adapters = dict(self._adapters) 

425 

426 for name, adapter in adapters.items(): 

427 # Filter by tier 

428 if tier == 'fast' and adapter.tier != 'fast': 

429 continue 

430 if tier == 'heavy' and adapter.tier != 'heavy': 

431 continue 

432 # Skip GPU benchmarks on lite nodes 

433 if adapter.requires_gpu and node_tier in ('lite', 'minimal'): 

434 snapshot['benchmarks'][name] = { 

435 'skipped': True, 'reason': f'requires GPU, node tier={node_tier}'} 

436 continue 

437 if not adapter.is_available(): 

438 snapshot['benchmarks'][name] = { 

439 'skipped': True, 'reason': 'not available'} 

440 continue 

441 try: 

442 result = adapter.run() 

443 snapshot['benchmarks'][name] = result 

444 with self._lock: 

445 self._latest_results[name] = result 

446 except Exception as e: 

447 snapshot['benchmarks'][name] = { 

448 'error': str(e)} 

449 

450 # Persist 

451 fname = os.path.join(BENCHMARK_DIR, f'{version}.json') 

452 try: 

453 with open(fname, 'w') as f: 

454 json.dump(snapshot, f, indent=2) 

455 except Exception as e: 

456 logger.debug(f"Benchmark snapshot save failed: {e}") 

457 

458 return snapshot 

459 

460 def is_upgrade_safe(self, old_version: str, new_version: str) -> Tuple[bool, str]: 

461 """ALL fast-tier metrics must be >= old version.""" 

462 old_file = os.path.join(BENCHMARK_DIR, f'{old_version}.json') 

463 new_file = os.path.join(BENCHMARK_DIR, f'{new_version}.json') 

464 

465 if not os.path.isfile(old_file): 

466 return True, 'no baseline to compare' 

467 if not os.path.isfile(new_file): 

468 return False, 'new version snapshot missing' 

469 

470 with open(old_file) as f: 

471 old = json.load(f) 

472 with open(new_file) as f: 

473 new = json.load(f) 

474 

475 regressions = [] 

476 for bench_name, old_result in old.get('benchmarks', {}).items(): 

477 new_result = new.get('benchmarks', {}).get(bench_name, {}) 

478 if old_result.get('skipped') or new_result.get('skipped'): 

479 continue 

480 old_metrics = old_result.get('metrics', {}) 

481 new_metrics = new_result.get('metrics', {}) 

482 for metric_name, old_m in old_metrics.items(): 

483 new_m = new_metrics.get(metric_name) 

484 if not new_m or not isinstance(old_m, dict) or not isinstance(new_m, dict): 

485 continue 

486 old_val = old_m.get('value', 0) 

487 new_val = new_m.get('value', 0) 

488 direction = old_m.get('direction', 'higher') 

489 if direction == 'higher' and new_val < old_val * 0.95: 

490 regressions.append( 

491 f"{bench_name}.{metric_name}: {old_val:.3f} → {new_val:.3f} (regression)") 

492 elif direction == 'lower' and new_val > old_val * 1.05: 

493 regressions.append( 

494 f"{bench_name}.{metric_name}: {old_val:.3f} → {new_val:.3f} (regression)") 

495 

496 if regressions: 

497 return False, f"Regressions: {'; '.join(regressions)}" 

498 return True, 'all metrics pass' 

499 

500 def get_latest_results(self) -> dict: 

501 """Get latest benchmark results (used by federation delta).""" 

502 with self._lock: 

503 return dict(self._latest_results) 

504 

505 def list_benchmarks(self) -> List[Dict]: 

506 """List all registered benchmarks with status.""" 

507 with self._lock: 

508 return [ 

509 { 

510 'name': name, 

511 'source': adapter.source, 

512 'tier': adapter.tier, 

513 'requires_gpu': adapter.requires_gpu, 

514 'available': adapter.is_available(), 

515 } 

516 for name, adapter in self._adapters.items() 

517 ] 

518 

519 

520# ─── Singleton ─── 

521_registry = None 

522_registry_lock = threading.Lock() 

523 

524 

525def get_benchmark_registry() -> BenchmarkRegistry: 

526 global _registry 

527 if _registry is None: 

528 with _registry_lock: 

529 if _registry is None: 

530 _registry = BenchmarkRegistry() 

531 return _registry