Skip to content

Commit

Permalink
Merge pull request #457 from Koed00/resource_limits
Browse files Browse the repository at this point in the history
Resource limits: max rss for workers
  • Loading branch information
Koed00 authored Jul 8, 2020
2 parents e0beb50 + 0095a93 commit 0de1cbc
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 38 deletions.
8 changes: 1 addition & 7 deletions django_q/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,4 @@
default_app_config = "django_q.apps.DjangoQConfig"


__all__ = ["conf", "cluster", "models", "tasks", "croniter"]

# Optional Imports
try:
from croniter import croniter
except ImportError:
croniter = None
__all__ = ["conf", "cluster", "models", "tasks"]
3 changes: 1 addition & 2 deletions django_q/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
from django.contrib import admin
from django.utils.translation import gettext_lazy as _

from django_q.conf import Conf
from django_q.conf import Conf, croniter
from django_q.models import Success, Failure, Schedule, OrmQ
from django_q.tasks import async_task
from django_q import croniter


class TaskAdmin(admin.ModelAdmin):
Expand Down
52 changes: 34 additions & 18 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# External
import arrow

# Django
from django import db
from django.conf import settings
Expand All @@ -19,14 +20,21 @@
# Local
import django_q.tasks
from django_q.brokers import get_broker, Broker
from django_q.conf import Conf, logger, psutil, get_ppid, error_reporter
from django_q.conf import (
Conf,
logger,
psutil,
get_ppid,
error_reporter,
croniter,
resource,
)
from django_q.humanhash import humanize
from django_q.models import Task, Success, Schedule
from django_q.queues import Queue
from django_q.signals import pre_execute
from django_q.signing import SignedPackage, BadSignature
from django_q.status import Stat, Status
from django_q import croniter


class Cluster:
Expand Down Expand Up @@ -102,10 +110,10 @@ def is_running(self) -> bool:
@property
def is_stopping(self) -> bool:
return (
self.stop_event
and self.start_event
and self.start_event.is_set()
and self.stop_event.is_set()
self.stop_event
and self.start_event
and self.start_event.is_set()
and self.stop_event.is_set()
)

@property
Expand All @@ -115,13 +123,13 @@ def has_stopped(self) -> bool:

class Sentinel:
def __init__(
self,
stop_event,
start_event,
cluster_id,
broker=None,
timeout=Conf.TIMEOUT,
start=True,
self,
stop_event,
start_event,
cluster_id,
broker=None,
timeout=Conf.TIMEOUT,
start=True,
):
# Make sure we catch signals for the pool
signal.signal(signal.SIGINT, signal.SIG_IGN)
Expand Down Expand Up @@ -376,7 +384,7 @@ def monitor(result_queue: Queue, broker: Broker = None):


def worker(
task_queue: Queue, result_queue: Queue, timer: Value, timeout: int = Conf.TIMEOUT
task_queue: Queue, result_queue: Queue, timer: Value, timeout: int = Conf.TIMEOUT
):
"""
Takes a task from the task queue, tries to execute it and puts the result back in the result queue
Expand Down Expand Up @@ -433,7 +441,7 @@ def worker(
result_queue.put(task)
timer.value = -1 # Idle
# Recycle
if task_count == Conf.RECYCLE:
if task_count == Conf.RECYCLE or rss_check():
timer.value = -2 # Recycled
break
logger.info(_(f"{name} stopped doing work"))
Expand Down Expand Up @@ -551,9 +559,9 @@ def scheduler(broker: Broker = None):
try:
with db.transaction.atomic(using=Schedule.objects.db):
for s in (
Schedule.objects.select_for_update()
.exclude(repeats=0)
.filter(next_run__lt=timezone.now())
Schedule.objects.select_for_update()
.exclude(repeats=0)
.filter(next_run__lt=timezone.now())
):
args = ()
kwargs = {}
Expand Down Expand Up @@ -692,3 +700,11 @@ def set_cpu_affinity(n: int, process_ids: list, actual: bool = not Conf.TESTING)
if actual:
p.cpu_affinity(affinity)
logger.info(_(f"{pid} will use cpu {affinity}"))


def rss_check():
if Conf.MAX_RSS and resource:
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss >= Conf.MAX_RSS
elif Conf.MAX_RSS and psutil:
return psutil.Process().memory_info().rss >= Conf.MAX_RSS * 1024
return False
16 changes: 15 additions & 1 deletion django_q/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
except ImportError:
psutil = None

try:
from croniter import croniter
except ImportError:
croniter = None

try:
import resource
except ModuleNotFoundError:
resource = None


class Conf:
"""
Expand Down Expand Up @@ -104,6 +114,10 @@ class Conf:
# Number of tasks each worker can handle before it gets recycled. Useful for releasing memory
RECYCLE = conf.get("recycle", 500)

# The maximum resident set size in kilobytes before a worker will recycle. Useful for limiting memory usage
# Not available on all platforms
MAX_RSS = conf.get("max_rss", None)

# Number of seconds to wait for a worker to finish.
TIMEOUT = conf.get("timeout", None)

Expand Down Expand Up @@ -211,7 +225,7 @@ def report(self):
# and instantiate an ErrorReporter using the provided config
for name, conf in error_conf.items():
for entry in pkg_resources.iter_entry_points(
"djangoq.errorreporters", name
"djangoq.errorreporters", name
):
Reporter = entry.load()
reporters.append(Reporter(**conf))
Expand Down
2 changes: 1 addition & 1 deletion django_q/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from picklefield.fields import dbsafe_decode

# Local
from django_q.conf import croniter
from django_q.signing import SignedPackage
from django_q import croniter


class Task(models.Model):
Expand Down
34 changes: 34 additions & 0 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,40 @@ def test_recycle(broker, monkeypatch):
assert Success.objects.count() == Conf.SAVE_LIMIT
broker.delete_queue()

@pytest.mark.django_db
def test_max_rss(broker, monkeypatch):
# set up the Sentinel
broker.list_key = 'test_max_rss_test:q'
async_task('django_q.tests.tasks.multiply', 2, 2, broker=broker)
start_event = Event()
stop_event = Event()
cluster_id = uuidlib.uuid4()
# override settings
monkeypatch.setattr(Conf, 'MAX_RSS', 40000)
monkeypatch.setattr(Conf, 'WORKERS', 1)
# set a timer to stop the Sentinel
threading.Timer(3, stop_event.set).start()
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 1
async_task('django_q.tests.tasks.multiply', 2, 2, broker=broker)
task_queue = Queue()
result_queue = Queue()
# push the task
pusher(task_queue, stop_event, broker=broker)
# worker should exit on recycle
worker(task_queue, result_queue, Value('f', -1))
# check if the work has been done
assert result_queue.qsize() == 1
# save_limit test
monkeypatch.setattr(Conf, 'SAVE_LIMIT', 1)
result_queue.put('STOP')
# run monitor
monitor(result_queue)
assert Success.objects.count() == Conf.SAVE_LIMIT
broker.delete_queue()


@pytest.mark.django_db
def test_bad_secret(broker, monkeypatch):
Expand Down
7 changes: 7 additions & 0 deletions docs/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ recycle

The number of tasks a worker will process before recycling . Useful to release memory resources on a regular basis. Defaults to ``500``.

max_rss
~~~~~~~

The maximum resident set size in kilobytes before a worker will recycle and release resources. Useful for limiting memory usage.
Only supported on platforms that implement the python resource module or install the :ref:`psutil<psutil_package>` module.
Defaults to ``None``.

.. _timeout:

timeout
Expand Down
14 changes: 7 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
arrow==0.15.7 # via -r requirements.in
asgiref==3.2.10 # via django
blessed==1.17.8 # via -r requirements.in
boto3==1.14.16 # via -r requirements.in
botocore==1.17.16 # via boto3, s3transfer
boto3==1.14.19 # via -r requirements.in
botocore==1.17.19 # via boto3, s3transfer
certifi==2020.6.20 # via requests
chardet==3.0.4 # via requests
croniter==0.3.34 # via -r requirements.in
Expand Down

0 comments on commit 0de1cbc

Please sign in to comment.