Coverage for integrations / coding_agent / benchmark_tracker.py: 97.7%

86 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Coding Agent Benchmark Tracker — SQLite-backed performance tracking. 

3 

4Records task completion time and success rate per tool, task type, and model. 

5Exports compact deltas for hive distributed learning via FederatedAggregator. 

6 

7DB location: agent_data/coding_benchmarks.db 

8""" 

9import logging 

10import os 

11import sqlite3 

12import threading 

13import time 

14from typing import Dict, List, Optional, Tuple 

15 

16logger = logging.getLogger('hevolve.coding_agent') 

17 

18_DB_PATH = os.path.join( 

19 os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 

20 'agent_data', 'coding_benchmarks.db' 

21) 

22 

23# Minimum samples before a tool is considered "benchmarked" for a task type 

24MIN_SAMPLES = 5 

25 

26 

27class BenchmarkTracker: 

28 """SQLite benchmark tracker — thread-safe singleton.""" 

29 

30 def __init__(self, db_path: str = _DB_PATH): 

31 self._db_path = db_path 

32 self._lock = threading.Lock() 

33 self._init_db() 

34 

35 def _init_db(self): 

36 with self._lock: 

37 conn = sqlite3.connect(self._db_path) 

38 conn.execute(''' 

39 CREATE TABLE IF NOT EXISTS benchmarks ( 

40 id INTEGER PRIMARY KEY AUTOINCREMENT, 

41 task_type TEXT NOT NULL, 

42 tool_name TEXT NOT NULL, 

43 model_name TEXT DEFAULT '', 

44 user_id TEXT DEFAULT '', 

45 completion_time_s REAL NOT NULL, 

46 success INTEGER NOT NULL DEFAULT 0, 

47 offloaded INTEGER NOT NULL DEFAULT 0, 

48 timestamp REAL NOT NULL 

49 ) 

50 ''') 

51 conn.execute(''' 

52 CREATE TABLE IF NOT EXISTS hive_routing ( 

53 task_type TEXT PRIMARY KEY, 

54 best_tool TEXT NOT NULL, 

55 success_rate REAL NOT NULL, 

56 avg_time_s REAL NOT NULL, 

57 sample_count INTEGER NOT NULL, 

58 updated_at REAL NOT NULL 

59 ) 

60 ''') 

61 conn.execute(''' 

62 CREATE INDEX IF NOT EXISTS idx_benchmarks_task_tool 

63 ON benchmarks(task_type, tool_name) 

64 ''') 

65 conn.commit() 

66 conn.close() 

67 

68 def record(self, task_type: str, tool_name: str, completion_time_s: float, 

69 success: bool, model_name: str = '', user_id: str = '', 

70 offloaded: bool = False): 

71 """Record a benchmark entry.""" 

72 with self._lock: 

73 conn = sqlite3.connect(self._db_path) 

74 conn.execute( 

75 'INSERT INTO benchmarks ' 

76 '(task_type, tool_name, model_name, user_id, completion_time_s, ' 

77 ' success, offloaded, timestamp) ' 

78 'VALUES (?, ?, ?, ?, ?, ?, ?, ?)', 

79 (task_type, tool_name, model_name, user_id, 

80 completion_time_s, int(success), int(offloaded), time.time()) 

81 ) 

82 conn.commit() 

83 conn.close() 

84 

85 def get_best_tool(self, task_type: str) -> Optional[Tuple[str, float, float]]: 

86 """Get best tool for a task type based on local benchmarks. 

87 

88 Returns (tool_name, success_rate, avg_time) or None if insufficient data. 

89 Requires MIN_SAMPLES entries. 

90 """ 

91 with self._lock: 

92 conn = sqlite3.connect(self._db_path) 

93 rows = conn.execute(''' 

94 SELECT tool_name, 

95 AVG(success) as success_rate, 

96 AVG(completion_time_s) as avg_time, 

97 COUNT(*) as cnt 

98 FROM benchmarks 

99 WHERE task_type = ? 

100 GROUP BY tool_name 

101 HAVING cnt >= ? 

102 ORDER BY success_rate DESC, avg_time ASC 

103 LIMIT 1 

104 ''', (task_type, MIN_SAMPLES)).fetchall() 

105 conn.close() 

106 

107 if rows: 

108 return (rows[0][0], rows[0][1], rows[0][2]) 

109 return None 

110 

111 def get_hive_best_tool(self, task_type: str) -> Optional[Tuple[str, float, float]]: 

112 """Get best tool from hive-aggregated intelligence. 

113 

114 Returns (tool_name, success_rate, avg_time_s) or None — same shape 

115 as get_best_tool() so callers can index consistently. 

116 """ 

117 with self._lock: 

118 conn = sqlite3.connect(self._db_path) 

119 row = conn.execute( 

120 'SELECT best_tool, success_rate, avg_time_s FROM hive_routing WHERE task_type = ?', 

121 (task_type,) 

122 ).fetchone() 

123 conn.close() 

124 return (row[0], row[1], row[2]) if row else None 

125 

126 def get_summary(self) -> Dict: 

127 """Dashboard summary data.""" 

128 with self._lock: 

129 conn = sqlite3.connect(self._db_path) 

130 total = conn.execute('SELECT COUNT(*) FROM benchmarks').fetchone()[0] 

131 by_tool = conn.execute(''' 

132 SELECT tool_name, 

133 COUNT(*) as total, 

134 AVG(success) as success_rate, 

135 AVG(completion_time_s) as avg_time 

136 FROM benchmarks 

137 GROUP BY tool_name 

138 ''').fetchall() 

139 by_task = conn.execute(''' 

140 SELECT task_type, 

141 tool_name, 

142 COUNT(*) as total, 

143 AVG(success) as success_rate, 

144 AVG(completion_time_s) as avg_time 

145 FROM benchmarks 

146 GROUP BY task_type, tool_name 

147 ORDER BY task_type, success_rate DESC 

148 ''').fetchall() 

149 conn.close() 

150 

151 return { 

152 'total_benchmarks': total, 

153 'by_tool': [ 

154 {'tool': r[0], 'total': r[1], 

155 'success_rate': round(r[2], 3), 'avg_time_s': round(r[3], 2)} 

156 for r in by_tool 

157 ], 

158 'by_task_type': [ 

159 {'task_type': r[0], 'tool': r[1], 'total': r[2], 

160 'success_rate': round(r[3], 3), 'avg_time_s': round(r[4], 2)} 

161 for r in by_task 

162 ], 

163 } 

164 

165 # ─── Hive learning integration ─── 

166 

167 def export_learning_delta(self) -> Optional[Dict]: 

168 """Export benchmark stats as a compact delta for hive learning. 

169 

170 Format: {task_type → {tool → {success_rate, avg_time, count}}} 

171 Only exports task types with MIN_SAMPLES data. 

172 """ 

173 with self._lock: 

174 conn = sqlite3.connect(self._db_path) 

175 rows = conn.execute(''' 

176 SELECT task_type, tool_name, 

177 AVG(success) as sr, AVG(completion_time_s) as at, 

178 COUNT(*) as cnt 

179 FROM benchmarks 

180 GROUP BY task_type, tool_name 

181 HAVING cnt >= ? 

182 ''', (MIN_SAMPLES,)).fetchall() 

183 conn.close() 

184 

185 if not rows: 

186 return None 

187 

188 delta = {} 

189 for task_type, tool, sr, at, cnt in rows: 

190 if task_type not in delta: 

191 delta[task_type] = {} 

192 delta[task_type][tool] = { 

193 'success_rate': round(sr, 3), 

194 'avg_time_s': round(at, 2), 

195 'sample_count': cnt, 

196 } 

197 

198 return {'coding_benchmarks': delta, 'ts': time.time()} 

199 

200 def import_hive_delta(self, aggregated: Dict): 

201 """Apply hive-aggregated routing intelligence to local hive_routing table. 

202 

203 Merges peer benchmarks with a decay factor — local data always 

204 takes priority over hive data. 

205 """ 

206 benchmarks = aggregated.get('coding_benchmarks', {}) 

207 if not benchmarks: 

208 return 

209 

210 with self._lock: 

211 conn = sqlite3.connect(self._db_path) 

212 for task_type, tools in benchmarks.items(): 

213 if not tools: 

214 continue 

215 # Find best tool across hive peers 

216 best = max(tools.items(), 

217 key=lambda x: (x[1].get('success_rate', 0), 

218 -x[1].get('avg_time_s', 999))) 

219 tool_name, stats = best 

220 conn.execute(''' 

221 INSERT INTO hive_routing (task_type, best_tool, success_rate, 

222 avg_time_s, sample_count, updated_at) 

223 VALUES (?, ?, ?, ?, ?, ?) 

224 ON CONFLICT(task_type) DO UPDATE SET 

225 best_tool = excluded.best_tool, 

226 success_rate = excluded.success_rate, 

227 avg_time_s = excluded.avg_time_s, 

228 sample_count = excluded.sample_count, 

229 updated_at = excluded.updated_at 

230 ''', (task_type, tool_name, 

231 stats.get('success_rate', 0), 

232 stats.get('avg_time_s', 0), 

233 stats.get('sample_count', 0), 

234 time.time())) 

235 conn.commit() 

236 conn.close() 

237 logger.info(f"Imported hive routing delta for {len(benchmarks)} task types") 

238 

239 

240# ─── Module-level singleton ─── 

241_tracker = None 

242_tracker_lock = threading.Lock() 

243 

244 

245def get_benchmark_tracker() -> BenchmarkTracker: 

246 """Get or create the singleton BenchmarkTracker.""" 

247 global _tracker 

248 if _tracker is None: 

249 with _tracker_lock: 

250 if _tracker is None: 

251 _tracker = BenchmarkTracker() 

252 return _tracker