Coverage for integrations / ap2 / ap2_protocol.py: 0.0%
224 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2AP2 (Agent Protocol 2) - Agentic Commerce Module
4This module enables payment workflows for agents, allowing them to request,
5authorize, and complete payment transactions as part of their task execution.
7Features:
8- Payment request creation and management
9- Multi-agent payment coordination
10- Payment gateway integration (Stripe, PayPal, etc.)
11- Transaction ledger with audit trail
12- Secure payment handling with PCI compliance patterns
13- Integration with task_ledger for workflow tracking
14- Integration with A2A for multi-agent coordination
15"""
17import json
18import logging
19import threading
20import uuid
21import os
22from typing import Dict, List, Any, Optional, Callable
23from datetime import datetime
24from enum import Enum
25from decimal import Decimal
26import hashlib
28logger = logging.getLogger(__name__)
31class PaymentStatus(str, Enum):
32 """Payment transaction status lifecycle"""
33 # Initial states
34 PENDING = "pending" # Payment request created
35 AUTHORIZED = "authorized" # Payment authorized but not captured
37 # Processing states
38 PROCESSING = "processing" # Payment being processed
39 APPROVAL_REQUIRED = "approval_required" # Requires user/admin approval
41 # Terminal states
42 COMPLETED = "completed" # Payment successfully completed
43 FAILED = "failed" # Payment failed
44 CANCELLED = "cancelled" # Payment cancelled
45 REFUNDED = "refunded" # Payment refunded
46 EXPIRED = "expired" # Payment authorization expired
49class PaymentMethod(str, Enum):
50 """Supported payment methods"""
51 CREDIT_CARD = "credit_card"
52 DEBIT_CARD = "debit_card"
53 BANK_TRANSFER = "bank_transfer"
54 PAYPAL = "paypal"
55 STRIPE = "stripe"
56 CRYPTO = "cryptocurrency"
57 INTERNAL_CREDITS = "internal_credits" # For testing or internal workflows
60class PaymentGateway(str, Enum):
61 """Supported payment gateways"""
62 STRIPE = "stripe"
63 PAYPAL = "paypal"
64 SQUARE = "square"
65 BRAINTREE = "braintree"
66 MOCK = "mock" # For testing
69class PaymentRequest:
70 """Represents a payment request from an agent"""
72 def __init__(
73 self,
74 amount: Decimal,
75 currency: str,
76 description: str,
77 requester_agent_id: str,
78 payment_method: PaymentMethod = PaymentMethod.INTERNAL_CREDITS,
79 metadata: Optional[Dict[str, Any]] = None
80 ):
81 """
82 Initialize a payment request
84 Args:
85 amount: Payment amount
86 currency: Currency code (USD, EUR, etc.)
87 description: Human-readable description
88 requester_agent_id: Agent requesting payment
89 payment_method: Payment method to use
90 metadata: Additional payment metadata
91 """
92 self.payment_id = str(uuid.uuid4())
93 self.amount = Decimal(str(amount))
94 self.currency = currency.upper()
95 self.description = description
96 self.requester_agent_id = requester_agent_id
97 self.payment_method = payment_method
98 self.metadata = metadata or {}
100 self.status = PaymentStatus.PENDING
101 self.created_at = datetime.now()
102 self.updated_at = datetime.now()
103 self.completed_at = None
105 self.gateway = None
106 self.gateway_transaction_id = None
107 self.approval_chain = [] # Track who approved
108 self.error_message = None
110 def to_dict(self) -> Dict[str, Any]:
111 """Convert payment request to dictionary"""
112 return {
113 'payment_id': self.payment_id,
114 'amount': str(self.amount),
115 'currency': self.currency,
116 'description': self.description,
117 'requester_agent_id': self.requester_agent_id,
118 'payment_method': self.payment_method.value,
119 'status': self.status.value,
120 'created_at': self.created_at.isoformat(),
121 'updated_at': self.updated_at.isoformat(),
122 'completed_at': self.completed_at.isoformat() if self.completed_at else None,
123 'gateway': self.gateway.value if self.gateway else None,
124 'gateway_transaction_id': self.gateway_transaction_id,
125 'approval_chain': self.approval_chain,
126 'error_message': self.error_message,
127 'metadata': self.metadata
128 }
130 def update_status(self, new_status: PaymentStatus, message: Optional[str] = None):
131 """Update payment status"""
132 old_status = self.status
133 self.status = new_status
134 self.updated_at = datetime.now()
136 if new_status in [PaymentStatus.COMPLETED, PaymentStatus.FAILED,
137 PaymentStatus.CANCELLED, PaymentStatus.REFUNDED]:
138 self.completed_at = datetime.now()
140 if message:
141 self.error_message = message
143 logger.info(f"Payment {self.payment_id} status: {old_status} -> {new_status}")
146class PaymentGatewayConnector:
147 """Base class for payment gateway connectors"""
149 def __init__(self, gateway: PaymentGateway, api_key: Optional[str] = None, config: Optional[Dict] = None):
150 """
151 Initialize payment gateway connector
153 Args:
154 gateway: Payment gateway type
155 api_key: Gateway API key
156 config: Additional gateway configuration
157 """
158 self.gateway = gateway
159 self.api_key = api_key
160 self.config = config or {}
161 self.connected = False
163 def connect(self) -> bool:
164 """Connect to payment gateway"""
165 raise NotImplementedError("Subclasses must implement connect()")
167 def create_payment(self, payment_request: PaymentRequest) -> Dict[str, Any]:
168 """Create a payment transaction"""
169 raise NotImplementedError("Subclasses must implement create_payment()")
171 def capture_payment(self, payment_id: str, gateway_transaction_id: str) -> Dict[str, Any]:
172 """Capture an authorized payment"""
173 raise NotImplementedError("Subclasses must implement capture_payment()")
175 def refund_payment(self, payment_id: str, gateway_transaction_id: str, amount: Optional[Decimal] = None) -> Dict[str, Any]:
176 """Refund a payment"""
177 raise NotImplementedError("Subclasses must implement refund_payment()")
180class MockPaymentGateway(PaymentGatewayConnector):
181 """Mock payment gateway for testing"""
183 def __init__(self, **kwargs):
184 super().__init__(PaymentGateway.MOCK, **kwargs)
185 self.transactions = {}
187 def connect(self) -> bool:
188 """Mock connection always succeeds"""
189 self.connected = True
190 logger.info("Connected to mock payment gateway")
191 return True
193 def create_payment(self, payment_request: PaymentRequest) -> Dict[str, Any]:
194 """Create a mock payment transaction"""
195 transaction_id = f"mock_txn_{uuid.uuid4().hex[:12]}"
197 self.transactions[transaction_id] = {
198 'payment_id': payment_request.payment_id,
199 'amount': str(payment_request.amount),
200 'currency': payment_request.currency,
201 'status': 'authorized',
202 'created_at': datetime.now().isoformat()
203 }
205 logger.info(f"Mock payment created: {transaction_id} for ${payment_request.amount}")
207 return {
208 'success': True,
209 'transaction_id': transaction_id,
210 'status': 'authorized',
211 'message': 'Mock payment authorized successfully'
212 }
214 def capture_payment(self, payment_id: str, gateway_transaction_id: str) -> Dict[str, Any]:
215 """Capture a mock payment"""
216 if gateway_transaction_id in self.transactions:
217 self.transactions[gateway_transaction_id]['status'] = 'captured'
218 logger.info(f"Mock payment captured: {gateway_transaction_id}")
219 return {
220 'success': True,
221 'status': 'captured',
222 'message': 'Mock payment captured successfully'
223 }
224 else:
225 return {
226 'success': False,
227 'error': 'Transaction not found'
228 }
230 def refund_payment(self, payment_id: str, gateway_transaction_id: str, amount: Optional[Decimal] = None) -> Dict[str, Any]:
231 """Refund a mock payment"""
232 if gateway_transaction_id in self.transactions:
233 self.transactions[gateway_transaction_id]['status'] = 'refunded'
234 logger.info(f"Mock payment refunded: {gateway_transaction_id}")
235 return {
236 'success': True,
237 'status': 'refunded',
238 'message': 'Mock payment refunded successfully'
239 }
240 else:
241 return {
242 'success': False,
243 'error': 'Transaction not found'
244 }
247class PaymentLedger:
248 """
249 Payment transaction ledger with audit trail
251 Integrates with task_ledger to track payment workflows
252 """
254 def __init__(self, ledger_path: str = "agent_data/payment_ledger.json"):
255 """
256 Initialize payment ledger
258 Args:
259 ledger_path: Path to persist payment ledger
260 """
261 self.ledger_path = ledger_path
262 self.payments: Dict[str, PaymentRequest] = {}
263 self.lock = threading.Lock()
264 self.gateways: Dict[PaymentGateway, PaymentGatewayConnector] = {}
266 # Default to mock gateway for testing
267 self.add_gateway(MockPaymentGateway())
269 self.load_ledger()
271 def add_gateway(self, gateway: PaymentGatewayConnector):
272 """Add a payment gateway connector"""
273 with self.lock:
274 self.gateways[gateway.gateway] = gateway
275 gateway.connect()
276 logger.info(f"Added payment gateway: {gateway.gateway.value}")
278 def create_payment_request(
279 self,
280 amount: Decimal,
281 currency: str,
282 description: str,
283 requester_agent_id: str,
284 payment_method: PaymentMethod = PaymentMethod.INTERNAL_CREDITS,
285 gateway: PaymentGateway = PaymentGateway.MOCK,
286 metadata: Optional[Dict[str, Any]] = None
287 ) -> PaymentRequest:
288 """
289 Create a new payment request
291 Args:
292 amount: Payment amount
293 currency: Currency code
294 description: Payment description
295 requester_agent_id: Agent requesting payment
296 payment_method: Payment method
297 gateway: Payment gateway to use
298 metadata: Additional metadata
300 Returns:
301 PaymentRequest object
302 """
303 with self.lock:
304 payment = PaymentRequest(
305 amount=amount,
306 currency=currency,
307 description=description,
308 requester_agent_id=requester_agent_id,
309 payment_method=payment_method,
310 metadata=metadata
311 )
313 payment.gateway = gateway
314 self.payments[payment.payment_id] = payment
316 logger.info(f"Created payment request: {payment.payment_id} - ${amount} {currency}")
318 self.save_ledger()
319 return payment
321 def authorize_payment(self, payment_id: str, approver_id: str) -> bool:
322 """
323 Authorize a payment request
325 Args:
326 payment_id: Payment ID to authorize
327 approver_id: ID of the approver (user or agent)
329 Returns:
330 True if authorization successful
331 """
332 with self.lock:
333 if payment_id not in self.payments:
334 logger.error(f"Payment not found: {payment_id}")
335 return False
337 payment = self.payments[payment_id]
339 if payment.status != PaymentStatus.PENDING:
340 logger.warning(f"Payment {payment_id} not in pending state: {payment.status}")
341 return False
343 # Add to approval chain
344 payment.approval_chain.append({
345 'approver_id': approver_id,
346 'approved_at': datetime.now().isoformat(),
347 'action': 'authorized'
348 })
350 payment.update_status(PaymentStatus.AUTHORIZED)
352 logger.info(f"Payment {payment_id} authorized by {approver_id}")
354 self.save_ledger()
355 return True
357 def process_payment(self, payment_id: str) -> Dict[str, Any]:
358 """
359 Process an authorized payment through the gateway
361 Args:
362 payment_id: Payment ID to process
364 Returns:
365 Processing result
366 """
367 with self.lock:
368 if payment_id not in self.payments:
369 return {'success': False, 'error': 'Payment not found'}
371 payment = self.payments[payment_id]
373 if payment.status != PaymentStatus.AUTHORIZED:
374 return {
375 'success': False,
376 'error': f'Payment not authorized: {payment.status.value}'
377 }
379 # Get appropriate gateway
380 gateway = self.gateways.get(payment.gateway)
381 if not gateway:
382 payment.update_status(PaymentStatus.FAILED, "Gateway not available")
383 self.save_ledger()
384 return {'success': False, 'error': 'Gateway not available'}
386 # Create payment in gateway
387 payment.update_status(PaymentStatus.PROCESSING)
388 self.save_ledger()
390 try:
391 result = gateway.create_payment(payment)
393 if result.get('success'):
394 payment.gateway_transaction_id = result.get('transaction_id')
396 # Capture payment immediately for now
397 capture_result = gateway.capture_payment(
398 payment.payment_id,
399 payment.gateway_transaction_id
400 )
402 if capture_result.get('success'):
403 payment.update_status(PaymentStatus.COMPLETED)
404 logger.info(f"Payment {payment_id} completed successfully")
405 else:
406 payment.update_status(
407 PaymentStatus.FAILED,
408 capture_result.get('error', 'Capture failed')
409 )
411 self.save_ledger()
412 return capture_result
413 else:
414 payment.update_status(
415 PaymentStatus.FAILED,
416 result.get('error', 'Gateway returned failure')
417 )
418 self.save_ledger()
419 return result
421 except Exception as e:
422 logger.error(f"Error processing payment {payment_id}: {e}")
423 payment.update_status(PaymentStatus.FAILED, str(e))
424 self.save_ledger()
425 return {'success': False, 'error': str(e)}
427 def get_payment(self, payment_id: str) -> Optional[PaymentRequest]:
428 """Get payment request by ID"""
429 with self.lock:
430 return self.payments.get(payment_id)
432 def list_payments(
433 self,
434 agent_id: Optional[str] = None,
435 status: Optional[PaymentStatus] = None
436 ) -> List[PaymentRequest]:
437 """
438 List payment requests with optional filters
440 Args:
441 agent_id: Filter by agent ID
442 status: Filter by status
444 Returns:
445 List of matching payment requests
446 """
447 with self.lock:
448 results = list(self.payments.values())
450 if agent_id:
451 results = [p for p in results if p.requester_agent_id == agent_id]
453 if status:
454 results = [p for p in results if p.status == status]
456 # Sort by creation time (newest first)
457 results.sort(key=lambda p: p.created_at, reverse=True)
459 return results
461 def save_ledger(self):
462 """Save payment ledger to disk"""
463 try:
464 os.makedirs(os.path.dirname(self.ledger_path), exist_ok=True)
466 data = {
467 'payments': {
468 pid: payment.to_dict()
469 for pid, payment in self.payments.items()
470 },
471 'last_updated': datetime.now().isoformat()
472 }
474 with open(self.ledger_path, 'w') as f:
475 json.dump(data, f, indent=2)
477 logger.debug(f"Payment ledger saved to {self.ledger_path}")
478 except Exception as e:
479 logger.error(f"Failed to save payment ledger: {e}")
481 def load_ledger(self):
482 """Load payment ledger from disk"""
483 try:
484 if os.path.exists(self.ledger_path):
485 with open(self.ledger_path, 'r') as f:
486 data = json.load(f)
488 # Reconstruct payment objects
489 for pid, payment_data in data.get('payments', {}).items():
490 payment = PaymentRequest(
491 amount=Decimal(payment_data['amount']),
492 currency=payment_data['currency'],
493 description=payment_data['description'],
494 requester_agent_id=payment_data['requester_agent_id'],
495 payment_method=PaymentMethod(payment_data['payment_method']),
496 metadata=payment_data.get('metadata', {})
497 )
499 # Restore state
500 payment.payment_id = pid
501 payment.status = PaymentStatus(payment_data['status'])
502 payment.created_at = datetime.fromisoformat(payment_data['created_at'])
503 payment.updated_at = datetime.fromisoformat(payment_data['updated_at'])
504 if payment_data.get('completed_at'):
505 payment.completed_at = datetime.fromisoformat(payment_data['completed_at'])
506 payment.gateway = PaymentGateway(payment_data['gateway']) if payment_data.get('gateway') else None
507 payment.gateway_transaction_id = payment_data.get('gateway_transaction_id')
508 payment.approval_chain = payment_data.get('approval_chain', [])
509 payment.error_message = payment_data.get('error_message')
511 self.payments[pid] = payment
513 logger.info(f"Loaded {len(self.payments)} payments from ledger")
514 except Exception as e:
515 logger.warning(f"Could not load payment ledger: {e}")
518# Global payment ledger instance
519payment_ledger = PaymentLedger()
522def create_payment_request_function(agent_name: str) -> Callable:
523 """
524 Create a payment request function for an agent
526 Args:
527 agent_name: Name of the agent
529 Returns:
530 Function that can be registered with autogen
531 """
532 def request_payment(
533 amount: float,
534 currency: str,
535 description: str,
536 payment_method: str = "internal_credits"
537 ) -> str:
538 """
539 Request a payment as part of agent workflow
541 Args:
542 amount: Payment amount
543 currency: Currency code (USD, EUR, etc.)
544 description: Payment description
545 payment_method: Payment method (credit_card, paypal, etc.)
547 Returns:
548 JSON string with payment details
549 """
550 try:
551 method = PaymentMethod(payment_method.lower())
552 except ValueError:
553 method = PaymentMethod.INTERNAL_CREDITS
555 payment = payment_ledger.create_payment_request(
556 amount=Decimal(str(amount)),
557 currency=currency,
558 description=description,
559 requester_agent_id=agent_name,
560 payment_method=method,
561 gateway=PaymentGateway.MOCK
562 )
564 return json.dumps({
565 'payment_id': payment.payment_id,
566 'amount': str(payment.amount),
567 'currency': payment.currency,
568 'status': payment.status.value,
569 'message': f'Payment request created. Awaiting authorization.'
570 }, indent=2)
572 return request_payment
575def create_payment_authorization_function() -> Callable:
576 """
577 Create a payment authorization function
579 Returns:
580 Function that can be used to authorize payments
581 """
582 def authorize_payment(payment_id: str, approver_id: str = "system") -> str:
583 """
584 Authorize a payment request
586 Args:
587 payment_id: Payment ID to authorize
588 approver_id: ID of the approver
590 Returns:
591 Authorization result
592 """
593 success = payment_ledger.authorize_payment(payment_id, approver_id)
595 if success:
596 return json.dumps({
597 'success': True,
598 'payment_id': payment_id,
599 'message': 'Payment authorized successfully'
600 }, indent=2)
601 else:
602 return json.dumps({
603 'success': False,
604 'payment_id': payment_id,
605 'error': 'Authorization failed'
606 }, indent=2)
608 return authorize_payment
611def create_payment_processing_function() -> Callable:
612 """
613 Create a payment processing function
615 Returns:
616 Function that can be used to process payments
617 """
618 def process_payment(payment_id: str) -> str:
619 """
620 Process an authorized payment
622 Args:
623 payment_id: Payment ID to process
625 Returns:
626 Processing result
627 """
628 result = payment_ledger.process_payment(payment_id)
629 return json.dumps(result, indent=2)
631 return process_payment
634def get_ap2_tools_for_autogen(agent_name: str) -> List[Dict[str, Any]]:
635 """
636 Get AP2 payment tools for autogen agent registration
638 Args:
639 agent_name: Name of the agent
641 Returns:
642 List of tool definitions for autogen
643 """
644 return [
645 {
646 'function': create_payment_request_function(agent_name),
647 'name': 'request_payment',
648 'description': 'Request a payment transaction for services or resources. Returns payment_id for tracking.'
649 },
650 {
651 'function': create_payment_authorization_function(),
652 'name': 'authorize_payment',
653 'description': 'Authorize a pending payment request. Requires payment_id.'
654 },
655 {
656 'function': create_payment_processing_function(),
657 'name': 'process_payment',
658 'description': 'Process an authorized payment through the gateway. Requires payment_id.'
659 }
660 ]
663# Convenience exports
664__all__ = [
665 'PaymentStatus', 'PaymentMethod', 'PaymentGateway',
666 'PaymentRequest', 'PaymentLedger', 'PaymentGatewayConnector',
667 'MockPaymentGateway', 'payment_ledger',
668 'create_payment_request_function', 'create_payment_authorization_function',
669 'create_payment_processing_function', 'get_ap2_tools_for_autogen'
670]