Coverage for integrations / channels / automation / cron.py: 30.2%
222 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Cron Manager for HevolveBot Integration.
4Provides enhanced scheduling capabilities with cron expressions.
5"""
7import re
8import secrets
9from dataclasses import dataclass, field
10from datetime import datetime, timedelta
11from enum import Enum
12from typing import Any, Callable, Dict, List, Optional, Union
13import threading
16class JobStatus(Enum):
17 """Status of a scheduled job."""
18 PENDING = "pending"
19 RUNNING = "running"
20 PAUSED = "paused"
21 COMPLETED = "completed"
22 FAILED = "failed"
23 CANCELLED = "cancelled"
26class IntervalUnit(Enum):
27 """Units for interval scheduling."""
28 SECONDS = "seconds"
29 MINUTES = "minutes"
30 HOURS = "hours"
31 DAYS = "days"
32 WEEKS = "weeks"
35@dataclass
36class CronJob:
37 """A scheduled cron job."""
38 id: str
39 name: str
40 callback: Callable
41 schedule_type: str # 'at', 'every', 'cron'
42 schedule_spec: Union[str, datetime, Dict[str, Any]]
43 status: JobStatus = JobStatus.PENDING
44 created_at: datetime = field(default_factory=datetime.now)
45 last_run: Optional[datetime] = None
46 next_run: Optional[datetime] = None
47 run_count: int = 0
48 max_runs: Optional[int] = None
49 args: tuple = field(default_factory=tuple)
50 kwargs: Dict[str, Any] = field(default_factory=dict)
51 metadata: Dict[str, Any] = field(default_factory=dict)
52 error: Optional[str] = None
55@dataclass
56class CronExpression:
57 """Parsed cron expression."""
58 minute: str = "*"
59 hour: str = "*"
60 day_of_month: str = "*"
61 month: str = "*"
62 day_of_week: str = "*"
64 @classmethod
65 def parse(cls, expression: str) -> "CronExpression":
66 """
67 Parse a cron expression string.
69 Args:
70 expression: Cron expression (e.g., "0 9 * * 1-5")
72 Returns:
73 Parsed CronExpression
75 Raises:
76 ValueError: If expression is invalid
77 """
78 parts = expression.strip().split()
79 if len(parts) != 5:
80 raise ValueError(
81 f"Invalid cron expression: expected 5 fields, got {len(parts)}"
82 )
84 return cls(
85 minute=parts[0],
86 hour=parts[1],
87 day_of_month=parts[2],
88 month=parts[3],
89 day_of_week=parts[4]
90 )
92 def matches(self, dt: datetime) -> bool:
93 """
94 Check if a datetime matches this cron expression.
96 Args:
97 dt: The datetime to check
99 Returns:
100 True if the datetime matches
101 """
102 return (
103 self._matches_field(self.minute, dt.minute, 0, 59) and
104 self._matches_field(self.hour, dt.hour, 0, 23) and
105 self._matches_field(self.day_of_month, dt.day, 1, 31) and
106 self._matches_field(self.month, dt.month, 1, 12) and
107 self._matches_field(self.day_of_week, dt.weekday(), 0, 6)
108 )
110 def _matches_field(
111 self,
112 field: str,
113 value: int,
114 min_val: int,
115 max_val: int
116 ) -> bool:
117 """Check if a value matches a cron field."""
118 if field == "*":
119 return True
121 for part in field.split(","):
122 if "-" in part:
123 # Range
124 start, end = map(int, part.split("-"))
125 if start <= value <= end:
126 return True
127 elif "/" in part:
128 # Step
129 base, step = part.split("/")
130 step = int(step)
131 if base == "*":
132 if (value - min_val) % step == 0:
133 return True
134 else:
135 base = int(base)
136 if value >= base and (value - base) % step == 0:
137 return True
138 else:
139 # Single value
140 if int(part) == value:
141 return True
143 return False
145 def next_occurrence(self, after: datetime) -> datetime:
146 """
147 Find the next occurrence after a given datetime.
149 Args:
150 after: The datetime to search after
152 Returns:
153 The next matching datetime
154 """
155 # Start from the next minute
156 dt = after.replace(second=0, microsecond=0) + timedelta(minutes=1)
158 # Search up to a year ahead
159 max_iterations = 525600 # minutes in a year
160 for _ in range(max_iterations):
161 if self.matches(dt):
162 return dt
163 dt += timedelta(minutes=1)
165 raise ValueError("No matching time found within a year")
168class CronManager:
169 """
170 Manages scheduled jobs with cron-like scheduling.
172 Features:
173 - Schedule jobs at specific times
174 - Schedule recurring jobs with intervals
175 - Schedule jobs with cron expressions
176 - Pause and resume jobs
177 - Track job execution history
178 """
180 def __init__(self):
181 """Initialize the CronManager."""
182 self._jobs: Dict[str, CronJob] = {}
183 self._lock = threading.Lock()
184 self._running = False
185 self._execution_history: List[Dict[str, Any]] = []
187 def schedule_at(
188 self,
189 callback: Callable,
190 run_at: datetime,
191 name: Optional[str] = None,
192 job_id: Optional[str] = None,
193 args: tuple = (),
194 kwargs: Optional[Dict[str, Any]] = None,
195 metadata: Optional[Dict[str, Any]] = None
196 ) -> CronJob:
197 """
198 Schedule a job to run at a specific time.
200 Args:
201 callback: The function to execute
202 run_at: When to run the job
203 name: Optional job name
204 job_id: Optional custom job ID
205 args: Positional arguments for the callback
206 kwargs: Keyword arguments for the callback
207 metadata: Optional metadata
209 Returns:
210 The created CronJob
212 Raises:
213 ValueError: If run_at is in the past
214 """
215 if run_at < datetime.now():
216 raise ValueError("Cannot schedule job in the past")
218 job_id = job_id or f"job_{secrets.token_hex(6)}"
219 name = name or f"Job at {run_at.isoformat()}"
221 if job_id in self._jobs:
222 raise ValueError(f"Job with ID '{job_id}' already exists")
224 job = CronJob(
225 id=job_id,
226 name=name,
227 callback=callback,
228 schedule_type="at",
229 schedule_spec=run_at,
230 next_run=run_at,
231 max_runs=1,
232 args=args,
233 kwargs=kwargs or {},
234 metadata=metadata or {}
235 )
237 with self._lock:
238 self._jobs[job_id] = job
240 return job
242 def schedule_every(
243 self,
244 callback: Callable,
245 interval: int,
246 unit: IntervalUnit = IntervalUnit.MINUTES,
247 name: Optional[str] = None,
248 job_id: Optional[str] = None,
249 start_at: Optional[datetime] = None,
250 max_runs: Optional[int] = None,
251 args: tuple = (),
252 kwargs: Optional[Dict[str, Any]] = None,
253 metadata: Optional[Dict[str, Any]] = None
254 ) -> CronJob:
255 """
256 Schedule a recurring job with a fixed interval.
258 Args:
259 callback: The function to execute
260 interval: The interval value
261 unit: The interval unit (seconds, minutes, hours, days, weeks)
262 name: Optional job name
263 job_id: Optional custom job ID
264 start_at: When to start (defaults to now)
265 max_runs: Maximum number of executions (None for unlimited)
266 args: Positional arguments for the callback
267 kwargs: Keyword arguments for the callback
268 metadata: Optional metadata
270 Returns:
271 The created CronJob
272 """
273 job_id = job_id or f"job_{secrets.token_hex(6)}"
274 name = name or f"Every {interval} {unit.value}"
276 if job_id in self._jobs:
277 raise ValueError(f"Job with ID '{job_id}' already exists")
279 start_at = start_at or datetime.now()
281 # Calculate next run based on interval
282 delta = self._get_timedelta(interval, unit)
283 next_run = start_at + delta if start_at <= datetime.now() else start_at
285 job = CronJob(
286 id=job_id,
287 name=name,
288 callback=callback,
289 schedule_type="every",
290 schedule_spec={"interval": interval, "unit": unit.value},
291 next_run=next_run,
292 max_runs=max_runs,
293 args=args,
294 kwargs=kwargs or {},
295 metadata=metadata or {}
296 )
298 with self._lock:
299 self._jobs[job_id] = job
301 return job
303 def schedule_cron(
304 self,
305 callback: Callable,
306 expression: str,
307 name: Optional[str] = None,
308 job_id: Optional[str] = None,
309 max_runs: Optional[int] = None,
310 args: tuple = (),
311 kwargs: Optional[Dict[str, Any]] = None,
312 metadata: Optional[Dict[str, Any]] = None
313 ) -> CronJob:
314 """
315 Schedule a job with a cron expression.
317 Args:
318 callback: The function to execute
319 expression: Cron expression (minute hour day month weekday)
320 name: Optional job name
321 job_id: Optional custom job ID
322 max_runs: Maximum number of executions (None for unlimited)
323 args: Positional arguments for the callback
324 kwargs: Keyword arguments for the callback
325 metadata: Optional metadata
327 Returns:
328 The created CronJob
330 Raises:
331 ValueError: If cron expression is invalid
332 """
333 # Validate the expression
334 cron = CronExpression.parse(expression)
336 job_id = job_id or f"job_{secrets.token_hex(6)}"
337 name = name or f"Cron: {expression}"
339 if job_id in self._jobs:
340 raise ValueError(f"Job with ID '{job_id}' already exists")
342 # Calculate next run
343 next_run = cron.next_occurrence(datetime.now())
345 job = CronJob(
346 id=job_id,
347 name=name,
348 callback=callback,
349 schedule_type="cron",
350 schedule_spec=expression,
351 next_run=next_run,
352 max_runs=max_runs,
353 args=args,
354 kwargs=kwargs or {},
355 metadata=metadata or {}
356 )
358 with self._lock:
359 self._jobs[job_id] = job
361 return job
363 def pause(self, job_id: str) -> bool:
364 """
365 Pause a scheduled job.
367 Args:
368 job_id: The job ID to pause
370 Returns:
371 True if paused, False if not found
372 """
373 with self._lock:
374 if job_id in self._jobs:
375 job = self._jobs[job_id]
376 if job.status not in (JobStatus.COMPLETED, JobStatus.CANCELLED):
377 job.status = JobStatus.PAUSED
378 return True
379 return False
381 def resume(self, job_id: str) -> bool:
382 """
383 Resume a paused job.
385 Args:
386 job_id: The job ID to resume
388 Returns:
389 True if resumed, False if not found or not paused
390 """
391 with self._lock:
392 if job_id in self._jobs:
393 job = self._jobs[job_id]
394 if job.status == JobStatus.PAUSED:
395 job.status = JobStatus.PENDING
396 # Recalculate next run if needed
397 if job.next_run and job.next_run < datetime.now():
398 self._update_next_run(job)
399 return True
400 return False
402 def cancel(self, job_id: str) -> bool:
403 """
404 Cancel a scheduled job.
406 Args:
407 job_id: The job ID to cancel
409 Returns:
410 True if cancelled, False if not found
411 """
412 with self._lock:
413 if job_id in self._jobs:
414 self._jobs[job_id].status = JobStatus.CANCELLED
415 return True
416 return False
418 def remove(self, job_id: str) -> bool:
419 """
420 Remove a job from the scheduler.
422 Args:
423 job_id: The job ID to remove
425 Returns:
426 True if removed, False if not found
427 """
428 with self._lock:
429 if job_id in self._jobs:
430 del self._jobs[job_id]
431 return True
432 return False
434 def list_jobs(
435 self,
436 status: Optional[JobStatus] = None,
437 schedule_type: Optional[str] = None
438 ) -> List[CronJob]:
439 """
440 List all scheduled jobs.
442 Args:
443 status: Optional filter by status
444 schedule_type: Optional filter by schedule type ('at', 'every', 'cron')
446 Returns:
447 List of matching jobs
448 """
449 with self._lock:
450 jobs = list(self._jobs.values())
452 if status:
453 jobs = [j for j in jobs if j.status == status]
455 if schedule_type:
456 jobs = [j for j in jobs if j.schedule_type == schedule_type]
458 return jobs
460 def get_job(self, job_id: str) -> Optional[CronJob]:
461 """
462 Get a specific job by ID.
464 Args:
465 job_id: The job ID
467 Returns:
468 The job or None if not found
469 """
470 return self._jobs.get(job_id)
472 def run_due_jobs(self) -> List[Dict[str, Any]]:
473 """
474 Execute all jobs that are due.
476 Returns:
477 List of execution results
478 """
479 results = []
480 now = datetime.now()
482 with self._lock:
483 due_jobs = [
484 j for j in self._jobs.values()
485 if j.status == JobStatus.PENDING and j.next_run and j.next_run <= now
486 ]
488 for job in due_jobs:
489 result = self._execute_job(job)
490 results.append(result)
492 return results
494 def run_job(self, job_id: str) -> Optional[Dict[str, Any]]:
495 """
496 Manually execute a job immediately.
498 Args:
499 job_id: The job ID to execute
501 Returns:
502 Execution result or None if job not found
503 """
504 job = self._jobs.get(job_id)
505 if job:
506 return self._execute_job(job)
507 return None
509 def _execute_job(self, job: CronJob) -> Dict[str, Any]:
510 """Execute a job and update its state."""
511 job.status = JobStatus.RUNNING
512 job.last_run = datetime.now()
514 result = {
515 "job_id": job.id,
516 "job_name": job.name,
517 "started_at": job.last_run,
518 "success": False,
519 "result": None,
520 "error": None
521 }
523 try:
524 result["result"] = job.callback(*job.args, **job.kwargs)
525 result["success"] = True
526 job.run_count += 1
527 job.error = None
529 # Check if max runs reached
530 if job.max_runs and job.run_count >= job.max_runs:
531 job.status = JobStatus.COMPLETED
532 else:
533 job.status = JobStatus.PENDING
534 self._update_next_run(job)
536 except Exception as e:
537 result["error"] = str(e)
538 job.error = str(e)
539 job.status = JobStatus.FAILED
541 result["completed_at"] = datetime.now()
542 self._execution_history.append(result)
544 return result
546 def _update_next_run(self, job: CronJob) -> None:
547 """Update the next run time for a job."""
548 now = datetime.now()
550 if job.schedule_type == "at":
551 # One-time job, no next run
552 job.next_run = None
554 elif job.schedule_type == "every":
555 spec = job.schedule_spec
556 interval = spec["interval"]
557 unit = IntervalUnit(spec["unit"])
558 delta = self._get_timedelta(interval, unit)
559 job.next_run = now + delta
561 elif job.schedule_type == "cron":
562 cron = CronExpression.parse(job.schedule_spec)
563 job.next_run = cron.next_occurrence(now)
565 def _get_timedelta(self, interval: int, unit: IntervalUnit) -> timedelta:
566 """Convert interval and unit to timedelta."""
567 if unit == IntervalUnit.SECONDS:
568 return timedelta(seconds=interval)
569 elif unit == IntervalUnit.MINUTES:
570 return timedelta(minutes=interval)
571 elif unit == IntervalUnit.HOURS:
572 return timedelta(hours=interval)
573 elif unit == IntervalUnit.DAYS:
574 return timedelta(days=interval)
575 elif unit == IntervalUnit.WEEKS:
576 return timedelta(weeks=interval)
577 else:
578 raise ValueError(f"Unknown interval unit: {unit}")
580 def get_execution_history(
581 self,
582 job_id: Optional[str] = None,
583 limit: int = 100
584 ) -> List[Dict[str, Any]]:
585 """
586 Get job execution history.
588 Args:
589 job_id: Optional filter by job ID
590 limit: Maximum number of records
592 Returns:
593 List of execution records
594 """
595 history = self._execution_history.copy()
597 if job_id:
598 history = [h for h in history if h["job_id"] == job_id]
600 return history[-limit:]
602 def clear_history(self) -> None:
603 """Clear execution history."""
604 self._execution_history.clear()