Coverage for integrations / distributed_agent / verification_protocol.py: 15.8%

95 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Verification Protocol — multi-agent consensus for distributed task results. 

3 

4Steps: 

51. SUBMIT: Agent completes task, computes result hash 

62. VERIFY: Other agents independently verify (hash match + optional LLM review) 

73. CONSENSUS: If majority agree (>50%), result accepted. Else → task back to PENDING. 

84. BASELINE: After acceptance, new snapshot created. 

9""" 

10 

11import logging 

12from datetime import datetime 

13from typing import Any, Dict, List, Optional 

14 

15from agent_ledger.core import SmartLedger, TaskStatus 

16from agent_ledger.verification import TaskVerification, TaskBaseline 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class VerificationProtocol: 

22 """Multi-step verification protocol for distributed task results.""" 

23 

24 MIN_VERIFIERS = 2 

25 CONSENSUS_THRESHOLD = 0.5 # >50% must agree 

26 

27 def __init__( 

28 self, 

29 ledger: SmartLedger, 

30 verifier: Optional[TaskVerification] = None, 

31 baseline: Optional[TaskBaseline] = None, 

32 ): 

33 self._ledger = ledger 

34 self._verifier = verifier or TaskVerification() 

35 self._baseline = baseline or TaskBaseline(ledger.backend) 

36 

37 def request_verification(self, task_id: str) -> Dict[str, Any]: 

38 """ 

39 Create a verification request for a completed task. 

40 

41 Returns verification request info including result hash. 

42 """ 

43 task = self._ledger.get_task(task_id) 

44 if not task: 

45 return {"error": f"Task {task_id} not found"} 

46 

47 if task.status != TaskStatus.COMPLETED: 

48 return {"error": f"Task {task_id} not completed (status={task.status})"} 

49 

50 result_hash = task.context.get("result_hash") 

51 if not result_hash and task.result is not None: 

52 result_hash = TaskVerification.compute_result_hash(task.result) 

53 task.context["result_hash"] = result_hash 

54 self._ledger.save() 

55 

56 request = { 

57 "task_id": task_id, 

58 "result_hash": result_hash, 

59 "requested_at": datetime.now().isoformat(), 

60 "min_verifiers": self.MIN_VERIFIERS, 

61 } 

62 

63 # Publish via PubSub if available 

64 if hasattr(self._ledger, '_pubsub') and self._ledger._pubsub: 

65 self._ledger._pubsub.publish_verification_request(task_id, result_hash or "") 

66 

67 logger.info(f"Verification requested for task {task_id}") 

68 return request 

69 

70 def submit_verification( 

71 self, 

72 task_id: str, 

73 verifying_agent: str, 

74 verdict: bool, 

75 evidence: Optional[Dict[str, Any]] = None, 

76 ) -> None: 

77 """Submit a verification verdict.""" 

78 task = self._ledger.get_task(task_id) 

79 if not task: 

80 logger.warning(f"Cannot verify task {task_id}: not found") 

81 return 

82 

83 result_hash = task.context.get("result_hash", "") 

84 self._verifier.record_verification(task_id, result_hash, verifying_agent, verdict) 

85 

86 if evidence: 

87 # Store evidence in task context 

88 if "verification_evidence" not in task.context: 

89 task.context["verification_evidence"] = [] 

90 task.context["verification_evidence"].append({ 

91 "agent": verifying_agent, 

92 "verdict": verdict, 

93 "evidence": evidence, 

94 "timestamp": datetime.now().isoformat(), 

95 }) 

96 self._ledger.save() 

97 

98 logger.info(f"Verification submitted: task={task_id} agent={verifying_agent} verdict={verdict}") 

99 

100 def check_consensus(self, task_id: str) -> Dict[str, Any]: 

101 """ 

102 Check if consensus has been reached. 

103 

104 Returns: {consensus_reached, accepted, verifications, agreed, total} 

105 """ 

106 status = self._verifier.get_verification_status(task_id) 

107 

108 total = status.get("total", 0) 

109 agreed = status.get("agreed", 0) 

110 

111 enough_verifiers = total >= self.MIN_VERIFIERS 

112 accepted = agreed > total * self.CONSENSUS_THRESHOLD if total > 0 else False 

113 consensus_reached = enough_verifiers 

114 

115 result = { 

116 "task_id": task_id, 

117 "consensus_reached": consensus_reached, 

118 "accepted": accepted if consensus_reached else None, 

119 "agreed": agreed, 

120 "total": total, 

121 "min_verifiers": self.MIN_VERIFIERS, 

122 "threshold": self.CONSENSUS_THRESHOLD, 

123 } 

124 

125 # Auto-handle rejection if consensus reached and rejected 

126 if consensus_reached and not accepted: 

127 self.handle_rejection(task_id, "Consensus rejected the result") 

128 result["action_taken"] = "task_reset_to_pending" 

129 

130 # Auto-baseline if accepted 

131 if consensus_reached and accepted: 

132 snap_id = self._baseline.create_snapshot(self._ledger, label=f"verified_{task_id}") 

133 result["baseline_snapshot"] = snap_id 

134 self._notify_verification_accepted(task_id) 

135 

136 return result 

137 

138 def _notify_verification_accepted(self, task_id: str): 

139 """Notify the agent owner that their contribution was verified by consensus.""" 

140 try: 

141 task = self._ledger.get_task(task_id) 

142 if not task: 

143 return 

144 agent_id = task.context.get("claimed_by") 

145 if not agent_id: 

146 return 

147 

148 parent = self._ledger.get_task(task.parent_task_id) if task.parent_task_id else None 

149 objective = parent.context.get("objective", "a distributed goal") if parent else "a distributed goal" 

150 

151 try: 

152 from flask import g as flask_g 

153 db = flask_g.db 

154 owns_session = False 

155 except (RuntimeError, AttributeError): 

156 from integrations.social.models import get_db 

157 db = get_db() 

158 owns_session = True 

159 

160 from integrations.social.services import NotificationService 

161 message = f'Your contribution to "{objective}" was verified by peer consensus!' 

162 notif = NotificationService.create( 

163 db, agent_id, 'goal_verified', 

164 source_user_id=None, 

165 target_type='goal', 

166 target_id=task.parent_task_id or task_id, 

167 message=message, 

168 ) 

169 

170 if owns_session: 

171 db.commit() 

172 db.close() 

173 

174 try: 

175 from integrations.social.realtime import on_notification 

176 on_notification(agent_id, notif.to_dict()) 

177 except Exception: 

178 pass 

179 

180 logger.info(f"Notified user {agent_id}: verification accepted for {task_id}") 

181 except Exception as e: 

182 logger.warning(f"Failed to notify verification consensus: {e}") 

183 

184 def handle_rejection(self, task_id: str, reason: str) -> None: 

185 """Handle a rejected result — rollback task to PENDING.""" 

186 task = self._ledger.get_task(task_id) 

187 if not task: 

188 return 

189 

190 # Rollback completed task, then retry from PENDING if rollback succeeds 

191 if task.status == TaskStatus.COMPLETED: 

192 task.rollback(reason=reason) 

193 # For non-terminal states, fail the task so it can be retried 

194 if not task.is_terminal(): 

195 task.fail(error=reason, reason="Verification rejected") 

196 

197 logger.info(f"Task {task_id} rejected and reset to PENDING: {reason}")