Skip to content

Commit

Permalink
replace celery task decorators with a kombu-based publisher
Browse files Browse the repository at this point in the history
this commit implements the bulk of `awx-manage run_dispatcher`, a new
command that binds to RabbitMQ via kombu and balances messages across
a pool of workers that are similar to celeryd workers in spirit.
Specifically, this includes:

- a new decorator, `awx.main.dispatch.task`, which can be used to
  decorate functions or classes so that they can be designated as
  "Tasks"
- support for fanout/broadcast tasks (at this point in time, only
  `conf.Setting` memcached flushes use this functionality)
- support for job reaping
- support for success/failure hooks for job runs (i.e.,
  `handle_work_success` and `handle_work_error`)
- support for auto scaling worker pool that scale processes up and down
  on demand
- minimal support for RPC, such as status checks and pool recycle/reload
  • Loading branch information
ryanpetrello committed Oct 11, 2018
1 parent da74f1d commit ff1e8cc
Show file tree
Hide file tree
Showing 54 changed files with 1,603 additions and 1,144 deletions.
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Start the development containers by running the following:
(host)$ make docker-compose
```

The above utilizes the image built in the previous step, and will automatically start all required services and dependent containers. Once the containers launch, your session will be attached to the *awx* container, and you'll be able to watch log messages and events in real time. You will see messages from Django, celery, and the front end build process.
The above utilizes the image built in the previous step, and will automatically start all required services and dependent containers. Once the containers launch, your session will be attached to the *awx* container, and you'll be able to watch log messages and events in real time. You will see messages from Django and the front end build process.

If you start a second terminal session, you can take a look at the running containers using the `docker ps` command. For example:

Expand Down Expand Up @@ -174,7 +174,7 @@ The first time you start the environment, database migrations need to run in ord
```bash
awx_1 | Operations to perform:
awx_1 | Synchronize unmigrated apps: solo, api, staticfiles, debug_toolbar, messages, channels, django_extensions, ui, rest_framework, polymorphic
awx_1 | Apply all migrations: sso, taggit, sessions, djcelery, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main
awx_1 | Apply all migrations: sso, taggit, sessions, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main
awx_1 | Synchronizing apps without migrations:
awx_1 | Creating tables...
awx_1 | Running deferred SQL...
Expand Down
4 changes: 2 additions & 2 deletions INSTALL.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Using /etc/ansible/ansible.cfg as config file
}
Operations to perform:
Synchronize unmigrated apps: solo, api, staticfiles, messages, channels, django_extensions, ui, rest_framework, polymorphic
Apply all migrations: sso, taggit, sessions, djcelery, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main
Apply all migrations: sso, taggit, sessions, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main
Synchronizing apps without migrations:
Creating tables...
Running deferred SQL...
Expand Down Expand Up @@ -548,7 +548,7 @@ Using /etc/ansible/ansible.cfg as config file
}
Operations to perform:
Synchronize unmigrated apps: solo, api, staticfiles, messages, channels, django_extensions, ui, rest_framework, polymorphic
Apply all migrations: sso, taggit, sessions, djcelery, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main
Apply all migrations: sso, taggit, sessions, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main
Synchronizing apps without migrations:
Creating tables...
Running deferred SQL...
Expand Down
20 changes: 7 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ UI_RELEASE_FLAG_FILE = awx/ui/.release_built
I18N_FLAG_FILE = .i18n_built

.PHONY: awx-link clean clean-tmp clean-venv requirements requirements_dev \
develop refresh adduser migrate dbchange dbshell runserver celeryd \
develop refresh adduser migrate dbchange dbshell runserver \
receiver test test_unit test_ansible test_coverage coverage_html \
dev_build release_build release_clean sdist \
ui-docker-machine ui-docker ui-release ui-devel \
Expand Down Expand Up @@ -233,7 +233,7 @@ server_noattach:
tmux new-session -d -s awx 'exec make uwsgi'
tmux rename-window 'AWX'
tmux select-window -t awx:0
tmux split-window -v 'exec make celeryd'
tmux split-window -v 'exec make dispatcher'
tmux new-window 'exec make daphne'
tmux select-window -t awx:1
tmux rename-window 'WebSockets'
Expand Down Expand Up @@ -265,12 +265,6 @@ honcho:
fi; \
honcho start -f tools/docker-compose/Procfile

flower:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
celery flower --address=0.0.0.0 --port=5555 --broker=amqp://guest:guest@$(RABBITMQ_HOST):5672//

collectstatic:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
Expand All @@ -281,7 +275,7 @@ uwsgi: collectstatic
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
uwsgi -b 32768 --socket 127.0.0.1:8050 --module=awx.wsgi:application --home=/venv/awx --chdir=/awx_devel/ --vacuum --processes=5 --harakiri=120 --master --no-orphans --py-autoreload 1 --max-requests=1000 --stats /tmp/stats.socket --lazy-apps --logformat "%(addr) %(method) %(uri) - %(proto) %(status)" --hook-accepting1-once="exec:/bin/sh -c '[ -f /tmp/celery_pid ] && kill -1 `cat /tmp/celery_pid` || true'"
uwsgi -b 32768 --socket 127.0.0.1:8050 --module=awx.wsgi:application --home=/venv/awx --chdir=/awx_devel/ --vacuum --processes=5 --harakiri=120 --master --no-orphans --py-autoreload 1 --max-requests=1000 --stats /tmp/stats.socket --lazy-apps --logformat "%(addr) %(method) %(uri) - %(proto) %(status)" --hook-accepting1-once="exec:awx-manage run_dispatcher --reload"

daphne:
@if [ "$(VENV_BASE)" ]; then \
Expand All @@ -302,13 +296,13 @@ runserver:
fi; \
$(PYTHON) manage.py runserver

# Run to start the background celery worker for development.
celeryd:
rm -f /tmp/celery_pid
# Run to start the background task dispatcher for development.
dispatcher:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -n celery@$(COMPOSE_HOST) --pidfile /tmp/celery_pid
$(PYTHON) manage.py run_dispatcher


# Run to start the zeromq callback receiver
receiver:
Expand Down
8 changes: 0 additions & 8 deletions awx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@
__all__ = ['__version__']


# Isolated nodes do not have celery installed
try:
from .celery import app as celery_app # noqa
__all__.append('celery_app')
except ImportError:
pass


# Check for the presence/absence of "devonly" module to determine if running
# from a source code checkout or release packaage.
try:
Expand Down
2 changes: 1 addition & 1 deletion awx/api/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3318,7 +3318,7 @@ def post(self, request, *args, **kwargs):
with transaction.atomic():
job = job_template.create_job(**kv)

# Send a signal to celery that the job should be started.
# Send a signal to signify that the job should be started.
result = job.signal_start(inventory_sources_already_updated=inventory_sources_already_updated)
if not result:
data = dict(msg=_('Error starting job!'))
Expand Down
4 changes: 3 additions & 1 deletion awx/api/views/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ def destroy(self, request, *args, **kwargs):

class InstanceGroupMembershipMixin(object):
'''
Manages signaling celery to reload its queue configuration on Instance Group membership changes
This mixin overloads attach/detach so that it calls InstanceGroup.save(),
triggering a background recalculation of policy-based instance group
membership.
'''
def attach(self, request, *args, **kwargs):
response = super(InstanceGroupMembershipMixin, self).attach(request, *args, **kwargs)
Expand Down
25 changes: 0 additions & 25 deletions awx/celery.py

This file was deleted.

5 changes: 5 additions & 0 deletions awx/main/dispatch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.conf import settings


def get_local_queuename():
return settings.CLUSTER_HOST_ID.encode('utf-8')
60 changes: 60 additions & 0 deletions awx/main/dispatch/control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
import socket

from django.conf import settings

from awx.main.dispatch import get_local_queuename
from kombu import Connection, Queue, Exchange, Producer, Consumer

logger = logging.getLogger('awx.main.dispatch')


class Control(object):

services = ('dispatcher', 'callback_receiver')
result = None

def __init__(self, service):
if service not in self.services:
raise RuntimeError('{} must be in {}'.format(service, self.services))
self.service = service
queuename = get_local_queuename()
self.queue = Queue(queuename, Exchange(queuename), routing_key=queuename)

def publish(self, msg, conn, host, **kwargs):
producer = Producer(
exchange=self.queue.exchange,
channel=conn,
routing_key=get_local_queuename()
)
producer.publish(msg, expiration=5, **kwargs)

def status(self, *args, **kwargs):
return self.control_with_reply('status', *args, **kwargs)

def running(self, *args, **kwargs):
return self.control_with_reply('running', *args, **kwargs)

def control_with_reply(self, command, host=None, timeout=5):
host = host or settings.CLUSTER_HOST_ID
logger.warn('checking {} {} for {}'.format(self.service, command, host))
reply_queue = Queue(name="amq.rabbitmq.reply-to")
self.result = None
with Connection(settings.BROKER_URL) as conn:
with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True):
self.publish({'control': command}, conn, host, reply_to='amq.rabbitmq.reply-to')
try:
conn.drain_events(timeout=timeout)
except socket.timeout:
logger.error('{} did not reply within {}s'.format(self.service, timeout))
raise
return self.result

def control(self, msg, host=None, **kwargs):
host = host or settings.CLUSTER_HOST_ID
with Connection(settings.BROKER_URL) as conn:
self.publish(msg, conn, host)

def process_message(self, body, message):
self.result = body
message.ack()
Loading

0 comments on commit ff1e8cc

Please sign in to comment.