Coverage for integrations / channels / automation / webhooks.py: 37.2%
148 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Webhook Manager for HevolveBot Integration.
4Provides webhook registration, triggering, and signature verification.
5"""
7import hashlib
8import hmac
9import secrets
10import time
11from dataclasses import dataclass, field
12from datetime import datetime
13from enum import Enum
14from typing import Any, Callable, Dict, List, Optional
15import json
18class WebhookStatus(Enum):
19 """Status of a webhook."""
20 ACTIVE = "active"
21 INACTIVE = "inactive"
22 FAILED = "failed"
23 PENDING = "pending"
26@dataclass
27class WebhookConfig:
28 """Configuration for a webhook."""
29 id: str
30 url: str
31 events: List[str]
32 secret: str
33 status: WebhookStatus = WebhookStatus.ACTIVE
34 created_at: datetime = field(default_factory=datetime.now)
35 last_triggered: Optional[datetime] = None
36 failure_count: int = 0
37 max_retries: int = 3
38 timeout: int = 30
39 headers: Dict[str, str] = field(default_factory=dict)
40 metadata: Dict[str, Any] = field(default_factory=dict)
43@dataclass
44class WebhookDelivery:
45 """Record of a webhook delivery attempt."""
46 webhook_id: str
47 event: str
48 payload: Dict[str, Any]
49 timestamp: datetime
50 success: bool
51 response_code: Optional[int] = None
52 response_body: Optional[str] = None
53 error: Optional[str] = None
54 duration_ms: Optional[float] = None
57class WebhookManager:
58 """
59 Manages webhook registration, delivery, and verification.
61 Features:
62 - Register and unregister webhooks
63 - Trigger webhooks with payloads
64 - Verify webhook signatures
65 - Track delivery history
66 - Automatic retry on failure
67 """
69 def __init__(self, signing_key: Optional[str] = None):
70 """
71 Initialize the WebhookManager.
73 Args:
74 signing_key: Optional master signing key for signature generation
75 """
76 self._webhooks: Dict[str, WebhookConfig] = {}
77 self._deliveries: List[WebhookDelivery] = []
78 self._handlers: Dict[str, Callable] = {}
79 self._signing_key = signing_key or secrets.token_hex(32)
80 self._max_deliveries_history = 1000
82 def register(
83 self,
84 url: str,
85 events: List[str],
86 webhook_id: Optional[str] = None,
87 secret: Optional[str] = None,
88 headers: Optional[Dict[str, str]] = None,
89 metadata: Optional[Dict[str, Any]] = None,
90 max_retries: int = 3,
91 timeout: int = 30
92 ) -> WebhookConfig:
93 """
94 Register a new webhook.
96 Args:
97 url: The URL to send webhook payloads to
98 events: List of event types to subscribe to
99 webhook_id: Optional custom ID (auto-generated if not provided)
100 secret: Optional secret for signature verification
101 headers: Optional custom headers to include in requests
102 metadata: Optional metadata to associate with the webhook
103 max_retries: Maximum retry attempts on failure
104 timeout: Request timeout in seconds
106 Returns:
107 The created WebhookConfig
109 Raises:
110 ValueError: If URL is invalid or events list is empty
111 """
112 if not url:
113 raise ValueError("URL is required")
114 if not events:
115 raise ValueError("At least one event type is required")
117 webhook_id = webhook_id or f"wh_{secrets.token_hex(8)}"
119 if webhook_id in self._webhooks:
120 raise ValueError(f"Webhook with ID '{webhook_id}' already exists")
122 secret = secret or secrets.token_hex(16)
124 config = WebhookConfig(
125 id=webhook_id,
126 url=url,
127 events=events,
128 secret=secret,
129 headers=headers or {},
130 metadata=metadata or {},
131 max_retries=max_retries,
132 timeout=timeout
133 )
135 self._webhooks[webhook_id] = config
136 return config
138 def unregister(self, webhook_id: str) -> bool:
139 """
140 Unregister a webhook.
142 Args:
143 webhook_id: The ID of the webhook to unregister
145 Returns:
146 True if successfully unregistered, False if not found
147 """
148 if webhook_id in self._webhooks:
149 del self._webhooks[webhook_id]
150 return True
151 return False
153 def list_webhooks(
154 self,
155 event: Optional[str] = None,
156 status: Optional[WebhookStatus] = None
157 ) -> List[WebhookConfig]:
158 """
159 List registered webhooks.
161 Args:
162 event: Optional filter by event type
163 status: Optional filter by status
165 Returns:
166 List of matching webhook configurations
167 """
168 webhooks = list(self._webhooks.values())
170 if event:
171 webhooks = [w for w in webhooks if event in w.events]
173 if status:
174 webhooks = [w for w in webhooks if w.status == status]
176 return webhooks
178 def get_webhook(self, webhook_id: str) -> Optional[WebhookConfig]:
179 """
180 Get a specific webhook by ID.
182 Args:
183 webhook_id: The webhook ID
185 Returns:
186 The webhook configuration or None if not found
187 """
188 return self._webhooks.get(webhook_id)
190 def update_webhook(
191 self,
192 webhook_id: str,
193 url: Optional[str] = None,
194 events: Optional[List[str]] = None,
195 status: Optional[WebhookStatus] = None,
196 headers: Optional[Dict[str, str]] = None,
197 metadata: Optional[Dict[str, Any]] = None
198 ) -> Optional[WebhookConfig]:
199 """
200 Update a webhook configuration.
202 Args:
203 webhook_id: The webhook ID to update
204 url: New URL (optional)
205 events: New events list (optional)
206 status: New status (optional)
207 headers: New headers (optional)
208 metadata: New metadata (optional)
210 Returns:
211 Updated webhook configuration or None if not found
212 """
213 if webhook_id not in self._webhooks:
214 return None
216 webhook = self._webhooks[webhook_id]
218 if url is not None:
219 webhook.url = url
220 if events is not None:
221 webhook.events = events
222 if status is not None:
223 webhook.status = status
224 if headers is not None:
225 webhook.headers = headers
226 if metadata is not None:
227 webhook.metadata = metadata
229 return webhook
231 def trigger(
232 self,
233 event: str,
234 payload: Dict[str, Any],
235 webhook_id: Optional[str] = None
236 ) -> List[WebhookDelivery]:
237 """
238 Trigger webhooks for an event.
240 Args:
241 event: The event type
242 payload: The payload to send
243 webhook_id: Optional specific webhook to trigger
245 Returns:
246 List of delivery records
247 """
248 deliveries = []
250 if webhook_id:
251 webhooks = [self._webhooks.get(webhook_id)] if webhook_id in self._webhooks else []
252 else:
253 webhooks = [w for w in self._webhooks.values()
254 if event in w.events and w.status == WebhookStatus.ACTIVE]
256 for webhook in webhooks:
257 if webhook is None:
258 continue
260 delivery = self._deliver(webhook, event, payload)
261 deliveries.append(delivery)
262 self._record_delivery(delivery)
264 return deliveries
266 def _deliver(
267 self,
268 webhook: WebhookConfig,
269 event: str,
270 payload: Dict[str, Any]
271 ) -> WebhookDelivery:
272 """
273 Deliver a webhook payload.
275 In a real implementation, this would make an HTTP request.
276 For testing, we simulate the delivery.
278 Args:
279 webhook: The webhook configuration
280 event: The event type
281 payload: The payload to deliver
283 Returns:
284 Delivery record
285 """
286 start_time = time.time()
288 # Add standard headers to payload
289 full_payload = {
290 "event": event,
291 "timestamp": datetime.now().isoformat(),
292 "webhook_id": webhook.id,
293 "data": payload
294 }
296 # Generate signature
297 signature = self.generate_signature(json.dumps(full_payload), webhook.secret)
299 # In a real implementation, this would make an HTTP POST request
300 # For now, we check if there's a registered handler for testing
301 handler = self._handlers.get(webhook.id)
303 try:
304 if handler:
305 result = handler(event, full_payload, signature)
306 success = result.get("success", True)
307 response_code = result.get("code", 200)
308 response_body = result.get("body", "OK")
309 else:
310 # Simulate successful delivery
311 success = True
312 response_code = 200
313 response_body = "OK"
315 webhook.last_triggered = datetime.now()
316 if success:
317 webhook.failure_count = 0
318 else:
319 webhook.failure_count += 1
320 if webhook.failure_count >= webhook.max_retries:
321 webhook.status = WebhookStatus.FAILED
323 duration_ms = (time.time() - start_time) * 1000
325 return WebhookDelivery(
326 webhook_id=webhook.id,
327 event=event,
328 payload=full_payload,
329 timestamp=datetime.now(),
330 success=success,
331 response_code=response_code,
332 response_body=response_body,
333 duration_ms=duration_ms
334 )
336 except Exception as e:
337 webhook.failure_count += 1
338 if webhook.failure_count >= webhook.max_retries:
339 webhook.status = WebhookStatus.FAILED
341 duration_ms = (time.time() - start_time) * 1000
343 return WebhookDelivery(
344 webhook_id=webhook.id,
345 event=event,
346 payload=full_payload,
347 timestamp=datetime.now(),
348 success=False,
349 error=str(e),
350 duration_ms=duration_ms
351 )
353 def _record_delivery(self, delivery: WebhookDelivery) -> None:
354 """Record a delivery in history."""
355 self._deliveries.append(delivery)
357 # Trim history if it exceeds max
358 if len(self._deliveries) > self._max_deliveries_history:
359 self._deliveries = self._deliveries[-self._max_deliveries_history:]
361 def get_delivery_history(
362 self,
363 webhook_id: Optional[str] = None,
364 event: Optional[str] = None,
365 limit: int = 100
366 ) -> List[WebhookDelivery]:
367 """
368 Get webhook delivery history.
370 Args:
371 webhook_id: Optional filter by webhook ID
372 event: Optional filter by event type
373 limit: Maximum number of records to return
375 Returns:
376 List of delivery records
377 """
378 deliveries = self._deliveries.copy()
380 if webhook_id:
381 deliveries = [d for d in deliveries if d.webhook_id == webhook_id]
383 if event:
384 deliveries = [d for d in deliveries if d.event == event]
386 return deliveries[-limit:]
388 def generate_signature(self, payload: str, secret: str) -> str:
389 """
390 Generate a signature for a webhook payload.
392 Args:
393 payload: The payload string to sign
394 secret: The secret key
396 Returns:
397 The HMAC-SHA256 signature
398 """
399 return hmac.new(
400 secret.encode(),
401 payload.encode(),
402 hashlib.sha256
403 ).hexdigest()
405 def verify_signature(
406 self,
407 payload: str,
408 signature: str,
409 secret: str
410 ) -> bool:
411 """
412 Verify a webhook signature.
414 Args:
415 payload: The payload string
416 signature: The signature to verify
417 secret: The secret key
419 Returns:
420 True if signature is valid, False otherwise
421 """
422 expected = self.generate_signature(payload, secret)
423 return hmac.compare_digest(expected, signature)
425 def register_handler(
426 self,
427 webhook_id: str,
428 handler: Callable[[str, Dict[str, Any], str], Dict[str, Any]]
429 ) -> None:
430 """
431 Register a handler for testing webhook delivery.
433 Args:
434 webhook_id: The webhook ID
435 handler: Function that receives (event, payload, signature) and returns response dict
436 """
437 self._handlers[webhook_id] = handler
439 def unregister_handler(self, webhook_id: str) -> None:
440 """Remove a test handler."""
441 if webhook_id in self._handlers:
442 del self._handlers[webhook_id]
444 def reset_failure_count(self, webhook_id: str) -> bool:
445 """
446 Reset the failure count for a webhook.
448 Args:
449 webhook_id: The webhook ID
451 Returns:
452 True if successful, False if webhook not found
453 """
454 if webhook_id in self._webhooks:
455 self._webhooks[webhook_id].failure_count = 0
456 self._webhooks[webhook_id].status = WebhookStatus.ACTIVE
457 return True
458 return False