Coverage for integrations / channels / automation / cron.py: 30.2%

222 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Cron Manager for HevolveBot Integration. 

3 

4Provides enhanced scheduling capabilities with cron expressions. 

5""" 

6 

7import re 

8import secrets 

9from dataclasses import dataclass, field 

10from datetime import datetime, timedelta 

11from enum import Enum 

12from typing import Any, Callable, Dict, List, Optional, Union 

13import threading 

14 

15 

16class JobStatus(Enum): 

17 """Status of a scheduled job.""" 

18 PENDING = "pending" 

19 RUNNING = "running" 

20 PAUSED = "paused" 

21 COMPLETED = "completed" 

22 FAILED = "failed" 

23 CANCELLED = "cancelled" 

24 

25 

26class IntervalUnit(Enum): 

27 """Units for interval scheduling.""" 

28 SECONDS = "seconds" 

29 MINUTES = "minutes" 

30 HOURS = "hours" 

31 DAYS = "days" 

32 WEEKS = "weeks" 

33 

34 

35@dataclass 

36class CronJob: 

37 """A scheduled cron job.""" 

38 id: str 

39 name: str 

40 callback: Callable 

41 schedule_type: str # 'at', 'every', 'cron' 

42 schedule_spec: Union[str, datetime, Dict[str, Any]] 

43 status: JobStatus = JobStatus.PENDING 

44 created_at: datetime = field(default_factory=datetime.now) 

45 last_run: Optional[datetime] = None 

46 next_run: Optional[datetime] = None 

47 run_count: int = 0 

48 max_runs: Optional[int] = None 

49 args: tuple = field(default_factory=tuple) 

50 kwargs: Dict[str, Any] = field(default_factory=dict) 

51 metadata: Dict[str, Any] = field(default_factory=dict) 

52 error: Optional[str] = None 

53 

54 

55@dataclass 

56class CronExpression: 

57 """Parsed cron expression.""" 

58 minute: str = "*" 

59 hour: str = "*" 

60 day_of_month: str = "*" 

61 month: str = "*" 

62 day_of_week: str = "*" 

63 

64 @classmethod 

65 def parse(cls, expression: str) -> "CronExpression": 

66 """ 

67 Parse a cron expression string. 

68 

69 Args: 

70 expression: Cron expression (e.g., "0 9 * * 1-5") 

71 

72 Returns: 

73 Parsed CronExpression 

74 

75 Raises: 

76 ValueError: If expression is invalid 

77 """ 

78 parts = expression.strip().split() 

79 if len(parts) != 5: 

80 raise ValueError( 

81 f"Invalid cron expression: expected 5 fields, got {len(parts)}" 

82 ) 

83 

84 return cls( 

85 minute=parts[0], 

86 hour=parts[1], 

87 day_of_month=parts[2], 

88 month=parts[3], 

89 day_of_week=parts[4] 

90 ) 

91 

92 def matches(self, dt: datetime) -> bool: 

93 """ 

94 Check if a datetime matches this cron expression. 

95 

96 Args: 

97 dt: The datetime to check 

98 

99 Returns: 

100 True if the datetime matches 

101 """ 

102 return ( 

103 self._matches_field(self.minute, dt.minute, 0, 59) and 

104 self._matches_field(self.hour, dt.hour, 0, 23) and 

105 self._matches_field(self.day_of_month, dt.day, 1, 31) and 

106 self._matches_field(self.month, dt.month, 1, 12) and 

107 self._matches_field(self.day_of_week, dt.weekday(), 0, 6) 

108 ) 

109 

110 def _matches_field( 

111 self, 

112 field: str, 

113 value: int, 

114 min_val: int, 

115 max_val: int 

116 ) -> bool: 

117 """Check if a value matches a cron field.""" 

118 if field == "*": 

119 return True 

120 

121 for part in field.split(","): 

122 if "-" in part: 

123 # Range 

124 start, end = map(int, part.split("-")) 

125 if start <= value <= end: 

126 return True 

127 elif "/" in part: 

128 # Step 

129 base, step = part.split("/") 

130 step = int(step) 

131 if base == "*": 

132 if (value - min_val) % step == 0: 

133 return True 

134 else: 

135 base = int(base) 

136 if value >= base and (value - base) % step == 0: 

137 return True 

138 else: 

139 # Single value 

140 if int(part) == value: 

141 return True 

142 

143 return False 

144 

145 def next_occurrence(self, after: datetime) -> datetime: 

146 """ 

147 Find the next occurrence after a given datetime. 

148 

149 Args: 

150 after: The datetime to search after 

151 

152 Returns: 

153 The next matching datetime 

154 """ 

155 # Start from the next minute 

156 dt = after.replace(second=0, microsecond=0) + timedelta(minutes=1) 

157 

158 # Search up to a year ahead 

159 max_iterations = 525600 # minutes in a year 

160 for _ in range(max_iterations): 

161 if self.matches(dt): 

162 return dt 

163 dt += timedelta(minutes=1) 

164 

165 raise ValueError("No matching time found within a year") 

166 

167 

168class CronManager: 

169 """ 

170 Manages scheduled jobs with cron-like scheduling. 

171 

172 Features: 

173 - Schedule jobs at specific times 

174 - Schedule recurring jobs with intervals 

175 - Schedule jobs with cron expressions 

176 - Pause and resume jobs 

177 - Track job execution history 

178 """ 

179 

180 def __init__(self): 

181 """Initialize the CronManager.""" 

182 self._jobs: Dict[str, CronJob] = {} 

183 self._lock = threading.Lock() 

184 self._running = False 

185 self._execution_history: List[Dict[str, Any]] = [] 

186 

187 def schedule_at( 

188 self, 

189 callback: Callable, 

190 run_at: datetime, 

191 name: Optional[str] = None, 

192 job_id: Optional[str] = None, 

193 args: tuple = (), 

194 kwargs: Optional[Dict[str, Any]] = None, 

195 metadata: Optional[Dict[str, Any]] = None 

196 ) -> CronJob: 

197 """ 

198 Schedule a job to run at a specific time. 

199 

200 Args: 

201 callback: The function to execute 

202 run_at: When to run the job 

203 name: Optional job name 

204 job_id: Optional custom job ID 

205 args: Positional arguments for the callback 

206 kwargs: Keyword arguments for the callback 

207 metadata: Optional metadata 

208 

209 Returns: 

210 The created CronJob 

211 

212 Raises: 

213 ValueError: If run_at is in the past 

214 """ 

215 if run_at < datetime.now(): 

216 raise ValueError("Cannot schedule job in the past") 

217 

218 job_id = job_id or f"job_{secrets.token_hex(6)}" 

219 name = name or f"Job at {run_at.isoformat()}" 

220 

221 if job_id in self._jobs: 

222 raise ValueError(f"Job with ID '{job_id}' already exists") 

223 

224 job = CronJob( 

225 id=job_id, 

226 name=name, 

227 callback=callback, 

228 schedule_type="at", 

229 schedule_spec=run_at, 

230 next_run=run_at, 

231 max_runs=1, 

232 args=args, 

233 kwargs=kwargs or {}, 

234 metadata=metadata or {} 

235 ) 

236 

237 with self._lock: 

238 self._jobs[job_id] = job 

239 

240 return job 

241 

242 def schedule_every( 

243 self, 

244 callback: Callable, 

245 interval: int, 

246 unit: IntervalUnit = IntervalUnit.MINUTES, 

247 name: Optional[str] = None, 

248 job_id: Optional[str] = None, 

249 start_at: Optional[datetime] = None, 

250 max_runs: Optional[int] = None, 

251 args: tuple = (), 

252 kwargs: Optional[Dict[str, Any]] = None, 

253 metadata: Optional[Dict[str, Any]] = None 

254 ) -> CronJob: 

255 """ 

256 Schedule a recurring job with a fixed interval. 

257 

258 Args: 

259 callback: The function to execute 

260 interval: The interval value 

261 unit: The interval unit (seconds, minutes, hours, days, weeks) 

262 name: Optional job name 

263 job_id: Optional custom job ID 

264 start_at: When to start (defaults to now) 

265 max_runs: Maximum number of executions (None for unlimited) 

266 args: Positional arguments for the callback 

267 kwargs: Keyword arguments for the callback 

268 metadata: Optional metadata 

269 

270 Returns: 

271 The created CronJob 

272 """ 

273 job_id = job_id or f"job_{secrets.token_hex(6)}" 

274 name = name or f"Every {interval} {unit.value}" 

275 

276 if job_id in self._jobs: 

277 raise ValueError(f"Job with ID '{job_id}' already exists") 

278 

279 start_at = start_at or datetime.now() 

280 

281 # Calculate next run based on interval 

282 delta = self._get_timedelta(interval, unit) 

283 next_run = start_at + delta if start_at <= datetime.now() else start_at 

284 

285 job = CronJob( 

286 id=job_id, 

287 name=name, 

288 callback=callback, 

289 schedule_type="every", 

290 schedule_spec={"interval": interval, "unit": unit.value}, 

291 next_run=next_run, 

292 max_runs=max_runs, 

293 args=args, 

294 kwargs=kwargs or {}, 

295 metadata=metadata or {} 

296 ) 

297 

298 with self._lock: 

299 self._jobs[job_id] = job 

300 

301 return job 

302 

303 def schedule_cron( 

304 self, 

305 callback: Callable, 

306 expression: str, 

307 name: Optional[str] = None, 

308 job_id: Optional[str] = None, 

309 max_runs: Optional[int] = None, 

310 args: tuple = (), 

311 kwargs: Optional[Dict[str, Any]] = None, 

312 metadata: Optional[Dict[str, Any]] = None 

313 ) -> CronJob: 

314 """ 

315 Schedule a job with a cron expression. 

316 

317 Args: 

318 callback: The function to execute 

319 expression: Cron expression (minute hour day month weekday) 

320 name: Optional job name 

321 job_id: Optional custom job ID 

322 max_runs: Maximum number of executions (None for unlimited) 

323 args: Positional arguments for the callback 

324 kwargs: Keyword arguments for the callback 

325 metadata: Optional metadata 

326 

327 Returns: 

328 The created CronJob 

329 

330 Raises: 

331 ValueError: If cron expression is invalid 

332 """ 

333 # Validate the expression 

334 cron = CronExpression.parse(expression) 

335 

336 job_id = job_id or f"job_{secrets.token_hex(6)}" 

337 name = name or f"Cron: {expression}" 

338 

339 if job_id in self._jobs: 

340 raise ValueError(f"Job with ID '{job_id}' already exists") 

341 

342 # Calculate next run 

343 next_run = cron.next_occurrence(datetime.now()) 

344 

345 job = CronJob( 

346 id=job_id, 

347 name=name, 

348 callback=callback, 

349 schedule_type="cron", 

350 schedule_spec=expression, 

351 next_run=next_run, 

352 max_runs=max_runs, 

353 args=args, 

354 kwargs=kwargs or {}, 

355 metadata=metadata or {} 

356 ) 

357 

358 with self._lock: 

359 self._jobs[job_id] = job 

360 

361 return job 

362 

363 def pause(self, job_id: str) -> bool: 

364 """ 

365 Pause a scheduled job. 

366 

367 Args: 

368 job_id: The job ID to pause 

369 

370 Returns: 

371 True if paused, False if not found 

372 """ 

373 with self._lock: 

374 if job_id in self._jobs: 

375 job = self._jobs[job_id] 

376 if job.status not in (JobStatus.COMPLETED, JobStatus.CANCELLED): 

377 job.status = JobStatus.PAUSED 

378 return True 

379 return False 

380 

381 def resume(self, job_id: str) -> bool: 

382 """ 

383 Resume a paused job. 

384 

385 Args: 

386 job_id: The job ID to resume 

387 

388 Returns: 

389 True if resumed, False if not found or not paused 

390 """ 

391 with self._lock: 

392 if job_id in self._jobs: 

393 job = self._jobs[job_id] 

394 if job.status == JobStatus.PAUSED: 

395 job.status = JobStatus.PENDING 

396 # Recalculate next run if needed 

397 if job.next_run and job.next_run < datetime.now(): 

398 self._update_next_run(job) 

399 return True 

400 return False 

401 

402 def cancel(self, job_id: str) -> bool: 

403 """ 

404 Cancel a scheduled job. 

405 

406 Args: 

407 job_id: The job ID to cancel 

408 

409 Returns: 

410 True if cancelled, False if not found 

411 """ 

412 with self._lock: 

413 if job_id in self._jobs: 

414 self._jobs[job_id].status = JobStatus.CANCELLED 

415 return True 

416 return False 

417 

418 def remove(self, job_id: str) -> bool: 

419 """ 

420 Remove a job from the scheduler. 

421 

422 Args: 

423 job_id: The job ID to remove 

424 

425 Returns: 

426 True if removed, False if not found 

427 """ 

428 with self._lock: 

429 if job_id in self._jobs: 

430 del self._jobs[job_id] 

431 return True 

432 return False 

433 

434 def list_jobs( 

435 self, 

436 status: Optional[JobStatus] = None, 

437 schedule_type: Optional[str] = None 

438 ) -> List[CronJob]: 

439 """ 

440 List all scheduled jobs. 

441 

442 Args: 

443 status: Optional filter by status 

444 schedule_type: Optional filter by schedule type ('at', 'every', 'cron') 

445 

446 Returns: 

447 List of matching jobs 

448 """ 

449 with self._lock: 

450 jobs = list(self._jobs.values()) 

451 

452 if status: 

453 jobs = [j for j in jobs if j.status == status] 

454 

455 if schedule_type: 

456 jobs = [j for j in jobs if j.schedule_type == schedule_type] 

457 

458 return jobs 

459 

460 def get_job(self, job_id: str) -> Optional[CronJob]: 

461 """ 

462 Get a specific job by ID. 

463 

464 Args: 

465 job_id: The job ID 

466 

467 Returns: 

468 The job or None if not found 

469 """ 

470 return self._jobs.get(job_id) 

471 

472 def run_due_jobs(self) -> List[Dict[str, Any]]: 

473 """ 

474 Execute all jobs that are due. 

475 

476 Returns: 

477 List of execution results 

478 """ 

479 results = [] 

480 now = datetime.now() 

481 

482 with self._lock: 

483 due_jobs = [ 

484 j for j in self._jobs.values() 

485 if j.status == JobStatus.PENDING and j.next_run and j.next_run <= now 

486 ] 

487 

488 for job in due_jobs: 

489 result = self._execute_job(job) 

490 results.append(result) 

491 

492 return results 

493 

494 def run_job(self, job_id: str) -> Optional[Dict[str, Any]]: 

495 """ 

496 Manually execute a job immediately. 

497 

498 Args: 

499 job_id: The job ID to execute 

500 

501 Returns: 

502 Execution result or None if job not found 

503 """ 

504 job = self._jobs.get(job_id) 

505 if job: 

506 return self._execute_job(job) 

507 return None 

508 

509 def _execute_job(self, job: CronJob) -> Dict[str, Any]: 

510 """Execute a job and update its state.""" 

511 job.status = JobStatus.RUNNING 

512 job.last_run = datetime.now() 

513 

514 result = { 

515 "job_id": job.id, 

516 "job_name": job.name, 

517 "started_at": job.last_run, 

518 "success": False, 

519 "result": None, 

520 "error": None 

521 } 

522 

523 try: 

524 result["result"] = job.callback(*job.args, **job.kwargs) 

525 result["success"] = True 

526 job.run_count += 1 

527 job.error = None 

528 

529 # Check if max runs reached 

530 if job.max_runs and job.run_count >= job.max_runs: 

531 job.status = JobStatus.COMPLETED 

532 else: 

533 job.status = JobStatus.PENDING 

534 self._update_next_run(job) 

535 

536 except Exception as e: 

537 result["error"] = str(e) 

538 job.error = str(e) 

539 job.status = JobStatus.FAILED 

540 

541 result["completed_at"] = datetime.now() 

542 self._execution_history.append(result) 

543 

544 return result 

545 

546 def _update_next_run(self, job: CronJob) -> None: 

547 """Update the next run time for a job.""" 

548 now = datetime.now() 

549 

550 if job.schedule_type == "at": 

551 # One-time job, no next run 

552 job.next_run = None 

553 

554 elif job.schedule_type == "every": 

555 spec = job.schedule_spec 

556 interval = spec["interval"] 

557 unit = IntervalUnit(spec["unit"]) 

558 delta = self._get_timedelta(interval, unit) 

559 job.next_run = now + delta 

560 

561 elif job.schedule_type == "cron": 

562 cron = CronExpression.parse(job.schedule_spec) 

563 job.next_run = cron.next_occurrence(now) 

564 

565 def _get_timedelta(self, interval: int, unit: IntervalUnit) -> timedelta: 

566 """Convert interval and unit to timedelta.""" 

567 if unit == IntervalUnit.SECONDS: 

568 return timedelta(seconds=interval) 

569 elif unit == IntervalUnit.MINUTES: 

570 return timedelta(minutes=interval) 

571 elif unit == IntervalUnit.HOURS: 

572 return timedelta(hours=interval) 

573 elif unit == IntervalUnit.DAYS: 

574 return timedelta(days=interval) 

575 elif unit == IntervalUnit.WEEKS: 

576 return timedelta(weeks=interval) 

577 else: 

578 raise ValueError(f"Unknown interval unit: {unit}") 

579 

580 def get_execution_history( 

581 self, 

582 job_id: Optional[str] = None, 

583 limit: int = 100 

584 ) -> List[Dict[str, Any]]: 

585 """ 

586 Get job execution history. 

587 

588 Args: 

589 job_id: Optional filter by job ID 

590 limit: Maximum number of records 

591 

592 Returns: 

593 List of execution records 

594 """ 

595 history = self._execution_history.copy() 

596 

597 if job_id: 

598 history = [h for h in history if h["job_id"] == job_id] 

599 

600 return history[-limit:] 

601 

602 def clear_history(self) -> None: 

603 """Clear execution history.""" 

604 self._execution_history.clear()