Coverage for integrations / agent_engine / hive_consensus.py: 87.3%

110 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HiveConsensus — the 4-of-4 democratic gate on every upgrade. 

3 

4From the ml_intern brief §3.4: 

5 No single DemonstrationProbe can unilaterally declare its agent 

6 "best". The verdict is a consensus across: 

7 - the probe's own measurement (numerical), 

8 - peer probes on OTHER nodes (federated), 

9 - the constitutional filter (check_prompt / check_goal), 

10 - the hive circuit breaker (is_halted → veto). 

11 If any of the four votes "no", the upgrade does not land. 

12 

13This module is the ONLY path that may authorize a write to a seeded 

14agent's system prompt or weights. Anywhere in HARTOS that wants to 

15change a trained agent (prompt edit, weight swap, policy promotion) 

16MUST call HiveConsensus.upgrade_proposal() first; a True return is 

17the precondition for the write. 

18 

19Vote sources: 

20 local_probe — most recent ProbeResult.agent_wins for this goal_type 

21 peer_probe_quorum — federated_aggregator reports at least 3 peer 

22 probes for the same goal_type with agent_wins=True 

23 constitutional — ConstitutionalFilter.check_prompt on the proposed 

24 new content passes, AND check_code_change on any 

25 protected-file diff passes 

26 circuit_breaker — HiveCircuitBreaker.is_halted() is False 

27 

28All four must return (passed=True, reason=...) for the proposal to be 

29approved. The outcome plus the vote dict is written to 

30reasoning_trace.record_decision() so the audit log is complete. 

31""" 

32from __future__ import annotations 

33 

34import logging 

35from dataclasses import dataclass, field, asdict 

36from typing import Any, Dict, List, Optional, Tuple 

37 

38from . import reasoning_trace 

39 

40logger = logging.getLogger('hevolve_social') 

41 

42 

43# Minimum independent peer-probes that must agree our agent wins before 

44# the peer_probe_quorum vote can pass. Matches the brief's "peer- 

45# probe quorum ≥ 3". 

46PEER_PROBE_QUORUM_MIN = 3 

47 

48 

49@dataclass 

50class Vote: 

51 name: str 

52 passed: bool 

53 reason: str = '' 

54 

55 def to_dict(self) -> Dict[str, Any]: 

56 return asdict(self) 

57 

58 

59@dataclass 

60class ConsensusDecision: 

61 approved: bool 

62 votes: List[Vote] = field(default_factory=list) 

63 subject: Dict[str, Any] = field(default_factory=dict) 

64 

65 @property 

66 def reason(self) -> str: 

67 failed = [v for v in self.votes if not v.passed] 

68 if not failed: 

69 return 'all 4 votes passed' 

70 return '; '.join(f'{v.name}: {v.reason}' for v in failed) 

71 

72 def to_dict(self) -> Dict[str, Any]: 

73 return { 

74 'approved': self.approved, 

75 'votes': [v.to_dict() for v in self.votes], 

76 'subject': self.subject, 

77 'reason': self.reason, 

78 } 

79 

80 

81class HiveConsensus: 

82 """4-of-4 gate. Stateless — use the module-level helpers.""" 

83 

84 @classmethod 

85 def upgrade_proposal( 

86 cls, 

87 prompt_id: str, 

88 goal_type: str, 

89 new_content: str, 

90 probe_evidence: Optional[Dict[str, Any]] = None, 

91 target_files: Optional[List[str]] = None, 

92 user_id: Optional[str] = None, 

93 ) -> ConsensusDecision: 

94 """Evaluate an upgrade proposal against the 4 vote sources. 

95 

96 Args: 

97 prompt_id: canonical persona identity of the seeded agent 

98 whose prompt/weights are changing (LOCAL_AGENTS / 

99 SEED_BOOTSTRAP_GOALS identity). Per the ml_intern 

100 brief correlation-id contract, this is the carrier 

101 field — NOT an ad-hoc agent_id. When a per-user LoRA 

102 overlay is involved, pass user_id as well; the 

103 underlying prompt/weight upgrade itself is still 

104 shared across users of the same prompt_id. 

105 goal_type: goal_type for peer-probe quorum lookup 

106 new_content: the proposed new system prompt or description 

107 probe_evidence: optional dict containing local probe 

108 verdict + baseline deltas; when omitted we look it up 

109 from storage.get_latest_result(goal_type). 

110 target_files: list of files the upgrade intends to touch. 

111 Consulted by ConstitutionalFilter.check_code_change(). 

112 user_id: optional per-user scope for LoRA overlays. Does 

113 not affect voting; recorded in the reasoning trace 

114 for audit. 

115 

116 Returns: 

117 ConsensusDecision with votes list + final approved bool. 

118 NEVER raises. Any unexpected failure results in a 

119 rejection with reason="unexpected: ..." so the system 

120 fails closed, not open. 

121 """ 

122 subject = { 

123 'prompt_id': prompt_id, 

124 'goal_type': goal_type, 

125 'new_content_preview': (new_content or '')[:300], 

126 'target_files': list(target_files or []), 

127 } 

128 if user_id: 

129 subject['user_id'] = user_id 

130 

131 votes: List[Vote] = [] 

132 votes.append(cls._vote_circuit_breaker()) 

133 votes.append(cls._vote_constitutional(new_content, target_files or [])) 

134 votes.append(cls._vote_local_probe(goal_type, probe_evidence)) 

135 votes.append(cls._vote_peer_probe_quorum(goal_type)) 

136 

137 approved = all(v.passed for v in votes) 

138 decision = ConsensusDecision( 

139 approved=approved, votes=votes, subject=subject, 

140 ) 

141 reasoning_trace.record_decision( 

142 action='upgrade_proposal', 

143 approved=approved, 

144 votes={v.name: v.to_dict() for v in votes}, 

145 subject=subject, 

146 reason=decision.reason, 

147 ) 

148 return decision 

149 

150 # ─── Individual votes ─── 

151 

152 @classmethod 

153 def _vote_circuit_breaker(cls) -> Vote: 

154 try: 

155 from security.hive_guardrails import HiveCircuitBreaker 

156 if HiveCircuitBreaker.is_halted(): 

157 return Vote('circuit_breaker', False, 

158 f'hive halted: {HiveCircuitBreaker.get_status()}') 

159 return Vote('circuit_breaker', True, 'not halted') 

160 except ImportError as exc: 

161 return Vote('circuit_breaker', False, 

162 f'guardrails unavailable: {exc}') 

163 except Exception as exc: 

164 return Vote('circuit_breaker', False, f'unexpected: {exc}') 

165 

166 @classmethod 

167 def _vote_constitutional( 

168 cls, new_content: str, target_files: List[str], 

169 ) -> Vote: 

170 try: 

171 from security.hive_guardrails import ConstitutionalFilter 

172 passed_prompt, reason_prompt = ConstitutionalFilter.check_prompt( 

173 new_content or '' 

174 ) 

175 if not passed_prompt: 

176 return Vote('constitutional', False, reason_prompt) 

177 if target_files: 

178 # Structural immutability gate: any proposal that touches 

179 # a PROTECTED_FILES entry is rejected regardless of content. 

180 passed_code, reason_code = ConstitutionalFilter.check_code_change( 

181 diff='', target_files=target_files, 

182 ) 

183 if not passed_code: 

184 return Vote('constitutional', False, reason_code) 

185 return Vote('constitutional', True, 'ok') 

186 except ImportError as exc: 

187 return Vote('constitutional', False, 

188 f'guardrails unavailable: {exc}') 

189 except Exception as exc: 

190 return Vote('constitutional', False, f'unexpected: {exc}') 

191 

192 @classmethod 

193 def _vote_local_probe( 

194 cls, 

195 goal_type: str, 

196 probe_evidence: Optional[Dict[str, Any]], 

197 ) -> Vote: 

198 """Read the local benchmark leaderboard for this goal's benchmark. 

199 

200 We DO NOT maintain a separate probe store — `_Leaderboard` in 

201 hive_benchmark_prover already holds per-benchmark best scores + 

202 improvement_history. The caller can pass probe_evidence 

203 explicitly when they've run a one-off measurement; otherwise 

204 we read from the leaderboard using benchmark name convention 

205 `goal:{goal_type}` (set by the post-dispatch hook). 

206 """ 

207 evidence = probe_evidence 

208 if evidence is None: 

209 try: 

210 from .hive_benchmark_prover import get_benchmark_prover 

211 prover = get_benchmark_prover() 

212 best = prover._leaderboard.get_best_scores() or {} 

213 comparisons = prover._leaderboard.compare_to_baselines() or {} 

214 bench_key = f'goal:{goal_type}' 

215 best_entry = best.get(bench_key) 

216 if not best_entry: 

217 return Vote('local_probe', False, 

218 f'no leaderboard entry for {bench_key}') 

219 comp = comparisons.get(bench_key) or {} 

220 margin = comp.get('margin_vs_best') 

221 if margin is None: 

222 # No public baseline to compare against — accept 

223 # on score alone when it's above the neutral 0.5. 

224 if best_entry.get('score', 0.0) >= 0.5: 

225 return Vote('local_probe', True, 

226 f'score={best_entry["score"]:.4f} ' 

227 '(no public baseline)') 

228 return Vote('local_probe', False, 

229 f'score={best_entry["score"]:.4f} <0.5') 

230 if margin > 0: 

231 return Vote('local_probe', True, 

232 f'margin_vs_best={margin:.4f}') 

233 return Vote('local_probe', False, 

234 f'margin_vs_best={margin:.4f} <=0') 

235 except Exception as exc: 

236 return Vote('local_probe', False, 

237 f'leaderboard unavailable: {exc}') 

238 if not evidence: 

239 return Vote('local_probe', False, 

240 f'no probe result for goal_type={goal_type}') 

241 # Legacy shape: explicit probe_evidence dict carrying agent_wins / 

242 # margin / delta. Prefer margin_vs_best, fall back to agent_wins. 

243 if 'margin_vs_best' in evidence: 

244 m = float(evidence['margin_vs_best'] or 0.0) 

245 if m > 0: 

246 return Vote('local_probe', True, f'margin={m:.4f}') 

247 return Vote('local_probe', False, f'margin={m:.4f} <=0') 

248 if evidence.get('agent_wins'): 

249 return Vote('local_probe', True, 

250 f'agent_wins score={evidence.get("score", 0.0):.4f}') 

251 return Vote('local_probe', False, 

252 f'evidence does not show a win: {evidence}') 

253 

254 @classmethod 

255 def _vote_peer_probe_quorum(cls, goal_type: str) -> Vote: 

256 """Count peers that independently beat the baseline for this goal. 

257 

258 Source: federated_aggregator's `_peer_deltas` — each delta 

259 carries `benchmark_results` (see federated_aggregator line 

260 405). We count peers whose benchmark score for 

261 `goal:{goal_type}` beats their own baseline. When running 

262 single-node (no peers), we pass with a note — the other three 

263 votes still must pass. 

264 """ 

265 try: 

266 from .federated_aggregator import get_federated_aggregator 

267 agg = get_federated_aggregator() 

268 with agg._lock: 

269 peer_deltas = dict(agg._peer_deltas) 

270 if not peer_deltas: 

271 return Vote('peer_probe_quorum', True, 

272 'single-node / no peers connected') 

273 bench_key = f'goal:{goal_type}' 

274 agreeing = 0 

275 for _node_id, delta in peer_deltas.items(): 

276 bench = (delta or {}).get('benchmark_results') or {} 

277 entry = bench.get(bench_key) 

278 if not isinstance(entry, dict): 

279 continue 

280 score = float(entry.get('value') or entry.get('score') or 0.0) 

281 baseline = float(entry.get('baseline') or 0.0) 

282 if score > baseline and score > 0.5: 

283 agreeing += 1 

284 if agreeing >= PEER_PROBE_QUORUM_MIN: 

285 return Vote('peer_probe_quorum', True, 

286 f'{agreeing} peers agree') 

287 # Peers connected but few agree — still acceptable when 

288 # fewer than 3 peers exist at all (we can't reach quorum 

289 # structurally). Fail only when peers exist and explicitly 

290 # disagree. 

291 if len(peer_deltas) < PEER_PROBE_QUORUM_MIN: 

292 return Vote('peer_probe_quorum', True, 

293 f'{len(peer_deltas)} peers ' 

294 f'(<{PEER_PROBE_QUORUM_MIN}) — ' 

295 f'{agreeing} agree') 

296 return Vote('peer_probe_quorum', False, 

297 f'only {agreeing}/{len(peer_deltas)} peers agree ' 

298 f'(need >={PEER_PROBE_QUORUM_MIN})') 

299 except ImportError as exc: 

300 return Vote('peer_probe_quorum', True, 

301 f'federation unavailable: {exc}') 

302 except Exception as exc: 

303 return Vote('peer_probe_quorum', False, f'unexpected: {exc}') 

304 

305 

306def upgrade_proposal( 

307 prompt_id: str, 

308 goal_type: str, 

309 new_content: str, 

310 probe_evidence: Optional[Dict[str, Any]] = None, 

311 target_files: Optional[List[str]] = None, 

312 user_id: Optional[str] = None, 

313) -> ConsensusDecision: 

314 """Convenience wrapper — `HiveConsensus.upgrade_proposal(...)`.""" 

315 return HiveConsensus.upgrade_proposal( 

316 prompt_id=prompt_id, 

317 goal_type=goal_type, 

318 new_content=new_content, 

319 probe_evidence=probe_evidence, 

320 target_files=target_files, 

321 user_id=user_id, 

322 )