Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RRule support #319

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from redis import WatchError

from .utils import from_unix, to_unix, get_next_scheduled_time, rationalize_until
from .utils import from_unix, to_unix, get_next_scheduled_time, get_next_rrule_scheduled_time, rationalize_until

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -298,6 +298,37 @@ def cron(self, cron_string, func, args=None, kwargs=None, repeat=None,
{job.id: to_unix(scheduled_time)})
return job


def rrule(self, rrule_string, func, args=None, kwargs=None, repeat=None,
queue_name=None, result_ttl=-1, ttl=None, id=None, timeout=None, description=None, meta=None,
depends_on=None, on_success=None, on_failure=None, at_front: bool = False):
"""
Schedule a recurring job via RRule
"""
scheduled_time = get_next_rrule_scheduled_time(rrule_string)
if not scheduled_time:
return None

job = self._create_job(func, args=args, kwargs=kwargs, commit=False,
result_ttl=result_ttl, ttl=ttl, id=id, queue_name=queue_name,
description=description, timeout=timeout, meta=meta, depends_on=depends_on,
on_success=on_success, on_failure=on_failure)

job.meta['rrule_string'] = rrule_string

if repeat is not None:
job.meta['repeat'] = int(repeat)

if at_front:
job.enqueue_at_front = True

job.save()

self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(scheduled_time)})
return job


def cancel(self, job):
"""
Pulls a job from the scheduler queue. This function accepts either a
Expand Down Expand Up @@ -415,6 +446,7 @@ def enqueue_job(self, job):
interval = job.meta.get('interval', None)
repeat = job.meta.get('repeat', None)
cron_string = job.meta.get('cron_string', None)
rrule_string = job.meta.get('rrule_string', None)
use_local_timezone = job.meta.get('use_local_timezone', None)

# If job is a repeated job, decrement counter
Expand All @@ -425,21 +457,21 @@ def enqueue_job(self, job):
queue.enqueue_job(job, at_front=bool(job.enqueue_at_front))
self.connection.zrem(self.scheduled_jobs_key, job.id)

# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
if interval:
# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(datetime.utcnow()) + int(interval)})
elif cron_string:
# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
next_scheduled_time = get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone)
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(next_scheduled_time)})
elif rrule_string:
next_scheduled_time = get_next_rrule_scheduled_time(rrule_string)
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(next_scheduled_time)})

def enqueue_jobs(self):
"""
Expand Down
23 changes: 23 additions & 0 deletions rq_scheduler/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import calendar
import crontab
import dateutil.tz
import dateutil.rrule
import re

from datetime import datetime, timedelta
import logging
Expand Down Expand Up @@ -30,6 +32,27 @@ def get_next_scheduled_time(cron_string, use_local_timezone=False):
return next_time.astimezone(tz)


def get_next_rrule_scheduled_time(rrule_string):
"""Calculate the next scheduled time by creating a rrule object
with a rrule string"""
timezone = dateutil.tz.UTC
ruleset = dateutil.rrule.rrulestr(rrule_string, forceset=True)
any_occurence = None
for occur in ruleset:
any_occurence = occur
break
if any_occurence is None:
return None
if any_occurence.tzinfo is None:
now = datetime.now()
else:
now = datetime.now(tz=timezone)
next_occurence = ruleset.after(now)
if next_occurence is None:
return None
return next_occurence.astimezone(timezone)


def setup_loghandlers(level='INFO'):
logger = logging.getLogger('rq_scheduler.scheduler')
if not logger.handlers:
Expand Down
Loading