diff --git a/README.md b/README.md index d589707..fb01036 100644 --- a/README.md +++ b/README.md @@ -288,6 +288,11 @@ queue_depths = Job.get_queue_depths() print(queue_depths) # {"default": 1, "other_queue": 1} ``` +You can also exclude jobs which exist but are scheduled to be run in the future from the queue depths, where `run_after` is set to a future time from now. To do this set the `exclude_future_jobs` kwarg like so: +```python +queue_depths = Job.get_queue_depths(exclude_future_jobs=True) +``` + **Important:** When checking queue depths, do not assume that the key for your queue will always be available. Queue depths of zero won't be included in the dict returned by this method. @@ -312,6 +317,8 @@ manage.py worker [queue_name] [--rate_limit] If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any jobs in the "NEW" or "READY" states will be returned. +If you wish to exclude jobs which are scheduled to be run in the future you can add `--exclude_future_jobs` to the command. + **Important:** If you misspell or provide a queue name which does not have any jobs, a depth of 0 will always be returned. ### Gotcha: `bulk_create` diff --git a/django_dbq/management/commands/queue_depth.py b/django_dbq/management/commands/queue_depth.py index 3419601..cb8b6fd 100644 --- a/django_dbq/management/commands/queue_depth.py +++ b/django_dbq/management/commands/queue_depth.py @@ -8,10 +8,13 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument("queue_name", nargs="*", default=["default"], type=str) + parser.add_argument("--exclude_future_jobs", default=False, type=bool) def handle(self, *args, **options): queue_names = options["queue_name"] - queue_depths = Job.get_queue_depths() + queue_depths = Job.get_queue_depths( + exclude_future_jobs=options["exclude_future_jobs"] + ) queue_depths_string = " ".join( [ diff --git a/django_dbq/models.py b/django_dbq/models.py index b58eef4..a90354f 100644 --- a/django_dbq/models.py +++ b/django_dbq/models.py @@ -8,7 +8,7 @@ get_failure_hook_name, get_creation_hook_name, ) -from django.db.models import JSONField, UUIDField, Count, TextChoices +from django.db.models import JSONField, UUIDField, Count, TextChoices, Q import datetime import logging import uuid @@ -173,10 +173,17 @@ def run_creation_hook(self): creation_hook_function(self) @staticmethod - def get_queue_depths(): + def get_queue_depths(*, exclude_future_jobs=False): + jobs_waiting_in_queue = Job.objects.filter( + state__in=(Job.STATES.READY, Job.STATES.NEW) + ) + if exclude_future_jobs: + jobs_waiting_in_queue = jobs_waiting_in_queue.filter( + Q(run_after__isnull=True) | Q(run_after__lte=timezone.now()) + ) + annotation_dicts = ( - Job.objects.filter(state__in=(Job.STATES.READY, Job.STATES.NEW)) - .values("queue_name") + jobs_waiting_in_queue.values("queue_name") .order_by("queue_name") .annotate(Count("queue_name")) ) diff --git a/django_dbq/tests.py b/django_dbq/tests.py index f755666..200623b 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -63,12 +63,17 @@ def test_worker_with_queue_name(self): self.assertTrue("test_queue" in output) +@freezegun.freeze_time("2025-01-01T12:00:00Z") @override_settings(JOBS={"testjob": {"tasks": ["a"]}}) class JobModelMethodTestCase(TestCase): def test_get_queue_depths(self): Job.objects.create(name="testjob", queue_name="default") Job.objects.create(name="testjob", queue_name="testworker") - Job.objects.create(name="testjob", queue_name="testworker") + Job.objects.create( + name="testjob", + queue_name="testworker", + run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)), + ) Job.objects.create( name="testjob", queue_name="testworker", state=Job.STATES.FAILED ) @@ -79,16 +84,38 @@ def test_get_queue_depths(self): queue_depths = Job.get_queue_depths() self.assertDictEqual(queue_depths, {"default": 1, "testworker": 2}) + def test_get_queue_depths_exclude_future_jobs(self): + Job.objects.create(name="testjob", queue_name="default") + Job.objects.create(name="testjob", queue_name="testworker") + Job.objects.create( + name="testjob", + queue_name="testworker", + run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)), + ) + Job.objects.create( + name="testjob", queue_name="testworker", state=Job.STATES.FAILED + ) + Job.objects.create( + name="testjob", queue_name="testworker", state=Job.STATES.COMPLETE + ) + + queue_depths = Job.get_queue_depths(exclude_future_jobs=True) + self.assertDictEqual(queue_depths, {"default": 1, "testworker": 1}) + +@freezegun.freeze_time("2025-01-01T12:00:00Z") @override_settings(JOBS={"testjob": {"tasks": ["a"]}}) class QueueDepthTestCase(TestCase): def test_queue_depth(self): - Job.objects.create(name="testjob", state=Job.STATES.FAILED) Job.objects.create(name="testjob", state=Job.STATES.NEW) Job.objects.create(name="testjob", state=Job.STATES.FAILED) Job.objects.create(name="testjob", state=Job.STATES.COMPLETE) - Job.objects.create(name="testjob", state=Job.STATES.READY) + Job.objects.create( + name="testjob", + state=Job.STATES.READY, + run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)), + ) Job.objects.create( name="testjob", queue_name="testqueue", state=Job.STATES.READY ) @@ -101,6 +128,28 @@ def test_queue_depth(self): output = stdout.getvalue() self.assertEqual(output.strip(), "event=queue_depths default=2") + def test_queue_depth_exclude_future_jobs(self): + Job.objects.create(name="testjob", state=Job.STATES.FAILED) + Job.objects.create(name="testjob", state=Job.STATES.NEW) + Job.objects.create(name="testjob", state=Job.STATES.FAILED) + Job.objects.create(name="testjob", state=Job.STATES.COMPLETE) + Job.objects.create( + name="testjob", + state=Job.STATES.READY, + run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)), + ) + Job.objects.create( + name="testjob", queue_name="testqueue", state=Job.STATES.READY + ) + Job.objects.create( + name="testjob", queue_name="testqueue", state=Job.STATES.READY + ) + + stdout = StringIO() + call_command("queue_depth", exclude_future_jobs=True, stdout=stdout) + output = stdout.getvalue() + self.assertEqual(output.strip(), "event=queue_depths default=1") + def test_queue_depth_multiple_queues(self): Job.objects.create(name="testjob", state=Job.STATES.FAILED)