Coverage for integrations / providers / efficiency_matrix.py: 71.3%

188 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2EfficiencyMatrix — continuous benchmarking for optimal model selection. 

3 

4Tracks per-provider, per-model: 

5 - Tokens per second (throughput) 

6 - Time to first token (TTFT) latency 

7 - End-to-end latency 

8 - Quality score (from user feedback + automated eval) 

9 - Reliability (success rate) 

10 - Cost efficiency (quality per dollar) 

11 

12The matrix runs benchmarks during idle time (via ResourceGovernor) 

13and also records live usage stats from every gateway call. 

14 

15Persisted at ~/Documents/Nunba/data/efficiency_matrix.json. 

16Integrated with ResourceGovernor's proactive action stream. 

17 

18The key formula: 

19 efficiency_score = (quality × speed × reliability) / cost 

20 → Higher is better. Used by registry.find_best() for 'balanced' strategy. 

21""" 

22 

23import json 

24import logging 

25import os 

26import threading 

27import time 

28from dataclasses import dataclass, field, asdict 

29from pathlib import Path 

30from typing import Any, Dict, List, Optional 

31 

32logger = logging.getLogger(__name__) 

33 

34 

35@dataclass 

36class ModelBenchmark: 

37 """Benchmark results for a specific model on a specific provider.""" 

38 provider_id: str 

39 model_id: str 

40 model_type: str = 'llm' 

41 

42 # Throughput 

43 avg_tok_per_s: float = 0.0 

44 p50_tok_per_s: float = 0.0 

45 p95_tok_per_s: float = 0.0 

46 

47 # Latency 

48 avg_ttft_ms: float = 0.0 # Time to first token 

49 avg_e2e_ms: float = 0.0 # End-to-end 

50 p95_e2e_ms: float = 0.0 

51 

52 # Quality (0-1) 

53 quality_score: float = 0.5 

54 coherence: float = 0.5 

55 instruction_following: float = 0.5 

56 

57 # Reliability 

58 success_rate: float = 1.0 

59 total_requests: int = 0 

60 failed_requests: int = 0 

61 

62 # Cost 

63 avg_cost_per_request: float = 0.0 

64 cost_per_1k_output_tokens: float = 0.0 

65 

66 # Computed 

67 efficiency_score: float = 0.0 # quality × speed × reliability / cost 

68 

69 # Metadata 

70 last_benchmark: float = 0.0 

71 last_live_update: float = 0.0 

72 sample_count: int = 0 

73 

74 def compute_efficiency(self): 

75 """Recalculate efficiency score.""" 

76 speed = min(1.0, self.avg_tok_per_s / 100.0) if self.avg_tok_per_s > 0 else 0.3 

77 cost_factor = max(0.01, self.cost_per_1k_output_tokens) if self.cost_per_1k_output_tokens > 0 else 1.0 

78 self.efficiency_score = ( 

79 self.quality_score * speed * self.success_rate 

80 ) / cost_factor 

81 

82 

83@dataclass 

84class BenchmarkTask: 

85 """A benchmark task to evaluate model capability.""" 

86 id: str 

87 prompt: str 

88 model_type: str = 'llm' 

89 expected_keywords: List[str] = field(default_factory=list) 

90 max_tokens: int = 256 

91 category: str = 'general' # general, reasoning, coding, creative 

92 

93 

94# Built-in benchmark tasks (lightweight — one request each) 

95_BENCHMARK_TASKS = [ 

96 BenchmarkTask( 

97 id='general_1', 

98 prompt='Explain quantum computing in 3 sentences.', 

99 expected_keywords=['qubit', 'superposition', 'quantum'], 

100 category='general', 

101 ), 

102 BenchmarkTask( 

103 id='reasoning_1', 

104 prompt='If all roses are flowers and some flowers fade quickly, can we conclude all roses fade quickly? Explain.', 

105 expected_keywords=['no', 'some', 'logic', 'conclude'], 

106 category='reasoning', 

107 ), 

108 BenchmarkTask( 

109 id='coding_1', 

110 prompt='Write a Python function that checks if a string is a palindrome. Return only the function.', 

111 expected_keywords=['def', 'return', 'reverse', '[::-1]'], 

112 category='coding', 

113 max_tokens=200, 

114 ), 

115 BenchmarkTask( 

116 id='creative_1', 

117 prompt='Write a haiku about artificial intelligence.', 

118 expected_keywords=[], 

119 category='creative', 

120 max_tokens=100, 

121 ), 

122 BenchmarkTask( 

123 id='instruction_1', 

124 prompt='List exactly 5 prime numbers between 10 and 50. Output only the numbers, comma-separated.', 

125 expected_keywords=['11', '13', '17', '19', '23', '29', '31', '37', '41', '43', '47'], 

126 category='instruction_following', 

127 max_tokens=50, 

128 ), 

129] 

130 

131 

132class EfficiencyMatrix: 

133 """Continuous benchmarking system for provider/model selection.""" 

134 

135 def __init__(self, matrix_path: Optional[str] = None): 

136 try: 

137 from core.platform_paths import get_db_dir 

138 data_dir = Path(get_db_dir()) 

139 except ImportError: 

140 data_dir = Path.home() / 'Documents' / 'Nunba' / 'data' 

141 data_dir.mkdir(parents=True, exist_ok=True) 

142 

143 self._path = Path(matrix_path) if matrix_path else data_dir / 'efficiency_matrix.json' 

144 self._benchmarks: Dict[str, ModelBenchmark] = {} # key = "provider_id:model_id" 

145 self._lock = threading.Lock() 

146 self._load() 

147 

148 def _key(self, provider_id: str, model_id: str) -> str: 

149 return f"{provider_id}:{model_id}" 

150 

151 def _load(self): 

152 if self._path.exists(): 

153 try: 

154 with open(self._path, 'r') as f: 

155 data = json.load(f) 

156 for k, v in data.items(): 

157 known = {fn.name for fn in ModelBenchmark.__dataclass_fields__.values()} 

158 self._benchmarks[k] = ModelBenchmark( 

159 **{fk: fv for fk, fv in v.items() if fk in known}) 

160 logger.info("Efficiency matrix loaded: %d entries", len(self._benchmarks)) 

161 except Exception as e: 

162 logger.warning("Failed to load efficiency matrix: %s", e) 

163 

164 def save(self): 

165 with self._lock: 

166 data = {k: asdict(v) for k, v in self._benchmarks.items()} 

167 try: 

168 self._path.parent.mkdir(parents=True, exist_ok=True) 

169 with open(self._path, 'w') as f: 

170 json.dump(data, f, indent=2) 

171 except Exception as e: 

172 logger.error("Failed to save efficiency matrix: %s", e) 

173 

174 # ── Live updates (called by gateway after each request) ─────────── 

175 

176 def record_request(self, provider_id: str, model_id: str, 

177 model_type: str = 'llm', 

178 tok_per_s: float = 0, ttft_ms: float = 0, 

179 e2e_ms: float = 0, cost_usd: float = 0, 

180 output_tokens: int = 0, success: bool = True): 

181 """Record a live request result into the matrix.""" 

182 key = self._key(provider_id, model_id) 

183 alpha = 0.1 # EMA smoothing 

184 

185 with self._lock: 

186 bm = self._benchmarks.get(key) 

187 if not bm: 

188 bm = ModelBenchmark( 

189 provider_id=provider_id, model_id=model_id, 

190 model_type=model_type, 

191 ) 

192 self._benchmarks[key] = bm 

193 

194 bm.total_requests += 1 

195 if not success: 

196 bm.failed_requests += 1 

197 bm.success_rate = 1.0 - (bm.failed_requests / max(1, bm.total_requests)) 

198 

199 if tok_per_s > 0: 

200 bm.avg_tok_per_s = (bm.avg_tok_per_s * (1 - alpha) + tok_per_s * alpha 

201 if bm.avg_tok_per_s > 0 else tok_per_s) 

202 if ttft_ms > 0: 

203 bm.avg_ttft_ms = (bm.avg_ttft_ms * (1 - alpha) + ttft_ms * alpha 

204 if bm.avg_ttft_ms > 0 else ttft_ms) 

205 if e2e_ms > 0: 

206 bm.avg_e2e_ms = (bm.avg_e2e_ms * (1 - alpha) + e2e_ms * alpha 

207 if bm.avg_e2e_ms > 0 else e2e_ms) 

208 if cost_usd > 0: 

209 bm.avg_cost_per_request = (bm.avg_cost_per_request * (1 - alpha) + cost_usd * alpha 

210 if bm.avg_cost_per_request > 0 else cost_usd) 

211 if output_tokens > 0 and cost_usd > 0: 

212 cpt = (cost_usd / output_tokens) * 1000 

213 bm.cost_per_1k_output_tokens = ( 

214 bm.cost_per_1k_output_tokens * (1 - alpha) + cpt * alpha 

215 if bm.cost_per_1k_output_tokens > 0 else cpt) 

216 

217 bm.sample_count += 1 

218 bm.last_live_update = time.time() 

219 bm.compute_efficiency() 

220 

221 # Periodic save (every 10 requests) 

222 if bm.total_requests % 10 == 0: 

223 self.save() 

224 

225 # ── Benchmarking (run during idle via ResourceGovernor) ─────────── 

226 

227 def run_benchmark(self, provider_id: str = '', model_type: str = 'llm'): 

228 """Run lightweight benchmark against one or all providers. 

229 

230 Called by ResourceGovernor during idle time. 

231 """ 

232 from integrations.providers.registry import get_registry 

233 

234 registry = get_registry() 

235 providers = ([registry.get(provider_id)] if provider_id 

236 else registry.list_api_providers()) 

237 

238 for provider in providers: 

239 if not provider or not provider.has_api_key(): 

240 continue 

241 

242 for pm in provider.models.values(): 

243 if pm.model_type != model_type or not pm.enabled: 

244 continue 

245 

246 # Skip if recently benchmarked (within 1 hour) 

247 key = self._key(provider.id, pm.model_id) 

248 bm = self._benchmarks.get(key) 

249 if bm and (time.time() - bm.last_benchmark) < 3600: 

250 continue 

251 

252 logger.info("Benchmarking %s on %s...", pm.model_id, provider.id) 

253 self._benchmark_model(provider, pm) 

254 

255 def _benchmark_model(self, provider, provider_model): 

256 """Run benchmark tasks against a specific model.""" 

257 from integrations.providers.gateway import get_gateway 

258 

259 gw = get_gateway() 

260 results = [] 

261 

262 for task in _BENCHMARK_TASKS: 

263 if task.model_type != provider_model.model_type: 

264 continue 

265 

266 try: 

267 t0 = time.time() 

268 result = gw.generate( 

269 task.prompt, 

270 model_type=task.model_type, 

271 provider_id=provider.id, 

272 model_id=provider_model.model_id, 

273 max_tokens=task.max_tokens, 

274 temperature=0.3, # Low temp for consistent benchmarks 

275 ) 

276 elapsed_ms = (time.time() - t0) * 1000 

277 

278 if result.success: 

279 # Score quality by checking expected keywords 

280 quality = self._score_quality(result.content, task) 

281 results.append({ 

282 'task': task.id, 

283 'tok_per_s': result.tok_per_s, 

284 'e2e_ms': elapsed_ms, 

285 'cost': result.cost_usd, 

286 'quality': quality, 

287 'output_tokens': result.usage.get('output_tokens', 0), 

288 'success': True, 

289 }) 

290 else: 

291 results.append({ 

292 'task': task.id, 'success': False, 

293 'error': result.error, 

294 }) 

295 except Exception as e: 

296 logger.debug("Benchmark task %s failed: %s", task.id, e) 

297 results.append({'task': task.id, 'success': False, 'error': str(e)}) 

298 

299 # Aggregate results into benchmark entry 

300 if results: 

301 self._aggregate_benchmark(provider.id, provider_model.model_id, 

302 provider_model.model_type, results) 

303 

304 def _score_quality(self, content: str, task: BenchmarkTask) -> float: 

305 """Score response quality (0-1) based on expected keywords and length.""" 

306 if not content: 

307 return 0.0 

308 

309 score = 0.3 # Base score for any non-empty response 

310 

311 # Keyword matching 

312 if task.expected_keywords: 

313 content_lower = content.lower() 

314 matches = sum(1 for kw in task.expected_keywords 

315 if kw.lower() in content_lower) 

316 keyword_score = matches / len(task.expected_keywords) 

317 score += keyword_score * 0.5 

318 

319 # Length appropriateness (penalize very short or very long) 

320 words = len(content.split()) 

321 if 10 <= words <= task.max_tokens: 

322 score += 0.2 

323 elif words >= 5: 

324 score += 0.1 

325 

326 return min(1.0, score) 

327 

328 def _aggregate_benchmark(self, provider_id, model_id, model_type, results): 

329 """Aggregate benchmark task results into a single ModelBenchmark.""" 

330 key = self._key(provider_id, model_id) 

331 successes = [r for r in results if r.get('success')] 

332 

333 with self._lock: 

334 bm = self._benchmarks.get(key) 

335 if not bm: 

336 bm = ModelBenchmark( 

337 provider_id=provider_id, model_id=model_id, 

338 model_type=model_type, 

339 ) 

340 self._benchmarks[key] = bm 

341 

342 if successes: 

343 bm.avg_tok_per_s = sum(r['tok_per_s'] for r in successes) / len(successes) 

344 bm.avg_e2e_ms = sum(r['e2e_ms'] for r in successes) / len(successes) 

345 bm.quality_score = sum(r['quality'] for r in successes) / len(successes) 

346 total_cost = sum(r['cost'] for r in successes) 

347 total_tokens = sum(r.get('output_tokens', 0) for r in successes) 

348 if total_tokens > 0 and total_cost > 0: 

349 bm.cost_per_1k_output_tokens = (total_cost / total_tokens) * 1000 

350 bm.avg_cost_per_request = total_cost / len(successes) 

351 

352 bm.success_rate = len(successes) / len(results) if results else 0 

353 bm.last_benchmark = time.time() 

354 bm.compute_efficiency() 

355 

356 logger.info("Benchmark complete: %s on %s — efficiency=%.3f, " 

357 "tok/s=%.1f, quality=%.2f, success=%.0f%%", 

358 model_id, provider_id, bm.efficiency_score, 

359 bm.avg_tok_per_s, bm.quality_score, 

360 bm.success_rate * 100) 

361 self.save() 

362 

363 # ── Query API ───────────────────────────────────────────────────── 

364 

365 def get_benchmark(self, provider_id: str, model_id: str) -> Optional[ModelBenchmark]: 

366 return self._benchmarks.get(self._key(provider_id, model_id)) 

367 

368 def get_leaderboard(self, model_type: str = 'llm', 

369 sort_by: str = 'efficiency') -> List[ModelBenchmark]: 

370 """Return benchmarks sorted by efficiency, speed, quality, or cost.""" 

371 entries = [bm for bm in self._benchmarks.values() 

372 if bm.model_type == model_type and bm.total_requests > 0] 

373 

374 key_map = { 

375 'efficiency': lambda b: b.efficiency_score, 

376 'speed': lambda b: b.avg_tok_per_s, 

377 'quality': lambda b: b.quality_score, 

378 'cost': lambda b: -b.cost_per_1k_output_tokens, # Lower cost = better 

379 'reliability': lambda b: b.success_rate, 

380 } 

381 entries.sort(key=key_map.get(sort_by, key_map['efficiency']), reverse=True) 

382 return entries 

383 

384 def get_matrix_summary(self) -> Dict[str, Any]: 

385 """Return a summary for dashboards.""" 

386 return { 

387 'total_entries': len(self._benchmarks), 

388 'total_benchmark_requests': sum( 

389 bm.total_requests for bm in self._benchmarks.values()), 

390 'by_type': { 

391 mt: len([bm for bm in self._benchmarks.values() 

392 if bm.model_type == mt]) 

393 for mt in set(bm.model_type for bm in self._benchmarks.values()) 

394 }, 

395 'top_efficient': [ 

396 {'provider': bm.provider_id, 'model': bm.model_id, 

397 'efficiency': round(bm.efficiency_score, 3)} 

398 for bm in sorted(self._benchmarks.values(), 

399 key=lambda b: b.efficiency_score, reverse=True)[:5] 

400 ], 

401 } 

402 

403 

404# ═══════════════════════════════════════════════════════════════════════ 

405# Singleton 

406# ═══════════════════════════════════════════════════════════════════════ 

407 

408_matrix: Optional[EfficiencyMatrix] = None 

409_matrix_lock = threading.Lock() 

410 

411 

412def get_matrix() -> EfficiencyMatrix: 

413 global _matrix 

414 if _matrix is None: 

415 with _matrix_lock: 

416 if _matrix is None: 

417 _matrix = EfficiencyMatrix() 

418 return _matrix