Coverage for integrations / channels / automation / webhooks.py: 37.2%

148 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Webhook Manager for HevolveBot Integration. 

3 

4Provides webhook registration, triggering, and signature verification. 

5""" 

6 

7import hashlib 

8import hmac 

9import secrets 

10import time 

11from dataclasses import dataclass, field 

12from datetime import datetime 

13from enum import Enum 

14from typing import Any, Callable, Dict, List, Optional 

15import json 

16 

17 

18class WebhookStatus(Enum): 

19 """Status of a webhook.""" 

20 ACTIVE = "active" 

21 INACTIVE = "inactive" 

22 FAILED = "failed" 

23 PENDING = "pending" 

24 

25 

26@dataclass 

27class WebhookConfig: 

28 """Configuration for a webhook.""" 

29 id: str 

30 url: str 

31 events: List[str] 

32 secret: str 

33 status: WebhookStatus = WebhookStatus.ACTIVE 

34 created_at: datetime = field(default_factory=datetime.now) 

35 last_triggered: Optional[datetime] = None 

36 failure_count: int = 0 

37 max_retries: int = 3 

38 timeout: int = 30 

39 headers: Dict[str, str] = field(default_factory=dict) 

40 metadata: Dict[str, Any] = field(default_factory=dict) 

41 

42 

43@dataclass 

44class WebhookDelivery: 

45 """Record of a webhook delivery attempt.""" 

46 webhook_id: str 

47 event: str 

48 payload: Dict[str, Any] 

49 timestamp: datetime 

50 success: bool 

51 response_code: Optional[int] = None 

52 response_body: Optional[str] = None 

53 error: Optional[str] = None 

54 duration_ms: Optional[float] = None 

55 

56 

57class WebhookManager: 

58 """ 

59 Manages webhook registration, delivery, and verification. 

60 

61 Features: 

62 - Register and unregister webhooks 

63 - Trigger webhooks with payloads 

64 - Verify webhook signatures 

65 - Track delivery history 

66 - Automatic retry on failure 

67 """ 

68 

69 def __init__(self, signing_key: Optional[str] = None): 

70 """ 

71 Initialize the WebhookManager. 

72 

73 Args: 

74 signing_key: Optional master signing key for signature generation 

75 """ 

76 self._webhooks: Dict[str, WebhookConfig] = {} 

77 self._deliveries: List[WebhookDelivery] = [] 

78 self._handlers: Dict[str, Callable] = {} 

79 self._signing_key = signing_key or secrets.token_hex(32) 

80 self._max_deliveries_history = 1000 

81 

82 def register( 

83 self, 

84 url: str, 

85 events: List[str], 

86 webhook_id: Optional[str] = None, 

87 secret: Optional[str] = None, 

88 headers: Optional[Dict[str, str]] = None, 

89 metadata: Optional[Dict[str, Any]] = None, 

90 max_retries: int = 3, 

91 timeout: int = 30 

92 ) -> WebhookConfig: 

93 """ 

94 Register a new webhook. 

95 

96 Args: 

97 url: The URL to send webhook payloads to 

98 events: List of event types to subscribe to 

99 webhook_id: Optional custom ID (auto-generated if not provided) 

100 secret: Optional secret for signature verification 

101 headers: Optional custom headers to include in requests 

102 metadata: Optional metadata to associate with the webhook 

103 max_retries: Maximum retry attempts on failure 

104 timeout: Request timeout in seconds 

105 

106 Returns: 

107 The created WebhookConfig 

108 

109 Raises: 

110 ValueError: If URL is invalid or events list is empty 

111 """ 

112 if not url: 

113 raise ValueError("URL is required") 

114 if not events: 

115 raise ValueError("At least one event type is required") 

116 

117 webhook_id = webhook_id or f"wh_{secrets.token_hex(8)}" 

118 

119 if webhook_id in self._webhooks: 

120 raise ValueError(f"Webhook with ID '{webhook_id}' already exists") 

121 

122 secret = secret or secrets.token_hex(16) 

123 

124 config = WebhookConfig( 

125 id=webhook_id, 

126 url=url, 

127 events=events, 

128 secret=secret, 

129 headers=headers or {}, 

130 metadata=metadata or {}, 

131 max_retries=max_retries, 

132 timeout=timeout 

133 ) 

134 

135 self._webhooks[webhook_id] = config 

136 return config 

137 

138 def unregister(self, webhook_id: str) -> bool: 

139 """ 

140 Unregister a webhook. 

141 

142 Args: 

143 webhook_id: The ID of the webhook to unregister 

144 

145 Returns: 

146 True if successfully unregistered, False if not found 

147 """ 

148 if webhook_id in self._webhooks: 

149 del self._webhooks[webhook_id] 

150 return True 

151 return False 

152 

153 def list_webhooks( 

154 self, 

155 event: Optional[str] = None, 

156 status: Optional[WebhookStatus] = None 

157 ) -> List[WebhookConfig]: 

158 """ 

159 List registered webhooks. 

160 

161 Args: 

162 event: Optional filter by event type 

163 status: Optional filter by status 

164 

165 Returns: 

166 List of matching webhook configurations 

167 """ 

168 webhooks = list(self._webhooks.values()) 

169 

170 if event: 

171 webhooks = [w for w in webhooks if event in w.events] 

172 

173 if status: 

174 webhooks = [w for w in webhooks if w.status == status] 

175 

176 return webhooks 

177 

178 def get_webhook(self, webhook_id: str) -> Optional[WebhookConfig]: 

179 """ 

180 Get a specific webhook by ID. 

181 

182 Args: 

183 webhook_id: The webhook ID 

184 

185 Returns: 

186 The webhook configuration or None if not found 

187 """ 

188 return self._webhooks.get(webhook_id) 

189 

190 def update_webhook( 

191 self, 

192 webhook_id: str, 

193 url: Optional[str] = None, 

194 events: Optional[List[str]] = None, 

195 status: Optional[WebhookStatus] = None, 

196 headers: Optional[Dict[str, str]] = None, 

197 metadata: Optional[Dict[str, Any]] = None 

198 ) -> Optional[WebhookConfig]: 

199 """ 

200 Update a webhook configuration. 

201 

202 Args: 

203 webhook_id: The webhook ID to update 

204 url: New URL (optional) 

205 events: New events list (optional) 

206 status: New status (optional) 

207 headers: New headers (optional) 

208 metadata: New metadata (optional) 

209 

210 Returns: 

211 Updated webhook configuration or None if not found 

212 """ 

213 if webhook_id not in self._webhooks: 

214 return None 

215 

216 webhook = self._webhooks[webhook_id] 

217 

218 if url is not None: 

219 webhook.url = url 

220 if events is not None: 

221 webhook.events = events 

222 if status is not None: 

223 webhook.status = status 

224 if headers is not None: 

225 webhook.headers = headers 

226 if metadata is not None: 

227 webhook.metadata = metadata 

228 

229 return webhook 

230 

231 def trigger( 

232 self, 

233 event: str, 

234 payload: Dict[str, Any], 

235 webhook_id: Optional[str] = None 

236 ) -> List[WebhookDelivery]: 

237 """ 

238 Trigger webhooks for an event. 

239 

240 Args: 

241 event: The event type 

242 payload: The payload to send 

243 webhook_id: Optional specific webhook to trigger 

244 

245 Returns: 

246 List of delivery records 

247 """ 

248 deliveries = [] 

249 

250 if webhook_id: 

251 webhooks = [self._webhooks.get(webhook_id)] if webhook_id in self._webhooks else [] 

252 else: 

253 webhooks = [w for w in self._webhooks.values() 

254 if event in w.events and w.status == WebhookStatus.ACTIVE] 

255 

256 for webhook in webhooks: 

257 if webhook is None: 

258 continue 

259 

260 delivery = self._deliver(webhook, event, payload) 

261 deliveries.append(delivery) 

262 self._record_delivery(delivery) 

263 

264 return deliveries 

265 

266 def _deliver( 

267 self, 

268 webhook: WebhookConfig, 

269 event: str, 

270 payload: Dict[str, Any] 

271 ) -> WebhookDelivery: 

272 """ 

273 Deliver a webhook payload. 

274 

275 In a real implementation, this would make an HTTP request. 

276 For testing, we simulate the delivery. 

277 

278 Args: 

279 webhook: The webhook configuration 

280 event: The event type 

281 payload: The payload to deliver 

282 

283 Returns: 

284 Delivery record 

285 """ 

286 start_time = time.time() 

287 

288 # Add standard headers to payload 

289 full_payload = { 

290 "event": event, 

291 "timestamp": datetime.now().isoformat(), 

292 "webhook_id": webhook.id, 

293 "data": payload 

294 } 

295 

296 # Generate signature 

297 signature = self.generate_signature(json.dumps(full_payload), webhook.secret) 

298 

299 # In a real implementation, this would make an HTTP POST request 

300 # For now, we check if there's a registered handler for testing 

301 handler = self._handlers.get(webhook.id) 

302 

303 try: 

304 if handler: 

305 result = handler(event, full_payload, signature) 

306 success = result.get("success", True) 

307 response_code = result.get("code", 200) 

308 response_body = result.get("body", "OK") 

309 else: 

310 # Simulate successful delivery 

311 success = True 

312 response_code = 200 

313 response_body = "OK" 

314 

315 webhook.last_triggered = datetime.now() 

316 if success: 

317 webhook.failure_count = 0 

318 else: 

319 webhook.failure_count += 1 

320 if webhook.failure_count >= webhook.max_retries: 

321 webhook.status = WebhookStatus.FAILED 

322 

323 duration_ms = (time.time() - start_time) * 1000 

324 

325 return WebhookDelivery( 

326 webhook_id=webhook.id, 

327 event=event, 

328 payload=full_payload, 

329 timestamp=datetime.now(), 

330 success=success, 

331 response_code=response_code, 

332 response_body=response_body, 

333 duration_ms=duration_ms 

334 ) 

335 

336 except Exception as e: 

337 webhook.failure_count += 1 

338 if webhook.failure_count >= webhook.max_retries: 

339 webhook.status = WebhookStatus.FAILED 

340 

341 duration_ms = (time.time() - start_time) * 1000 

342 

343 return WebhookDelivery( 

344 webhook_id=webhook.id, 

345 event=event, 

346 payload=full_payload, 

347 timestamp=datetime.now(), 

348 success=False, 

349 error=str(e), 

350 duration_ms=duration_ms 

351 ) 

352 

353 def _record_delivery(self, delivery: WebhookDelivery) -> None: 

354 """Record a delivery in history.""" 

355 self._deliveries.append(delivery) 

356 

357 # Trim history if it exceeds max 

358 if len(self._deliveries) > self._max_deliveries_history: 

359 self._deliveries = self._deliveries[-self._max_deliveries_history:] 

360 

361 def get_delivery_history( 

362 self, 

363 webhook_id: Optional[str] = None, 

364 event: Optional[str] = None, 

365 limit: int = 100 

366 ) -> List[WebhookDelivery]: 

367 """ 

368 Get webhook delivery history. 

369 

370 Args: 

371 webhook_id: Optional filter by webhook ID 

372 event: Optional filter by event type 

373 limit: Maximum number of records to return 

374 

375 Returns: 

376 List of delivery records 

377 """ 

378 deliveries = self._deliveries.copy() 

379 

380 if webhook_id: 

381 deliveries = [d for d in deliveries if d.webhook_id == webhook_id] 

382 

383 if event: 

384 deliveries = [d for d in deliveries if d.event == event] 

385 

386 return deliveries[-limit:] 

387 

388 def generate_signature(self, payload: str, secret: str) -> str: 

389 """ 

390 Generate a signature for a webhook payload. 

391 

392 Args: 

393 payload: The payload string to sign 

394 secret: The secret key 

395 

396 Returns: 

397 The HMAC-SHA256 signature 

398 """ 

399 return hmac.new( 

400 secret.encode(), 

401 payload.encode(), 

402 hashlib.sha256 

403 ).hexdigest() 

404 

405 def verify_signature( 

406 self, 

407 payload: str, 

408 signature: str, 

409 secret: str 

410 ) -> bool: 

411 """ 

412 Verify a webhook signature. 

413 

414 Args: 

415 payload: The payload string 

416 signature: The signature to verify 

417 secret: The secret key 

418 

419 Returns: 

420 True if signature is valid, False otherwise 

421 """ 

422 expected = self.generate_signature(payload, secret) 

423 return hmac.compare_digest(expected, signature) 

424 

425 def register_handler( 

426 self, 

427 webhook_id: str, 

428 handler: Callable[[str, Dict[str, Any], str], Dict[str, Any]] 

429 ) -> None: 

430 """ 

431 Register a handler for testing webhook delivery. 

432 

433 Args: 

434 webhook_id: The webhook ID 

435 handler: Function that receives (event, payload, signature) and returns response dict 

436 """ 

437 self._handlers[webhook_id] = handler 

438 

439 def unregister_handler(self, webhook_id: str) -> None: 

440 """Remove a test handler.""" 

441 if webhook_id in self._handlers: 

442 del self._handlers[webhook_id] 

443 

444 def reset_failure_count(self, webhook_id: str) -> bool: 

445 """ 

446 Reset the failure count for a webhook. 

447 

448 Args: 

449 webhook_id: The webhook ID 

450 

451 Returns: 

452 True if successful, False if webhook not found 

453 """ 

454 if webhook_id in self._webhooks: 

455 self._webhooks[webhook_id].failure_count = 0 

456 self._webhooks[webhook_id].status = WebhookStatus.ACTIVE 

457 return True 

458 return False