Coverage for integrations / agent_engine / hive_consensus.py: 87.3%
110 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HiveConsensus — the 4-of-4 democratic gate on every upgrade.
4From the ml_intern brief §3.4:
5 No single DemonstrationProbe can unilaterally declare its agent
6 "best". The verdict is a consensus across:
7 - the probe's own measurement (numerical),
8 - peer probes on OTHER nodes (federated),
9 - the constitutional filter (check_prompt / check_goal),
10 - the hive circuit breaker (is_halted → veto).
11 If any of the four votes "no", the upgrade does not land.
13This module is the ONLY path that may authorize a write to a seeded
14agent's system prompt or weights. Anywhere in HARTOS that wants to
15change a trained agent (prompt edit, weight swap, policy promotion)
16MUST call HiveConsensus.upgrade_proposal() first; a True return is
17the precondition for the write.
19Vote sources:
20 local_probe — most recent ProbeResult.agent_wins for this goal_type
21 peer_probe_quorum — federated_aggregator reports at least 3 peer
22 probes for the same goal_type with agent_wins=True
23 constitutional — ConstitutionalFilter.check_prompt on the proposed
24 new content passes, AND check_code_change on any
25 protected-file diff passes
26 circuit_breaker — HiveCircuitBreaker.is_halted() is False
28All four must return (passed=True, reason=...) for the proposal to be
29approved. The outcome plus the vote dict is written to
30reasoning_trace.record_decision() so the audit log is complete.
31"""
32from __future__ import annotations
34import logging
35from dataclasses import dataclass, field, asdict
36from typing import Any, Dict, List, Optional, Tuple
38from . import reasoning_trace
40logger = logging.getLogger('hevolve_social')
43# Minimum independent peer-probes that must agree our agent wins before
44# the peer_probe_quorum vote can pass. Matches the brief's "peer-
45# probe quorum ≥ 3".
46PEER_PROBE_QUORUM_MIN = 3
49@dataclass
50class Vote:
51 name: str
52 passed: bool
53 reason: str = ''
55 def to_dict(self) -> Dict[str, Any]:
56 return asdict(self)
59@dataclass
60class ConsensusDecision:
61 approved: bool
62 votes: List[Vote] = field(default_factory=list)
63 subject: Dict[str, Any] = field(default_factory=dict)
65 @property
66 def reason(self) -> str:
67 failed = [v for v in self.votes if not v.passed]
68 if not failed:
69 return 'all 4 votes passed'
70 return '; '.join(f'{v.name}: {v.reason}' for v in failed)
72 def to_dict(self) -> Dict[str, Any]:
73 return {
74 'approved': self.approved,
75 'votes': [v.to_dict() for v in self.votes],
76 'subject': self.subject,
77 'reason': self.reason,
78 }
81class HiveConsensus:
82 """4-of-4 gate. Stateless — use the module-level helpers."""
84 @classmethod
85 def upgrade_proposal(
86 cls,
87 prompt_id: str,
88 goal_type: str,
89 new_content: str,
90 probe_evidence: Optional[Dict[str, Any]] = None,
91 target_files: Optional[List[str]] = None,
92 user_id: Optional[str] = None,
93 ) -> ConsensusDecision:
94 """Evaluate an upgrade proposal against the 4 vote sources.
96 Args:
97 prompt_id: canonical persona identity of the seeded agent
98 whose prompt/weights are changing (LOCAL_AGENTS /
99 SEED_BOOTSTRAP_GOALS identity). Per the ml_intern
100 brief correlation-id contract, this is the carrier
101 field — NOT an ad-hoc agent_id. When a per-user LoRA
102 overlay is involved, pass user_id as well; the
103 underlying prompt/weight upgrade itself is still
104 shared across users of the same prompt_id.
105 goal_type: goal_type for peer-probe quorum lookup
106 new_content: the proposed new system prompt or description
107 probe_evidence: optional dict containing local probe
108 verdict + baseline deltas; when omitted we look it up
109 from storage.get_latest_result(goal_type).
110 target_files: list of files the upgrade intends to touch.
111 Consulted by ConstitutionalFilter.check_code_change().
112 user_id: optional per-user scope for LoRA overlays. Does
113 not affect voting; recorded in the reasoning trace
114 for audit.
116 Returns:
117 ConsensusDecision with votes list + final approved bool.
118 NEVER raises. Any unexpected failure results in a
119 rejection with reason="unexpected: ..." so the system
120 fails closed, not open.
121 """
122 subject = {
123 'prompt_id': prompt_id,
124 'goal_type': goal_type,
125 'new_content_preview': (new_content or '')[:300],
126 'target_files': list(target_files or []),
127 }
128 if user_id:
129 subject['user_id'] = user_id
131 votes: List[Vote] = []
132 votes.append(cls._vote_circuit_breaker())
133 votes.append(cls._vote_constitutional(new_content, target_files or []))
134 votes.append(cls._vote_local_probe(goal_type, probe_evidence))
135 votes.append(cls._vote_peer_probe_quorum(goal_type))
137 approved = all(v.passed for v in votes)
138 decision = ConsensusDecision(
139 approved=approved, votes=votes, subject=subject,
140 )
141 reasoning_trace.record_decision(
142 action='upgrade_proposal',
143 approved=approved,
144 votes={v.name: v.to_dict() for v in votes},
145 subject=subject,
146 reason=decision.reason,
147 )
148 return decision
150 # ─── Individual votes ───
152 @classmethod
153 def _vote_circuit_breaker(cls) -> Vote:
154 try:
155 from security.hive_guardrails import HiveCircuitBreaker
156 if HiveCircuitBreaker.is_halted():
157 return Vote('circuit_breaker', False,
158 f'hive halted: {HiveCircuitBreaker.get_status()}')
159 return Vote('circuit_breaker', True, 'not halted')
160 except ImportError as exc:
161 return Vote('circuit_breaker', False,
162 f'guardrails unavailable: {exc}')
163 except Exception as exc:
164 return Vote('circuit_breaker', False, f'unexpected: {exc}')
166 @classmethod
167 def _vote_constitutional(
168 cls, new_content: str, target_files: List[str],
169 ) -> Vote:
170 try:
171 from security.hive_guardrails import ConstitutionalFilter
172 passed_prompt, reason_prompt = ConstitutionalFilter.check_prompt(
173 new_content or ''
174 )
175 if not passed_prompt:
176 return Vote('constitutional', False, reason_prompt)
177 if target_files:
178 # Structural immutability gate: any proposal that touches
179 # a PROTECTED_FILES entry is rejected regardless of content.
180 passed_code, reason_code = ConstitutionalFilter.check_code_change(
181 diff='', target_files=target_files,
182 )
183 if not passed_code:
184 return Vote('constitutional', False, reason_code)
185 return Vote('constitutional', True, 'ok')
186 except ImportError as exc:
187 return Vote('constitutional', False,
188 f'guardrails unavailable: {exc}')
189 except Exception as exc:
190 return Vote('constitutional', False, f'unexpected: {exc}')
192 @classmethod
193 def _vote_local_probe(
194 cls,
195 goal_type: str,
196 probe_evidence: Optional[Dict[str, Any]],
197 ) -> Vote:
198 """Read the local benchmark leaderboard for this goal's benchmark.
200 We DO NOT maintain a separate probe store — `_Leaderboard` in
201 hive_benchmark_prover already holds per-benchmark best scores +
202 improvement_history. The caller can pass probe_evidence
203 explicitly when they've run a one-off measurement; otherwise
204 we read from the leaderboard using benchmark name convention
205 `goal:{goal_type}` (set by the post-dispatch hook).
206 """
207 evidence = probe_evidence
208 if evidence is None:
209 try:
210 from .hive_benchmark_prover import get_benchmark_prover
211 prover = get_benchmark_prover()
212 best = prover._leaderboard.get_best_scores() or {}
213 comparisons = prover._leaderboard.compare_to_baselines() or {}
214 bench_key = f'goal:{goal_type}'
215 best_entry = best.get(bench_key)
216 if not best_entry:
217 return Vote('local_probe', False,
218 f'no leaderboard entry for {bench_key}')
219 comp = comparisons.get(bench_key) or {}
220 margin = comp.get('margin_vs_best')
221 if margin is None:
222 # No public baseline to compare against — accept
223 # on score alone when it's above the neutral 0.5.
224 if best_entry.get('score', 0.0) >= 0.5:
225 return Vote('local_probe', True,
226 f'score={best_entry["score"]:.4f} '
227 '(no public baseline)')
228 return Vote('local_probe', False,
229 f'score={best_entry["score"]:.4f} <0.5')
230 if margin > 0:
231 return Vote('local_probe', True,
232 f'margin_vs_best={margin:.4f}')
233 return Vote('local_probe', False,
234 f'margin_vs_best={margin:.4f} <=0')
235 except Exception as exc:
236 return Vote('local_probe', False,
237 f'leaderboard unavailable: {exc}')
238 if not evidence:
239 return Vote('local_probe', False,
240 f'no probe result for goal_type={goal_type}')
241 # Legacy shape: explicit probe_evidence dict carrying agent_wins /
242 # margin / delta. Prefer margin_vs_best, fall back to agent_wins.
243 if 'margin_vs_best' in evidence:
244 m = float(evidence['margin_vs_best'] or 0.0)
245 if m > 0:
246 return Vote('local_probe', True, f'margin={m:.4f}')
247 return Vote('local_probe', False, f'margin={m:.4f} <=0')
248 if evidence.get('agent_wins'):
249 return Vote('local_probe', True,
250 f'agent_wins score={evidence.get("score", 0.0):.4f}')
251 return Vote('local_probe', False,
252 f'evidence does not show a win: {evidence}')
254 @classmethod
255 def _vote_peer_probe_quorum(cls, goal_type: str) -> Vote:
256 """Count peers that independently beat the baseline for this goal.
258 Source: federated_aggregator's `_peer_deltas` — each delta
259 carries `benchmark_results` (see federated_aggregator line
260 405). We count peers whose benchmark score for
261 `goal:{goal_type}` beats their own baseline. When running
262 single-node (no peers), we pass with a note — the other three
263 votes still must pass.
264 """
265 try:
266 from .federated_aggregator import get_federated_aggregator
267 agg = get_federated_aggregator()
268 with agg._lock:
269 peer_deltas = dict(agg._peer_deltas)
270 if not peer_deltas:
271 return Vote('peer_probe_quorum', True,
272 'single-node / no peers connected')
273 bench_key = f'goal:{goal_type}'
274 agreeing = 0
275 for _node_id, delta in peer_deltas.items():
276 bench = (delta or {}).get('benchmark_results') or {}
277 entry = bench.get(bench_key)
278 if not isinstance(entry, dict):
279 continue
280 score = float(entry.get('value') or entry.get('score') or 0.0)
281 baseline = float(entry.get('baseline') or 0.0)
282 if score > baseline and score > 0.5:
283 agreeing += 1
284 if agreeing >= PEER_PROBE_QUORUM_MIN:
285 return Vote('peer_probe_quorum', True,
286 f'{agreeing} peers agree')
287 # Peers connected but few agree — still acceptable when
288 # fewer than 3 peers exist at all (we can't reach quorum
289 # structurally). Fail only when peers exist and explicitly
290 # disagree.
291 if len(peer_deltas) < PEER_PROBE_QUORUM_MIN:
292 return Vote('peer_probe_quorum', True,
293 f'{len(peer_deltas)} peers '
294 f'(<{PEER_PROBE_QUORUM_MIN}) — '
295 f'{agreeing} agree')
296 return Vote('peer_probe_quorum', False,
297 f'only {agreeing}/{len(peer_deltas)} peers agree '
298 f'(need >={PEER_PROBE_QUORUM_MIN})')
299 except ImportError as exc:
300 return Vote('peer_probe_quorum', True,
301 f'federation unavailable: {exc}')
302 except Exception as exc:
303 return Vote('peer_probe_quorum', False, f'unexpected: {exc}')
306def upgrade_proposal(
307 prompt_id: str,
308 goal_type: str,
309 new_content: str,
310 probe_evidence: Optional[Dict[str, Any]] = None,
311 target_files: Optional[List[str]] = None,
312 user_id: Optional[str] = None,
313) -> ConsensusDecision:
314 """Convenience wrapper — `HiveConsensus.upgrade_proposal(...)`."""
315 return HiveConsensus.upgrade_proposal(
316 prompt_id=prompt_id,
317 goal_type=goal_type,
318 new_content=new_content,
319 probe_evidence=probe_evidence,
320 target_files=target_files,
321 user_id=user_id,
322 )