Coverage for integrations / channels / automation / scheduled_messages.py: 28.9%
253 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Scheduled Message Manager for HevolveBot Integration.
4Provides scheduling and management of delayed messages.
5"""
7import json
8import logging
9import os
10import secrets
11from dataclasses import dataclass, field
12from datetime import datetime, timedelta
13from enum import Enum
14from typing import Any, Dict, List, Optional, Callable
15import threading
17logger = logging.getLogger(__name__)
20class MessageStatus(Enum):
21 """Status of a scheduled message."""
22 PENDING = "pending"
23 SENT = "sent"
24 FAILED = "failed"
25 CANCELLED = "cancelled"
28class RecurrenceType(Enum):
29 """Types of message recurrence."""
30 NONE = "none"
31 DAILY = "daily"
32 WEEKLY = "weekly"
33 MONTHLY = "monthly"
34 CUSTOM = "custom"
37@dataclass
38class ScheduledMessage:
39 """A scheduled message."""
40 id: str
41 channel_id: str
42 content: str
43 scheduled_time: datetime
44 status: MessageStatus = MessageStatus.PENDING
45 sender_id: Optional[str] = None
46 thread_id: Optional[str] = None
47 attachments: List[Dict[str, Any]] = field(default_factory=list)
48 metadata: Dict[str, Any] = field(default_factory=dict)
49 recurrence: RecurrenceType = RecurrenceType.NONE
50 recurrence_interval: Optional[int] = None
51 recurrence_end: Optional[datetime] = None
52 created_at: datetime = field(default_factory=datetime.now)
53 sent_at: Optional[datetime] = None
54 error: Optional[str] = None
55 retry_count: int = 0
56 max_retries: int = 3
59@dataclass
60class MessageDeliveryResult:
61 """Result of a message delivery attempt."""
62 message_id: str
63 success: bool
64 delivered_at: Optional[datetime] = None
65 error: Optional[str] = None
66 response: Optional[Dict[str, Any]] = None
69class ScheduledMessageManager:
70 """
71 Manages scheduled messages.
73 Features:
74 - Schedule messages for future delivery
75 - Support for recurring messages
76 - Cancel pending messages
77 - Track delivery status
78 - Retry failed deliveries
79 """
81 def __init__(
82 self,
83 delivery_handler: Optional[Callable[[ScheduledMessage], bool]] = None,
84 persist_path: Optional[str] = None,
85 ):
86 """
87 Initialize the ScheduledMessageManager.
89 Args:
90 delivery_handler: Optional function to actually deliver messages
91 persist_path: Optional JSON file path for durable storage. When
92 provided, the manager loads existing messages on startup
93 and writes to disk on every mutation so pending reminders
94 survive restarts. Defaults to ~/.nunba/data/scheduled_messages.json
95 when None (set to '' to disable).
96 """
97 self._messages: Dict[str, ScheduledMessage] = {}
98 self._lock = threading.Lock()
99 self._delivery_handler = delivery_handler
100 self._delivery_history: List[MessageDeliveryResult] = []
101 # File persistence — default to ~/.nunba/data/scheduled_messages.json
102 # so the enumerate tool / list_pending survive process restarts.
103 # Pass persist_path='' to opt out (used by unit tests that don't
104 # want to touch the filesystem).
105 if persist_path is None:
106 persist_path = os.path.join(
107 os.path.expanduser('~'), '.nunba', 'data',
108 'scheduled_messages.json',
109 )
110 self._persist_path = persist_path or None
111 if self._persist_path:
112 self._load_from_disk()
114 def _load_from_disk(self) -> None:
115 """Read ``self._persist_path`` and hydrate ``self._messages``.
116 Silently no-ops if the file doesn't exist or is unreadable —
117 the manager must always start up cleanly."""
118 if not self._persist_path or not os.path.isfile(self._persist_path):
119 return
120 try:
121 with open(self._persist_path, encoding='utf-8') as fp:
122 raw = json.load(fp)
123 if not isinstance(raw, list):
124 return
125 for row in raw:
126 try:
127 msg = self._deserialise_message(row)
128 if msg is not None:
129 self._messages[msg.id] = msg
130 except Exception as e:
131 logger.debug(f"skip corrupt scheduled message row: {e}")
132 logger.info(
133 f"ScheduledMessageManager: loaded {len(self._messages)} "
134 f"messages from {self._persist_path}"
135 )
136 except Exception as e:
137 logger.warning(f"Failed to load scheduled messages: {e}")
139 def _persist_to_disk(self) -> None:
140 """Write ``self._messages`` atomically to ``self._persist_path``.
141 Called from every mutation under ``self._lock``. Swallows
142 exceptions — disk failures must never break the caller."""
143 if not self._persist_path:
144 return
145 try:
146 os.makedirs(os.path.dirname(self._persist_path), exist_ok=True)
147 serialised = [
148 self._serialise_message(m) for m in self._messages.values()
149 ]
150 tmp = self._persist_path + '.tmp'
151 with open(tmp, 'w', encoding='utf-8') as fp:
152 json.dump(serialised, fp, indent=2)
153 os.replace(tmp, self._persist_path)
154 except Exception as e:
155 logger.debug(f"Failed to persist scheduled messages: {e}")
157 @staticmethod
158 def _serialise_message(m: ScheduledMessage) -> Dict[str, Any]:
159 """Dataclass → JSON-compatible dict. Enums → value strings,
160 datetimes → ISO strings. Safe for list comprehensions."""
161 def _iso(dt: Optional[datetime]) -> Optional[str]:
162 return dt.isoformat() if dt is not None else None
163 return {
164 'id': m.id, 'channel_id': m.channel_id, 'content': m.content,
165 'scheduled_time': _iso(m.scheduled_time),
166 'status': m.status.value,
167 'sender_id': m.sender_id, 'thread_id': m.thread_id,
168 'attachments': m.attachments, 'metadata': m.metadata,
169 'recurrence': m.recurrence.value,
170 'recurrence_interval': m.recurrence_interval,
171 'recurrence_end': _iso(m.recurrence_end),
172 'created_at': _iso(m.created_at),
173 'sent_at': _iso(m.sent_at),
174 'error': m.error, 'retry_count': m.retry_count,
175 'max_retries': m.max_retries,
176 }
178 @staticmethod
179 def _deserialise_message(row: Dict[str, Any]) -> Optional[ScheduledMessage]:
180 """JSON row → ScheduledMessage. Returns None on any field that
181 can't be rehydrated so a corrupt row can't poison the whole file."""
182 def _dt(v: Any) -> Optional[datetime]:
183 if not v:
184 return None
185 try:
186 return datetime.fromisoformat(v)
187 except Exception:
188 return None
189 try:
190 return ScheduledMessage(
191 id=row['id'],
192 channel_id=row['channel_id'],
193 content=row.get('content', ''),
194 scheduled_time=_dt(row.get('scheduled_time')) or datetime.now(),
195 status=MessageStatus(row.get('status', 'pending')),
196 sender_id=row.get('sender_id'),
197 thread_id=row.get('thread_id'),
198 attachments=row.get('attachments') or [],
199 metadata=row.get('metadata') or {},
200 recurrence=RecurrenceType(row.get('recurrence', 'none')),
201 recurrence_interval=row.get('recurrence_interval'),
202 recurrence_end=_dt(row.get('recurrence_end')),
203 created_at=_dt(row.get('created_at')) or datetime.now(),
204 sent_at=_dt(row.get('sent_at')),
205 error=row.get('error'),
206 retry_count=row.get('retry_count', 0),
207 max_retries=row.get('max_retries', 3),
208 )
209 except Exception:
210 return None
212 def schedule(
213 self,
214 channel_id: str,
215 content: str,
216 scheduled_time: datetime,
217 message_id: Optional[str] = None,
218 sender_id: Optional[str] = None,
219 thread_id: Optional[str] = None,
220 attachments: Optional[List[Dict[str, Any]]] = None,
221 metadata: Optional[Dict[str, Any]] = None,
222 recurrence: RecurrenceType = RecurrenceType.NONE,
223 recurrence_interval: Optional[int] = None,
224 recurrence_end: Optional[datetime] = None
225 ) -> ScheduledMessage:
226 """
227 Schedule a message for future delivery.
229 Args:
230 channel_id: The channel to send to
231 content: The message content
232 scheduled_time: When to send the message
233 message_id: Optional custom message ID
234 sender_id: Optional sender ID
235 thread_id: Optional thread ID for replies
236 attachments: Optional list of attachments
237 metadata: Optional metadata
238 recurrence: Recurrence type
239 recurrence_interval: Days between recurrences
240 recurrence_end: When to stop recurring
242 Returns:
243 The scheduled message
245 Raises:
246 ValueError: If scheduled_time is in the past
247 """
248 if scheduled_time < datetime.now():
249 raise ValueError("Cannot schedule message in the past")
251 message_id = message_id or f"msg_{secrets.token_hex(8)}"
253 if message_id in self._messages:
254 raise ValueError(f"Message with ID '{message_id}' already exists")
256 message = ScheduledMessage(
257 id=message_id,
258 channel_id=channel_id,
259 content=content,
260 scheduled_time=scheduled_time,
261 sender_id=sender_id,
262 thread_id=thread_id,
263 attachments=attachments or [],
264 metadata=metadata or {},
265 recurrence=recurrence,
266 recurrence_interval=recurrence_interval,
267 recurrence_end=recurrence_end
268 )
270 with self._lock:
271 self._messages[message_id] = message
272 self._persist_to_disk()
274 return message
276 def schedule_relative(
277 self,
278 channel_id: str,
279 content: str,
280 delay_seconds: int = 0,
281 delay_minutes: int = 0,
282 delay_hours: int = 0,
283 delay_days: int = 0,
284 **kwargs
285 ) -> ScheduledMessage:
286 """
287 Schedule a message with a relative delay.
289 Args:
290 channel_id: The channel to send to
291 content: The message content
292 delay_seconds: Seconds from now
293 delay_minutes: Minutes from now
294 delay_hours: Hours from now
295 delay_days: Days from now
296 **kwargs: Additional arguments for schedule()
298 Returns:
299 The scheduled message
300 """
301 delay = timedelta(
302 seconds=delay_seconds,
303 minutes=delay_minutes,
304 hours=delay_hours,
305 days=delay_days
306 )
308 scheduled_time = datetime.now() + delay
309 return self.schedule(channel_id, content, scheduled_time, **kwargs)
311 def cancel(self, message_id: str) -> bool:
312 """
313 Cancel a scheduled message.
315 Args:
316 message_id: The message ID to cancel
318 Returns:
319 True if cancelled, False if not found or already sent
320 """
321 with self._lock:
322 if message_id in self._messages:
323 message = self._messages[message_id]
324 if message.status == MessageStatus.PENDING:
325 message.status = MessageStatus.CANCELLED
326 self._persist_to_disk()
327 return True
328 return False
330 def reschedule(
331 self,
332 message_id: str,
333 new_time: datetime
334 ) -> Optional[ScheduledMessage]:
335 """
336 Reschedule a pending message.
338 Args:
339 message_id: The message ID
340 new_time: The new scheduled time
342 Returns:
343 The updated message or None if not found/not pending
344 """
345 if new_time < datetime.now():
346 raise ValueError("Cannot reschedule to the past")
348 with self._lock:
349 if message_id in self._messages:
350 message = self._messages[message_id]
351 if message.status == MessageStatus.PENDING:
352 message.scheduled_time = new_time
353 self._persist_to_disk()
354 return message
355 return None
357 def update_content(
358 self,
359 message_id: str,
360 content: str
361 ) -> Optional[ScheduledMessage]:
362 """
363 Update the content of a pending message.
365 Args:
366 message_id: The message ID
367 content: The new content
369 Returns:
370 The updated message or None if not found/not pending
371 """
372 with self._lock:
373 if message_id in self._messages:
374 message = self._messages[message_id]
375 if message.status == MessageStatus.PENDING:
376 message.content = content
377 self._persist_to_disk()
378 return message
379 return None
381 def get_message(self, message_id: str) -> Optional[ScheduledMessage]:
382 """
383 Get a scheduled message by ID.
385 Args:
386 message_id: The message ID
388 Returns:
389 The message or None
390 """
391 return self._messages.get(message_id)
393 def list_pending(
394 self,
395 channel_id: Optional[str] = None,
396 before: Optional[datetime] = None,
397 after: Optional[datetime] = None
398 ) -> List[ScheduledMessage]:
399 """
400 List pending scheduled messages.
402 Args:
403 channel_id: Optional filter by channel
404 before: Optional filter by scheduled time (before)
405 after: Optional filter by scheduled time (after)
407 Returns:
408 List of pending messages
409 """
410 with self._lock:
411 messages = [
412 m for m in self._messages.values()
413 if m.status == MessageStatus.PENDING
414 ]
416 if channel_id:
417 messages = [m for m in messages if m.channel_id == channel_id]
419 if before:
420 messages = [m for m in messages if m.scheduled_time < before]
422 if after:
423 messages = [m for m in messages if m.scheduled_time > after]
425 # Sort by scheduled time
426 messages.sort(key=lambda m: m.scheduled_time)
428 return messages
430 def list_all(
431 self,
432 status: Optional[MessageStatus] = None,
433 channel_id: Optional[str] = None,
434 limit: int = 100
435 ) -> List[ScheduledMessage]:
436 """
437 List all scheduled messages.
439 Args:
440 status: Optional filter by status
441 channel_id: Optional filter by channel
442 limit: Maximum number to return
444 Returns:
445 List of messages
446 """
447 with self._lock:
448 messages = list(self._messages.values())
450 if status:
451 messages = [m for m in messages if m.status == status]
453 if channel_id:
454 messages = [m for m in messages if m.channel_id == channel_id]
456 # Sort by scheduled time (most recent first)
457 messages.sort(key=lambda m: m.scheduled_time, reverse=True)
459 return messages[:limit]
461 def get_due_messages(self) -> List[ScheduledMessage]:
462 """
463 Get all messages that are due for delivery.
465 Returns:
466 List of due messages
467 """
468 now = datetime.now()
470 with self._lock:
471 due = [
472 m for m in self._messages.values()
473 if m.status == MessageStatus.PENDING and m.scheduled_time <= now
474 ]
476 # Sort by scheduled time (oldest first)
477 due.sort(key=lambda m: m.scheduled_time)
479 return due
481 def deliver_due_messages(self) -> List[MessageDeliveryResult]:
482 """
483 Deliver all due messages.
485 Returns:
486 List of delivery results
487 """
488 due_messages = self.get_due_messages()
489 results = []
491 for message in due_messages:
492 result = self._deliver_message(message)
493 results.append(result)
494 self._delivery_history.append(result)
496 return results
498 def _deliver_message(self, message: ScheduledMessage) -> MessageDeliveryResult:
499 """
500 Deliver a single message.
502 Args:
503 message: The message to deliver
505 Returns:
506 Delivery result
507 """
508 result = MessageDeliveryResult(
509 message_id=message.id,
510 success=False
511 )
513 try:
514 if self._delivery_handler:
515 success = self._delivery_handler(message)
516 else:
517 # Simulate successful delivery
518 success = True
520 if success:
521 message.status = MessageStatus.SENT
522 message.sent_at = datetime.now()
523 result.success = True
524 result.delivered_at = message.sent_at
526 # Handle recurrence
527 if message.recurrence != RecurrenceType.NONE:
528 self._schedule_next_recurrence(message)
529 else:
530 self._handle_delivery_failure(message, "Delivery failed")
531 result.error = "Delivery failed"
533 except Exception as e:
534 self._handle_delivery_failure(message, str(e))
535 result.error = str(e)
537 return result
539 def _handle_delivery_failure(self, message: ScheduledMessage, error: str) -> None:
540 """Handle a delivery failure."""
541 message.retry_count += 1
542 message.error = error
544 if message.retry_count >= message.max_retries:
545 message.status = MessageStatus.FAILED
546 else:
547 # Reschedule for retry (exponential backoff)
548 delay = timedelta(minutes=2 ** message.retry_count)
549 message.scheduled_time = datetime.now() + delay
551 def _schedule_next_recurrence(self, message: ScheduledMessage) -> None:
552 """Schedule the next occurrence of a recurring message."""
553 if message.recurrence == RecurrenceType.NONE:
554 return
556 # Calculate next time
557 if message.recurrence == RecurrenceType.DAILY:
558 next_time = message.scheduled_time + timedelta(days=1)
559 elif message.recurrence == RecurrenceType.WEEKLY:
560 next_time = message.scheduled_time + timedelta(weeks=1)
561 elif message.recurrence == RecurrenceType.MONTHLY:
562 # Approximate month as 30 days
563 next_time = message.scheduled_time + timedelta(days=30)
564 elif message.recurrence == RecurrenceType.CUSTOM and message.recurrence_interval:
565 next_time = message.scheduled_time + timedelta(days=message.recurrence_interval)
566 else:
567 return
569 # Check if we're past the end date
570 if message.recurrence_end and next_time > message.recurrence_end:
571 return
573 # Schedule the new message
574 new_id = f"{message.id}_next_{secrets.token_hex(4)}"
576 new_message = ScheduledMessage(
577 id=new_id,
578 channel_id=message.channel_id,
579 content=message.content,
580 scheduled_time=next_time,
581 sender_id=message.sender_id,
582 thread_id=message.thread_id,
583 attachments=message.attachments.copy(),
584 metadata=message.metadata.copy(),
585 recurrence=message.recurrence,
586 recurrence_interval=message.recurrence_interval,
587 recurrence_end=message.recurrence_end
588 )
590 with self._lock:
591 self._messages[new_id] = new_message
593 def retry_failed(self, message_id: str) -> Optional[ScheduledMessage]:
594 """
595 Retry a failed message.
597 Args:
598 message_id: The message ID
600 Returns:
601 The message if reset for retry, None otherwise
602 """
603 with self._lock:
604 if message_id in self._messages:
605 message = self._messages[message_id]
606 if message.status == MessageStatus.FAILED:
607 message.status = MessageStatus.PENDING
608 message.retry_count = 0
609 message.error = None
610 message.scheduled_time = datetime.now() + timedelta(seconds=10)
611 return message
612 return None
614 def delete(self, message_id: str) -> bool:
615 """
616 Delete a scheduled message.
618 Args:
619 message_id: The message ID
621 Returns:
622 True if deleted, False if not found
623 """
624 with self._lock:
625 if message_id in self._messages:
626 del self._messages[message_id]
627 return True
628 return False
630 def clear_sent(self) -> int:
631 """
632 Clear all sent messages from the manager.
634 Returns:
635 Number of messages cleared
636 """
637 with self._lock:
638 sent_ids = [
639 m.id for m in self._messages.values()
640 if m.status == MessageStatus.SENT
641 ]
642 for msg_id in sent_ids:
643 del self._messages[msg_id]
644 return len(sent_ids)
646 def get_delivery_history(
647 self,
648 message_id: Optional[str] = None,
649 limit: int = 100
650 ) -> List[MessageDeliveryResult]:
651 """
652 Get delivery history.
654 Args:
655 message_id: Optional filter by message ID
656 limit: Maximum number of records
658 Returns:
659 List of delivery results
660 """
661 history = self._delivery_history.copy()
663 if message_id:
664 history = [h for h in history if h.message_id == message_id]
666 return history[-limit:]
668 def get_stats(self) -> Dict[str, Any]:
669 """
670 Get statistics about scheduled messages.
672 Returns:
673 Dictionary with message statistics
674 """
675 with self._lock:
676 messages = list(self._messages.values())
678 pending = sum(1 for m in messages if m.status == MessageStatus.PENDING)
679 sent = sum(1 for m in messages if m.status == MessageStatus.SENT)
680 failed = sum(1 for m in messages if m.status == MessageStatus.FAILED)
681 cancelled = sum(1 for m in messages if m.status == MessageStatus.CANCELLED)
683 return {
684 "total": len(messages),
685 "pending": pending,
686 "sent": sent,
687 "failed": failed,
688 "cancelled": cancelled,
689 "deliveries": len(self._delivery_history)
690 }