From f875f213de301176412af1299af4e1e47678cdd0 Mon Sep 17 00:00:00 2001 From: KlochkovHF <122774728+KlochkovHF@users.noreply.github.com> Date: Mon, 17 Apr 2023 14:08:45 +0300 Subject: [PATCH] Added FIXED_INTERVAL retry policy (#5) * Added FIXED_INTERVAL retry policy * WorkerSettings.redis_client now abstract --- setup.cfg | 2 +- tests/test_task_queue.py | 63 ++++++++++++++++++++++++++++++++++ yatq/enums.py | 2 ++ yatq/queue.py | 4 ++- yatq/worker/worker_settings.py | 4 ++- 5 files changed, 72 insertions(+), 3 deletions(-) diff --git a/setup.cfg b/setup.cfg index e12e516..6242d63 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = yatq -version = 0.0.4 +version = 0.0.5 [options] packages = find: diff --git a/tests/test_task_queue.py b/tests/test_task_queue.py index ae541e3..b9abefd 100644 --- a/tests/test_task_queue.py +++ b/tests/test_task_queue.py @@ -416,6 +416,69 @@ async def test_task_retry_exponential_policy(task_queue, queue_checker, freezer) await queue_checker.assert_metric_requeued(3) +@pytest.mark.asyncio +async def test_task_retry_every_x_policy(task_queue, queue_checker, freezer): + task_1 = await task_queue.add_task( + {"key": "value"}, retry_policy=RetryPolicy.FIXED_INTERVAL, retry_delay=4 + ) + assert isinstance(task_1.id, str) + await queue_checker.assert_state(task_1.id, TaskState.QUEUED) + + await queue_checker.assert_pending_count(1) + await queue_checker.assert_processing_count(0) + + task = await task_queue.get_task() + await queue_checker.assert_pending_count(0) + await queue_checker.assert_processing_count(1) + await queue_checker.assert_state(task_1.id, TaskState.PROCESSING) + + await task_queue.auto_reschedule_task(task) + await queue_checker.assert_pending_count(1) + await queue_checker.assert_processing_count(0) + await queue_checker.assert_state(task_1.id, TaskState.REQUEUED) + + # Task is not immediately available + task = await task_queue.get_task() + assert task is None + + # Task is not available after 3 seconds + freezer.tick(3) + task = await task_queue.get_task() + assert task is None + + # Task is available after 5 seconds + freezer.tick(4) + task = await task_queue.get_task() + assert task is not None + await queue_checker.assert_pending_count(0) + await queue_checker.assert_processing_count(1) + await queue_checker.assert_state(task_1.id, TaskState.PROCESSING) + await task_queue.auto_reschedule_task(task) + + await queue_checker.assert_metric_added(1) + await queue_checker.assert_metric_taken(2) + await queue_checker.assert_metric_requeued(2) + + # Second retry - task is not available after 3 seconds + freezer.tick(3) + task = await task_queue.get_task() + assert task is None + + # Task is available after 4 seconds + freezer.tick(1) + task = await task_queue.get_task() + assert task is not None + await queue_checker.assert_pending_count(0) + await queue_checker.assert_processing_count(1) + await queue_checker.assert_state(task_1.id, TaskState.PROCESSING) + await task_queue.auto_reschedule_task(task) + + await queue_checker.assert_metric_added(1) + await queue_checker.assert_metric_taken(3) + await queue_checker.assert_metric_requeued(3) + + + @pytest.mark.asyncio async def test_task_retry_forced(task_queue, queue_checker, freezer): task_1 = await task_queue.add_task( diff --git a/yatq/enums.py b/yatq/enums.py index 0693ef9..31d0331 100644 --- a/yatq/enums.py +++ b/yatq/enums.py @@ -9,11 +9,13 @@ class RetryPolicy(str, Enum): NONE - no retry allowed LINEAR - delay between executions grows linearly EXPONENTIAL - delay between executions grows exponentially + FIXED_INTERVAL - delay between executions is constant independent of retries """ NONE = "NONE" LINEAR = "LINEAR" EXPONENTIAL = "EXPONENTIAL" + FIXED_INTERVAL = "FIXED_INTERVAL" class TaskState(str, Enum): diff --git a/yatq/queue.py b/yatq/queue.py index 1662e52..00f3cae 100644 --- a/yatq/queue.py +++ b/yatq/queue.py @@ -228,8 +228,10 @@ async def auto_reschedule_task( if task.policy == RetryPolicy.LINEAR: delay = task.delay * task.retry_counter - else: + elif task.policy == RetryPolicy.EXPONENTIAL: delay = task.delay**task.retry_counter + else: + delay = task.delay after_time = int(time.time()) + delay task.state = TaskState.REQUEUED diff --git a/yatq/worker/worker_settings.py b/yatq/worker/worker_settings.py index 4650994..5f2858d 100644 --- a/yatq/worker/worker_settings.py +++ b/yatq/worker/worker_settings.py @@ -1,5 +1,6 @@ from types import TracebackType from typing import Awaitable, Callable, Dict, Optional, Tuple, Type +from abc import ABC, abstractmethod import aioredis @@ -11,7 +12,7 @@ T_ExceptionHandler = Callable[[BaseJob, T_ExcInfo], Awaitable] -class WorkerSettings: +class WorkerSettings(ABC): """ WorkerSettings class is used to configure worker. @@ -36,6 +37,7 @@ async def on_shutdown() -> None: # pragma: no cover ... @staticmethod + @abstractmethod async def redis_client() -> aioredis.Redis: # pragma: no cover ...