Coverage for integrations / distributed_agent / verification_protocol.py: 15.8%
95 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Verification Protocol — multi-agent consensus for distributed task results.
4Steps:
51. SUBMIT: Agent completes task, computes result hash
62. VERIFY: Other agents independently verify (hash match + optional LLM review)
73. CONSENSUS: If majority agree (>50%), result accepted. Else → task back to PENDING.
84. BASELINE: After acceptance, new snapshot created.
9"""
11import logging
12from datetime import datetime
13from typing import Any, Dict, List, Optional
15from agent_ledger.core import SmartLedger, TaskStatus
16from agent_ledger.verification import TaskVerification, TaskBaseline
18logger = logging.getLogger(__name__)
21class VerificationProtocol:
22 """Multi-step verification protocol for distributed task results."""
24 MIN_VERIFIERS = 2
25 CONSENSUS_THRESHOLD = 0.5 # >50% must agree
27 def __init__(
28 self,
29 ledger: SmartLedger,
30 verifier: Optional[TaskVerification] = None,
31 baseline: Optional[TaskBaseline] = None,
32 ):
33 self._ledger = ledger
34 self._verifier = verifier or TaskVerification()
35 self._baseline = baseline or TaskBaseline(ledger.backend)
37 def request_verification(self, task_id: str) -> Dict[str, Any]:
38 """
39 Create a verification request for a completed task.
41 Returns verification request info including result hash.
42 """
43 task = self._ledger.get_task(task_id)
44 if not task:
45 return {"error": f"Task {task_id} not found"}
47 if task.status != TaskStatus.COMPLETED:
48 return {"error": f"Task {task_id} not completed (status={task.status})"}
50 result_hash = task.context.get("result_hash")
51 if not result_hash and task.result is not None:
52 result_hash = TaskVerification.compute_result_hash(task.result)
53 task.context["result_hash"] = result_hash
54 self._ledger.save()
56 request = {
57 "task_id": task_id,
58 "result_hash": result_hash,
59 "requested_at": datetime.now().isoformat(),
60 "min_verifiers": self.MIN_VERIFIERS,
61 }
63 # Publish via PubSub if available
64 if hasattr(self._ledger, '_pubsub') and self._ledger._pubsub:
65 self._ledger._pubsub.publish_verification_request(task_id, result_hash or "")
67 logger.info(f"Verification requested for task {task_id}")
68 return request
70 def submit_verification(
71 self,
72 task_id: str,
73 verifying_agent: str,
74 verdict: bool,
75 evidence: Optional[Dict[str, Any]] = None,
76 ) -> None:
77 """Submit a verification verdict."""
78 task = self._ledger.get_task(task_id)
79 if not task:
80 logger.warning(f"Cannot verify task {task_id}: not found")
81 return
83 result_hash = task.context.get("result_hash", "")
84 self._verifier.record_verification(task_id, result_hash, verifying_agent, verdict)
86 if evidence:
87 # Store evidence in task context
88 if "verification_evidence" not in task.context:
89 task.context["verification_evidence"] = []
90 task.context["verification_evidence"].append({
91 "agent": verifying_agent,
92 "verdict": verdict,
93 "evidence": evidence,
94 "timestamp": datetime.now().isoformat(),
95 })
96 self._ledger.save()
98 logger.info(f"Verification submitted: task={task_id} agent={verifying_agent} verdict={verdict}")
100 def check_consensus(self, task_id: str) -> Dict[str, Any]:
101 """
102 Check if consensus has been reached.
104 Returns: {consensus_reached, accepted, verifications, agreed, total}
105 """
106 status = self._verifier.get_verification_status(task_id)
108 total = status.get("total", 0)
109 agreed = status.get("agreed", 0)
111 enough_verifiers = total >= self.MIN_VERIFIERS
112 accepted = agreed > total * self.CONSENSUS_THRESHOLD if total > 0 else False
113 consensus_reached = enough_verifiers
115 result = {
116 "task_id": task_id,
117 "consensus_reached": consensus_reached,
118 "accepted": accepted if consensus_reached else None,
119 "agreed": agreed,
120 "total": total,
121 "min_verifiers": self.MIN_VERIFIERS,
122 "threshold": self.CONSENSUS_THRESHOLD,
123 }
125 # Auto-handle rejection if consensus reached and rejected
126 if consensus_reached and not accepted:
127 self.handle_rejection(task_id, "Consensus rejected the result")
128 result["action_taken"] = "task_reset_to_pending"
130 # Auto-baseline if accepted
131 if consensus_reached and accepted:
132 snap_id = self._baseline.create_snapshot(self._ledger, label=f"verified_{task_id}")
133 result["baseline_snapshot"] = snap_id
134 self._notify_verification_accepted(task_id)
136 return result
138 def _notify_verification_accepted(self, task_id: str):
139 """Notify the agent owner that their contribution was verified by consensus."""
140 try:
141 task = self._ledger.get_task(task_id)
142 if not task:
143 return
144 agent_id = task.context.get("claimed_by")
145 if not agent_id:
146 return
148 parent = self._ledger.get_task(task.parent_task_id) if task.parent_task_id else None
149 objective = parent.context.get("objective", "a distributed goal") if parent else "a distributed goal"
151 try:
152 from flask import g as flask_g
153 db = flask_g.db
154 owns_session = False
155 except (RuntimeError, AttributeError):
156 from integrations.social.models import get_db
157 db = get_db()
158 owns_session = True
160 from integrations.social.services import NotificationService
161 message = f'Your contribution to "{objective}" was verified by peer consensus!'
162 notif = NotificationService.create(
163 db, agent_id, 'goal_verified',
164 source_user_id=None,
165 target_type='goal',
166 target_id=task.parent_task_id or task_id,
167 message=message,
168 )
170 if owns_session:
171 db.commit()
172 db.close()
174 try:
175 from integrations.social.realtime import on_notification
176 on_notification(agent_id, notif.to_dict())
177 except Exception:
178 pass
180 logger.info(f"Notified user {agent_id}: verification accepted for {task_id}")
181 except Exception as e:
182 logger.warning(f"Failed to notify verification consensus: {e}")
184 def handle_rejection(self, task_id: str, reason: str) -> None:
185 """Handle a rejected result — rollback task to PENDING."""
186 task = self._ledger.get_task(task_id)
187 if not task:
188 return
190 # Rollback completed task, then retry from PENDING if rollback succeeds
191 if task.status == TaskStatus.COMPLETED:
192 task.rollback(reason=reason)
193 # For non-terminal states, fail the task so it can be retried
194 if not task.is_terminal():
195 task.fail(error=reason, reason="Verification rejected")
197 logger.info(f"Task {task_id} rejected and reset to PENDING: {reason}")