Skip to content

Commit

Permalink
Merge pull request ansible#3947 from ryanpetrello/transient-queues
Browse files Browse the repository at this point in the history
RFC: add the ability to disable RabbitMQ queue durability

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
  • Loading branch information
softwarefactory-project-zuul[bot] authored May 28, 2019
2 parents 8c56d1d + 40b1e89 commit 41f2b83
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 5 deletions.
10 changes: 10 additions & 0 deletions awx/main/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,16 @@ def _load_default_license_from_file():
)


register(
'BROKER_DURABILITY',
field_class=fields.BooleanField,
label=_('Message Durability'),
help_text=_('When set (the default), underlying queues will be persisted to disk. Disable this to enable higher message bus throughput.'),
category=_('System'),
category_slug='system',
)


def logging_validate(serializer, attrs):
if not serializer.instance or \
not hasattr(serializer.instance, 'LOG_AGGREGATOR_HOST') or \
Expand Down
3 changes: 2 additions & 1 deletion awx/main/dispatch/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from django.conf import settings

from awx.main.dispatch import get_local_queuename
from kombu import Connection, Queue, Exchange, Producer, Consumer
from awx.main.dispatch.kombu import Connection
from kombu import Queue, Exchange, Producer, Consumer

logger = logging.getLogger('awx.main.dispatch')

Expand Down
42 changes: 42 additions & 0 deletions awx/main/dispatch/kombu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from amqp.exceptions import PreconditionFailed
from django.conf import settings
from kombu.connection import Connection as KombuConnection
from kombu.transport import pyamqp

import logging

logger = logging.getLogger('awx.main.dispatch')


__all__ = ['Connection']


class Connection(KombuConnection):

def __init__(self, *args, **kwargs):
super(Connection, self).__init__(*args, **kwargs)
class _Channel(pyamqp.Channel):

def queue_declare(self, queue, *args, **kwargs):
kwargs['durable'] = settings.BROKER_DURABILITY
try:
return super(_Channel, self).queue_declare(queue, *args, **kwargs)
except PreconditionFailed as e:
if "inequivalent arg 'durable'" in getattr(e, 'reply_text', None):
logger.error(
'queue {} durability is not {}, deleting and recreating'.format(

queue,
kwargs['durable']
)
)
self.queue_delete(queue)
return super(_Channel, self).queue_declare(queue, *args, **kwargs)

class _Connection(pyamqp.Connection):
Channel = _Channel

class _Transport(pyamqp.Transport):
Connection = _Connection

self.transport_cls = _Transport
4 changes: 3 additions & 1 deletion awx/main/dispatch/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from uuid import uuid4

from django.conf import settings
from kombu import Connection, Exchange, Producer
from kombu import Exchange, Producer

from awx.main.dispatch.kombu import Connection

logger = logging.getLogger('awx.main.dispatch')

Expand Down
3 changes: 2 additions & 1 deletion awx/main/management/commands/run_callback_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

from django.conf import settings
from django.core.management.base import BaseCommand
from kombu import Connection, Exchange, Queue
from kombu import Exchange, Queue

from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker


Expand Down
3 changes: 2 additions & 1 deletion awx/main/management/commands/run_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
from django.core.cache import cache as django_cache
from django.core.management.base import BaseCommand
from django.db import connection as django_connection, connections
from kombu import Connection, Exchange, Queue
from kombu import Exchange, Queue

from awx.main.dispatch import get_local_queuename, reaper
from awx.main.dispatch.control import Control
from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumer, TaskWorker

Expand Down
3 changes: 2 additions & 1 deletion awx/main/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from django.conf import settings

# Kombu
from kombu import Connection, Exchange, Producer
from awx.main.dispatch.kombu import Connection
from kombu import Exchange, Producer
from kombu.serialization import registry

__all__ = ['CallbackQueueDispatcher']
Expand Down
1 change: 1 addition & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def IS_TESTING(argv=None):
# Set default ports for live server tests.
os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')

BROKER_DURABILITY = True
BROKER_POOL_LIMIT = None
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
Expand Down
2 changes: 2 additions & 0 deletions docs/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ To accomplish this, AWX makes use of a "Task Queue" abstraction. Task Queues ar

AWX communicates with these worker processes to mediate between clients and workers. This is done via distributed RabbitMQ queues and the already-acknowledged local queue that the Dispatcher is working through. Simply put: to initiate a task, the client (generally, Python code in the AWX API) publishes a message to a queue, and RabbitMQ then delivers that message to one or more workers.

By default, when AWX creates queues in RabbitMQ, it creates them as *durable* queues in RabbitMQ (which allows for message persistence at the cost of lower performance). For increased message throughput (at the risk of message loss on server restarts), set BROKER_DURABILITY=False, and AWX will create _transient_ queues instead.

Clustered AWX installations consist of multiple workers spread across every
node, giving way to high availability and horizontal scaling.

Expand Down

0 comments on commit 41f2b83

Please sign in to comment.