Coverage for integrations / providers / efficiency_matrix.py: 71.3%
188 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2EfficiencyMatrix — continuous benchmarking for optimal model selection.
4Tracks per-provider, per-model:
5 - Tokens per second (throughput)
6 - Time to first token (TTFT) latency
7 - End-to-end latency
8 - Quality score (from user feedback + automated eval)
9 - Reliability (success rate)
10 - Cost efficiency (quality per dollar)
12The matrix runs benchmarks during idle time (via ResourceGovernor)
13and also records live usage stats from every gateway call.
15Persisted at ~/Documents/Nunba/data/efficiency_matrix.json.
16Integrated with ResourceGovernor's proactive action stream.
18The key formula:
19 efficiency_score = (quality × speed × reliability) / cost
20 → Higher is better. Used by registry.find_best() for 'balanced' strategy.
21"""
23import json
24import logging
25import os
26import threading
27import time
28from dataclasses import dataclass, field, asdict
29from pathlib import Path
30from typing import Any, Dict, List, Optional
32logger = logging.getLogger(__name__)
35@dataclass
36class ModelBenchmark:
37 """Benchmark results for a specific model on a specific provider."""
38 provider_id: str
39 model_id: str
40 model_type: str = 'llm'
42 # Throughput
43 avg_tok_per_s: float = 0.0
44 p50_tok_per_s: float = 0.0
45 p95_tok_per_s: float = 0.0
47 # Latency
48 avg_ttft_ms: float = 0.0 # Time to first token
49 avg_e2e_ms: float = 0.0 # End-to-end
50 p95_e2e_ms: float = 0.0
52 # Quality (0-1)
53 quality_score: float = 0.5
54 coherence: float = 0.5
55 instruction_following: float = 0.5
57 # Reliability
58 success_rate: float = 1.0
59 total_requests: int = 0
60 failed_requests: int = 0
62 # Cost
63 avg_cost_per_request: float = 0.0
64 cost_per_1k_output_tokens: float = 0.0
66 # Computed
67 efficiency_score: float = 0.0 # quality × speed × reliability / cost
69 # Metadata
70 last_benchmark: float = 0.0
71 last_live_update: float = 0.0
72 sample_count: int = 0
74 def compute_efficiency(self):
75 """Recalculate efficiency score."""
76 speed = min(1.0, self.avg_tok_per_s / 100.0) if self.avg_tok_per_s > 0 else 0.3
77 cost_factor = max(0.01, self.cost_per_1k_output_tokens) if self.cost_per_1k_output_tokens > 0 else 1.0
78 self.efficiency_score = (
79 self.quality_score * speed * self.success_rate
80 ) / cost_factor
83@dataclass
84class BenchmarkTask:
85 """A benchmark task to evaluate model capability."""
86 id: str
87 prompt: str
88 model_type: str = 'llm'
89 expected_keywords: List[str] = field(default_factory=list)
90 max_tokens: int = 256
91 category: str = 'general' # general, reasoning, coding, creative
94# Built-in benchmark tasks (lightweight — one request each)
95_BENCHMARK_TASKS = [
96 BenchmarkTask(
97 id='general_1',
98 prompt='Explain quantum computing in 3 sentences.',
99 expected_keywords=['qubit', 'superposition', 'quantum'],
100 category='general',
101 ),
102 BenchmarkTask(
103 id='reasoning_1',
104 prompt='If all roses are flowers and some flowers fade quickly, can we conclude all roses fade quickly? Explain.',
105 expected_keywords=['no', 'some', 'logic', 'conclude'],
106 category='reasoning',
107 ),
108 BenchmarkTask(
109 id='coding_1',
110 prompt='Write a Python function that checks if a string is a palindrome. Return only the function.',
111 expected_keywords=['def', 'return', 'reverse', '[::-1]'],
112 category='coding',
113 max_tokens=200,
114 ),
115 BenchmarkTask(
116 id='creative_1',
117 prompt='Write a haiku about artificial intelligence.',
118 expected_keywords=[],
119 category='creative',
120 max_tokens=100,
121 ),
122 BenchmarkTask(
123 id='instruction_1',
124 prompt='List exactly 5 prime numbers between 10 and 50. Output only the numbers, comma-separated.',
125 expected_keywords=['11', '13', '17', '19', '23', '29', '31', '37', '41', '43', '47'],
126 category='instruction_following',
127 max_tokens=50,
128 ),
129]
132class EfficiencyMatrix:
133 """Continuous benchmarking system for provider/model selection."""
135 def __init__(self, matrix_path: Optional[str] = None):
136 try:
137 from core.platform_paths import get_db_dir
138 data_dir = Path(get_db_dir())
139 except ImportError:
140 data_dir = Path.home() / 'Documents' / 'Nunba' / 'data'
141 data_dir.mkdir(parents=True, exist_ok=True)
143 self._path = Path(matrix_path) if matrix_path else data_dir / 'efficiency_matrix.json'
144 self._benchmarks: Dict[str, ModelBenchmark] = {} # key = "provider_id:model_id"
145 self._lock = threading.Lock()
146 self._load()
148 def _key(self, provider_id: str, model_id: str) -> str:
149 return f"{provider_id}:{model_id}"
151 def _load(self):
152 if self._path.exists():
153 try:
154 with open(self._path, 'r') as f:
155 data = json.load(f)
156 for k, v in data.items():
157 known = {fn.name for fn in ModelBenchmark.__dataclass_fields__.values()}
158 self._benchmarks[k] = ModelBenchmark(
159 **{fk: fv for fk, fv in v.items() if fk in known})
160 logger.info("Efficiency matrix loaded: %d entries", len(self._benchmarks))
161 except Exception as e:
162 logger.warning("Failed to load efficiency matrix: %s", e)
164 def save(self):
165 with self._lock:
166 data = {k: asdict(v) for k, v in self._benchmarks.items()}
167 try:
168 self._path.parent.mkdir(parents=True, exist_ok=True)
169 with open(self._path, 'w') as f:
170 json.dump(data, f, indent=2)
171 except Exception as e:
172 logger.error("Failed to save efficiency matrix: %s", e)
174 # ── Live updates (called by gateway after each request) ───────────
176 def record_request(self, provider_id: str, model_id: str,
177 model_type: str = 'llm',
178 tok_per_s: float = 0, ttft_ms: float = 0,
179 e2e_ms: float = 0, cost_usd: float = 0,
180 output_tokens: int = 0, success: bool = True):
181 """Record a live request result into the matrix."""
182 key = self._key(provider_id, model_id)
183 alpha = 0.1 # EMA smoothing
185 with self._lock:
186 bm = self._benchmarks.get(key)
187 if not bm:
188 bm = ModelBenchmark(
189 provider_id=provider_id, model_id=model_id,
190 model_type=model_type,
191 )
192 self._benchmarks[key] = bm
194 bm.total_requests += 1
195 if not success:
196 bm.failed_requests += 1
197 bm.success_rate = 1.0 - (bm.failed_requests / max(1, bm.total_requests))
199 if tok_per_s > 0:
200 bm.avg_tok_per_s = (bm.avg_tok_per_s * (1 - alpha) + tok_per_s * alpha
201 if bm.avg_tok_per_s > 0 else tok_per_s)
202 if ttft_ms > 0:
203 bm.avg_ttft_ms = (bm.avg_ttft_ms * (1 - alpha) + ttft_ms * alpha
204 if bm.avg_ttft_ms > 0 else ttft_ms)
205 if e2e_ms > 0:
206 bm.avg_e2e_ms = (bm.avg_e2e_ms * (1 - alpha) + e2e_ms * alpha
207 if bm.avg_e2e_ms > 0 else e2e_ms)
208 if cost_usd > 0:
209 bm.avg_cost_per_request = (bm.avg_cost_per_request * (1 - alpha) + cost_usd * alpha
210 if bm.avg_cost_per_request > 0 else cost_usd)
211 if output_tokens > 0 and cost_usd > 0:
212 cpt = (cost_usd / output_tokens) * 1000
213 bm.cost_per_1k_output_tokens = (
214 bm.cost_per_1k_output_tokens * (1 - alpha) + cpt * alpha
215 if bm.cost_per_1k_output_tokens > 0 else cpt)
217 bm.sample_count += 1
218 bm.last_live_update = time.time()
219 bm.compute_efficiency()
221 # Periodic save (every 10 requests)
222 if bm.total_requests % 10 == 0:
223 self.save()
225 # ── Benchmarking (run during idle via ResourceGovernor) ───────────
227 def run_benchmark(self, provider_id: str = '', model_type: str = 'llm'):
228 """Run lightweight benchmark against one or all providers.
230 Called by ResourceGovernor during idle time.
231 """
232 from integrations.providers.registry import get_registry
234 registry = get_registry()
235 providers = ([registry.get(provider_id)] if provider_id
236 else registry.list_api_providers())
238 for provider in providers:
239 if not provider or not provider.has_api_key():
240 continue
242 for pm in provider.models.values():
243 if pm.model_type != model_type or not pm.enabled:
244 continue
246 # Skip if recently benchmarked (within 1 hour)
247 key = self._key(provider.id, pm.model_id)
248 bm = self._benchmarks.get(key)
249 if bm and (time.time() - bm.last_benchmark) < 3600:
250 continue
252 logger.info("Benchmarking %s on %s...", pm.model_id, provider.id)
253 self._benchmark_model(provider, pm)
255 def _benchmark_model(self, provider, provider_model):
256 """Run benchmark tasks against a specific model."""
257 from integrations.providers.gateway import get_gateway
259 gw = get_gateway()
260 results = []
262 for task in _BENCHMARK_TASKS:
263 if task.model_type != provider_model.model_type:
264 continue
266 try:
267 t0 = time.time()
268 result = gw.generate(
269 task.prompt,
270 model_type=task.model_type,
271 provider_id=provider.id,
272 model_id=provider_model.model_id,
273 max_tokens=task.max_tokens,
274 temperature=0.3, # Low temp for consistent benchmarks
275 )
276 elapsed_ms = (time.time() - t0) * 1000
278 if result.success:
279 # Score quality by checking expected keywords
280 quality = self._score_quality(result.content, task)
281 results.append({
282 'task': task.id,
283 'tok_per_s': result.tok_per_s,
284 'e2e_ms': elapsed_ms,
285 'cost': result.cost_usd,
286 'quality': quality,
287 'output_tokens': result.usage.get('output_tokens', 0),
288 'success': True,
289 })
290 else:
291 results.append({
292 'task': task.id, 'success': False,
293 'error': result.error,
294 })
295 except Exception as e:
296 logger.debug("Benchmark task %s failed: %s", task.id, e)
297 results.append({'task': task.id, 'success': False, 'error': str(e)})
299 # Aggregate results into benchmark entry
300 if results:
301 self._aggregate_benchmark(provider.id, provider_model.model_id,
302 provider_model.model_type, results)
304 def _score_quality(self, content: str, task: BenchmarkTask) -> float:
305 """Score response quality (0-1) based on expected keywords and length."""
306 if not content:
307 return 0.0
309 score = 0.3 # Base score for any non-empty response
311 # Keyword matching
312 if task.expected_keywords:
313 content_lower = content.lower()
314 matches = sum(1 for kw in task.expected_keywords
315 if kw.lower() in content_lower)
316 keyword_score = matches / len(task.expected_keywords)
317 score += keyword_score * 0.5
319 # Length appropriateness (penalize very short or very long)
320 words = len(content.split())
321 if 10 <= words <= task.max_tokens:
322 score += 0.2
323 elif words >= 5:
324 score += 0.1
326 return min(1.0, score)
328 def _aggregate_benchmark(self, provider_id, model_id, model_type, results):
329 """Aggregate benchmark task results into a single ModelBenchmark."""
330 key = self._key(provider_id, model_id)
331 successes = [r for r in results if r.get('success')]
333 with self._lock:
334 bm = self._benchmarks.get(key)
335 if not bm:
336 bm = ModelBenchmark(
337 provider_id=provider_id, model_id=model_id,
338 model_type=model_type,
339 )
340 self._benchmarks[key] = bm
342 if successes:
343 bm.avg_tok_per_s = sum(r['tok_per_s'] for r in successes) / len(successes)
344 bm.avg_e2e_ms = sum(r['e2e_ms'] for r in successes) / len(successes)
345 bm.quality_score = sum(r['quality'] for r in successes) / len(successes)
346 total_cost = sum(r['cost'] for r in successes)
347 total_tokens = sum(r.get('output_tokens', 0) for r in successes)
348 if total_tokens > 0 and total_cost > 0:
349 bm.cost_per_1k_output_tokens = (total_cost / total_tokens) * 1000
350 bm.avg_cost_per_request = total_cost / len(successes)
352 bm.success_rate = len(successes) / len(results) if results else 0
353 bm.last_benchmark = time.time()
354 bm.compute_efficiency()
356 logger.info("Benchmark complete: %s on %s — efficiency=%.3f, "
357 "tok/s=%.1f, quality=%.2f, success=%.0f%%",
358 model_id, provider_id, bm.efficiency_score,
359 bm.avg_tok_per_s, bm.quality_score,
360 bm.success_rate * 100)
361 self.save()
363 # ── Query API ─────────────────────────────────────────────────────
365 def get_benchmark(self, provider_id: str, model_id: str) -> Optional[ModelBenchmark]:
366 return self._benchmarks.get(self._key(provider_id, model_id))
368 def get_leaderboard(self, model_type: str = 'llm',
369 sort_by: str = 'efficiency') -> List[ModelBenchmark]:
370 """Return benchmarks sorted by efficiency, speed, quality, or cost."""
371 entries = [bm for bm in self._benchmarks.values()
372 if bm.model_type == model_type and bm.total_requests > 0]
374 key_map = {
375 'efficiency': lambda b: b.efficiency_score,
376 'speed': lambda b: b.avg_tok_per_s,
377 'quality': lambda b: b.quality_score,
378 'cost': lambda b: -b.cost_per_1k_output_tokens, # Lower cost = better
379 'reliability': lambda b: b.success_rate,
380 }
381 entries.sort(key=key_map.get(sort_by, key_map['efficiency']), reverse=True)
382 return entries
384 def get_matrix_summary(self) -> Dict[str, Any]:
385 """Return a summary for dashboards."""
386 return {
387 'total_entries': len(self._benchmarks),
388 'total_benchmark_requests': sum(
389 bm.total_requests for bm in self._benchmarks.values()),
390 'by_type': {
391 mt: len([bm for bm in self._benchmarks.values()
392 if bm.model_type == mt])
393 for mt in set(bm.model_type for bm in self._benchmarks.values())
394 },
395 'top_efficient': [
396 {'provider': bm.provider_id, 'model': bm.model_id,
397 'efficiency': round(bm.efficiency_score, 3)}
398 for bm in sorted(self._benchmarks.values(),
399 key=lambda b: b.efficiency_score, reverse=True)[:5]
400 ],
401 }
404# ═══════════════════════════════════════════════════════════════════════
405# Singleton
406# ═══════════════════════════════════════════════════════════════════════
408_matrix: Optional[EfficiencyMatrix] = None
409_matrix_lock = threading.Lock()
412def get_matrix() -> EfficiencyMatrix:
413 global _matrix
414 if _matrix is None:
415 with _matrix_lock:
416 if _matrix is None:
417 _matrix = EfficiencyMatrix()
418 return _matrix