Coverage for integrations / channels / automation / scheduled_messages.py: 28.9%

253 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Scheduled Message Manager for HevolveBot Integration. 

3 

4Provides scheduling and management of delayed messages. 

5""" 

6 

7import json 

8import logging 

9import os 

10import secrets 

11from dataclasses import dataclass, field 

12from datetime import datetime, timedelta 

13from enum import Enum 

14from typing import Any, Dict, List, Optional, Callable 

15import threading 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class MessageStatus(Enum): 

21 """Status of a scheduled message.""" 

22 PENDING = "pending" 

23 SENT = "sent" 

24 FAILED = "failed" 

25 CANCELLED = "cancelled" 

26 

27 

28class RecurrenceType(Enum): 

29 """Types of message recurrence.""" 

30 NONE = "none" 

31 DAILY = "daily" 

32 WEEKLY = "weekly" 

33 MONTHLY = "monthly" 

34 CUSTOM = "custom" 

35 

36 

37@dataclass 

38class ScheduledMessage: 

39 """A scheduled message.""" 

40 id: str 

41 channel_id: str 

42 content: str 

43 scheduled_time: datetime 

44 status: MessageStatus = MessageStatus.PENDING 

45 sender_id: Optional[str] = None 

46 thread_id: Optional[str] = None 

47 attachments: List[Dict[str, Any]] = field(default_factory=list) 

48 metadata: Dict[str, Any] = field(default_factory=dict) 

49 recurrence: RecurrenceType = RecurrenceType.NONE 

50 recurrence_interval: Optional[int] = None 

51 recurrence_end: Optional[datetime] = None 

52 created_at: datetime = field(default_factory=datetime.now) 

53 sent_at: Optional[datetime] = None 

54 error: Optional[str] = None 

55 retry_count: int = 0 

56 max_retries: int = 3 

57 

58 

59@dataclass 

60class MessageDeliveryResult: 

61 """Result of a message delivery attempt.""" 

62 message_id: str 

63 success: bool 

64 delivered_at: Optional[datetime] = None 

65 error: Optional[str] = None 

66 response: Optional[Dict[str, Any]] = None 

67 

68 

69class ScheduledMessageManager: 

70 """ 

71 Manages scheduled messages. 

72 

73 Features: 

74 - Schedule messages for future delivery 

75 - Support for recurring messages 

76 - Cancel pending messages 

77 - Track delivery status 

78 - Retry failed deliveries 

79 """ 

80 

81 def __init__( 

82 self, 

83 delivery_handler: Optional[Callable[[ScheduledMessage], bool]] = None, 

84 persist_path: Optional[str] = None, 

85 ): 

86 """ 

87 Initialize the ScheduledMessageManager. 

88 

89 Args: 

90 delivery_handler: Optional function to actually deliver messages 

91 persist_path: Optional JSON file path for durable storage. When 

92 provided, the manager loads existing messages on startup 

93 and writes to disk on every mutation so pending reminders 

94 survive restarts. Defaults to ~/.nunba/data/scheduled_messages.json 

95 when None (set to '' to disable). 

96 """ 

97 self._messages: Dict[str, ScheduledMessage] = {} 

98 self._lock = threading.Lock() 

99 self._delivery_handler = delivery_handler 

100 self._delivery_history: List[MessageDeliveryResult] = [] 

101 # File persistence — default to ~/.nunba/data/scheduled_messages.json 

102 # so the enumerate tool / list_pending survive process restarts. 

103 # Pass persist_path='' to opt out (used by unit tests that don't 

104 # want to touch the filesystem). 

105 if persist_path is None: 

106 persist_path = os.path.join( 

107 os.path.expanduser('~'), '.nunba', 'data', 

108 'scheduled_messages.json', 

109 ) 

110 self._persist_path = persist_path or None 

111 if self._persist_path: 

112 self._load_from_disk() 

113 

114 def _load_from_disk(self) -> None: 

115 """Read ``self._persist_path`` and hydrate ``self._messages``. 

116 Silently no-ops if the file doesn't exist or is unreadable — 

117 the manager must always start up cleanly.""" 

118 if not self._persist_path or not os.path.isfile(self._persist_path): 

119 return 

120 try: 

121 with open(self._persist_path, encoding='utf-8') as fp: 

122 raw = json.load(fp) 

123 if not isinstance(raw, list): 

124 return 

125 for row in raw: 

126 try: 

127 msg = self._deserialise_message(row) 

128 if msg is not None: 

129 self._messages[msg.id] = msg 

130 except Exception as e: 

131 logger.debug(f"skip corrupt scheduled message row: {e}") 

132 logger.info( 

133 f"ScheduledMessageManager: loaded {len(self._messages)} " 

134 f"messages from {self._persist_path}" 

135 ) 

136 except Exception as e: 

137 logger.warning(f"Failed to load scheduled messages: {e}") 

138 

139 def _persist_to_disk(self) -> None: 

140 """Write ``self._messages`` atomically to ``self._persist_path``. 

141 Called from every mutation under ``self._lock``. Swallows 

142 exceptions — disk failures must never break the caller.""" 

143 if not self._persist_path: 

144 return 

145 try: 

146 os.makedirs(os.path.dirname(self._persist_path), exist_ok=True) 

147 serialised = [ 

148 self._serialise_message(m) for m in self._messages.values() 

149 ] 

150 tmp = self._persist_path + '.tmp' 

151 with open(tmp, 'w', encoding='utf-8') as fp: 

152 json.dump(serialised, fp, indent=2) 

153 os.replace(tmp, self._persist_path) 

154 except Exception as e: 

155 logger.debug(f"Failed to persist scheduled messages: {e}") 

156 

157 @staticmethod 

158 def _serialise_message(m: ScheduledMessage) -> Dict[str, Any]: 

159 """Dataclass → JSON-compatible dict. Enums → value strings, 

160 datetimes → ISO strings. Safe for list comprehensions.""" 

161 def _iso(dt: Optional[datetime]) -> Optional[str]: 

162 return dt.isoformat() if dt is not None else None 

163 return { 

164 'id': m.id, 'channel_id': m.channel_id, 'content': m.content, 

165 'scheduled_time': _iso(m.scheduled_time), 

166 'status': m.status.value, 

167 'sender_id': m.sender_id, 'thread_id': m.thread_id, 

168 'attachments': m.attachments, 'metadata': m.metadata, 

169 'recurrence': m.recurrence.value, 

170 'recurrence_interval': m.recurrence_interval, 

171 'recurrence_end': _iso(m.recurrence_end), 

172 'created_at': _iso(m.created_at), 

173 'sent_at': _iso(m.sent_at), 

174 'error': m.error, 'retry_count': m.retry_count, 

175 'max_retries': m.max_retries, 

176 } 

177 

178 @staticmethod 

179 def _deserialise_message(row: Dict[str, Any]) -> Optional[ScheduledMessage]: 

180 """JSON row → ScheduledMessage. Returns None on any field that 

181 can't be rehydrated so a corrupt row can't poison the whole file.""" 

182 def _dt(v: Any) -> Optional[datetime]: 

183 if not v: 

184 return None 

185 try: 

186 return datetime.fromisoformat(v) 

187 except Exception: 

188 return None 

189 try: 

190 return ScheduledMessage( 

191 id=row['id'], 

192 channel_id=row['channel_id'], 

193 content=row.get('content', ''), 

194 scheduled_time=_dt(row.get('scheduled_time')) or datetime.now(), 

195 status=MessageStatus(row.get('status', 'pending')), 

196 sender_id=row.get('sender_id'), 

197 thread_id=row.get('thread_id'), 

198 attachments=row.get('attachments') or [], 

199 metadata=row.get('metadata') or {}, 

200 recurrence=RecurrenceType(row.get('recurrence', 'none')), 

201 recurrence_interval=row.get('recurrence_interval'), 

202 recurrence_end=_dt(row.get('recurrence_end')), 

203 created_at=_dt(row.get('created_at')) or datetime.now(), 

204 sent_at=_dt(row.get('sent_at')), 

205 error=row.get('error'), 

206 retry_count=row.get('retry_count', 0), 

207 max_retries=row.get('max_retries', 3), 

208 ) 

209 except Exception: 

210 return None 

211 

212 def schedule( 

213 self, 

214 channel_id: str, 

215 content: str, 

216 scheduled_time: datetime, 

217 message_id: Optional[str] = None, 

218 sender_id: Optional[str] = None, 

219 thread_id: Optional[str] = None, 

220 attachments: Optional[List[Dict[str, Any]]] = None, 

221 metadata: Optional[Dict[str, Any]] = None, 

222 recurrence: RecurrenceType = RecurrenceType.NONE, 

223 recurrence_interval: Optional[int] = None, 

224 recurrence_end: Optional[datetime] = None 

225 ) -> ScheduledMessage: 

226 """ 

227 Schedule a message for future delivery. 

228 

229 Args: 

230 channel_id: The channel to send to 

231 content: The message content 

232 scheduled_time: When to send the message 

233 message_id: Optional custom message ID 

234 sender_id: Optional sender ID 

235 thread_id: Optional thread ID for replies 

236 attachments: Optional list of attachments 

237 metadata: Optional metadata 

238 recurrence: Recurrence type 

239 recurrence_interval: Days between recurrences 

240 recurrence_end: When to stop recurring 

241 

242 Returns: 

243 The scheduled message 

244 

245 Raises: 

246 ValueError: If scheduled_time is in the past 

247 """ 

248 if scheduled_time < datetime.now(): 

249 raise ValueError("Cannot schedule message in the past") 

250 

251 message_id = message_id or f"msg_{secrets.token_hex(8)}" 

252 

253 if message_id in self._messages: 

254 raise ValueError(f"Message with ID '{message_id}' already exists") 

255 

256 message = ScheduledMessage( 

257 id=message_id, 

258 channel_id=channel_id, 

259 content=content, 

260 scheduled_time=scheduled_time, 

261 sender_id=sender_id, 

262 thread_id=thread_id, 

263 attachments=attachments or [], 

264 metadata=metadata or {}, 

265 recurrence=recurrence, 

266 recurrence_interval=recurrence_interval, 

267 recurrence_end=recurrence_end 

268 ) 

269 

270 with self._lock: 

271 self._messages[message_id] = message 

272 self._persist_to_disk() 

273 

274 return message 

275 

276 def schedule_relative( 

277 self, 

278 channel_id: str, 

279 content: str, 

280 delay_seconds: int = 0, 

281 delay_minutes: int = 0, 

282 delay_hours: int = 0, 

283 delay_days: int = 0, 

284 **kwargs 

285 ) -> ScheduledMessage: 

286 """ 

287 Schedule a message with a relative delay. 

288 

289 Args: 

290 channel_id: The channel to send to 

291 content: The message content 

292 delay_seconds: Seconds from now 

293 delay_minutes: Minutes from now 

294 delay_hours: Hours from now 

295 delay_days: Days from now 

296 **kwargs: Additional arguments for schedule() 

297 

298 Returns: 

299 The scheduled message 

300 """ 

301 delay = timedelta( 

302 seconds=delay_seconds, 

303 minutes=delay_minutes, 

304 hours=delay_hours, 

305 days=delay_days 

306 ) 

307 

308 scheduled_time = datetime.now() + delay 

309 return self.schedule(channel_id, content, scheduled_time, **kwargs) 

310 

311 def cancel(self, message_id: str) -> bool: 

312 """ 

313 Cancel a scheduled message. 

314 

315 Args: 

316 message_id: The message ID to cancel 

317 

318 Returns: 

319 True if cancelled, False if not found or already sent 

320 """ 

321 with self._lock: 

322 if message_id in self._messages: 

323 message = self._messages[message_id] 

324 if message.status == MessageStatus.PENDING: 

325 message.status = MessageStatus.CANCELLED 

326 self._persist_to_disk() 

327 return True 

328 return False 

329 

330 def reschedule( 

331 self, 

332 message_id: str, 

333 new_time: datetime 

334 ) -> Optional[ScheduledMessage]: 

335 """ 

336 Reschedule a pending message. 

337 

338 Args: 

339 message_id: The message ID 

340 new_time: The new scheduled time 

341 

342 Returns: 

343 The updated message or None if not found/not pending 

344 """ 

345 if new_time < datetime.now(): 

346 raise ValueError("Cannot reschedule to the past") 

347 

348 with self._lock: 

349 if message_id in self._messages: 

350 message = self._messages[message_id] 

351 if message.status == MessageStatus.PENDING: 

352 message.scheduled_time = new_time 

353 self._persist_to_disk() 

354 return message 

355 return None 

356 

357 def update_content( 

358 self, 

359 message_id: str, 

360 content: str 

361 ) -> Optional[ScheduledMessage]: 

362 """ 

363 Update the content of a pending message. 

364 

365 Args: 

366 message_id: The message ID 

367 content: The new content 

368 

369 Returns: 

370 The updated message or None if not found/not pending 

371 """ 

372 with self._lock: 

373 if message_id in self._messages: 

374 message = self._messages[message_id] 

375 if message.status == MessageStatus.PENDING: 

376 message.content = content 

377 self._persist_to_disk() 

378 return message 

379 return None 

380 

381 def get_message(self, message_id: str) -> Optional[ScheduledMessage]: 

382 """ 

383 Get a scheduled message by ID. 

384 

385 Args: 

386 message_id: The message ID 

387 

388 Returns: 

389 The message or None 

390 """ 

391 return self._messages.get(message_id) 

392 

393 def list_pending( 

394 self, 

395 channel_id: Optional[str] = None, 

396 before: Optional[datetime] = None, 

397 after: Optional[datetime] = None 

398 ) -> List[ScheduledMessage]: 

399 """ 

400 List pending scheduled messages. 

401 

402 Args: 

403 channel_id: Optional filter by channel 

404 before: Optional filter by scheduled time (before) 

405 after: Optional filter by scheduled time (after) 

406 

407 Returns: 

408 List of pending messages 

409 """ 

410 with self._lock: 

411 messages = [ 

412 m for m in self._messages.values() 

413 if m.status == MessageStatus.PENDING 

414 ] 

415 

416 if channel_id: 

417 messages = [m for m in messages if m.channel_id == channel_id] 

418 

419 if before: 

420 messages = [m for m in messages if m.scheduled_time < before] 

421 

422 if after: 

423 messages = [m for m in messages if m.scheduled_time > after] 

424 

425 # Sort by scheduled time 

426 messages.sort(key=lambda m: m.scheduled_time) 

427 

428 return messages 

429 

430 def list_all( 

431 self, 

432 status: Optional[MessageStatus] = None, 

433 channel_id: Optional[str] = None, 

434 limit: int = 100 

435 ) -> List[ScheduledMessage]: 

436 """ 

437 List all scheduled messages. 

438 

439 Args: 

440 status: Optional filter by status 

441 channel_id: Optional filter by channel 

442 limit: Maximum number to return 

443 

444 Returns: 

445 List of messages 

446 """ 

447 with self._lock: 

448 messages = list(self._messages.values()) 

449 

450 if status: 

451 messages = [m for m in messages if m.status == status] 

452 

453 if channel_id: 

454 messages = [m for m in messages if m.channel_id == channel_id] 

455 

456 # Sort by scheduled time (most recent first) 

457 messages.sort(key=lambda m: m.scheduled_time, reverse=True) 

458 

459 return messages[:limit] 

460 

461 def get_due_messages(self) -> List[ScheduledMessage]: 

462 """ 

463 Get all messages that are due for delivery. 

464 

465 Returns: 

466 List of due messages 

467 """ 

468 now = datetime.now() 

469 

470 with self._lock: 

471 due = [ 

472 m for m in self._messages.values() 

473 if m.status == MessageStatus.PENDING and m.scheduled_time <= now 

474 ] 

475 

476 # Sort by scheduled time (oldest first) 

477 due.sort(key=lambda m: m.scheduled_time) 

478 

479 return due 

480 

481 def deliver_due_messages(self) -> List[MessageDeliveryResult]: 

482 """ 

483 Deliver all due messages. 

484 

485 Returns: 

486 List of delivery results 

487 """ 

488 due_messages = self.get_due_messages() 

489 results = [] 

490 

491 for message in due_messages: 

492 result = self._deliver_message(message) 

493 results.append(result) 

494 self._delivery_history.append(result) 

495 

496 return results 

497 

498 def _deliver_message(self, message: ScheduledMessage) -> MessageDeliveryResult: 

499 """ 

500 Deliver a single message. 

501 

502 Args: 

503 message: The message to deliver 

504 

505 Returns: 

506 Delivery result 

507 """ 

508 result = MessageDeliveryResult( 

509 message_id=message.id, 

510 success=False 

511 ) 

512 

513 try: 

514 if self._delivery_handler: 

515 success = self._delivery_handler(message) 

516 else: 

517 # Simulate successful delivery 

518 success = True 

519 

520 if success: 

521 message.status = MessageStatus.SENT 

522 message.sent_at = datetime.now() 

523 result.success = True 

524 result.delivered_at = message.sent_at 

525 

526 # Handle recurrence 

527 if message.recurrence != RecurrenceType.NONE: 

528 self._schedule_next_recurrence(message) 

529 else: 

530 self._handle_delivery_failure(message, "Delivery failed") 

531 result.error = "Delivery failed" 

532 

533 except Exception as e: 

534 self._handle_delivery_failure(message, str(e)) 

535 result.error = str(e) 

536 

537 return result 

538 

539 def _handle_delivery_failure(self, message: ScheduledMessage, error: str) -> None: 

540 """Handle a delivery failure.""" 

541 message.retry_count += 1 

542 message.error = error 

543 

544 if message.retry_count >= message.max_retries: 

545 message.status = MessageStatus.FAILED 

546 else: 

547 # Reschedule for retry (exponential backoff) 

548 delay = timedelta(minutes=2 ** message.retry_count) 

549 message.scheduled_time = datetime.now() + delay 

550 

551 def _schedule_next_recurrence(self, message: ScheduledMessage) -> None: 

552 """Schedule the next occurrence of a recurring message.""" 

553 if message.recurrence == RecurrenceType.NONE: 

554 return 

555 

556 # Calculate next time 

557 if message.recurrence == RecurrenceType.DAILY: 

558 next_time = message.scheduled_time + timedelta(days=1) 

559 elif message.recurrence == RecurrenceType.WEEKLY: 

560 next_time = message.scheduled_time + timedelta(weeks=1) 

561 elif message.recurrence == RecurrenceType.MONTHLY: 

562 # Approximate month as 30 days 

563 next_time = message.scheduled_time + timedelta(days=30) 

564 elif message.recurrence == RecurrenceType.CUSTOM and message.recurrence_interval: 

565 next_time = message.scheduled_time + timedelta(days=message.recurrence_interval) 

566 else: 

567 return 

568 

569 # Check if we're past the end date 

570 if message.recurrence_end and next_time > message.recurrence_end: 

571 return 

572 

573 # Schedule the new message 

574 new_id = f"{message.id}_next_{secrets.token_hex(4)}" 

575 

576 new_message = ScheduledMessage( 

577 id=new_id, 

578 channel_id=message.channel_id, 

579 content=message.content, 

580 scheduled_time=next_time, 

581 sender_id=message.sender_id, 

582 thread_id=message.thread_id, 

583 attachments=message.attachments.copy(), 

584 metadata=message.metadata.copy(), 

585 recurrence=message.recurrence, 

586 recurrence_interval=message.recurrence_interval, 

587 recurrence_end=message.recurrence_end 

588 ) 

589 

590 with self._lock: 

591 self._messages[new_id] = new_message 

592 

593 def retry_failed(self, message_id: str) -> Optional[ScheduledMessage]: 

594 """ 

595 Retry a failed message. 

596 

597 Args: 

598 message_id: The message ID 

599 

600 Returns: 

601 The message if reset for retry, None otherwise 

602 """ 

603 with self._lock: 

604 if message_id in self._messages: 

605 message = self._messages[message_id] 

606 if message.status == MessageStatus.FAILED: 

607 message.status = MessageStatus.PENDING 

608 message.retry_count = 0 

609 message.error = None 

610 message.scheduled_time = datetime.now() + timedelta(seconds=10) 

611 return message 

612 return None 

613 

614 def delete(self, message_id: str) -> bool: 

615 """ 

616 Delete a scheduled message. 

617 

618 Args: 

619 message_id: The message ID 

620 

621 Returns: 

622 True if deleted, False if not found 

623 """ 

624 with self._lock: 

625 if message_id in self._messages: 

626 del self._messages[message_id] 

627 return True 

628 return False 

629 

630 def clear_sent(self) -> int: 

631 """ 

632 Clear all sent messages from the manager. 

633 

634 Returns: 

635 Number of messages cleared 

636 """ 

637 with self._lock: 

638 sent_ids = [ 

639 m.id for m in self._messages.values() 

640 if m.status == MessageStatus.SENT 

641 ] 

642 for msg_id in sent_ids: 

643 del self._messages[msg_id] 

644 return len(sent_ids) 

645 

646 def get_delivery_history( 

647 self, 

648 message_id: Optional[str] = None, 

649 limit: int = 100 

650 ) -> List[MessageDeliveryResult]: 

651 """ 

652 Get delivery history. 

653 

654 Args: 

655 message_id: Optional filter by message ID 

656 limit: Maximum number of records 

657 

658 Returns: 

659 List of delivery results 

660 """ 

661 history = self._delivery_history.copy() 

662 

663 if message_id: 

664 history = [h for h in history if h.message_id == message_id] 

665 

666 return history[-limit:] 

667 

668 def get_stats(self) -> Dict[str, Any]: 

669 """ 

670 Get statistics about scheduled messages. 

671 

672 Returns: 

673 Dictionary with message statistics 

674 """ 

675 with self._lock: 

676 messages = list(self._messages.values()) 

677 

678 pending = sum(1 for m in messages if m.status == MessageStatus.PENDING) 

679 sent = sum(1 for m in messages if m.status == MessageStatus.SENT) 

680 failed = sum(1 for m in messages if m.status == MessageStatus.FAILED) 

681 cancelled = sum(1 for m in messages if m.status == MessageStatus.CANCELLED) 

682 

683 return { 

684 "total": len(messages), 

685 "pending": pending, 

686 "sent": sent, 

687 "failed": failed, 

688 "cancelled": cancelled, 

689 "deliveries": len(self._delivery_history) 

690 }