Skip to content

Commit

Permalink
Added FIXED_INTERVAL retry policy (#5)
Browse files Browse the repository at this point in the history
* Added FIXED_INTERVAL retry policy

* WorkerSettings.redis_client now abstract
  • Loading branch information
KlochkovHF authored Apr 17, 2023
1 parent acd405e commit f875f21
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 3 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = yatq
version = 0.0.4
version = 0.0.5

[options]
packages = find:
Expand Down
63 changes: 63 additions & 0 deletions tests/test_task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,69 @@ async def test_task_retry_exponential_policy(task_queue, queue_checker, freezer)
await queue_checker.assert_metric_requeued(3)


@pytest.mark.asyncio
async def test_task_retry_every_x_policy(task_queue, queue_checker, freezer):
task_1 = await task_queue.add_task(
{"key": "value"}, retry_policy=RetryPolicy.FIXED_INTERVAL, retry_delay=4
)
assert isinstance(task_1.id, str)
await queue_checker.assert_state(task_1.id, TaskState.QUEUED)

await queue_checker.assert_pending_count(1)
await queue_checker.assert_processing_count(0)

task = await task_queue.get_task()
await queue_checker.assert_pending_count(0)
await queue_checker.assert_processing_count(1)
await queue_checker.assert_state(task_1.id, TaskState.PROCESSING)

await task_queue.auto_reschedule_task(task)
await queue_checker.assert_pending_count(1)
await queue_checker.assert_processing_count(0)
await queue_checker.assert_state(task_1.id, TaskState.REQUEUED)

# Task is not immediately available
task = await task_queue.get_task()
assert task is None

# Task is not available after 3 seconds
freezer.tick(3)
task = await task_queue.get_task()
assert task is None

# Task is available after 5 seconds
freezer.tick(4)
task = await task_queue.get_task()
assert task is not None
await queue_checker.assert_pending_count(0)
await queue_checker.assert_processing_count(1)
await queue_checker.assert_state(task_1.id, TaskState.PROCESSING)
await task_queue.auto_reschedule_task(task)

await queue_checker.assert_metric_added(1)
await queue_checker.assert_metric_taken(2)
await queue_checker.assert_metric_requeued(2)

# Second retry - task is not available after 3 seconds
freezer.tick(3)
task = await task_queue.get_task()
assert task is None

# Task is available after 4 seconds
freezer.tick(1)
task = await task_queue.get_task()
assert task is not None
await queue_checker.assert_pending_count(0)
await queue_checker.assert_processing_count(1)
await queue_checker.assert_state(task_1.id, TaskState.PROCESSING)
await task_queue.auto_reschedule_task(task)

await queue_checker.assert_metric_added(1)
await queue_checker.assert_metric_taken(3)
await queue_checker.assert_metric_requeued(3)



@pytest.mark.asyncio
async def test_task_retry_forced(task_queue, queue_checker, freezer):
task_1 = await task_queue.add_task(
Expand Down
2 changes: 2 additions & 0 deletions yatq/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ class RetryPolicy(str, Enum):
NONE - no retry allowed
LINEAR - delay between executions grows linearly
EXPONENTIAL - delay between executions grows exponentially
FIXED_INTERVAL - delay between executions is constant independent of retries
"""

NONE = "NONE"
LINEAR = "LINEAR"
EXPONENTIAL = "EXPONENTIAL"
FIXED_INTERVAL = "FIXED_INTERVAL"


class TaskState(str, Enum):
Expand Down
4 changes: 3 additions & 1 deletion yatq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,10 @@ async def auto_reschedule_task(

if task.policy == RetryPolicy.LINEAR:
delay = task.delay * task.retry_counter
else:
elif task.policy == RetryPolicy.EXPONENTIAL:
delay = task.delay**task.retry_counter
else:
delay = task.delay

after_time = int(time.time()) + delay
task.state = TaskState.REQUEUED
Expand Down
4 changes: 3 additions & 1 deletion yatq/worker/worker_settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from types import TracebackType
from typing import Awaitable, Callable, Dict, Optional, Tuple, Type
from abc import ABC, abstractmethod

import aioredis

Expand All @@ -11,7 +12,7 @@
T_ExceptionHandler = Callable[[BaseJob, T_ExcInfo], Awaitable]


class WorkerSettings:
class WorkerSettings(ABC):

"""
WorkerSettings class is used to configure worker.
Expand All @@ -36,6 +37,7 @@ async def on_shutdown() -> None: # pragma: no cover
...

@staticmethod
@abstractmethod
async def redis_client() -> aioredis.Redis: # pragma: no cover
...

Expand Down

0 comments on commit f875f21

Please sign in to comment.