From 7496a770513481e7dc5c33f21f0ad84baefaf01d Mon Sep 17 00:00:00 2001 From: vishesh10 Date: Wed, 8 Nov 2023 01:06:31 +0530 Subject: [PATCH] Add provision of cron expressions for periodic task scheduling. (#307) --- README.rst | 13 +++++++++- requirements.txt | 1 + tasktiger/__init__.py | 3 ++- tasktiger/schedule.py | 59 ++++++++++++++++++++++++++++++++++++++++-- tests/test_periodic.py | 46 +++++++++++++++++++++++++++++++- 5 files changed, 117 insertions(+), 5 deletions(-) diff --git a/README.rst b/README.rst index 533016ce..34f4a02d 100644 --- a/README.rst +++ b/README.rst @@ -389,7 +389,7 @@ The following options can be only specified in the task decorator: the initial task execution date when a worker is initialized, and to determine the next execution date when the task is about to get executed. - For most common scenarios, the ``periodic`` built-in function can be passed: + For most common scenarios, the below mentioned built-in functions can be passed: - ``periodic(seconds=0, minutes=0, hours=0, days=0, weeks=0, start_date=None, end_date=None)`` @@ -401,6 +401,17 @@ The following options can be only specified in the task decorator: every Sunday at 4am UTC, you could use ``schedule=periodic(weeks=1, start_date=datetime.datetime(2000, 1, 2, 4))``. + - ``cron_expr(expr, start_date=None, end_date=None)`` + + ``start_date``, to specify the periodic task start date. It defaults to + ``2000-01-01T00:00Z``, a Saturday, if not given. + ``end_date``, to specify the periodic task end date. The task repeats + forever if ``end_date`` is not given. + For example, to run a task every hour indefinitely, + use ``schedule=cron_expr("0 * * * *")``. To run a task every Sunday at + 4am UTC, you could use ``schedule=cron_expr("0 4 * * 0")``. + + Custom retrying --------------- diff --git a/requirements.txt b/requirements.txt index 461d8b8c..5fbafa6c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ click==8.1.3 redis==4.5.2 structlog==22.3.0 +croniter diff --git a/tasktiger/__init__.py b/tasktiger/__init__.py index 5ff4d417..2702d291 100644 --- a/tasktiger/__init__.py +++ b/tasktiger/__init__.py @@ -9,7 +9,7 @@ TaskNotFound, ) from .retry import exponential, fixed, linear -from .schedule import periodic +from .schedule import cron_expr, periodic from .task import Task from .tasktiger import TaskTiger, run_worker from .worker import Worker @@ -31,6 +31,7 @@ "exponential", # Schedules "periodic", + "cron_expr", ] diff --git a/tasktiger/schedule.py b/tasktiger/schedule.py index 525aacad..80c2391a 100644 --- a/tasktiger/schedule.py +++ b/tasktiger/schedule.py @@ -1,7 +1,9 @@ import datetime from typing import Callable, Optional, Tuple -__all__ = ["periodic"] +__all__ = ["periodic", "cron_expr"] + +START_DATE = datetime.datetime(2000, 1, 1) def _periodic( @@ -54,5 +56,58 @@ def periodic( assert period > 0, "Must specify a positive period." if not start_date: # Saturday at midnight - start_date = datetime.datetime(2000, 1, 1) + start_date = START_DATE return (_periodic, (period, start_date, end_date)) + + +def _cron_expr( + dt: datetime.datetime, + expr: str, + start_date: datetime.datetime, + end_date: Optional[datetime.datetime] = None, +) -> Optional[datetime.datetime]: + import croniter # type: ignore + import pytz # type: ignore + + localize = pytz.utc.localize + + if end_date and dt >= end_date: + return None + + if dt < start_date: + return start_date + + assert croniter.croniter.is_valid(expr), "Cron expression is not valid." + + start_date = localize(start_date) + dt = localize(dt) + + next_utc = croniter.croniter(expr, dt).get_next(ret_type=datetime.datetime) + next_utc = next_utc.replace(tzinfo=None) + + # Make sure the time is still within bounds. + if end_date and next_utc > end_date: + return None + + return next_utc + + +def cron_expr( + expr: str, + start_date: Optional[datetime.datetime] = None, + end_date: Optional[datetime.datetime] = None, +) -> Tuple[Callable[..., Optional[datetime.datetime]], Tuple]: + """ + Periodic task schedule via cron expression: Use to schedule a task to run periodically, + starting from start_date (or None to be active immediately) until end_date + (or None to repeat forever). + + This function behaves similar to the cron jobs, which run with a minimum of 1 minute + granularity. So specifying "* * * * *" expression will the run the task every + minute. + + For more details, see README. + """ + if not start_date: + start_date = START_DATE + return (_cron_expr, (expr, start_date, end_date)) diff --git a/tests/test_periodic.py b/tests/test_periodic.py index bc2fab28..caede500 100644 --- a/tests/test_periodic.py +++ b/tests/test_periodic.py @@ -3,7 +3,7 @@ import datetime import time -from tasktiger import Task, Worker, periodic +from tasktiger import Task, Worker, cron_expr, periodic from tasktiger._internal import ( QUEUED, SCHEDULED, @@ -64,6 +64,50 @@ def test_periodic_schedule(self): f = periodic(minutes=1, end_date=dt) assert f[0](datetime.datetime(2010, 1, 1, 0, 1), *f[1]) is None + def test_cron_schedule(self): + """ + Test the cron_expr() schedule function. + """ + dt = datetime.datetime(2010, 1, 1) + + f = cron_expr("* * * * *") + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 0, 1) + + f = cron_expr("0 * * * *") + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 1) + + f = cron_expr("0 0 * * *") + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 2) + + f = cron_expr("0 0 * * 6") + # 2010-01-02 is a Saturday + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 2) + + f = cron_expr("0 0 * * 0", start_date=datetime.datetime(2000, 1, 2)) + # 2000-01-02 is a Sunday and 2010-01-02 is a Saturday + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 3) + + f = cron_expr("2 3 * * *", start_date=dt) + assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 3, 2) + # Make sure we return the start_date if the current date is earlier. + assert f[0](datetime.datetime(1990, 1, 1), *f[1]) == dt + + f = cron_expr("* * * * *", end_date=dt) + assert f[0]( + datetime.datetime(2009, 12, 31, 23, 58), *f[1] + ) == datetime.datetime(2009, 12, 31, 23, 59) + + f = cron_expr("* * * * *", end_date=dt) + assert f[0]( + datetime.datetime(2009, 12, 31, 23, 59), *f[1] + ) == datetime.datetime(2010, 1, 1, 0, 0) + + f = cron_expr("* * * * *", end_date=dt) + assert f[0](datetime.datetime(2010, 1, 1, 0, 0), *f[1]) is None + + f = cron_expr("* * * * *", end_date=dt) + assert f[0](datetime.datetime(2010, 1, 1, 0, 1), *f[1]) is None + def test_periodic_execution(self): """ Test periodic task execution.