Coverage for integrations / coding_agent / benchmark_tracker.py: 97.7%
86 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Coding Agent Benchmark Tracker — SQLite-backed performance tracking.
4Records task completion time and success rate per tool, task type, and model.
5Exports compact deltas for hive distributed learning via FederatedAggregator.
7DB location: agent_data/coding_benchmarks.db
8"""
9import logging
10import os
11import sqlite3
12import threading
13import time
14from typing import Dict, List, Optional, Tuple
16logger = logging.getLogger('hevolve.coding_agent')
18_DB_PATH = os.path.join(
19 os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
20 'agent_data', 'coding_benchmarks.db'
21)
23# Minimum samples before a tool is considered "benchmarked" for a task type
24MIN_SAMPLES = 5
27class BenchmarkTracker:
28 """SQLite benchmark tracker — thread-safe singleton."""
30 def __init__(self, db_path: str = _DB_PATH):
31 self._db_path = db_path
32 self._lock = threading.Lock()
33 self._init_db()
35 def _init_db(self):
36 with self._lock:
37 conn = sqlite3.connect(self._db_path)
38 conn.execute('''
39 CREATE TABLE IF NOT EXISTS benchmarks (
40 id INTEGER PRIMARY KEY AUTOINCREMENT,
41 task_type TEXT NOT NULL,
42 tool_name TEXT NOT NULL,
43 model_name TEXT DEFAULT '',
44 user_id TEXT DEFAULT '',
45 completion_time_s REAL NOT NULL,
46 success INTEGER NOT NULL DEFAULT 0,
47 offloaded INTEGER NOT NULL DEFAULT 0,
48 timestamp REAL NOT NULL
49 )
50 ''')
51 conn.execute('''
52 CREATE TABLE IF NOT EXISTS hive_routing (
53 task_type TEXT PRIMARY KEY,
54 best_tool TEXT NOT NULL,
55 success_rate REAL NOT NULL,
56 avg_time_s REAL NOT NULL,
57 sample_count INTEGER NOT NULL,
58 updated_at REAL NOT NULL
59 )
60 ''')
61 conn.execute('''
62 CREATE INDEX IF NOT EXISTS idx_benchmarks_task_tool
63 ON benchmarks(task_type, tool_name)
64 ''')
65 conn.commit()
66 conn.close()
68 def record(self, task_type: str, tool_name: str, completion_time_s: float,
69 success: bool, model_name: str = '', user_id: str = '',
70 offloaded: bool = False):
71 """Record a benchmark entry."""
72 with self._lock:
73 conn = sqlite3.connect(self._db_path)
74 conn.execute(
75 'INSERT INTO benchmarks '
76 '(task_type, tool_name, model_name, user_id, completion_time_s, '
77 ' success, offloaded, timestamp) '
78 'VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
79 (task_type, tool_name, model_name, user_id,
80 completion_time_s, int(success), int(offloaded), time.time())
81 )
82 conn.commit()
83 conn.close()
85 def get_best_tool(self, task_type: str) -> Optional[Tuple[str, float, float]]:
86 """Get best tool for a task type based on local benchmarks.
88 Returns (tool_name, success_rate, avg_time) or None if insufficient data.
89 Requires MIN_SAMPLES entries.
90 """
91 with self._lock:
92 conn = sqlite3.connect(self._db_path)
93 rows = conn.execute('''
94 SELECT tool_name,
95 AVG(success) as success_rate,
96 AVG(completion_time_s) as avg_time,
97 COUNT(*) as cnt
98 FROM benchmarks
99 WHERE task_type = ?
100 GROUP BY tool_name
101 HAVING cnt >= ?
102 ORDER BY success_rate DESC, avg_time ASC
103 LIMIT 1
104 ''', (task_type, MIN_SAMPLES)).fetchall()
105 conn.close()
107 if rows:
108 return (rows[0][0], rows[0][1], rows[0][2])
109 return None
111 def get_hive_best_tool(self, task_type: str) -> Optional[Tuple[str, float, float]]:
112 """Get best tool from hive-aggregated intelligence.
114 Returns (tool_name, success_rate, avg_time_s) or None — same shape
115 as get_best_tool() so callers can index consistently.
116 """
117 with self._lock:
118 conn = sqlite3.connect(self._db_path)
119 row = conn.execute(
120 'SELECT best_tool, success_rate, avg_time_s FROM hive_routing WHERE task_type = ?',
121 (task_type,)
122 ).fetchone()
123 conn.close()
124 return (row[0], row[1], row[2]) if row else None
126 def get_summary(self) -> Dict:
127 """Dashboard summary data."""
128 with self._lock:
129 conn = sqlite3.connect(self._db_path)
130 total = conn.execute('SELECT COUNT(*) FROM benchmarks').fetchone()[0]
131 by_tool = conn.execute('''
132 SELECT tool_name,
133 COUNT(*) as total,
134 AVG(success) as success_rate,
135 AVG(completion_time_s) as avg_time
136 FROM benchmarks
137 GROUP BY tool_name
138 ''').fetchall()
139 by_task = conn.execute('''
140 SELECT task_type,
141 tool_name,
142 COUNT(*) as total,
143 AVG(success) as success_rate,
144 AVG(completion_time_s) as avg_time
145 FROM benchmarks
146 GROUP BY task_type, tool_name
147 ORDER BY task_type, success_rate DESC
148 ''').fetchall()
149 conn.close()
151 return {
152 'total_benchmarks': total,
153 'by_tool': [
154 {'tool': r[0], 'total': r[1],
155 'success_rate': round(r[2], 3), 'avg_time_s': round(r[3], 2)}
156 for r in by_tool
157 ],
158 'by_task_type': [
159 {'task_type': r[0], 'tool': r[1], 'total': r[2],
160 'success_rate': round(r[3], 3), 'avg_time_s': round(r[4], 2)}
161 for r in by_task
162 ],
163 }
165 # ─── Hive learning integration ───
167 def export_learning_delta(self) -> Optional[Dict]:
168 """Export benchmark stats as a compact delta for hive learning.
170 Format: {task_type → {tool → {success_rate, avg_time, count}}}
171 Only exports task types with MIN_SAMPLES data.
172 """
173 with self._lock:
174 conn = sqlite3.connect(self._db_path)
175 rows = conn.execute('''
176 SELECT task_type, tool_name,
177 AVG(success) as sr, AVG(completion_time_s) as at,
178 COUNT(*) as cnt
179 FROM benchmarks
180 GROUP BY task_type, tool_name
181 HAVING cnt >= ?
182 ''', (MIN_SAMPLES,)).fetchall()
183 conn.close()
185 if not rows:
186 return None
188 delta = {}
189 for task_type, tool, sr, at, cnt in rows:
190 if task_type not in delta:
191 delta[task_type] = {}
192 delta[task_type][tool] = {
193 'success_rate': round(sr, 3),
194 'avg_time_s': round(at, 2),
195 'sample_count': cnt,
196 }
198 return {'coding_benchmarks': delta, 'ts': time.time()}
200 def import_hive_delta(self, aggregated: Dict):
201 """Apply hive-aggregated routing intelligence to local hive_routing table.
203 Merges peer benchmarks with a decay factor — local data always
204 takes priority over hive data.
205 """
206 benchmarks = aggregated.get('coding_benchmarks', {})
207 if not benchmarks:
208 return
210 with self._lock:
211 conn = sqlite3.connect(self._db_path)
212 for task_type, tools in benchmarks.items():
213 if not tools:
214 continue
215 # Find best tool across hive peers
216 best = max(tools.items(),
217 key=lambda x: (x[1].get('success_rate', 0),
218 -x[1].get('avg_time_s', 999)))
219 tool_name, stats = best
220 conn.execute('''
221 INSERT INTO hive_routing (task_type, best_tool, success_rate,
222 avg_time_s, sample_count, updated_at)
223 VALUES (?, ?, ?, ?, ?, ?)
224 ON CONFLICT(task_type) DO UPDATE SET
225 best_tool = excluded.best_tool,
226 success_rate = excluded.success_rate,
227 avg_time_s = excluded.avg_time_s,
228 sample_count = excluded.sample_count,
229 updated_at = excluded.updated_at
230 ''', (task_type, tool_name,
231 stats.get('success_rate', 0),
232 stats.get('avg_time_s', 0),
233 stats.get('sample_count', 0),
234 time.time()))
235 conn.commit()
236 conn.close()
237 logger.info(f"Imported hive routing delta for {len(benchmarks)} task types")
240# ─── Module-level singleton ───
241_tracker = None
242_tracker_lock = threading.Lock()
245def get_benchmark_tracker() -> BenchmarkTracker:
246 """Get or create the singleton BenchmarkTracker."""
247 global _tracker
248 if _tracker is None:
249 with _tracker_lock:
250 if _tracker is None:
251 _tracker = BenchmarkTracker()
252 return _tracker