Coverage for integrations / ap2 / ap2_protocol.py: 0.0%

224 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2AP2 (Agent Protocol 2) - Agentic Commerce Module 

3 

4This module enables payment workflows for agents, allowing them to request, 

5authorize, and complete payment transactions as part of their task execution. 

6 

7Features: 

8- Payment request creation and management 

9- Multi-agent payment coordination 

10- Payment gateway integration (Stripe, PayPal, etc.) 

11- Transaction ledger with audit trail 

12- Secure payment handling with PCI compliance patterns 

13- Integration with task_ledger for workflow tracking 

14- Integration with A2A for multi-agent coordination 

15""" 

16 

17import json 

18import logging 

19import threading 

20import uuid 

21import os 

22from typing import Dict, List, Any, Optional, Callable 

23from datetime import datetime 

24from enum import Enum 

25from decimal import Decimal 

26import hashlib 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class PaymentStatus(str, Enum): 

32 """Payment transaction status lifecycle""" 

33 # Initial states 

34 PENDING = "pending" # Payment request created 

35 AUTHORIZED = "authorized" # Payment authorized but not captured 

36 

37 # Processing states 

38 PROCESSING = "processing" # Payment being processed 

39 APPROVAL_REQUIRED = "approval_required" # Requires user/admin approval 

40 

41 # Terminal states 

42 COMPLETED = "completed" # Payment successfully completed 

43 FAILED = "failed" # Payment failed 

44 CANCELLED = "cancelled" # Payment cancelled 

45 REFUNDED = "refunded" # Payment refunded 

46 EXPIRED = "expired" # Payment authorization expired 

47 

48 

49class PaymentMethod(str, Enum): 

50 """Supported payment methods""" 

51 CREDIT_CARD = "credit_card" 

52 DEBIT_CARD = "debit_card" 

53 BANK_TRANSFER = "bank_transfer" 

54 PAYPAL = "paypal" 

55 STRIPE = "stripe" 

56 CRYPTO = "cryptocurrency" 

57 INTERNAL_CREDITS = "internal_credits" # For testing or internal workflows 

58 

59 

60class PaymentGateway(str, Enum): 

61 """Supported payment gateways""" 

62 STRIPE = "stripe" 

63 PAYPAL = "paypal" 

64 SQUARE = "square" 

65 BRAINTREE = "braintree" 

66 MOCK = "mock" # For testing 

67 

68 

69class PaymentRequest: 

70 """Represents a payment request from an agent""" 

71 

72 def __init__( 

73 self, 

74 amount: Decimal, 

75 currency: str, 

76 description: str, 

77 requester_agent_id: str, 

78 payment_method: PaymentMethod = PaymentMethod.INTERNAL_CREDITS, 

79 metadata: Optional[Dict[str, Any]] = None 

80 ): 

81 """ 

82 Initialize a payment request 

83 

84 Args: 

85 amount: Payment amount 

86 currency: Currency code (USD, EUR, etc.) 

87 description: Human-readable description 

88 requester_agent_id: Agent requesting payment 

89 payment_method: Payment method to use 

90 metadata: Additional payment metadata 

91 """ 

92 self.payment_id = str(uuid.uuid4()) 

93 self.amount = Decimal(str(amount)) 

94 self.currency = currency.upper() 

95 self.description = description 

96 self.requester_agent_id = requester_agent_id 

97 self.payment_method = payment_method 

98 self.metadata = metadata or {} 

99 

100 self.status = PaymentStatus.PENDING 

101 self.created_at = datetime.now() 

102 self.updated_at = datetime.now() 

103 self.completed_at = None 

104 

105 self.gateway = None 

106 self.gateway_transaction_id = None 

107 self.approval_chain = [] # Track who approved 

108 self.error_message = None 

109 

110 def to_dict(self) -> Dict[str, Any]: 

111 """Convert payment request to dictionary""" 

112 return { 

113 'payment_id': self.payment_id, 

114 'amount': str(self.amount), 

115 'currency': self.currency, 

116 'description': self.description, 

117 'requester_agent_id': self.requester_agent_id, 

118 'payment_method': self.payment_method.value, 

119 'status': self.status.value, 

120 'created_at': self.created_at.isoformat(), 

121 'updated_at': self.updated_at.isoformat(), 

122 'completed_at': self.completed_at.isoformat() if self.completed_at else None, 

123 'gateway': self.gateway.value if self.gateway else None, 

124 'gateway_transaction_id': self.gateway_transaction_id, 

125 'approval_chain': self.approval_chain, 

126 'error_message': self.error_message, 

127 'metadata': self.metadata 

128 } 

129 

130 def update_status(self, new_status: PaymentStatus, message: Optional[str] = None): 

131 """Update payment status""" 

132 old_status = self.status 

133 self.status = new_status 

134 self.updated_at = datetime.now() 

135 

136 if new_status in [PaymentStatus.COMPLETED, PaymentStatus.FAILED, 

137 PaymentStatus.CANCELLED, PaymentStatus.REFUNDED]: 

138 self.completed_at = datetime.now() 

139 

140 if message: 

141 self.error_message = message 

142 

143 logger.info(f"Payment {self.payment_id} status: {old_status} -> {new_status}") 

144 

145 

146class PaymentGatewayConnector: 

147 """Base class for payment gateway connectors""" 

148 

149 def __init__(self, gateway: PaymentGateway, api_key: Optional[str] = None, config: Optional[Dict] = None): 

150 """ 

151 Initialize payment gateway connector 

152 

153 Args: 

154 gateway: Payment gateway type 

155 api_key: Gateway API key 

156 config: Additional gateway configuration 

157 """ 

158 self.gateway = gateway 

159 self.api_key = api_key 

160 self.config = config or {} 

161 self.connected = False 

162 

163 def connect(self) -> bool: 

164 """Connect to payment gateway""" 

165 raise NotImplementedError("Subclasses must implement connect()") 

166 

167 def create_payment(self, payment_request: PaymentRequest) -> Dict[str, Any]: 

168 """Create a payment transaction""" 

169 raise NotImplementedError("Subclasses must implement create_payment()") 

170 

171 def capture_payment(self, payment_id: str, gateway_transaction_id: str) -> Dict[str, Any]: 

172 """Capture an authorized payment""" 

173 raise NotImplementedError("Subclasses must implement capture_payment()") 

174 

175 def refund_payment(self, payment_id: str, gateway_transaction_id: str, amount: Optional[Decimal] = None) -> Dict[str, Any]: 

176 """Refund a payment""" 

177 raise NotImplementedError("Subclasses must implement refund_payment()") 

178 

179 

180class MockPaymentGateway(PaymentGatewayConnector): 

181 """Mock payment gateway for testing""" 

182 

183 def __init__(self, **kwargs): 

184 super().__init__(PaymentGateway.MOCK, **kwargs) 

185 self.transactions = {} 

186 

187 def connect(self) -> bool: 

188 """Mock connection always succeeds""" 

189 self.connected = True 

190 logger.info("Connected to mock payment gateway") 

191 return True 

192 

193 def create_payment(self, payment_request: PaymentRequest) -> Dict[str, Any]: 

194 """Create a mock payment transaction""" 

195 transaction_id = f"mock_txn_{uuid.uuid4().hex[:12]}" 

196 

197 self.transactions[transaction_id] = { 

198 'payment_id': payment_request.payment_id, 

199 'amount': str(payment_request.amount), 

200 'currency': payment_request.currency, 

201 'status': 'authorized', 

202 'created_at': datetime.now().isoformat() 

203 } 

204 

205 logger.info(f"Mock payment created: {transaction_id} for ${payment_request.amount}") 

206 

207 return { 

208 'success': True, 

209 'transaction_id': transaction_id, 

210 'status': 'authorized', 

211 'message': 'Mock payment authorized successfully' 

212 } 

213 

214 def capture_payment(self, payment_id: str, gateway_transaction_id: str) -> Dict[str, Any]: 

215 """Capture a mock payment""" 

216 if gateway_transaction_id in self.transactions: 

217 self.transactions[gateway_transaction_id]['status'] = 'captured' 

218 logger.info(f"Mock payment captured: {gateway_transaction_id}") 

219 return { 

220 'success': True, 

221 'status': 'captured', 

222 'message': 'Mock payment captured successfully' 

223 } 

224 else: 

225 return { 

226 'success': False, 

227 'error': 'Transaction not found' 

228 } 

229 

230 def refund_payment(self, payment_id: str, gateway_transaction_id: str, amount: Optional[Decimal] = None) -> Dict[str, Any]: 

231 """Refund a mock payment""" 

232 if gateway_transaction_id in self.transactions: 

233 self.transactions[gateway_transaction_id]['status'] = 'refunded' 

234 logger.info(f"Mock payment refunded: {gateway_transaction_id}") 

235 return { 

236 'success': True, 

237 'status': 'refunded', 

238 'message': 'Mock payment refunded successfully' 

239 } 

240 else: 

241 return { 

242 'success': False, 

243 'error': 'Transaction not found' 

244 } 

245 

246 

247class PaymentLedger: 

248 """ 

249 Payment transaction ledger with audit trail 

250 

251 Integrates with task_ledger to track payment workflows 

252 """ 

253 

254 def __init__(self, ledger_path: str = "agent_data/payment_ledger.json"): 

255 """ 

256 Initialize payment ledger 

257 

258 Args: 

259 ledger_path: Path to persist payment ledger 

260 """ 

261 self.ledger_path = ledger_path 

262 self.payments: Dict[str, PaymentRequest] = {} 

263 self.lock = threading.Lock() 

264 self.gateways: Dict[PaymentGateway, PaymentGatewayConnector] = {} 

265 

266 # Default to mock gateway for testing 

267 self.add_gateway(MockPaymentGateway()) 

268 

269 self.load_ledger() 

270 

271 def add_gateway(self, gateway: PaymentGatewayConnector): 

272 """Add a payment gateway connector""" 

273 with self.lock: 

274 self.gateways[gateway.gateway] = gateway 

275 gateway.connect() 

276 logger.info(f"Added payment gateway: {gateway.gateway.value}") 

277 

278 def create_payment_request( 

279 self, 

280 amount: Decimal, 

281 currency: str, 

282 description: str, 

283 requester_agent_id: str, 

284 payment_method: PaymentMethod = PaymentMethod.INTERNAL_CREDITS, 

285 gateway: PaymentGateway = PaymentGateway.MOCK, 

286 metadata: Optional[Dict[str, Any]] = None 

287 ) -> PaymentRequest: 

288 """ 

289 Create a new payment request 

290 

291 Args: 

292 amount: Payment amount 

293 currency: Currency code 

294 description: Payment description 

295 requester_agent_id: Agent requesting payment 

296 payment_method: Payment method 

297 gateway: Payment gateway to use 

298 metadata: Additional metadata 

299 

300 Returns: 

301 PaymentRequest object 

302 """ 

303 with self.lock: 

304 payment = PaymentRequest( 

305 amount=amount, 

306 currency=currency, 

307 description=description, 

308 requester_agent_id=requester_agent_id, 

309 payment_method=payment_method, 

310 metadata=metadata 

311 ) 

312 

313 payment.gateway = gateway 

314 self.payments[payment.payment_id] = payment 

315 

316 logger.info(f"Created payment request: {payment.payment_id} - ${amount} {currency}") 

317 

318 self.save_ledger() 

319 return payment 

320 

321 def authorize_payment(self, payment_id: str, approver_id: str) -> bool: 

322 """ 

323 Authorize a payment request 

324 

325 Args: 

326 payment_id: Payment ID to authorize 

327 approver_id: ID of the approver (user or agent) 

328 

329 Returns: 

330 True if authorization successful 

331 """ 

332 with self.lock: 

333 if payment_id not in self.payments: 

334 logger.error(f"Payment not found: {payment_id}") 

335 return False 

336 

337 payment = self.payments[payment_id] 

338 

339 if payment.status != PaymentStatus.PENDING: 

340 logger.warning(f"Payment {payment_id} not in pending state: {payment.status}") 

341 return False 

342 

343 # Add to approval chain 

344 payment.approval_chain.append({ 

345 'approver_id': approver_id, 

346 'approved_at': datetime.now().isoformat(), 

347 'action': 'authorized' 

348 }) 

349 

350 payment.update_status(PaymentStatus.AUTHORIZED) 

351 

352 logger.info(f"Payment {payment_id} authorized by {approver_id}") 

353 

354 self.save_ledger() 

355 return True 

356 

357 def process_payment(self, payment_id: str) -> Dict[str, Any]: 

358 """ 

359 Process an authorized payment through the gateway 

360 

361 Args: 

362 payment_id: Payment ID to process 

363 

364 Returns: 

365 Processing result 

366 """ 

367 with self.lock: 

368 if payment_id not in self.payments: 

369 return {'success': False, 'error': 'Payment not found'} 

370 

371 payment = self.payments[payment_id] 

372 

373 if payment.status != PaymentStatus.AUTHORIZED: 

374 return { 

375 'success': False, 

376 'error': f'Payment not authorized: {payment.status.value}' 

377 } 

378 

379 # Get appropriate gateway 

380 gateway = self.gateways.get(payment.gateway) 

381 if not gateway: 

382 payment.update_status(PaymentStatus.FAILED, "Gateway not available") 

383 self.save_ledger() 

384 return {'success': False, 'error': 'Gateway not available'} 

385 

386 # Create payment in gateway 

387 payment.update_status(PaymentStatus.PROCESSING) 

388 self.save_ledger() 

389 

390 try: 

391 result = gateway.create_payment(payment) 

392 

393 if result.get('success'): 

394 payment.gateway_transaction_id = result.get('transaction_id') 

395 

396 # Capture payment immediately for now 

397 capture_result = gateway.capture_payment( 

398 payment.payment_id, 

399 payment.gateway_transaction_id 

400 ) 

401 

402 if capture_result.get('success'): 

403 payment.update_status(PaymentStatus.COMPLETED) 

404 logger.info(f"Payment {payment_id} completed successfully") 

405 else: 

406 payment.update_status( 

407 PaymentStatus.FAILED, 

408 capture_result.get('error', 'Capture failed') 

409 ) 

410 

411 self.save_ledger() 

412 return capture_result 

413 else: 

414 payment.update_status( 

415 PaymentStatus.FAILED, 

416 result.get('error', 'Gateway returned failure') 

417 ) 

418 self.save_ledger() 

419 return result 

420 

421 except Exception as e: 

422 logger.error(f"Error processing payment {payment_id}: {e}") 

423 payment.update_status(PaymentStatus.FAILED, str(e)) 

424 self.save_ledger() 

425 return {'success': False, 'error': str(e)} 

426 

427 def get_payment(self, payment_id: str) -> Optional[PaymentRequest]: 

428 """Get payment request by ID""" 

429 with self.lock: 

430 return self.payments.get(payment_id) 

431 

432 def list_payments( 

433 self, 

434 agent_id: Optional[str] = None, 

435 status: Optional[PaymentStatus] = None 

436 ) -> List[PaymentRequest]: 

437 """ 

438 List payment requests with optional filters 

439 

440 Args: 

441 agent_id: Filter by agent ID 

442 status: Filter by status 

443 

444 Returns: 

445 List of matching payment requests 

446 """ 

447 with self.lock: 

448 results = list(self.payments.values()) 

449 

450 if agent_id: 

451 results = [p for p in results if p.requester_agent_id == agent_id] 

452 

453 if status: 

454 results = [p for p in results if p.status == status] 

455 

456 # Sort by creation time (newest first) 

457 results.sort(key=lambda p: p.created_at, reverse=True) 

458 

459 return results 

460 

461 def save_ledger(self): 

462 """Save payment ledger to disk""" 

463 try: 

464 os.makedirs(os.path.dirname(self.ledger_path), exist_ok=True) 

465 

466 data = { 

467 'payments': { 

468 pid: payment.to_dict() 

469 for pid, payment in self.payments.items() 

470 }, 

471 'last_updated': datetime.now().isoformat() 

472 } 

473 

474 with open(self.ledger_path, 'w') as f: 

475 json.dump(data, f, indent=2) 

476 

477 logger.debug(f"Payment ledger saved to {self.ledger_path}") 

478 except Exception as e: 

479 logger.error(f"Failed to save payment ledger: {e}") 

480 

481 def load_ledger(self): 

482 """Load payment ledger from disk""" 

483 try: 

484 if os.path.exists(self.ledger_path): 

485 with open(self.ledger_path, 'r') as f: 

486 data = json.load(f) 

487 

488 # Reconstruct payment objects 

489 for pid, payment_data in data.get('payments', {}).items(): 

490 payment = PaymentRequest( 

491 amount=Decimal(payment_data['amount']), 

492 currency=payment_data['currency'], 

493 description=payment_data['description'], 

494 requester_agent_id=payment_data['requester_agent_id'], 

495 payment_method=PaymentMethod(payment_data['payment_method']), 

496 metadata=payment_data.get('metadata', {}) 

497 ) 

498 

499 # Restore state 

500 payment.payment_id = pid 

501 payment.status = PaymentStatus(payment_data['status']) 

502 payment.created_at = datetime.fromisoformat(payment_data['created_at']) 

503 payment.updated_at = datetime.fromisoformat(payment_data['updated_at']) 

504 if payment_data.get('completed_at'): 

505 payment.completed_at = datetime.fromisoformat(payment_data['completed_at']) 

506 payment.gateway = PaymentGateway(payment_data['gateway']) if payment_data.get('gateway') else None 

507 payment.gateway_transaction_id = payment_data.get('gateway_transaction_id') 

508 payment.approval_chain = payment_data.get('approval_chain', []) 

509 payment.error_message = payment_data.get('error_message') 

510 

511 self.payments[pid] = payment 

512 

513 logger.info(f"Loaded {len(self.payments)} payments from ledger") 

514 except Exception as e: 

515 logger.warning(f"Could not load payment ledger: {e}") 

516 

517 

518# Global payment ledger instance 

519payment_ledger = PaymentLedger() 

520 

521 

522def create_payment_request_function(agent_name: str) -> Callable: 

523 """ 

524 Create a payment request function for an agent 

525 

526 Args: 

527 agent_name: Name of the agent 

528 

529 Returns: 

530 Function that can be registered with autogen 

531 """ 

532 def request_payment( 

533 amount: float, 

534 currency: str, 

535 description: str, 

536 payment_method: str = "internal_credits" 

537 ) -> str: 

538 """ 

539 Request a payment as part of agent workflow 

540 

541 Args: 

542 amount: Payment amount 

543 currency: Currency code (USD, EUR, etc.) 

544 description: Payment description 

545 payment_method: Payment method (credit_card, paypal, etc.) 

546 

547 Returns: 

548 JSON string with payment details 

549 """ 

550 try: 

551 method = PaymentMethod(payment_method.lower()) 

552 except ValueError: 

553 method = PaymentMethod.INTERNAL_CREDITS 

554 

555 payment = payment_ledger.create_payment_request( 

556 amount=Decimal(str(amount)), 

557 currency=currency, 

558 description=description, 

559 requester_agent_id=agent_name, 

560 payment_method=method, 

561 gateway=PaymentGateway.MOCK 

562 ) 

563 

564 return json.dumps({ 

565 'payment_id': payment.payment_id, 

566 'amount': str(payment.amount), 

567 'currency': payment.currency, 

568 'status': payment.status.value, 

569 'message': f'Payment request created. Awaiting authorization.' 

570 }, indent=2) 

571 

572 return request_payment 

573 

574 

575def create_payment_authorization_function() -> Callable: 

576 """ 

577 Create a payment authorization function 

578 

579 Returns: 

580 Function that can be used to authorize payments 

581 """ 

582 def authorize_payment(payment_id: str, approver_id: str = "system") -> str: 

583 """ 

584 Authorize a payment request 

585 

586 Args: 

587 payment_id: Payment ID to authorize 

588 approver_id: ID of the approver 

589 

590 Returns: 

591 Authorization result 

592 """ 

593 success = payment_ledger.authorize_payment(payment_id, approver_id) 

594 

595 if success: 

596 return json.dumps({ 

597 'success': True, 

598 'payment_id': payment_id, 

599 'message': 'Payment authorized successfully' 

600 }, indent=2) 

601 else: 

602 return json.dumps({ 

603 'success': False, 

604 'payment_id': payment_id, 

605 'error': 'Authorization failed' 

606 }, indent=2) 

607 

608 return authorize_payment 

609 

610 

611def create_payment_processing_function() -> Callable: 

612 """ 

613 Create a payment processing function 

614 

615 Returns: 

616 Function that can be used to process payments 

617 """ 

618 def process_payment(payment_id: str) -> str: 

619 """ 

620 Process an authorized payment 

621 

622 Args: 

623 payment_id: Payment ID to process 

624 

625 Returns: 

626 Processing result 

627 """ 

628 result = payment_ledger.process_payment(payment_id) 

629 return json.dumps(result, indent=2) 

630 

631 return process_payment 

632 

633 

634def get_ap2_tools_for_autogen(agent_name: str) -> List[Dict[str, Any]]: 

635 """ 

636 Get AP2 payment tools for autogen agent registration 

637 

638 Args: 

639 agent_name: Name of the agent 

640 

641 Returns: 

642 List of tool definitions for autogen 

643 """ 

644 return [ 

645 { 

646 'function': create_payment_request_function(agent_name), 

647 'name': 'request_payment', 

648 'description': 'Request a payment transaction for services or resources. Returns payment_id for tracking.' 

649 }, 

650 { 

651 'function': create_payment_authorization_function(), 

652 'name': 'authorize_payment', 

653 'description': 'Authorize a pending payment request. Requires payment_id.' 

654 }, 

655 { 

656 'function': create_payment_processing_function(), 

657 'name': 'process_payment', 

658 'description': 'Process an authorized payment through the gateway. Requires payment_id.' 

659 } 

660 ] 

661 

662 

663# Convenience exports 

664__all__ = [ 

665 'PaymentStatus', 'PaymentMethod', 'PaymentGateway', 

666 'PaymentRequest', 'PaymentLedger', 'PaymentGatewayConnector', 

667 'MockPaymentGateway', 'payment_ledger', 

668 'create_payment_request_function', 'create_payment_authorization_function', 

669 'create_payment_processing_function', 'get_ap2_tools_for_autogen' 

670]