From ff1e8cc356a5686e8eacf4d659e4bf9eab5f4185 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 8 Aug 2018 13:41:07 -0400 Subject: [PATCH] replace celery task decorators with a kombu-based publisher this commit implements the bulk of `awx-manage run_dispatcher`, a new command that binds to RabbitMQ via kombu and balances messages across a pool of workers that are similar to celeryd workers in spirit. Specifically, this includes: - a new decorator, `awx.main.dispatch.task`, which can be used to decorate functions or classes so that they can be designated as "Tasks" - support for fanout/broadcast tasks (at this point in time, only `conf.Setting` memcached flushes use this functionality) - support for job reaping - support for success/failure hooks for job runs (i.e., `handle_work_success` and `handle_work_error`) - support for auto scaling worker pool that scale processes up and down on demand - minimal support for RPC, such as status checks and pool recycle/reload --- CONTRIBUTING.md | 4 +- INSTALL.md | 4 +- Makefile | 20 +- awx/__init__.py | 8 - awx/api/views/__init__.py | 2 +- awx/api/views/mixin.py | 4 +- awx/celery.py | 25 -- awx/main/dispatch/__init__.py | 5 + awx/main/dispatch/control.py | 60 +++ awx/main/dispatch/pool.py | 381 +++++++++++++++--- awx/main/dispatch/publish.py | 128 ++++++ awx/main/dispatch/reaper.py | 46 +++ awx/main/dispatch/worker/__init__.py | 3 + awx/main/dispatch/worker/base.py | 146 +++++++ .../{worker.py => worker/callback.py} | 79 +--- awx/main/dispatch/worker/task.py | 113 ++++++ awx/main/exceptions.py | 7 - .../management/commands/inventory_import.py | 4 +- .../management/commands/run_dispatcher.py | 124 ++++++ awx/main/management/commands/watch_celery.py | 66 --- awx/main/models/ha.py | 2 +- awx/main/models/unified_jobs.py | 74 +--- awx/main/models/workflow.py | 6 - awx/main/scheduler/task_manager.py | 215 +--------- awx/main/scheduler/tasks.py | 14 +- awx/main/tasks.py | 210 ++++------ awx/main/tests/functional/conftest.py | 12 - .../functional/models/test_unified_job.py | 44 -- .../task_management/test_scheduler.py | 148 +------ awx/main/tests/functional/test_dispatch.py | 321 ++++++++++++++- awx/main/tests/unit/test_task_manager.py | 69 ---- awx/main/tests/unit/test_tasks.py | 2 +- awx/main/tests/unit/utils/test_reload.py | 4 +- awx/main/utils/autoscale.py | 27 -- awx/main/utils/ha.py | 17 - awx/main/utils/reload.py | 19 +- awx/settings/defaults.py | 63 +-- awx/settings/development.py | 16 - awx/settings/local_settings.py.docker_compose | 7 +- awx/settings/local_settings.py.example | 7 +- awx/settings/production.py | 10 +- docs/resource_copy.md | 2 +- docs/task_manager_system.md | 12 +- docs/tasks.md | 155 +++++++ .../image_build/files/supervisor_task.conf | 18 +- .../roles/image_build/templates/Dockerfile.j2 | 1 + requirements/requirements.in | 1 - requirements/requirements.txt | 1 - requirements/requirements_dev.txt | 1 - tools/docker-compose-cluster.yml | 1 - tools/docker-compose.yml | 3 +- tools/docker-compose/Procfile | 5 +- tools/docker-compose/haproxy.cfg | 17 - tools/docker-compose/supervisor.conf | 14 +- 54 files changed, 1603 insertions(+), 1144 deletions(-) delete mode 100644 awx/celery.py create mode 100644 awx/main/dispatch/control.py create mode 100644 awx/main/dispatch/publish.py create mode 100644 awx/main/dispatch/reaper.py create mode 100644 awx/main/dispatch/worker/__init__.py create mode 100644 awx/main/dispatch/worker/base.py rename awx/main/dispatch/{worker.py => worker/callback.py} (72%) create mode 100644 awx/main/dispatch/worker/task.py create mode 100644 awx/main/management/commands/run_dispatcher.py delete mode 100644 awx/main/management/commands/watch_celery.py delete mode 100644 awx/main/tests/unit/test_task_manager.py delete mode 100644 awx/main/utils/autoscale.py delete mode 100644 awx/main/utils/ha.py create mode 100644 docs/tasks.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d98ceb49866e..29d4e4d7c41b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -145,7 +145,7 @@ Start the development containers by running the following: (host)$ make docker-compose ``` -The above utilizes the image built in the previous step, and will automatically start all required services and dependent containers. Once the containers launch, your session will be attached to the *awx* container, and you'll be able to watch log messages and events in real time. You will see messages from Django, celery, and the front end build process. +The above utilizes the image built in the previous step, and will automatically start all required services and dependent containers. Once the containers launch, your session will be attached to the *awx* container, and you'll be able to watch log messages and events in real time. You will see messages from Django and the front end build process. If you start a second terminal session, you can take a look at the running containers using the `docker ps` command. For example: @@ -174,7 +174,7 @@ The first time you start the environment, database migrations need to run in ord ```bash awx_1 | Operations to perform: awx_1 | Synchronize unmigrated apps: solo, api, staticfiles, debug_toolbar, messages, channels, django_extensions, ui, rest_framework, polymorphic -awx_1 | Apply all migrations: sso, taggit, sessions, djcelery, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main +awx_1 | Apply all migrations: sso, taggit, sessions, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main awx_1 | Synchronizing apps without migrations: awx_1 | Creating tables... awx_1 | Running deferred SQL... diff --git a/INSTALL.md b/INSTALL.md index e0dd020022ab..61b667368cdc 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -236,7 +236,7 @@ Using /etc/ansible/ansible.cfg as config file } Operations to perform: Synchronize unmigrated apps: solo, api, staticfiles, messages, channels, django_extensions, ui, rest_framework, polymorphic - Apply all migrations: sso, taggit, sessions, djcelery, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main + Apply all migrations: sso, taggit, sessions, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main Synchronizing apps without migrations: Creating tables... Running deferred SQL... @@ -548,7 +548,7 @@ Using /etc/ansible/ansible.cfg as config file } Operations to perform: Synchronize unmigrated apps: solo, api, staticfiles, messages, channels, django_extensions, ui, rest_framework, polymorphic - Apply all migrations: sso, taggit, sessions, djcelery, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main + Apply all migrations: sso, taggit, sessions, sites, kombu_transport_django, social_auth, contenttypes, auth, conf, main Synchronizing apps without migrations: Creating tables... Running deferred SQL... diff --git a/Makefile b/Makefile index efa83ec657b0..1d7b49433f67 100644 --- a/Makefile +++ b/Makefile @@ -59,7 +59,7 @@ UI_RELEASE_FLAG_FILE = awx/ui/.release_built I18N_FLAG_FILE = .i18n_built .PHONY: awx-link clean clean-tmp clean-venv requirements requirements_dev \ - develop refresh adduser migrate dbchange dbshell runserver celeryd \ + develop refresh adduser migrate dbchange dbshell runserver \ receiver test test_unit test_ansible test_coverage coverage_html \ dev_build release_build release_clean sdist \ ui-docker-machine ui-docker ui-release ui-devel \ @@ -233,7 +233,7 @@ server_noattach: tmux new-session -d -s awx 'exec make uwsgi' tmux rename-window 'AWX' tmux select-window -t awx:0 - tmux split-window -v 'exec make celeryd' + tmux split-window -v 'exec make dispatcher' tmux new-window 'exec make daphne' tmux select-window -t awx:1 tmux rename-window 'WebSockets' @@ -265,12 +265,6 @@ honcho: fi; \ honcho start -f tools/docker-compose/Procfile -flower: - @if [ "$(VENV_BASE)" ]; then \ - . $(VENV_BASE)/awx/bin/activate; \ - fi; \ - celery flower --address=0.0.0.0 --port=5555 --broker=amqp://guest:guest@$(RABBITMQ_HOST):5672// - collectstatic: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ @@ -281,7 +275,7 @@ uwsgi: collectstatic @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ fi; \ - uwsgi -b 32768 --socket 127.0.0.1:8050 --module=awx.wsgi:application --home=/venv/awx --chdir=/awx_devel/ --vacuum --processes=5 --harakiri=120 --master --no-orphans --py-autoreload 1 --max-requests=1000 --stats /tmp/stats.socket --lazy-apps --logformat "%(addr) %(method) %(uri) - %(proto) %(status)" --hook-accepting1-once="exec:/bin/sh -c '[ -f /tmp/celery_pid ] && kill -1 `cat /tmp/celery_pid` || true'" + uwsgi -b 32768 --socket 127.0.0.1:8050 --module=awx.wsgi:application --home=/venv/awx --chdir=/awx_devel/ --vacuum --processes=5 --harakiri=120 --master --no-orphans --py-autoreload 1 --max-requests=1000 --stats /tmp/stats.socket --lazy-apps --logformat "%(addr) %(method) %(uri) - %(proto) %(status)" --hook-accepting1-once="exec:awx-manage run_dispatcher --reload" daphne: @if [ "$(VENV_BASE)" ]; then \ @@ -302,13 +296,13 @@ runserver: fi; \ $(PYTHON) manage.py runserver -# Run to start the background celery worker for development. -celeryd: - rm -f /tmp/celery_pid +# Run to start the background task dispatcher for development. +dispatcher: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ fi; \ - celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -n celery@$(COMPOSE_HOST) --pidfile /tmp/celery_pid + $(PYTHON) manage.py run_dispatcher + # Run to start the zeromq callback receiver receiver: diff --git a/awx/__init__.py b/awx/__init__.py index 2241a165a008..8f35750c48ba 100644 --- a/awx/__init__.py +++ b/awx/__init__.py @@ -12,14 +12,6 @@ __all__ = ['__version__'] -# Isolated nodes do not have celery installed -try: - from .celery import app as celery_app # noqa - __all__.append('celery_app') -except ImportError: - pass - - # Check for the presence/absence of "devonly" module to determine if running # from a source code checkout or release packaage. try: diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 8d502eb5c934..ee38fbec314b 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -3318,7 +3318,7 @@ def post(self, request, *args, **kwargs): with transaction.atomic(): job = job_template.create_job(**kv) - # Send a signal to celery that the job should be started. + # Send a signal to signify that the job should be started. result = job.signal_start(inventory_sources_already_updated=inventory_sources_already_updated) if not result: data = dict(msg=_('Error starting job!')) diff --git a/awx/api/views/mixin.py b/awx/api/views/mixin.py index 0830b0c41903..9a705f7b1401 100644 --- a/awx/api/views/mixin.py +++ b/awx/api/views/mixin.py @@ -101,7 +101,9 @@ def destroy(self, request, *args, **kwargs): class InstanceGroupMembershipMixin(object): ''' - Manages signaling celery to reload its queue configuration on Instance Group membership changes + This mixin overloads attach/detach so that it calls InstanceGroup.save(), + triggering a background recalculation of policy-based instance group + membership. ''' def attach(self, request, *args, **kwargs): response = super(InstanceGroupMembershipMixin, self).attach(request, *args, **kwargs) diff --git a/awx/celery.py b/awx/celery.py deleted file mode 100644 index 124a2a4fd4ca..000000000000 --- a/awx/celery.py +++ /dev/null @@ -1,25 +0,0 @@ - -# Copyright (c) 2017 Ansible, Inc. -# All Rights Reserved. - -from __future__ import absolute_import, unicode_literals - -import os -from celery import Celery -from django.conf import settings # noqa - - -try: - import awx.devonly # noqa - MODE = 'development' -except ImportError: # pragma: no cover - MODE = 'production' - -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'awx.settings.%s' % MODE) - -app = Celery('awx') -app.config_from_object('django.conf:settings') -app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) - -if __name__ == '__main__': - app.start() diff --git a/awx/main/dispatch/__init__.py b/awx/main/dispatch/__init__.py index e69de29bb2d1..ac4ef421df07 100644 --- a/awx/main/dispatch/__init__.py +++ b/awx/main/dispatch/__init__.py @@ -0,0 +1,5 @@ +from django.conf import settings + + +def get_local_queuename(): + return settings.CLUSTER_HOST_ID.encode('utf-8') diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py new file mode 100644 index 000000000000..9beb6b4da226 --- /dev/null +++ b/awx/main/dispatch/control.py @@ -0,0 +1,60 @@ +import logging +import socket + +from django.conf import settings + +from awx.main.dispatch import get_local_queuename +from kombu import Connection, Queue, Exchange, Producer, Consumer + +logger = logging.getLogger('awx.main.dispatch') + + +class Control(object): + + services = ('dispatcher', 'callback_receiver') + result = None + + def __init__(self, service): + if service not in self.services: + raise RuntimeError('{} must be in {}'.format(service, self.services)) + self.service = service + queuename = get_local_queuename() + self.queue = Queue(queuename, Exchange(queuename), routing_key=queuename) + + def publish(self, msg, conn, host, **kwargs): + producer = Producer( + exchange=self.queue.exchange, + channel=conn, + routing_key=get_local_queuename() + ) + producer.publish(msg, expiration=5, **kwargs) + + def status(self, *args, **kwargs): + return self.control_with_reply('status', *args, **kwargs) + + def running(self, *args, **kwargs): + return self.control_with_reply('running', *args, **kwargs) + + def control_with_reply(self, command, host=None, timeout=5): + host = host or settings.CLUSTER_HOST_ID + logger.warn('checking {} {} for {}'.format(self.service, command, host)) + reply_queue = Queue(name="amq.rabbitmq.reply-to") + self.result = None + with Connection(settings.BROKER_URL) as conn: + with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True): + self.publish({'control': command}, conn, host, reply_to='amq.rabbitmq.reply-to') + try: + conn.drain_events(timeout=timeout) + except socket.timeout: + logger.error('{} did not reply within {}s'.format(self.service, timeout)) + raise + return self.result + + def control(self, msg, host=None, **kwargs): + host = host or settings.CLUSTER_HOST_ID + with Connection(settings.BROKER_URL) as conn: + self.publish(msg, conn, host) + + def process_message(self, body, message): + self.result = body + message.ack() diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index f6830c607b2c..9c4f5b219f20 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -1,81 +1,260 @@ -import errno import logging import os -import signal +import random import traceback +from uuid import uuid4 +import collections from multiprocessing import Process from multiprocessing import Queue as MPQueue -from Queue import Full as QueueFull +from Queue import Full as QueueFull, Empty as QueueEmpty from django.conf import settings from django.db import connection as django_connection from django.core.cache import cache as django_cache +from jinja2 import Template +import psutil + +from awx.main.models import UnifiedJob +from awx.main.dispatch import reaper logger = logging.getLogger('awx.main.dispatch') -def signame(sig): - return dict( - (k, v) for v, k in signal.__dict__.items() - if v.startswith('SIG') and not v.startswith('SIG_') - )[sig] +class PoolWorker(object): + ''' + Used to track a worker child process and its pending and finished messages. + + This class makes use of two distinct multiprocessing.Queues to track state: + + - self.queue: this is a queue which represents pending messages that should + be handled by this worker process; as new AMQP messages come + in, a pool will put() them into this queue; the child + process that is forked will get() from this queue and handle + received messages in an endless loop + - self.finished: this is a queue which the worker process uses to signal + that it has finished processing a message + + When a message is put() onto this worker, it is tracked in + self.managed_tasks. + + Periodically, the worker will call .calculate_managed_tasks(), which will + cause messages in self.finished to be removed from self.managed_tasks. + + In this way, self.managed_tasks represents a view of the messages assigned + to a specific process. The message at [0] is the least-recently inserted + message, and it represents what the worker is running _right now_ + (self.current_task). + + A worker is "busy" when it has at least one message in self.managed_tasks. + It is "idle" when self.managed_tasks is empty. + ''' + + def __init__(self, queue_size, target, args): + self.messages_sent = 0 + self.messages_finished = 0 + self.managed_tasks = collections.OrderedDict() + self.finished = MPQueue(queue_size) + self.queue = MPQueue(queue_size) + self.process = Process(target=target, args=(self.queue, self.finished) + args) + self.process.daemon = True + + def start(self): + self.process.start() + + def put(self, body): + uuid = '?' + if isinstance(body, dict): + if not body.get('uuid'): + body['uuid'] = str(uuid4()) + uuid = body['uuid'] + logger.debug('delivered {} to worker[{}] qsize {}'.format( + uuid, self.pid, self.qsize + )) + self.managed_tasks[uuid] = body + self.queue.put(body, block=True, timeout=5) + self.messages_sent += 1 + self.calculate_managed_tasks() + + def quit(self): + ''' + Send a special control message to the worker that tells it to exit + gracefully. + ''' + self.queue.put('QUIT') + + @property + def pid(self): + return self.process.pid + + @property + def qsize(self): + return self.queue.qsize() + + @property + def alive(self): + return self.process.is_alive() + + @property + def mb(self): + if self.alive: + return '{:0.3f}'.format( + psutil.Process(self.pid).memory_info().rss / 1024.0 / 1024.0 + ) + return '0' + + @property + def exitcode(self): + return str(self.process.exitcode) + + def calculate_managed_tasks(self): + # look to see if any tasks were finished + finished = [] + for _ in range(self.finished.qsize()): + try: + finished.append(self.finished.get(block=False)) + except QueueEmpty: + break # qsize is not always _totally_ up to date + + # if any tasks were finished, removed them from the managed tasks for + # this worker + for uuid in finished: + self.messages_finished += 1 + del self.managed_tasks[uuid] + + @property + def current_task(self): + self.calculate_managed_tasks() + # the task at [0] is the one that's running right now (or is about to + # be running) + if len(self.managed_tasks): + return self.managed_tasks[self.managed_tasks.keys()[0]] + + return None + + @property + def orphaned_tasks(self): + orphaned = [] + if not self.alive: + # if this process had a running task that never finished, + # requeue its error callbacks + current_task = self.current_task + if isinstance(current_task, dict): + orphaned.extend(current_task.get('errbacks', [])) + + # if this process has any pending messages requeue them + for _ in range(self.qsize): + try: + orphaned.append(self.queue.get(block=False)) + except QueueEmpty: + break # qsize is not always _totally_ up to date + if len(orphaned): + logger.error( + 'requeuing {} messages from gone worker pid:{}'.format( + len(orphaned), self.pid + ) + ) + return orphaned + + @property + def busy(self): + self.calculate_managed_tasks() + return len(self.managed_tasks) > 0 + + @property + def idle(self): + return not self.busy class WorkerPool(object): + ''' + Creates a pool of forked PoolWorkers. + + As WorkerPool.write(...) is called (generally, by a kombu consumer + implementation when it receives an AMQP message), messages are passed to + one of the multiprocessing Queues where some work can be done on them. + + class MessagePrinter(awx.main.dispatch.worker.BaseWorker): + + def perform_work(self, body): + print body + + pool = WorkerPool(min_workers=4) # spawn four worker processes + pool.init_workers(MessagePrint().work_loop) + pool.write( + 0, # preferred worker 0 + 'Hello, World!' + ) + ''' + + debug_meta = '' def __init__(self, min_workers=None, queue_size=None): + self.name = settings.CLUSTER_HOST_ID + self.pid = os.getpid() self.min_workers = min_workers or settings.JOB_EVENT_WORKERS self.queue_size = queue_size or settings.JOB_EVENT_MAX_QUEUE_SIZE - - # self.workers tracks the state of worker running worker processes: - # [ - # (total_messages_consumed, multiprocessing.Queue, multiprocessing.Process), - # (total_messages_consumed, multiprocessing.Queue, multiprocessing.Process), - # (total_messages_consumed, multiprocessing.Queue, multiprocessing.Process), - # (total_messages_consumed, multiprocessing.Queue, multiprocessing.Process) - # ] self.workers = [] def __len__(self): return len(self.workers) def init_workers(self, target, *target_args): - def shutdown_handler(active_workers): - def _handler(signum, frame): - logger.debug('received shutdown {}'.format(signame(signum))) - try: - for active_worker in active_workers: - logger.debug('terminating worker') - signal.signal(signum, signal.SIG_DFL) - os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it - except Exception: - logger.exception('error in shutdown_handler') - return _handler + self.target = target + self.target_args = target_args + for idx in range(self.min_workers): + self.up() + def up(self): + idx = len(self.workers) + # It's important to close these because we're _about_ to fork, and we + # don't want the forked processes to inherit the open sockets + # for the DB and memcached connections (that way lies race conditions) django_connection.close() django_cache.close() - for idx in range(self.min_workers): - queue_actual = MPQueue(self.queue_size) - w = Process(target=target, args=(queue_actual, idx,) + target_args) - w.start() - logger.debug('started {}[{}]'.format(target.im_self.__class__.__name__, idx)) - self.workers.append([0, queue_actual, w]) + worker = PoolWorker(self.queue_size, self.target, (idx,) + self.target_args) + self.workers.append(worker) + try: + worker.start() + except Exception: + logger.exception('could not fork') + else: + logger.warn('scaling up worker pid:{}'.format(worker.pid)) + return idx, worker - signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in self.workers])) - signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in self.workers])) + def debug(self, *args, **kwargs): + self.cleanup() + tmpl = Template( + '{{ pool.name }}[pid:{{ pool.pid }}] workers total={{ workers|length }} {{ meta }} \n' + '{% for w in workers %}' + '. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}' + ' sent={{ w.messages_sent }}' + ' finished={{ w.messages_finished }}' + ' qsize={{ w.managed_tasks|length }}' + ' rss={{ w.mb }}MB' + '{% for task in w.managed_tasks.values() %}' + '\n - {% if loop.index0 == 0 %}running {% else %}queued {% endif %}' + '{{ task["uuid"] }} ' + '{% if "task" in task %}' + '{{ task["task"].rsplit(".", 1)[-1] }}' + # don't print kwargs, they often contain launch-time secrets + '(*{{ task.get("args", []) }})' + '{% endif %}' + '{% endfor %}' + '{% if not w.managed_tasks|length %}' + ' [IDLE]' + '{% endif %}' + '\n' + '{% endfor %}' + ) + return tmpl.render(pool=self, workers=self.workers, meta=self.debug_meta) def write(self, preferred_queue, body): - queue_order = sorted(range(self.min_workers), cmp=lambda x, y: -1 if x==preferred_queue else 0) + queue_order = sorted(range(len(self.workers)), cmp=lambda x, y: -1 if x==preferred_queue else 0) write_attempt_order = [] for queue_actual in queue_order: try: - worker_actual = self.workers[queue_actual] - worker_actual[1].put(body, block=True, timeout=5) - logger.debug('delivered to Worker[{}] qsize {}'.format( - queue_actual, worker_actual[1].qsize() - )) - worker_actual[0] += 1 + self.workers[queue_actual].put(body) return queue_actual except QueueFull: pass @@ -87,11 +266,113 @@ def write(self, preferred_queue, body): logger.warn("could not write payload to any queue, attempted order: {}".format(write_attempt_order)) return None - def stop(self): - for worker in self.workers: - messages, queue, process = worker - try: - os.kill(process.pid, signal.SIGTERM) - except OSError as e: - if e.errno != errno.ESRCH: - raise + def stop(self, signum): + try: + for worker in self.workers: + os.kill(worker.pid, signum) + except Exception: + logger.exception('could not kill {}'.format(worker.pid)) + + +class AutoscalePool(WorkerPool): + ''' + An extended pool implementation that automatically scales workers up and + down based on demand + ''' + + def __init__(self, *args, **kwargs): + self.max_workers = kwargs.pop('max_workers', None) + super(AutoscalePool, self).__init__(*args, **kwargs) + + if self.max_workers is None: + settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) + if settings_absmem is not None: + total_memory_gb = int(settings_absmem) + else: + total_memory_gb = (psutil.virtual_memory().total >> 30) + 1 # noqa: round up + # 5 workers per GB of total memory + self.max_workers = (total_memory_gb * 5) + + @property + def should_grow(self): + if len(self.workers) < self.min_workers: + # If we don't have at least min_workers, add more + return True + # If every worker is busy doing something, add more + return all([w.busy for w in self.workers]) + + @property + def full(self): + return len(self.workers) == self.max_workers + + @property + def debug_meta(self): + return 'min={} max={}'.format(self.min_workers, self.max_workers) + + def cleanup(self): + """ + Perform some internal account and cleanup. This is run on + every cluster node heartbeat: + + 1. Discover worker processes that exited, and recover messages they + were handling. + 2. Clean up unnecessary, idle workers. + """ + orphaned = [] + for w in self.workers[::]: + if not w.alive: + # the worker process has exited + # 1. take the task it was running and enqueue the error + # callbacks + # 2. take any pending tasks delivered to its queue and + # send them to another worker + logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode)) + if w.current_task: + try: + for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']): + reaper.reap_job(j, 'failed') + except Exception: + logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid'])) + orphaned.extend(w.orphaned_tasks) + self.workers.remove(w) + elif w.idle and len(self.workers) > self.min_workers: + # the process has an empty queue (it's idle) and we have + # more processes in the pool than we need (> min) + # send this process a message so it will exit gracefully + # at the next opportunity + logger.warn('scaling down worker pid:{}'.format(w.pid)) + w.quit() + self.workers.remove(w) + + for m in orphaned: + # if all the workers are dead, spawn at least one + if not len(self.workers): + self.up() + idx = random.choice(range(len(self.workers))) + self.write(idx, m) + + def up(self): + if self.full: + # if we can't spawn more workers, just toss this message into a + # random worker's backlog + idx = random.choice(range(len(self.workers))) + return idx, self.workers[idx] + else: + return super(AutoscalePool, self).up() + + def write(self, preferred_queue, body): + # when the cluster heartbeat occurs, clean up internally + if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']: + self.cleanup() + if self.should_grow: + self.up() + # we don't care about "preferred queue" round robin distribution, just + # find the first non-busy worker and claim it + workers = self.workers[:] + random.shuffle(workers) + for w in workers: + if not w.busy: + w.put(body) + break + else: + return super(AutoscalePool, self).write(preferred_queue, body) diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py new file mode 100644 index 000000000000..739998ca961c --- /dev/null +++ b/awx/main/dispatch/publish.py @@ -0,0 +1,128 @@ +import inspect +import logging +import sys +from uuid import uuid4 + +from django.conf import settings +from kombu import Connection, Exchange, Producer + +logger = logging.getLogger('awx.main.dispatch') + + +def serialize_task(f): + return '.'.join([f.__module__, f.__name__]) + + +class task: + """ + Used to decorate a function or class so that it can be run asynchronously + via the task dispatcher. Tasks can be simple functions: + + @task() + def add(a, b): + return a + b + + ...or classes that define a `run` method: + + @task() + class Adder: + def run(self, a, b): + return a + b + + # Tasks can be run synchronously... + assert add(1, 1) == 2 + assert Adder().run(1, 1) == 2 + + # ...or published to a queue: + add.apply_async([1, 1]) + Adder.apply_async([1, 1]) + + # Tasks can also define a specific target queue or exchange type: + + @task(queue='slow-tasks') + def snooze(): + time.sleep(10) + + @task(queue='tower_broadcast', exchange_type='fanout') + def announce(): + print "Run this everywhere!" + """ + + def __init__(self, queue=None, exchange_type=None): + self.queue = queue + self.exchange_type = exchange_type + + def __call__(self, fn=None): + queue = self.queue + exchange_type = self.exchange_type + + class PublisherMixin(object): + + queue = None + + @classmethod + def delay(cls, *args, **kwargs): + return cls.apply_async(args, kwargs) + + @classmethod + def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw): + task_id = uuid or str(uuid4()) + args = args or [] + kwargs = kwargs or {} + queue = ( + queue or + getattr(cls.queue, 'im_func', cls.queue) or + settings.CELERY_DEFAULT_QUEUE + ) + obj = { + 'uuid': task_id, + 'args': args, + 'kwargs': kwargs, + 'task': cls.name + } + obj.update(**kw) + if callable(queue): + queue = queue() + if not settings.IS_TESTING(sys.argv): + with Connection(settings.BROKER_URL) as conn: + exchange = Exchange(queue, type=exchange_type or 'direct') + producer = Producer(conn) + logger.debug('publish {}({}, queue={})'.format( + cls.name, + task_id, + queue + )) + producer.publish(obj, + serializer='json', + compression='bzip2', + exchange=exchange, + declare=[exchange], + delivery_mode="persistent", + routing_key=queue) + return (obj, queue) + + # If the object we're wrapping *is* a class (e.g., RunJob), return + # a *new* class that inherits from the wrapped class *and* BaseTask + # In this way, the new class returned by our decorator is the class + # being decorated *plus* PublisherMixin so cls.apply_async() and + # cls.delay() work + bases = [] + ns = {'name': serialize_task(fn), 'queue': queue} + if inspect.isclass(fn): + bases = list(fn.__bases__) + ns.update(fn.__dict__) + cls = type( + fn.__name__, + tuple(bases + [PublisherMixin]), + ns + ) + if inspect.isclass(fn): + return cls + + # if the object being decorated is *not* a class (it's a Python + # function), make fn.apply_async and fn.delay proxy through to the + # PublisherMixin we dynamically created above + setattr(fn, 'name', cls.name) + setattr(fn, 'apply_async', cls.apply_async) + setattr(fn, 'delay', cls.delay) + return fn diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py new file mode 100644 index 000000000000..8e9a9d3d15cc --- /dev/null +++ b/awx/main/dispatch/reaper.py @@ -0,0 +1,46 @@ +from datetime import timedelta +import logging + +from django.db.models import Q +from django.utils.timezone import now as tz_now +from django.contrib.contenttypes.models import ContentType + +from awx.main.models import Instance, UnifiedJob, WorkflowJob + +logger = logging.getLogger('awx.main.dispatch') + + +def reap_job(j, status): + j.status = status + j.start_args = '' # blank field to remove encrypted passwords + j.job_explanation += ' '.join(( + 'Task was marked as running in Tower but was not present in', + 'the job queue, so it has been marked as failed.', + )) + j.save(update_fields=['status', 'start_args', 'job_explanation']) + if hasattr(j, 'send_notification_templates'): + j.send_notification_templates('failed') + j.websocket_emit_status(status) + logger.error( + '{} is no longer running; reaping'.format(j.log_format) + ) + + +def reap(instance=None, status='failed'): + ''' + Reap all jobs in waiting|running for this instance. + ''' + me = instance or Instance.objects.me() + now = tz_now() + workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id + jobs = UnifiedJob.objects.filter( + ( + Q(status='running') | + Q(status='waiting', modified__lte=now - timedelta(seconds=60)) + ) & ( + Q(execution_node=me.hostname) | + Q(controller_node=me.hostname) + ) & ~Q(polymorphic_ctype_id=workflow_ctype_id) + ) + for j in jobs: + reap_job(j, status) diff --git a/awx/main/dispatch/worker/__init__.py b/awx/main/dispatch/worker/__init__.py new file mode 100644 index 000000000000..009386914ffc --- /dev/null +++ b/awx/main/dispatch/worker/__init__.py @@ -0,0 +1,3 @@ +from .base import AWXConsumer, BaseWorker # noqa +from .callback import CallbackBrokerWorker # noqa +from .task import TaskWorker # noqa diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py new file mode 100644 index 000000000000..11cb95e06ace --- /dev/null +++ b/awx/main/dispatch/worker/base.py @@ -0,0 +1,146 @@ +# Copyright (c) 2018 Ansible by Red Hat +# All Rights Reserved. + +import os +import logging +import signal +from uuid import UUID +from Queue import Empty as QueueEmpty + +from kombu import Producer +from kombu.mixins import ConsumerMixin + +from awx.main.dispatch.pool import WorkerPool + +logger = logging.getLogger('awx.main.dispatch') + + +def signame(sig): + return dict( + (k, v) for v, k in signal.__dict__.items() + if v.startswith('SIG') and not v.startswith('SIG_') + )[sig] + + +class WorkerSignalHandler: + + def __init__(self): + self.kill_now = False + signal.signal(signal.SIGINT, self.exit_gracefully) + + def exit_gracefully(self, *args, **kwargs): + self.kill_now = True + + +class AWXConsumer(ConsumerMixin): + + def __init__(self, name, connection, worker, queues=[], pool=None): + self.connection = connection + self.total_messages = 0 + self.queues = queues + self.worker = worker + self.pool = pool + if pool is None: + self.pool = WorkerPool() + self.pool.init_workers(self.worker.work_loop) + + def get_consumers(self, Consumer, channel): + logger.debug(self.listening_on) + return [Consumer(queues=self.queues, accept=['json'], + callbacks=[self.process_task])] + + @property + def listening_on(self): + return 'listening on {}'.format([ + '{} [{}]'.format(q.name, q.exchange.type) for q in self.queues + ]) + + def control(self, body, message): + logger.warn(body) + control = body.get('control') + if control in ('status', 'running'): + producer = Producer( + channel=self.connection, + routing_key=message.properties['reply_to'] + ) + if control == 'status': + msg = '\n'.join([self.listening_on, self.pool.debug()]) + elif control == 'running': + msg = [] + for worker in self.pool.workers: + worker.calculate_managed_tasks() + msg.extend(worker.managed_tasks.keys()) + producer.publish(msg) + elif control == 'reload': + for worker in self.pool.workers: + worker.quit() + else: + logger.error('unrecognized control message: {}'.format(control)) + message.ack() + + def process_task(self, body, message): + if 'control' in body: + return self.control(body, message) + if len(self.pool): + if "uuid" in body and body['uuid']: + try: + queue = UUID(body['uuid']).int % len(self.pool) + except Exception: + queue = self.total_messages % len(self.pool) + else: + queue = self.total_messages % len(self.pool) + else: + queue = 0 + self.pool.write(queue, body) + self.total_messages += 1 + message.ack() + + def run(self, *args, **kwargs): + signal.signal(signal.SIGINT, self.stop) + signal.signal(signal.SIGTERM, self.stop) + self.worker.on_start() + super(AWXConsumer, self).run(*args, **kwargs) + + def stop(self, signum, frame): + self.should_stop = True # this makes the kombu mixin stop consuming + logger.debug('received {}, stopping'.format(signame(signum))) + self.worker.on_stop() + raise SystemExit() + + +class BaseWorker(object): + + def work_loop(self, queue, finished, idx, *args): + ppid = os.getppid() + signal_handler = WorkerSignalHandler() + while not signal_handler.kill_now: + # if the parent PID changes, this process has been orphaned + # via e.g., segfault or sigkill, we should exit too + if os.getppid() != ppid: + break + try: + body = queue.get(block=True, timeout=1) + if body == 'QUIT': + break + except QueueEmpty: + continue + except Exception as e: + logger.error("Exception on worker, restarting: " + str(e)) + continue + try: + self.perform_work(body, *args) + finally: + if 'uuid' in body: + uuid = body['uuid'] + logger.debug('task {} is finished'.format(uuid)) + finished.put(uuid) + logger.warn('worker exiting gracefully pid:{}'.format(os.getpid())) + + def perform_work(self, body): + raise NotImplementedError() + + def on_start(self): + pass + + def on_stop(self): + pass diff --git a/awx/main/dispatch/worker.py b/awx/main/dispatch/worker/callback.py similarity index 72% rename from awx/main/dispatch/worker.py rename to awx/main/dispatch/worker/callback.py index 4423fb160b10..451737f990c3 100644 --- a/awx/main/dispatch/worker.py +++ b/awx/main/dispatch/worker/callback.py @@ -1,83 +1,30 @@ -# Copyright (c) 2018 Ansible by Red Hat -# All Rights Reserved. - import logging +import time import os import signal -import time import traceback -from uuid import UUID -from Queue import Empty as QueueEmpty -from kombu.mixins import ConsumerMixin from django.conf import settings from django.db import DatabaseError, OperationalError, connection as django_connection from django.db.utils import InterfaceError, InternalError +from awx.main.consumers import emit_channel_notification from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob) -from awx.main.consumers import emit_channel_notification -from awx.main.dispatch.pool import WorkerPool - -logger = logging.getLogger('awx.main.dispatch') - - -class WorkerSignalHandler: - - def __init__(self): - self.kill_now = False - signal.signal(signal.SIGINT, self.exit_gracefully) - signal.signal(signal.SIGTERM, self.exit_gracefully) - - def exit_gracefully(self, *args, **kwargs): - self.kill_now = True +from .base import BaseWorker -class AWXConsumer(ConsumerMixin): - - def __init__(self, connection, worker, queues=[]): - self.connection = connection - self.total_messages = 0 - self.queues = queues - self.pool = WorkerPool() - self.pool.init_workers(worker.work_loop) - - def get_consumers(self, Consumer, channel): - return [Consumer(queues=self.queues, accept=['json'], - callbacks=[self.process_task])] - - def process_task(self, body, message): - if "uuid" in body and body['uuid']: - try: - queue = UUID(body['uuid']).int % len(self.pool) - except Exception: - queue = self.total_messages % len(self.pool) - else: - queue = self.total_messages % len(self.pool) - self.pool.write(queue, body) - self.total_messages += 1 - message.ack() - - -class BaseWorker(object): - - def work_loop(self, queue, idx, *args): - signal_handler = WorkerSignalHandler() - while not signal_handler.kill_now: - try: - body = queue.get(block=True, timeout=1) - except QueueEmpty: - continue - except Exception as e: - logger.error("Exception on worker, restarting: " + str(e)) - continue - self.perform_work(body, *args) - - def perform_work(self, body): - raise NotImplemented() +logger = logging.getLogger('awx.main.dispatch') class CallbackBrokerWorker(BaseWorker): + ''' + A worker implementation that deserializes callback event data and persists + it into the database. + + The code that *builds* these types of messages is found in the AWX display + callback (`awx.lib.awx_display_callback`). + ''' MAX_RETRIES = 2 @@ -151,7 +98,7 @@ def _save_event_data(): try: _save_event_data() break - except (OperationalError, InterfaceError, InternalError) as e: + except (OperationalError, InterfaceError, InternalError): if retries >= self.MAX_RETRIES: logger.exception('Worker could not re-establish database connectivity, shutting down gracefully: Job {}'.format(job_identifier)) os.kill(os.getppid(), signal.SIGINT) @@ -164,7 +111,7 @@ def _save_event_data(): django_connection.close() time.sleep(delay) retries += 1 - except DatabaseError as e: + except DatabaseError: logger.exception('Database Error Saving Job Event for Job {}'.format(job_identifier)) break except Exception as exc: diff --git a/awx/main/dispatch/worker/task.py b/awx/main/dispatch/worker/task.py new file mode 100644 index 000000000000..d1273749a1c4 --- /dev/null +++ b/awx/main/dispatch/worker/task.py @@ -0,0 +1,113 @@ +import inspect +import logging +import importlib +import sys +import traceback + +import six + +from awx.main.tasks import dispatch_startup, inform_cluster_of_shutdown + +from .base import BaseWorker + +logger = logging.getLogger('awx.main.dispatch') + + +class TaskWorker(BaseWorker): + ''' + A worker implementation that deserializes task messages and runs native + Python code. + + The code that *builds* these types of messages is found in + `awx.main.dispatch.publish`. + ''' + + @classmethod + def resolve_callable(cls, task): + ''' + Transform a dotted notation task into an imported, callable function, e.g., + + awx.main.tasks.delete_inventory + awx.main.tasks.RunProjectUpdate + ''' + module, target = task.rsplit('.', 1) + module = importlib.import_module(module) + _call = None + if hasattr(module, target): + _call = getattr(module, target, None) + return _call + + def run_callable(self, body): + ''' + Given some AMQP message, import the correct Python code and run it. + ''' + task = body['task'] + uuid = body.get('uuid', '') + args = body.get('args', []) + kwargs = body.get('kwargs', {}) + _call = TaskWorker.resolve_callable(task) + if inspect.isclass(_call): + # the callable is a class, e.g., RunJob; instantiate and + # return its `run()` method + _call = _call().run + # don't print kwargs, they often contain launch-time secrets + logger.debug('task {} starting {}(*{})'.format(uuid, task, args)) + return _call(*args, **kwargs) + + def perform_work(self, body): + ''' + Import and run code for a task e.g., + + body = { + 'args': [8], + 'callbacks': [{ + 'args': [], + 'kwargs': {} + 'task': u'awx.main.tasks.handle_work_success' + }], + 'errbacks': [{ + 'args': [], + 'kwargs': {}, + 'task': 'awx.main.tasks.handle_work_error' + }], + 'kwargs': {}, + 'task': u'awx.main.tasks.RunProjectUpdate' + } + ''' + result = None + try: + result = self.run_callable(body) + except Exception as exc: + + try: + if getattr(exc, 'is_awx_task_error', False): + # Error caused by user / tracked in job output + logger.warning(six.text_type("{}").format(exc)) + else: + task = body['task'] + args = body.get('args', []) + kwargs = body.get('kwargs', {}) + logger.exception('Worker failed to run task {}(*{}, **{}'.format( + task, args, kwargs + )) + except Exception: + # It's fairly critical that this code _not_ raise exceptions on logging + # If you configure external logging in a way that _it_ fails, there's + # not a lot we can do here; sys.stderr.write is a final hail mary + _, _, tb = sys.exc_info() + traceback.print_tb(tb) + + for callback in body.get('errbacks', []) or []: + callback['uuid'] = body['uuid'] + self.perform_work(callback) + + for callback in body.get('callbacks', []) or []: + callback['uuid'] = body['uuid'] + self.perform_work(callback) + return result + + def on_start(self): + dispatch_startup() + + def on_stop(self): + inform_cluster_of_shutdown() diff --git a/awx/main/exceptions.py b/awx/main/exceptions.py index 551493484794..eeeae3491e95 100644 --- a/awx/main/exceptions.py +++ b/awx/main/exceptions.py @@ -4,11 +4,6 @@ import six -# Celery does not respect exception type when using a serializer different than pickle; -# and awx uses the json serializer -# https://github.com/celery/celery/issues/3586 - - class _AwxTaskError(): def build_exception(self, task, message=None): if message is None: @@ -36,5 +31,3 @@ def TaskError(self, task, rc): AwxTaskError = _AwxTaskError() - - diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 692bb5644387..d8332e02099e 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -938,7 +938,7 @@ def handle(self, *args, **options): self.exclude_empty_groups = bool(options.get('exclude_empty_groups', False)) self.instance_id_var = options.get('instance_id_var', None) - self.celery_invoked = False if os.getenv('INVENTORY_SOURCE_ID', None) is None else True + self.invoked_from_dispatcher = False if os.getenv('INVENTORY_SOURCE_ID', None) is None else True # Load inventory and related objects from database. if self.inventory_name and self.inventory_id: @@ -1062,7 +1062,7 @@ def handle(self, *args, **options): exc = e transaction.rollback() - if self.celery_invoked is False: + if self.invoked_from_dispatcher is False: with ignore_inventory_computed_fields(): self.inventory_update = InventoryUpdate.objects.get(pk=self.inventory_update.pk) self.inventory_update.result_traceback = tb diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py new file mode 100644 index 000000000000..5891d7d0b697 --- /dev/null +++ b/awx/main/management/commands/run_dispatcher.py @@ -0,0 +1,124 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved. +import os +import logging +from multiprocessing import Process + +from django.conf import settings +from django.core.cache import cache as django_cache +from django.core.management.base import BaseCommand +from django.db import connection as django_connection +from kombu import Connection, Exchange, Queue + +from awx.main.dispatch import get_local_queuename, reaper +from awx.main.dispatch.control import Control +from awx.main.dispatch.pool import AutoscalePool +from awx.main.dispatch.worker import AWXConsumer, TaskWorker + +logger = logging.getLogger('awx.main.dispatch') + + +def construct_bcast_queue_name(common_name): + return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID + + +class Command(BaseCommand): + help = 'Launch the task dispatcher' + + def add_arguments(self, parser): + parser.add_argument('--status', dest='status', action='store_true', + help='print the internal state of any running dispatchers') + parser.add_argument('--running', dest='running', action='store_true', + help='print the UUIDs of any tasked managed by this dispatcher') + parser.add_argument('--reload', dest='reload', action='store_true', + help=('cause the dispatcher to recycle all of its worker processes;' + 'running jobs will run to completion first')) + + def beat(self): + from celery import app + from celery.beat import PersistentScheduler + from celery.apps import beat + + class AWXScheduler(PersistentScheduler): + + def __init__(self, *args, **kwargs): + self.ppid = os.getppid() + super(AWXScheduler, self).__init__(*args, **kwargs) + + def setup_schedule(self): + super(AWXScheduler, self).setup_schedule() + self.update_from_dict(settings.CELERYBEAT_SCHEDULE) + + def tick(self, *args, **kwargs): + if os.getppid() != self.ppid: + # if the parent PID changes, this process has been orphaned + # via e.g., segfault or sigkill, we should exit too + raise SystemExit() + return super(AWXScheduler, self).tick(*args, **kwargs) + + def apply_async(self, entry, publisher, **kwargs): + task = TaskWorker.resolve_callable(entry.task) + result, queue = task.apply_async() + + class TaskResult(object): + id = result['uuid'] + + return TaskResult() + + app = app.App() + app.conf.BROKER_URL = settings.BROKER_URL + app.conf.CELERY_TASK_RESULT_EXPIRES = False + beat.Beat( + 30, + app, + schedule='/var/lib/awx/beat.db', scheduler_cls=AWXScheduler + ).run() + + def handle(self, *arg, **options): + if options.get('status'): + print Control('dispatcher').status() + return + if options.get('running'): + print Control('dispatcher').running() + return + if options.get('reload'): + return Control('dispatcher').control({'control': 'reload'}) + + # It's important to close these because we're _about_ to fork, and we + # don't want the forked processes to inherit the open sockets + # for the DB and memcached connections (that way lies race conditions) + django_connection.close() + django_cache.close() + beat = Process(target=self.beat) + beat.daemon = True + beat.start() + + reaper.reap() + consumer = None + with Connection(settings.BROKER_URL) as conn: + try: + bcast = 'tower_broadcast_all' + queues = [ + Queue(q, Exchange(q), routing_key=q) + for q in (settings.AWX_CELERY_QUEUES_STATIC + [get_local_queuename()]) + ] + queues.append( + Queue( + construct_bcast_queue_name(bcast), + exchange=Exchange(bcast, type='fanout'), + routing_key=bcast, + reply=True + ) + ) + consumer = AWXConsumer( + 'dispatcher', + conn, + TaskWorker(), + queues, + AutoscalePool(min_workers=4) + ) + consumer.run() + except KeyboardInterrupt: + logger.debug('Terminating Task Dispatcher') + if consumer: + consumer.stop() diff --git a/awx/main/management/commands/watch_celery.py b/awx/main/management/commands/watch_celery.py deleted file mode 100644 index bd45f06803e9..000000000000 --- a/awx/main/management/commands/watch_celery.py +++ /dev/null @@ -1,66 +0,0 @@ -import datetime -import os -import signal -import subprocess -import sys -import time - -from celery import Celery -from django.core.management.base import BaseCommand -from django.conf import settings - - -class Command(BaseCommand): - """Watch local celery workers""" - help=("Sends a periodic ping to the local celery process over AMQP to ensure " - "it's responsive; this command is only intended to run in an environment " - "where celeryd is running") - - # - # Just because celery is _running_ doesn't mean it's _working_; it's - # imperative that celery workers are _actually_ handling AMQP messages on - # their appropriate queues for awx to function. Unfortunately, we've been - # plagued by a variety of bugs in celery that cause it to hang and become - # an unresponsive zombie, such as: - # - # https://github.com/celery/celery/issues/4185 - # https://github.com/celery/celery/issues/4457 - # - # The goal of this code is periodically send a broadcast AMQP message to - # the celery process on the local host via celery.app.control.ping; - # If that _fails_, we attempt to determine the pid of the celery process - # and send SIGHUP (which tends to resolve these sorts of issues for us). - # - - INTERVAL = 60 - - def _log(self, msg): - sys.stderr.write(datetime.datetime.utcnow().isoformat()) - sys.stderr.write(' ') - sys.stderr.write(msg) - sys.stderr.write('\n') - - def handle(self, **options): - app = Celery('awx') - app.config_from_object('django.conf:settings') - while True: - try: - pongs = app.control.ping(['celery@{}'.format(settings.CLUSTER_HOST_ID)], timeout=30) - except Exception: - pongs = [] - if not pongs: - self._log('celery is not responsive to ping over local AMQP') - pid = self.getpid() - if pid: - self._log('sending SIGHUP to {}'.format(pid)) - os.kill(pid, signal.SIGHUP) - time.sleep(self.INTERVAL) - - def getpid(self): - cmd = 'supervisorctl pid tower-processes:awx-celeryd' - if os.path.exists('/supervisor_task.conf'): - cmd = 'supervisorctl -c /supervisor_task.conf pid tower-processes:celery' - try: - return int(subprocess.check_output(cmd, shell=True)) - except Exception: - self._log('could not detect celery pid') diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index e8634bb22ad8..88289963f124 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -32,7 +32,7 @@ def validate_queuename(v): - # celery and kombu don't play nice with unicode in queue names + # kombu doesn't play nice with unicode in queue names if v: try: '{}'.format(v.decode('utf-8')) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 864655f60a7a..082fa8951d54 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -27,9 +27,6 @@ # Django-Polymorphic from polymorphic.models import PolymorphicModel -# Django-Celery -from djcelery.models import TaskMeta - # AWX from awx.main.models.base import * # noqa from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin @@ -1112,14 +1109,6 @@ def workflow_node_id(self): pass return None - @property - def celery_task(self): - try: - if self.celery_task_id: - return TaskMeta.objects.get(task_id=self.celery_task_id) - except TaskMeta.DoesNotExist: - pass - def get_passwords_needed_to_start(self): return [] @@ -1224,29 +1213,6 @@ def pre_start(self, **kwargs): return (True, opts) - def start_celery_task(self, opts, error_callback, success_callback, queue): - kwargs = { - 'link_error': error_callback, - 'link': success_callback, - 'queue': None, - 'task_id': None, - } - if not self.celery_task_id: - raise RuntimeError("Expected celery_task_id to be set on model.") - kwargs['task_id'] = self.celery_task_id - task_class = self._get_task_class() - kwargs['queue'] = queue - task_class().apply_async([self.pk], opts, **kwargs) - - def start(self, error_callback, success_callback, **kwargs): - ''' - Start the task running via Celery. - ''' - (res, opts) = self.pre_start(**kwargs) - if res: - self.start_celery_task(opts, error_callback, success_callback) - return res - def signal_start(self, **kwargs): """Notify the task runner system to begin work on this task.""" @@ -1286,42 +1252,6 @@ def signal_start(self, **kwargs): def can_cancel(self): return bool(self.status in CAN_CANCEL) - def _force_cancel(self): - # Update the status to 'canceled' if we can detect that the job - # really isn't running (i.e. celery has crashed or forcefully - # killed the worker). - task_statuses = ('STARTED', 'SUCCESS', 'FAILED', 'RETRY', 'REVOKED') - try: - taskmeta = self.celery_task - if not taskmeta or taskmeta.status not in task_statuses: - return - from celery import current_app - i = current_app.control.inspect() - for v in (i.active() or {}).values(): - if taskmeta.task_id in [x['id'] for x in v]: - return - for v in (i.reserved() or {}).values(): - if taskmeta.task_id in [x['id'] for x in v]: - return - for v in (i.revoked() or {}).values(): - if taskmeta.task_id in [x['id'] for x in v]: - return - for v in (i.scheduled() or {}).values(): - if taskmeta.task_id in [x['id'] for x in v]: - return - instance = self.__class__.objects.get(pk=self.pk) - if instance.can_cancel: - instance.status = 'canceled' - update_fields = ['status'] - if not instance.job_explanation: - instance.job_explanation = 'Forced cancel' - update_fields.append('job_explanation') - instance.save(update_fields=update_fields) - self.websocket_emit_status("canceled") - except Exception: # FIXME: Log this exception! - if settings.DEBUG: - raise - def _build_job_explanation(self): if not self.job_explanation: return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ @@ -1345,8 +1275,6 @@ def cancel(self, job_explanation=None, is_chain=False): cancel_fields.append('job_explanation') self.save(update_fields=cancel_fields) self.websocket_emit_status("canceled") - if settings.BROKER_URL.startswith('amqp://'): - self._force_cancel() return self.cancel_flag @property @@ -1402,7 +1330,7 @@ def awx_meta_vars(self): r['{}_user_last_name'.format(name)] = created_by.last_name return r - def get_celery_queue_name(self): + def get_queue_name(self): return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE def is_isolated(self): diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 198595424f99..d8698abada9b 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -481,9 +481,3 @@ def get_notification_friendly_name(self): @property def preferred_instance_groups(self): return [] - - ''' - A WorkflowJob is a virtual job. It doesn't result in a celery task. - ''' - def start_celery_task(self, opts, error_callback, success_callback, queue): - return None diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 3607e53241f7..51c807047a4b 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -2,7 +2,7 @@ # All Rights Reserved # Python -from datetime import datetime, timedelta +from datetime import timedelta import logging import uuid import json @@ -11,18 +11,13 @@ from sets import Set # Django -from django.conf import settings -from django.core.cache import cache -from django.db import transaction, connection, DatabaseError +from django.db import transaction, connection from django.utils.translation import ugettext_lazy as _ -from django.utils.timezone import now as tz_now, utc -from django.db.models import Q -from django.contrib.contenttypes.models import ContentType +from django.utils.timezone import now as tz_now # AWX from awx.main.models import ( AdHocCommand, - Instance, InstanceGroup, InventorySource, InventoryUpdate, @@ -30,21 +25,15 @@ Project, ProjectUpdate, SystemJob, - UnifiedJob, WorkflowJob, ) from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.utils.pglock import advisory_lock from awx.main.utils import get_type_for_model from awx.main.signals import disable_activity_stream - from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.utils import decrypt_field -# Celery -from celery import Celery -from celery.app.control import Inspect - logger = logging.getLogger('awx.main.scheduler') @@ -85,79 +74,6 @@ def get_tasks(self, status_list=('pending', 'waiting', 'running')): key=lambda task: task.created) return all_tasks - ''' - Tasks that are running and SHOULD have a celery task. - { - 'execution_node': [j1, j2,...], - 'execution_node': [j3], - ... - } - ''' - def get_running_tasks(self): - execution_nodes = {} - waiting_jobs = [] - now = tz_now() - workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id - jobs = UnifiedJob.objects.filter((Q(status='running') | - Q(status='waiting', modified__lte=now - timedelta(seconds=60))) & - ~Q(polymorphic_ctype_id=workflow_ctype_id)) - for j in jobs: - if j.execution_node: - execution_nodes.setdefault(j.execution_node, []).append(j) - else: - waiting_jobs.append(j) - return (execution_nodes, waiting_jobs) - - ''' - Tasks that are currently running in celery - - Transform: - { - "celery@ec2-54-204-222-62.compute-1.amazonaws.com": [], - "celery@ec2-54-163-144-168.compute-1.amazonaws.com": [{ - ... - "id": "5238466a-f8c7-43b3-9180-5b78e9da8304", - ... - }, { - ..., - }, ...] - } - - to: - { - "ec2-54-204-222-62.compute-1.amazonaws.com": [ - "5238466a-f8c7-43b3-9180-5b78e9da8304", - "5238466a-f8c7-43b3-9180-5b78e9da8306", - ... - ] - } - ''' - def get_active_tasks(self): - if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): - app = Celery('awx') - app.config_from_object('django.conf:settings') - inspector = Inspect(app=app) - active_task_queues = inspector.active() - else: - logger.warn("Ignoring celery task inspector") - active_task_queues = None - - queues = None - - if active_task_queues is not None: - queues = {} - for queue in active_task_queues: - active_tasks = set() - map(lambda at: active_tasks.add(at['id']), active_task_queues[queue]) - - # celery worker name is of the form celery@myhost.com - queue_name = queue.split('@') - queue_name = queue_name[1 if len(queue_name) > 1 else 0] - queues[queue_name] = active_tasks - else: - return (None, None) - - return (active_task_queues, queues) def get_latest_project_update_tasks(self, all_sorted_tasks): project_ids = Set() @@ -256,9 +172,6 @@ def start_task(self, task, rampart_group, dependent_tasks=None, instance=None): rampart_group.name, task.log_format)) return - error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies) - success_handler = handle_work_success.s(task_actual=task_actual) - task.status = 'waiting' (start_status, opts) = task.pre_start() @@ -300,11 +213,23 @@ def start_task(self, task, rampart_group, dependent_tasks=None, instance=None): def post_commit(): task.websocket_emit_status(task.status) - if task.status != 'failed': - task.start_celery_task(opts, - error_callback=error_handler, - success_callback=success_handler, - queue=task.get_celery_queue_name()) + if task.status != 'failed' and type(task) is not WorkflowJob: + task_cls = task._get_task_class() + task_cls.apply_async( + [task.pk], + opts, + queue=task.get_queue_name(), + uuid=task.celery_task_id, + callbacks=[{ + 'task': handle_work_success.name, + 'kwargs': {'task_actual': task_actual} + }], + errbacks=[{ + 'task': handle_work_error.name, + 'args': [task.celery_task_id], + 'kwargs': {'subtasks': [task_actual] + dependencies} + }], + ) connection.on_commit(post_commit) @@ -529,105 +454,6 @@ def process_pending_tasks(self, pending_tasks): if not found_acceptable_queue: logger.debug(six.text_type("{} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format)) - def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time, - isolated=False): - for task in node_jobs: - if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): - if isinstance(task, WorkflowJob): - continue - if task.modified > celery_task_start_time: - continue - new_status = 'failed' - if isolated: - new_status = 'error' - task.status = new_status - task.start_args = '' # blank field to remove encrypted passwords - if isolated: - # TODO: cancel and reap artifacts of lost jobs from heartbeat - task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but its ', - 'controller management daemon was not present in', - 'the job queue, so it has been marked as failed.', - 'Task may still be running, but contactability is unknown.' - )) - else: - task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but was not present in', - 'the job queue, so it has been marked as failed.', - )) - try: - task.save(update_fields=['status', 'start_args', 'job_explanation']) - except DatabaseError: - logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) - continue - if hasattr(task, 'send_notification_templates'): - task.send_notification_templates('failed') - task.websocket_emit_status(new_status) - logger.error("{}Task {} has no record in celery. Marking as failed".format( - 'Isolated ' if isolated else '', task.log_format)) - - def cleanup_inconsistent_celery_tasks(self): - ''' - Rectify tower db <-> celery inconsistent view of jobs state - ''' - last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc) - if (tz_now() - last_cleanup).seconds < settings.AWX_INCONSISTENT_TASK_INTERVAL: - return - - logger.debug("Failing inconsistent running jobs.") - celery_task_start_time = tz_now() - active_task_queues, active_queues = self.get_active_tasks() - cache.set('last_celery_task_cleanup', tz_now()) - - if active_queues is None: - logger.error('Failed to retrieve active tasks from celery') - return None - - ''' - Only consider failing tasks on instances for which we obtained a task - list from celery for. - ''' - running_tasks, waiting_tasks = self.get_running_tasks() - all_celery_task_ids = [] - for node, node_jobs in active_queues.iteritems(): - all_celery_task_ids.extend(node_jobs) - - self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time) - - for node, node_jobs in running_tasks.iteritems(): - isolated = False - if node in active_queues: - active_tasks = active_queues[node] - else: - ''' - Node task list not found in celery. We may branch into cases: - - instance is unknown to tower, system is improperly configured - - instance is reported as down, then fail all jobs on the node - - instance is an isolated node, then check running tasks - among all allowed controller nodes for management process - - valid healthy instance not included in celery task list - probably a netsplit case, leave it alone - ''' - instance = Instance.objects.filter(hostname=node).first() - - if instance is None: - logger.error("Execution node Instance {} not found in database. " - "The node is currently executing jobs {}".format( - node, [j.log_format for j in node_jobs])) - active_tasks = [] - elif instance.capacity == 0: - active_tasks = [] - elif instance.rampart_groups.filter(controller__isnull=False).exists(): - active_tasks = all_celery_task_ids - isolated = True - else: - continue - - self.fail_jobs_if_not_in_celery( - node_jobs, active_tasks, celery_task_start_time, - isolated=isolated - ) - def calculate_capacity_consumed(self, tasks): self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph) @@ -687,7 +513,6 @@ def schedule(self): return logger.debug("Starting Scheduler") - self.cleanup_inconsistent_celery_tasks() finished_wfjs = self._schedule() # Operations whose queries rely on modifications made during the atomic scheduling session diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 194b188146e7..cef2d52c6083 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -2,30 +2,24 @@ # Python import logging -# Celery -from celery import shared_task - # AWX from awx.main.scheduler import TaskManager +from awx.main.dispatch.publish import task logger = logging.getLogger('awx.main.scheduler') -# TODO: move logic to UnifiedJob model and use bind=True feature of celery. -# Would we need the request loop then? I think so. Even if we get the in-memory -# updated model, the call to schedule() may get stale data. - -@shared_task() +@task() def run_job_launch(job_id): TaskManager().schedule() -@shared_task() +@task() def run_job_complete(job_id): TaskManager().schedule() -@shared_task() +@task() def run_task_manager(): logger.debug("Running Tower task manager.") TaskManager().schedule() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 52a3a6c4da79..640a120de704 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -13,12 +13,11 @@ import os import re import shutil +import six import stat -import sys import tempfile import time import traceback -import six import urlparse from distutils.version import LooseVersion as Version import yaml @@ -28,12 +27,6 @@ except Exception: psutil = None -# Celery -from kombu import Queue, Exchange -from kombu.common import Broadcast -from celery import Task, shared_task -from celery.signals import celeryd_init, worker_shutdown - # Django from django.conf import settings from django.db import transaction, DatabaseError, IntegrityError @@ -58,10 +51,12 @@ from awx.main.exceptions import AwxTaskError from awx.main.queue import CallbackQueueDispatcher from awx.main.expect import run, isolated_manager +from awx.main.dispatch.publish import task +from awx.main.dispatch import get_local_queuename, reaper from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, get_licenser, wrap_args_with_proot, OutputEventFilter, OutputVerboseFilter, ignore_inventory_computed_fields, - ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars) + ignore_inventory_group_removal, extract_ansible_vars) from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock @@ -87,91 +82,56 @@ logger = logging.getLogger('awx.main.tasks') -def log_celery_failure(self, exc, task_id, args, kwargs, einfo): - try: - if getattr(exc, 'is_awx_task_error', False): - # Error caused by user / tracked in job output - logger.warning(six.text_type("{}").format(exc)) - elif isinstance(self, BaseTask): - logger.exception(six.text_type( - '{!s} {!s} execution encountered exception.') - .format(get_type_for_model(self.model), args[0])) - else: - logger.exception(six.text_type('Task {} encountered exception.').format(self.name), exc_info=exc) - except Exception: - # It's fairly critical that this code _not_ raise exceptions on logging - # If you configure external logging in a way that _it_ fails, there's - # not a lot we can do here; sys.stderr.write is a final hail mary - _, _, tb = sys.exc_info() - traceback.print_tb(tb) - +def dispatch_startup(): + startup_logger = logging.getLogger('awx.main.tasks') + startup_logger.info("Syncing Schedules") + for sch in Schedule.objects.all(): + try: + sch.update_computed_fields() + from awx.main.signals import disable_activity_stream + with disable_activity_stream(): + sch.save() + except Exception: + logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch)) -@celeryd_init.connect -def celery_startup(conf=None, **kwargs): # - # When celeryd starts, if the instance cannot be found in the database, + # When the dispatcher starts, if the instance cannot be found in the database, # automatically register it. This is mostly useful for openshift-based # deployments where: # # 2 Instances come online # Instance B encounters a network blip, Instance A notices, and # deprovisions it - # Instance B's connectivity is restored, celeryd starts, and it + # Instance B's connectivity is restored, the dispatcher starts, and it # re-registers itself # # In traditional container-less deployments, instances don't get # deprovisioned when they miss their heartbeat, so this code is mostly a # no-op. # - if kwargs['instance'].hostname != 'celery@{}'.format(settings.CLUSTER_HOST_ID): - error = six.text_type('celery -n {} does not match settings.CLUSTER_HOST_ID={}').format( - instance.hostname, settings.CLUSTER_HOST_ID - ) - logger.error(error) - raise RuntimeError(error) - (changed, tower_instance) = Instance.objects.get_or_register() - if changed: - logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname)) + apply_cluster_membership_policies() + cluster_node_heartbeat() + if Instance.objects.me().is_controller(): + awx_isolated_heartbeat() - startup_logger = logging.getLogger('awx.main.tasks') - startup_logger.info("Syncing Schedules") - for sch in Schedule.objects.all(): - try: - sch.update_computed_fields() - from awx.main.signals import disable_activity_stream - with disable_activity_stream(): - sch.save() - except Exception: - logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch)) - - # set the queues we want to bind to dynamically at startup - queues = [] - me = Instance.objects.me() - for q in [me.hostname] + settings.AWX_CELERY_QUEUES_STATIC: - q = q.encode('utf-8') - queues.append(Queue(q, Exchange(q), routing_key=q)) - for q in settings.AWX_CELERY_BCAST_QUEUES_STATIC: - queues.append(Broadcast(q.encode('utf-8'))) - conf.CELERY_QUEUES = list(set(queues)) - # Expedite the first hearbeat run so a node comes online quickly. - cluster_node_heartbeat.apply([]) - - -@worker_shutdown.connect -def inform_cluster_of_shutdown(*args, **kwargs): +def inform_cluster_of_shutdown(): try: this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) this_inst.capacity = 0 # No thank you to new jobs while shut down this_inst.save(update_fields=['capacity', 'modified']) + try: + reaper.reap(this_inst) + except Exception: + logger.exception('failed to reap jobs for {}'.format(this_inst.hostname)) logger.warning(six.text_type('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.').format(this_inst.hostname)) except Exception: logger.exception('Encountered problem with normal shutdown signal.') -@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) -def apply_cluster_membership_policies(self): +@task() +def apply_cluster_membership_policies(): started_waiting = time.time() with advisory_lock('cluster_policy_lock', wait=True): lock_time = time.time() - started_waiting @@ -280,20 +240,18 @@ def apply_cluster_membership_policies(self): logger.info('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute)) -@shared_task(exchange='tower_broadcast_all', bind=True) -def handle_setting_changes(self, setting_keys): +@task(queue='tower_broadcast_all', exchange_type='fanout') +def handle_setting_changes(setting_keys): orig_len = len(setting_keys) for i in range(orig_len): for dependent_key in settings_registry.get_dependent_settings(setting_keys[i]): setting_keys.append(dependent_key) - logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format( - self.request)) cache_keys = set(setting_keys) logger.debug('cache delete_many(%r)', cache_keys) cache.delete_many(cache_keys) -@shared_task(queue=settings.CELERY_DEFAULT_QUEUE) +@task() def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -322,8 +280,8 @@ def send_notifications(notification_list, job_id=None): logger.exception(six.text_type('Error saving notification {} result.').format(notification.id)) -@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) -def run_administrative_checks(self): +@task() +def run_administrative_checks(): logger.warn("Running administrative checks.") if not settings.TOWER_ADMIN_ALERTS: return @@ -344,8 +302,8 @@ def run_administrative_checks(self): fail_silently=True) -@shared_task(bind=True) -def purge_old_stdout_files(self): +@task(queue=get_local_queuename) +def purge_old_stdout_files(): nowtime = time.time() for f in os.listdir(settings.JOBOUTPUT_ROOT): if os.path.getctime(os.path.join(settings.JOBOUTPUT_ROOT,f)) < nowtime - settings.LOCAL_STDOUT_EXPIRE_TIME: @@ -353,8 +311,8 @@ def purge_old_stdout_files(self): logger.info(six.text_type("Removing {}").format(os.path.join(settings.JOBOUTPUT_ROOT,f))) -@shared_task(bind=True) -def cluster_node_heartbeat(self): +@task(queue=get_local_queuename) +def cluster_node_heartbeat(): logger.debug("Cluster node heartbeat task.") nowtime = now() instance_list = list(Instance.objects.all_non_isolated()) @@ -397,9 +355,13 @@ def cluster_node_heartbeat(self): this_inst.version)) # Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance. # The heartbeat task will reset the capacity to the system capacity after upgrade. - stop_local_services(['uwsgi', 'celery', 'beat', 'callback'], communicate=False) + stop_local_services(communicate=False) raise RuntimeError("Shutting down.") for other_inst in lost_instances: + try: + reaper.reap(other_inst) + except Exception: + logger.exception('failed to reap jobs for {}'.format(other_inst.hostname)) try: # Capacity could already be 0 because: # * It's a new node and it never had a heartbeat @@ -424,8 +386,8 @@ def cluster_node_heartbeat(self): logger.exception(six.text_type('Error marking {} as lost').format(other_inst.hostname)) -@shared_task(bind=True) -def awx_isolated_heartbeat(self): +@task(queue=get_local_queuename) +def awx_isolated_heartbeat(): local_hostname = settings.CLUSTER_HOST_ID logger.debug("Controlling node checking for any isolated management tasks.") poll_interval = settings.AWX_ISOLATED_PERIODIC_CHECK @@ -452,8 +414,8 @@ def awx_isolated_heartbeat(self): isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version) -@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) -def awx_periodic_scheduler(self): +@task() +def awx_periodic_scheduler(): run_now = now() state = TowerScheduleState.get_solo() last_run = state.schedule_last_run @@ -503,8 +465,8 @@ def awx_periodic_scheduler(self): state.save() -@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) -def handle_work_success(self, result, task_actual): +@task() +def handle_work_success(task_actual): try: instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) except ObjectDoesNotExist: @@ -517,7 +479,7 @@ def handle_work_success(self, result, task_actual): run_job_complete.delay(instance.id) -@shared_task(queue=settings.CELERY_DEFAULT_QUEUE) +@task() def handle_work_error(task_id, *args, **kwargs): subtasks = kwargs.get('subtasks', None) logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks))) @@ -558,7 +520,7 @@ def handle_work_error(task_id, *args, **kwargs): pass -@shared_task(queue=settings.CELERY_DEFAULT_QUEUE) +@task() def update_inventory_computed_fields(inventory_id, should_update_hosts=True): ''' Signal handler and wrapper around inventory.update_computed_fields to @@ -578,7 +540,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True): raise -@shared_task(queue=settings.CELERY_DEFAULT_QUEUE) +@task() def update_host_smart_inventory_memberships(): try: with transaction.atomic(): @@ -603,8 +565,8 @@ def update_host_smart_inventory_memberships(): smart_inventory.update_computed_fields(update_groups=False, update_hosts=False) -@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE, max_retries=5) -def delete_inventory(self, inventory_id, user_id): +@task() +def delete_inventory(inventory_id, user_id): # Delete inventory as user if user_id is None: user = None @@ -629,7 +591,7 @@ def delete_inventory(self, inventory_id, user_id): return except DatabaseError: logger.exception('Database error deleting inventory {}, but will retry.'.format(inventory_id)) - self.retry(countdown=10) + # TODO: self.retry(countdown=10) def with_path_cleanup(f): @@ -650,8 +612,7 @@ def _wrapped(self, *args, **kwargs): return _wrapped -class BaseTask(Task): - name = None +class BaseTask(object): model = None event_model = None abstract = True @@ -945,14 +906,11 @@ def run(self, pk, **kwargs): if instance.cancel_flag: instance = self.update_model(instance.pk, status='canceled') if instance.status != 'running': - if hasattr(settings, 'CELERY_UNIT_TEST'): - return - else: - # Stop the task chain and prevent starting the job if it has - # already been canceled. - instance = self.update_model(pk) - status = instance.status - raise RuntimeError('not starting %s task' % instance.status) + # Stop the task chain and prevent starting the job if it has + # already been canceled. + instance = self.update_model(pk) + status = instance.status + raise RuntimeError('not starting %s task' % instance.status) if not os.path.exists(settings.AWX_PROOT_BASE_PATH): raise RuntimeError('AWX_PROOT_BASE_PATH=%s does not exist' % settings.AWX_PROOT_BASE_PATH) @@ -1085,8 +1043,6 @@ def run(self, pk, **kwargs): logger.exception(six.text_type('{} Final run hook errored.').format(instance.log_format)) instance.websocket_emit_status(status) if status != 'successful': - # Raising an exception will mark the job as 'failed' in celery - # and will stop a task chain from continuing to execute if status == 'canceled': raise AwxTaskError.TaskCancel(instance, rc) else: @@ -1109,12 +1065,12 @@ def get_ssh_key_path(self, instance, **kwargs): return '' +@task() class RunJob(BaseTask): ''' - Celery task to run a job using ansible-playbook. + Run a job using ansible-playbook. ''' - name = 'awx.main.tasks.run_job' model = Job event_model = JobEvent event_data_key = 'job_id' @@ -1404,7 +1360,6 @@ def pre_run_hook(self, job, **kwargs): self.update_model(job.pk, status='failed', job_explanation=error) raise RuntimeError(error) if job.project and job.project.scm_type: - job_request_id = '' if self.request.id is None else self.request.id pu_ig = job.instance_group pu_en = job.execution_node if job.is_isolated() is True: @@ -1417,16 +1372,14 @@ def pre_run_hook(self, job, **kwargs): status='running', instance_group = pu_ig, execution_node=pu_en, - celery_task_id=job_request_id)) + celery_task_id=job.celery_task_id)) # save the associated job before calling run() so that a # cancel() call on the job can cancel the project update job = self.update_model(job.pk, project_update=local_project_sync) project_update_task = local_project_sync._get_task_class() try: - task_instance = project_update_task() - task_instance.request.id = job_request_id - task_instance.run(local_project_sync.id) + project_update_task().run(local_project_sync.id) job = self.update_model(job.pk, scm_revision=job.project.scm_revision) except Exception: local_project_sync.refresh_from_db() @@ -1436,7 +1389,6 @@ def pre_run_hook(self, job, **kwargs): ('project_update', local_project_sync.name, local_project_sync.id))) raise - def final_run_hook(self, job, status, **kwargs): super(RunJob, self).final_run_hook(job, status, **kwargs) if job.use_fact_cache: @@ -1467,9 +1419,9 @@ def final_run_hook(self, job, status, **kwargs): update_inventory_computed_fields.delay(inventory.id, True) +@task() class RunProjectUpdate(BaseTask): - name = 'awx.main.tasks.run_project_update' model = ProjectUpdate event_model = ProjectUpdateEvent event_data_key = 'project_update_id' @@ -1670,7 +1622,6 @@ def get_idle_timeout(self): return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None) def _update_dependent_inventories(self, project_update, dependent_inventory_sources): - project_request_id = '' if self.request.id is None else self.request.id scm_revision = project_update.project.scm_revision inv_update_class = InventoryUpdate._get_task_class() for inv_src in dependent_inventory_sources: @@ -1693,13 +1644,10 @@ def _update_dependent_inventories(self, project_update, dependent_inventory_sour status='running', instance_group=project_update.instance_group, execution_node=project_update.execution_node, - celery_task_id=str(project_request_id), - source_project_update=project_update)) + source_project_update=project_update, + celery_task_id=project_update.celery_task_id)) try: - task_instance = inv_update_class() - # Runs in the same Celery task as project update - task_instance.request.id = project_request_id - task_instance.run(local_inv_update.id) + inv_update_class().run(local_inv_update.id) except Exception: logger.exception(six.text_type('{} Unhandled exception updating dependent SCM inventory sources.') .format(project_update.log_format)) @@ -1804,9 +1752,9 @@ def should_use_proot(self, instance, **kwargs): return getattr(settings, 'AWX_PROOT_ENABLED', False) +@task() class RunInventoryUpdate(BaseTask): - name = 'awx.main.tasks.run_inventory_update' model = InventoryUpdate event_model = InventoryUpdateEvent event_data_key = 'inventory_update_id' @@ -2024,8 +1972,7 @@ def build_passwords(self, inventory_update, **kwargs): This dictionary is used by `build_env`, below. """ # Run the superclass implementation. - super_ = super(RunInventoryUpdate, self).build_passwords - passwords = super_(inventory_update, **kwargs) + passwords = super(RunInventoryUpdate, self).build_passwords(inventory_update, **kwargs) # Take key fields from the credential in use and add them to the # passwords dictionary. @@ -2188,7 +2135,6 @@ def pre_run_hook(self, inventory_update, **kwargs): if inventory_update.inventory_source: source_project = inventory_update.inventory_source.source_project if (inventory_update.source=='scm' and inventory_update.launch_type!='scm' and source_project): - request_id = '' if self.request.id is None else self.request.id local_project_sync = source_project.create_project_update( _eager_fields=dict( launch_type="sync", @@ -2196,16 +2142,14 @@ def pre_run_hook(self, inventory_update, **kwargs): status='running', execution_node=inventory_update.execution_node, instance_group = inventory_update.instance_group, - celery_task_id=request_id)) + celery_task_id=inventory_update.celery_task_id)) # associate the inventory update before calling run() so that a # cancel() call on the inventory update can cancel the project update local_project_sync.scm_inventory_updates.add(inventory_update) project_update_task = local_project_sync._get_task_class() try: - task_instance = project_update_task() - task_instance.request.id = request_id - task_instance.run(local_project_sync.id) + project_update_task().run(local_project_sync.id) inventory_update.inventory_source.scm_last_revision = local_project_sync.project.scm_revision inventory_update.inventory_source.save(update_fields=['scm_last_revision']) except Exception: @@ -2216,12 +2160,12 @@ def pre_run_hook(self, inventory_update, **kwargs): raise +@task() class RunAdHocCommand(BaseTask): ''' - Celery task to run an ad hoc command using ansible. + Run an ad hoc command using ansible. ''' - name = 'awx.main.tasks.run_ad_hoc_command' model = AdHocCommand event_model = AdHocCommandEvent event_data_key = 'ad_hoc_command_id' @@ -2382,9 +2326,9 @@ def should_use_proot(self, instance, **kwargs): return getattr(settings, 'AWX_PROOT_ENABLED', False) +@task() class RunSystemJob(BaseTask): - name = 'awx.main.tasks.run_system_job' model = SystemJob event_model = SystemJobEvent event_data_key = 'system_job_id' @@ -2439,9 +2383,9 @@ def _reconstruct_relationships(copy_mapping): new_obj.save() -@shared_task(bind=True, queue=settings.CELERY_DEFAULT_QUEUE) +@task() def deep_copy_model_obj( - self, model_module, model_name, obj_pk, new_obj_pk, + model_module, model_name, obj_pk, new_obj_pk, user_pk, sub_obj_list, permission_check_func=None ): logger.info(six.text_type('Deep copy {} from {} to {}.').format(model_name, obj_pk, new_obj_pk)) diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index de25bf1a7f58..30adb3cb6686 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -14,7 +14,6 @@ from django.utils.six.moves.urllib.parse import urlparse from django.utils import timezone from django.contrib.auth.models import User -from django.conf import settings from django.core.serializers.json import DjangoJSONEncoder from django.db.backends.sqlite3.base import SQLiteCursorWrapper from jsonbfield.fields import JSONField @@ -66,17 +65,6 @@ def swagger_autogen(requests=__SWAGGER_REQUESTS__): return requests -@pytest.fixture(scope="session", autouse=True) -def celery_memory_broker(): - ''' - FIXME: Not sure how "far" just setting the BROKER_URL will get us. - We may need to incluence CELERY's configuration like we do in the old unit tests (see base.py) - - Allows django signal code to execute without the need for redis - ''' - settings.BROKER_URL='memory://localhost/' - - @pytest.fixture def user(): def u(name, is_superuser=False): diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index f85cc4fe5d79..4242b96ee708 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -1,13 +1,11 @@ import itertools import pytest -import mock # Django from django.contrib.contenttypes.models import ContentType # AWX from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project, WorkflowJob, Schedule -from awx.main.models.ha import InstanceGroup @pytest.mark.django_db @@ -66,48 +64,6 @@ def test_job_relaunch_copy_vars(self, machine_credential, inventory, assert net_credential in second_job.credentials.all() -@pytest.mark.django_db -class TestIsolatedRuns: - - def test_low_capacity_isolated_instance_selected(self): - ig = InstanceGroup.objects.create(name='tower') - iso_ig = InstanceGroup.objects.create(name='thepentagon', controller=ig) - iso_ig.instances.create(hostname='iso1', capacity=50) - i2 = iso_ig.instances.create(hostname='iso2', capacity=200) - job = Job.objects.create( - instance_group=iso_ig, - celery_task_id='something', - ) - - mock_async = mock.MagicMock() - success_callback = mock.MagicMock() - error_callback = mock.MagicMock() - - class MockTaskClass: - apply_async = mock_async - - with mock.patch.object(job, '_get_task_class') as task_class: - task_class.return_value = MockTaskClass - job.start_celery_task([], error_callback, success_callback, 'thepentagon') - mock_async.assert_called_with([job.id], [], - link_error=error_callback, - link=success_callback, - queue='thepentagon', - task_id='something') - - i2.capacity = 20 - i2.save() - - with mock.patch.object(job, '_get_task_class') as task_class: - task_class.return_value = MockTaskClass - job.start_celery_task([], error_callback, success_callback, 'thepentagon') - mock_async.assert_called_with([job.id], [], - link_error=error_callback, - link=success_callback, - queue='thepentagon', - task_id='something') - - @pytest.mark.django_db class TestMetaVars: ''' diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 813adff7cfbe..55b64e3a38bf 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -1,19 +1,10 @@ import pytest import mock import json -from datetime import timedelta, datetime - -from django.core.cache import cache -from django.utils.timezone import now as tz_now +from datetime import timedelta from awx.main.scheduler import TaskManager from awx.main.utils import encrypt_field -from awx.main.models import ( - Job, - Instance, - WorkflowJob, -) -from awx.main.models.notifications import JobNotificationMixin @pytest.mark.django_db @@ -245,140 +236,3 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory iu = [x for x in ii.inventory_updates.all()] assert len(pu) == 1 assert len(iu) == 1 - - -@pytest.mark.django_db -def test_cleanup_interval(mock_cache): - with mock.patch.multiple('awx.main.scheduler.task_manager.cache', get=mock_cache.get, set=mock_cache.set): - assert mock_cache.get('last_celery_task_cleanup') is None - - TaskManager().cleanup_inconsistent_celery_tasks() - last_cleanup = mock_cache.get('last_celery_task_cleanup') - assert isinstance(last_cleanup, datetime) - - TaskManager().cleanup_inconsistent_celery_tasks() - assert cache.get('last_celery_task_cleanup') == last_cleanup - - -class TestReaper(): - @pytest.fixture - def all_jobs(self, mocker): - now = tz_now() - - Instance.objects.create(hostname='host1', capacity=100) - Instance.objects.create(hostname='host2', capacity=100) - Instance.objects.create(hostname='host3_split', capacity=100) - Instance.objects.create(hostname='host4_offline', capacity=0) - - j1 = Job.objects.create(status='pending', execution_node='host1') - j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2') - j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3') - j3.modified = now - timedelta(seconds=60) - j3.save(update_fields=['modified']) - j4 = Job.objects.create(status='running', celery_task_id='considered_j4', execution_node='host1') - j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5') - j5.modified = now - timedelta(seconds=60) - j5.save(update_fields=['modified']) - j6 = Job.objects.create(status='waiting', celery_task_id='considered_j6') - j6.modified = now - timedelta(seconds=60) - j6.save(update_fields=['modified']) - j7 = Job.objects.create(status='running', celery_task_id='considered_j7', execution_node='host2') - j8 = Job.objects.create(status='running', celery_task_id='reapable_j7', execution_node='host2') - j9 = Job.objects.create(status='waiting', celery_task_id='reapable_j8') - j9.modified = now - timedelta(seconds=60) - j9.save(update_fields=['modified']) - j10 = Job.objects.create(status='running', celery_task_id='host3_j10', execution_node='host3_split') - - j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline') - - j12 = WorkflowJob.objects.create(status='running', celery_task_id='workflow_job', execution_node='host1') - - js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11, j12] - for j in js: - j.save = mocker.Mock(wraps=j.save) - j.websocket_emit_status = mocker.Mock() - return js - - @pytest.fixture - def considered_jobs(self, all_jobs): - return all_jobs[2:7] + [all_jobs[10]] - - @pytest.fixture - def running_tasks(self, all_jobs): - return { - 'host1': [all_jobs[3]], - 'host2': [all_jobs[7], all_jobs[8]], - 'host3_split': [all_jobs[9]], - 'host4_offline': [all_jobs[10]], - } - - @pytest.fixture - def waiting_tasks(self, all_jobs): - return [all_jobs[2], all_jobs[4], all_jobs[5], all_jobs[8]] - - @pytest.fixture - def reapable_jobs(self, all_jobs): - return [all_jobs[4], all_jobs[7], all_jobs[10]] - - @pytest.fixture - def unconsidered_jobs(self, all_jobs): - return all_jobs[0:1] + all_jobs[5:7] - - @pytest.fixture - def active_tasks(self): - return ([], { - 'host1': ['considered_j2', 'considered_j3', 'considered_j4',], - 'host2': ['considered_j6', 'considered_j7'], - }) - - @pytest.mark.django_db - @mock.patch.object(JobNotificationMixin, 'send_notification_templates') - @mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], [])) - def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, waiting_tasks, mocker, settings): - settings.AWX_INCONSISTENT_TASK_INTERVAL = 0 - tm = TaskManager() - - tm.get_running_tasks = mocker.Mock(return_value=(running_tasks, waiting_tasks)) - tm.get_active_tasks = mocker.Mock(return_value=active_tasks) - - tm.cleanup_inconsistent_celery_tasks() - - for j in considered_jobs: - if j not in reapable_jobs: - j.save.assert_not_called() - - assert notify.call_count == 4 - notify.assert_has_calls([mock.call('failed') for j in reapable_jobs], any_order=True) - - for j in reapable_jobs: - j.websocket_emit_status.assert_called_once_with('failed') - assert j.status == 'failed' - assert j.job_explanation == ( - 'Task was marked as running in Tower but was not present in the job queue, so it has been marked as failed.' - ) - - @pytest.mark.django_db - def test_get_running_tasks(self, all_jobs): - tm = TaskManager() - - # Ensure the query grabs the expected jobs - execution_nodes_jobs, waiting_jobs = tm.get_running_tasks() - assert 'host1' in execution_nodes_jobs - assert 'host2' in execution_nodes_jobs - assert 'host3_split' in execution_nodes_jobs - - assert all_jobs[3] in execution_nodes_jobs['host1'] - - assert all_jobs[6] in execution_nodes_jobs['host2'] - assert all_jobs[7] in execution_nodes_jobs['host2'] - - assert all_jobs[9] in execution_nodes_jobs['host3_split'] - - assert all_jobs[10] in execution_nodes_jobs['host4_offline'] - - assert all_jobs[11] not in execution_nodes_jobs['host1'] - - assert all_jobs[2] in waiting_jobs - assert all_jobs[4] in waiting_jobs - assert all_jobs[5] in waiting_jobs - assert all_jobs[8] in waiting_jobs diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index 0f578a66e598..349d997e8f89 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -1,12 +1,39 @@ +import datetime import multiprocessing import random -import sys -from uuid import uuid4 +import signal +import time +from django.utils.timezone import now as tz_now import pytest -from awx.main.dispatch.worker import BaseWorker -from awx.main.dispatch.pool import WorkerPool +from awx.main.models import Job, WorkflowJob, Instance +from awx.main.dispatch import reaper +from awx.main.dispatch.pool import PoolWorker, WorkerPool, AutoscalePool +from awx.main.dispatch.publish import task +from awx.main.dispatch.worker import BaseWorker, TaskWorker + + +@task() +def add(a, b): + return a + b + + +class BaseTask(object): + + def add(self, a, b): + return add(a, b) + + +@task() +class Adder(BaseTask): + def run(self, a, b): + return super(Adder, self).add(a, b) + + +@task(queue='hard-math') +def multiply(a, b): + return a * b class SimpleWorker(BaseWorker): @@ -21,6 +48,61 @@ def perform_work(self, body, result_queue): result_queue.put(body + '!!!') +class SlowResultWriter(BaseWorker): + + def perform_work(self, body, result_queue): + time.sleep(3) + super(SlowResultWriter, self).perform_work(body, result_queue) + + +class TestPoolWorker: + + def setup_method(self, test_method): + self.worker = PoolWorker(1000, self.tick, tuple()) + + def tick(self): + self.worker.finished.put(self.worker.queue.get()['uuid']) + time.sleep(.5) + + def test_qsize(self): + assert self.worker.qsize == 0 + for i in range(3): + self.worker.put({'task': 'abc123'}) + assert self.worker.qsize == 3 + + def test_put(self): + assert len(self.worker.managed_tasks) == 0 + assert self.worker.messages_finished == 0 + self.worker.put({'task': 'abc123'}) + + assert len(self.worker.managed_tasks) == 1 + assert self.worker.messages_sent == 1 + + def test_managed_tasks(self): + self.worker.put({'task': 'abc123'}) + self.worker.calculate_managed_tasks() + assert len(self.worker.managed_tasks) == 1 + + self.tick() + self.worker.calculate_managed_tasks() + assert len(self.worker.managed_tasks) == 0 + + def test_current_task(self): + self.worker.put({'task': 'abc123'}) + assert self.worker.current_task['task'] == 'abc123' + + def test_quit(self): + self.worker.quit() + assert self.worker.queue.get() == 'QUIT' + + def test_idle_busy(self): + assert self.worker.idle is True + assert self.worker.busy is False + self.worker.put({'task': 'abc123'}) + assert self.worker.busy is True + assert self.worker.idle is False + + @pytest.mark.django_db class TestWorkerPool: @@ -28,37 +110,35 @@ def setup_method(self, test_method): self.pool = WorkerPool(min_workers=3) def teardown_method(self, test_method): - self.pool.stop() + self.pool.stop(signal.SIGTERM) def test_worker(self): self.pool.init_workers(SimpleWorker().work_loop) assert len(self.pool) == 3 for worker in self.pool.workers: - total, _, process = worker - assert total == 0 - assert process.is_alive() is True + assert worker.messages_sent == 0 + assert worker.alive is True def test_single_task(self): self.pool.init_workers(SimpleWorker().work_loop) self.pool.write(0, 'xyz') - assert self.pool.workers[0][0] == 1 # worker at index 0 handled one task - assert self.pool.workers[1][0] == 0 - assert self.pool.workers[2][0] == 0 + assert self.pool.workers[0].messages_sent == 1 # worker at index 0 handled one task + assert self.pool.workers[1].messages_sent == 0 + assert self.pool.workers[2].messages_sent == 0 def test_queue_preference(self): self.pool.init_workers(SimpleWorker().work_loop) self.pool.write(2, 'xyz') - assert self.pool.workers[0][0] == 0 - assert self.pool.workers[1][0] == 0 - assert self.pool.workers[2][0] == 1 # worker at index 2 handled one task + assert self.pool.workers[0].messages_sent == 0 + assert self.pool.workers[1].messages_sent == 0 + assert self.pool.workers[2].messages_sent == 1 # worker at index 2 handled one task def test_worker_processing(self): result_queue = multiprocessing.Queue() self.pool.init_workers(ResultWriter().work_loop, result_queue) - uuids = [] for i in range(10): self.pool.write( - random.choice(self.pool.workers)[0], + random.choice(range(len(self.pool))), 'Hello, Worker {}'.format(i) ) all_messages = [result_queue.get(timeout=1) for i in range(10)] @@ -68,5 +148,212 @@ def test_worker_processing(self): for i in range(10) ] - total_handled = sum([worker[0] for worker in self.pool.workers]) + total_handled = sum([worker.messages_sent for worker in self.pool.workers]) assert total_handled == 10 + + +@pytest.mark.django_db +class TestAutoScaling: + + def setup_method(self, test_method): + self.pool = AutoscalePool(min_workers=2, max_workers=10) + + def teardown_method(self, test_method): + self.pool.stop(signal.SIGTERM) + + def test_scale_up(self): + result_queue = multiprocessing.Queue() + self.pool.init_workers(SlowResultWriter().work_loop, result_queue) + + # start with two workers, write an event to each worker and make it busy + assert len(self.pool) == 2 + for i, w in enumerate(self.pool.workers): + w.put('Hello, Worker {}'.format(0)) + assert len(self.pool) == 2 + + # wait for the subprocesses to start working on their tasks and be marked busy + time.sleep(1) + assert self.pool.should_grow + + # write a third message, expect a new worker to spawn because all + # workers are busy + self.pool.write(0, 'Hello, Worker {}'.format(2)) + assert len(self.pool) == 3 + + def test_scale_down(self): + self.pool.init_workers(ResultWriter().work_loop, multiprocessing.Queue()) + + # start with two workers, and scale up to 10 workers + assert len(self.pool) == 2 + for i in range(8): + self.pool.up() + assert len(self.pool) == 10 + + # cleanup should scale down to 8 workers + self.pool.cleanup() + assert len(self.pool) == 2 + + def test_max_scale_up(self): + self.pool.init_workers(ResultWriter().work_loop, multiprocessing.Queue()) + + assert len(self.pool) == 2 + for i in range(25): + self.pool.up() + assert self.pool.max_workers == 10 + assert self.pool.full is True + assert len(self.pool) == 10 + + def test_equal_worker_distribution(self): + # if all workers are busy, spawn new workers *before* adding messages + # to an existing queue + self.pool.init_workers(SlowResultWriter().work_loop, multiprocessing.Queue) + + # start with two workers, write an event to each worker and make it busy + assert len(self.pool) == 2 + for i in range(10): + self.pool.write(0, 'Hello, World!') + assert len(self.pool) == 10 + for w in self.pool.workers: + assert w.busy + assert len(w.managed_tasks) == 1 + + # the queue is full at 10, the _next_ write should put the message into + # a worker's backlog + assert len(self.pool) == 10 + for w in self.pool.workers: + assert w.messages_sent == 1 + self.pool.write(0, 'Hello, World!') + assert len(self.pool) == 10 + assert self.pool.workers[0].messages_sent == 2 + + def test_lost_worker_autoscale(self): + # if a worker exits, it should be replaced automatically up to min_workers + self.pool.init_workers(ResultWriter().work_loop, multiprocessing.Queue()) + + # start with two workers, kill one of them + assert len(self.pool) == 2 + assert not self.pool.should_grow + alive_pid = self.pool.workers[1].pid + self.pool.workers[0].process.terminate() + time.sleep(1) # wait a moment for sigterm + + # clean up and the dead worker + self.pool.cleanup() + assert len(self.pool) == 1 + assert self.pool.workers[0].pid == alive_pid + + # the next queue write should replace the lost worker + self.pool.write(0, 'Hello, Worker') + assert len(self.pool) == 2 + + +class TestTaskDispatcher: + + @property + def tm(self): + return TaskWorker() + + def test_function_dispatch(self): + result = self.tm.perform_work({ + 'task': 'awx.main.tests.functional.test_dispatch.add', + 'args': [2, 2] + }) + assert result == 4 + + def test_method_dispatch(self): + result = self.tm.perform_work({ + 'task': 'awx.main.tests.functional.test_dispatch.Adder', + 'args': [2, 2] + }) + assert result == 4 + + +class TestTaskPublisher: + + def test_function_callable(self): + assert add(2, 2) == 4 + + def test_method_callable(self): + assert Adder().run(2, 2) == 4 + + def test_function_apply_async(self): + message, queue = add.apply_async([2, 2]) + assert message['args'] == [2, 2] + assert message['kwargs'] == {} + assert message['task'] == 'awx.main.tests.functional.test_dispatch.add' + assert queue == 'awx_private_queue' + + def test_method_apply_async(self): + message, queue = Adder.apply_async([2, 2]) + assert message['args'] == [2, 2] + assert message['kwargs'] == {} + assert message['task'] == 'awx.main.tests.functional.test_dispatch.Adder' + assert queue == 'awx_private_queue' + + def test_apply_with_queue(self): + message, queue = add.apply_async([2, 2], queue='abc123') + assert queue == 'abc123' + + def test_queue_defined_in_task_decorator(self): + message, queue = multiply.apply_async([2, 2]) + assert queue == 'hard-math' + + def test_queue_overridden_from_task_decorator(self): + message, queue = multiply.apply_async([2, 2], queue='not-so-hard') + assert queue == 'not-so-hard' + + def test_apply_with_callable_queuename(self): + message, queue = add.apply_async([2, 2], queue=lambda: 'called') + assert queue == 'called' + + +yesterday = tz_now() - datetime.timedelta(days=1) + + +@pytest.mark.django_db +class TestJobReaper(object): + + @pytest.mark.parametrize('status, execution_node, controller_node, modified, fail', [ + ('running', '', '', None, False), # running, not assigned to the instance + ('running', 'awx', '', None, True), # running, has the instance as its execution_node + ('running', '', 'awx', None, True), # running, has the instance as its controller_node + ('waiting', '', '', None, False), # waiting, not assigned to the instance + ('waiting', 'awx', '', None, False), # waiting, was edited less than a minute ago + ('waiting', '', 'awx', None, False), # waiting, was edited less than a minute ago + ('waiting', 'awx', '', yesterday, True), # waiting, assigned to the execution_node, stale + ('waiting', '', 'awx', yesterday, True), # waiting, assigned to the controller_node, stale + ]) + def test_should_reap(self, status, fail, execution_node, controller_node, modified): + i = Instance(hostname='awx') + i.save() + j = Job( + status=status, + execution_node=execution_node, + controller_node=controller_node, + start_args='SENSITIVE', + ) + j.save() + if modified: + # we have to edit the modification time _without_ calling save() + # (because .save() overwrites it to _now_) + Job.objects.filter(id=j.id).update(modified=modified) + reaper.reap(i) + job = Job.objects.first() + if fail: + assert job.status == 'failed' + assert 'marked as failed' in job.job_explanation + assert job.start_args == '' + else: + assert job.status == status + + def test_workflow_does_not_reap(self): + i = Instance(hostname='awx') + i.save() + j = WorkflowJob( + status='running', + execution_node='awx' + ) + j.save() + reaper.reap(i) + + assert WorkflowJob.objects.first().status == 'running' diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py deleted file mode 100644 index da3bddc5e42f..000000000000 --- a/awx/main/tests/unit/test_task_manager.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (c) 2017 Ansible by Red Hat -# All Rights Reserved. - -import mock -import pytest - -from django.utils.timezone import now as tz_now -from django.db import DatabaseError - -from awx.main.scheduler import TaskManager -from awx.main.models import ( - Job, - Instance, - InstanceGroup, -) -from django.core.cache import cache - - -class TestCleanupInconsistentCeleryTasks(): - @mock.patch.object(cache, 'get', return_value=None) - @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) - @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) - @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) - @mock.patch.object(Instance.objects, 'filter', return_value=mock.MagicMock(first=lambda: None)) - @mock.patch('awx.main.scheduler.task_manager.logger') - def test_instance_does_not_exist(self, logger_mock, *args): - logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked")) - tm = TaskManager() - with pytest.raises(RuntimeError) as excinfo: - tm.cleanup_inconsistent_celery_tasks() - - assert "mocked" in str(excinfo.value) - logger_mock.error.assert_called_once_with("Execution node Instance host1 not found in database. " - "The node is currently executing jobs ['job 2 (new)', " - "'job 3 (new)']") - - @mock.patch.object(cache, 'get', return_value=None) - @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []})) - @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) - @mock.patch.object(TaskManager, 'get_running_tasks') - @mock.patch('awx.main.scheduler.task_manager.logger') - def test_save_failed(self, logger_mock, get_running_tasks, *args): - logger_mock.error = mock.MagicMock() - job = Job(id=2, modified=tz_now(), status='running', celery_task_id='blah', execution_node='host1') - job.websocket_emit_status = mock.MagicMock() - get_running_tasks.return_value = ({'host1': [job]}, []) - tm = TaskManager() - - with mock.patch.object(job, 'save', side_effect=DatabaseError): - tm.cleanup_inconsistent_celery_tasks() - job.save.assert_called_once() - logger_mock.error.assert_called_once_with("Task job 2 (failed) DB error in marking failed. Job possibly deleted.") - - @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) - @mock.patch('awx.main.scheduler.task_manager.Inspect') - def test_multiple_active_instances_sanity_check(self, inspect_mock, *args): - class MockInspector: - pass - - mock_inspector = MockInspector() - mock_inspector.active = lambda: { - 'celery@host1': [], - 'celery@host2': [] - } - inspect_mock.return_value = mock_inspector - tm = TaskManager() - active_task_queues, queues = tm.get_active_tasks() - assert 'host1' in queues - assert 'host2' in queues diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 91e3c3505d05..6d86fec711fb 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -67,7 +67,7 @@ def test_work_success_callback_missing_job(): task_data = {'type': 'project_update', 'id': 9999} with mock.patch('django.db.models.query.QuerySet.get') as get_mock: get_mock.side_effect = ProjectUpdate.DoesNotExist() - assert tasks.handle_work_success(None, task_data) is None + assert tasks.handle_work_success(task_data) is None def test_send_notifications_list(mocker): diff --git a/awx/main/tests/unit/utils/test_reload.py b/awx/main/tests/unit/utils/test_reload.py index 1820f2724afb..87c2689da8b8 100644 --- a/awx/main/tests/unit/utils/test_reload.py +++ b/awx/main/tests/unit/utils/test_reload.py @@ -8,8 +8,8 @@ def test_produce_supervisor_command(mocker): mock_process.communicate = communicate_mock Popen_mock = mocker.MagicMock(return_value=mock_process) with mocker.patch.object(reload.subprocess, 'Popen', Popen_mock): - reload._supervisor_service_command(['beat', 'callback', 'fact'], "restart") + reload._supervisor_service_command("restart") reload.subprocess.Popen.assert_called_once_with( - ['supervisorctl', 'restart', 'tower-processes:receiver',], + ['supervisorctl', 'restart', 'tower-processes:*',], stderr=-1, stdin=-1, stdout=-1) diff --git a/awx/main/utils/autoscale.py b/awx/main/utils/autoscale.py deleted file mode 100644 index 49f07a251c6f..000000000000 --- a/awx/main/utils/autoscale.py +++ /dev/null @@ -1,27 +0,0 @@ -from celery.utils.log import get_logger -from celery.worker.autoscale import Autoscaler, AUTOSCALE_KEEPALIVE -from django.conf import settings -import psutil - -logger = get_logger('awx.main.tasks') - - -class DynamicAutoScaler(Autoscaler): - - def __init__(self, pool, max_concurrency, min_concurrency=0, worker=None, - keepalive=AUTOSCALE_KEEPALIVE, mutex=None): - super(DynamicAutoScaler, self).__init__(pool, max_concurrency, - min_concurrency, worker, - keepalive, mutex) - settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) - if settings_absmem is not None: - total_memory_gb = int(settings_absmem) - else: - total_memory_gb = (psutil.virtual_memory().total >> 30) + 1 # noqa: round up - - # 5 workers per GB of total memory - self.max_concurrency = min(max_concurrency, (total_memory_gb * 5)) - logger.warn('celery worker dynamic --autoscale={},{}'.format( - self.max_concurrency, - self.min_concurrency - )) diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py deleted file mode 100644 index 65fe1190d717..000000000000 --- a/awx/main/utils/ha.py +++ /dev/null @@ -1,17 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (c) 2017 Ansible Tower by Red Hat -# All Rights Reserved. - -from django.conf import settings - - -class AWXCeleryRouter(object): - def route_for_task(self, task, args=None, kwargs=None): - tasks = [ - 'awx.main.tasks.cluster_node_heartbeat', - 'awx.main.tasks.purge_old_stdout_files', - 'awx.main.tasks.awx_isolated_heartbeat', - ] - if task in tasks: - return {'queue': settings.CLUSTER_HOST_ID, 'routing_key': settings.CLUSTER_HOST_ID} diff --git a/awx/main/utils/reload.py b/awx/main/utils/reload.py index 8da1fb0096b3..bdfcc0dcc9e4 100644 --- a/awx/main/utils/reload.py +++ b/awx/main/utils/reload.py @@ -11,11 +11,8 @@ logger = logging.getLogger('awx.main.utils.reload') -def _supervisor_service_command(service_internal_names, command, communicate=True): +def _supervisor_service_command(command, communicate=True): ''' - Service internal name options: - - beat - celery - callback - channels - uwsgi - daphne - - fact - nginx example use pattern of supervisorctl: # supervisorctl restart tower-processes:receiver tower-processes:factcacher ''' @@ -25,13 +22,7 @@ def _supervisor_service_command(service_internal_names, command, communicate=Tru args = ['supervisorctl'] if settings.DEBUG: args.extend(['-c', '/supervisor.conf']) - programs = [] - name_translation_dict = settings.SERVICE_NAME_DICT - for n in service_internal_names: - if n in name_translation_dict: - programs.append('{}:{}'.format(group_name, name_translation_dict[n])) - args.extend([command]) - args.extend(programs) + args.extend([command, '{}:*'.format(group_name)]) logger.debug('Issuing command to {} services, args={}'.format(command, args)) supervisor_process = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -48,6 +39,6 @@ def _supervisor_service_command(service_internal_names, command, communicate=Tru logger.info('Submitted supervisorctl {} command, not waiting for result'.format(command)) -def stop_local_services(service_internal_names, communicate=True): - logger.warn('Stopping services {} on this node in response to user action'.format(service_internal_names)) - _supervisor_service_command(service_internal_names, command='stop', communicate=communicate) +def stop_local_services(communicate=True): + logger.warn('Stopping services on this node in response to user action') + _supervisor_service_command(command='stop', communicate=communicate) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 633e5081a1cd..ac347c43c269 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -4,7 +4,6 @@ import os import re # noqa import sys -import djcelery import six from datetime import timedelta @@ -26,6 +25,8 @@ def is_testing(argv=None): import sys '''Return True if running django or py.test unit tests.''' + if 'PYTEST_CURRENT_TEST' in os.environ.keys(): + return True argv = sys.argv if argv is None else argv if len(argv) >= 1 and ('py.test' in argv[0] or 'py/test.py' in argv[0]): return True @@ -60,7 +61,7 @@ def IS_TESTING(argv=None): 'NAME': os.path.join(BASE_DIR, 'awx.sqlite3'), 'ATOMIC_REQUESTS': True, 'TEST': { - # Test database cannot be :memory: for celery/inventory tests. + # Test database cannot be :memory: for inventory tests. 'NAME': os.path.join(BASE_DIR, 'awx_test.sqlite3'), }, } @@ -280,7 +281,6 @@ def IS_TESTING(argv=None): 'oauth2_provider', 'rest_framework', 'django_extensions', - 'djcelery', 'channels', 'polymorphic', 'taggit', @@ -459,40 +459,9 @@ def IS_TESTING(argv=None): # Set default ports for live server tests. os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') -djcelery.setup_loader() - BROKER_POOL_LIMIT = None BROKER_URL = 'amqp://guest:guest@localhost:5672//' -CELERY_EVENT_QUEUE_TTL = 5 CELERY_DEFAULT_QUEUE = 'awx_private_queue' -CELERY_DEFAULT_EXCHANGE = 'awx_private_queue' -CELERY_DEFAULT_ROUTING_KEY = 'awx_private_queue' -CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' -CELERY_TASK_SERIALIZER = 'json' -CELERY_RESULT_SERIALIZER = 'json' -CELERY_ACCEPT_CONTENT = ['json'] -CELERY_TRACK_STARTED = True -CELERYD_TASK_TIME_LIMIT = None -CELERYD_TASK_SOFT_TIME_LIMIT = None -CELERYD_POOL_RESTARTS = True -CELERYD_AUTOSCALER = 'awx.main.utils.autoscale:DynamicAutoScaler' -CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' -CELERY_IMPORTS = ('awx.main.scheduler.tasks',) -CELERY_QUEUES = () -CELERY_ROUTES = ('awx.main.utils.ha.AWXCeleryRouter',) - - -def log_celery_failure(*args): - # Import annotations lazily to avoid polluting the `awx.settings` namespace - # and causing circular imports - from awx.main.tasks import log_celery_failure - return log_celery_failure(*args) - - -CELERY_ANNOTATIONS = {'*': {'on_failure': log_celery_failure}} - -CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' -CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERYBEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.awx_periodic_scheduler', @@ -525,9 +494,6 @@ def log_celery_failure(*args): } AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3 -# Celery queues that will always be listened to by celery workers -# Note: Broadcast queues have unique, auto-generated names, with the alias -# property value of the original queue name. AWX_CELERY_QUEUES_STATIC = [ six.text_type(CELERY_DEFAULT_QUEUE), ] @@ -626,8 +592,8 @@ def log_celery_failure(*args): SOCIAL_AUTH_SAML_ORGANIZATION_ATTR = {} SOCIAL_AUTH_SAML_TEAM_ATTR = {} -# Any ANSIBLE_* settings will be passed to the subprocess environment by the -# celery task. +# Any ANSIBLE_* settings will be passed to the task runner subprocess +# environment # Do not want AWX to ask interactive questions and want it to be friendly with # reprovisioning @@ -641,8 +607,7 @@ def log_celery_failure(*args): # output ANSIBLE_FORCE_COLOR = True -# Additional environment variables to be passed to the subprocess started by -# the celery task. +# Additional environment variables to be passed to the ansible subprocesses AWX_TASK_ENV = {} # Flag to enable/disable updating hosts M2M when saving job events. @@ -1071,6 +1036,15 @@ def log_celery_failure(*args): 'backupCount': 5, 'formatter':'simple', }, + 'callback_receiver': { + 'level': 'WARNING', + 'class':'logging.handlers.RotatingFileHandler', + 'filters': ['require_debug_false'], + 'filename': os.path.join(LOG_ROOT, 'callback_receiver.log'), + 'maxBytes': 1024 * 1024 * 5, # 5 MB + 'backupCount': 5, + 'formatter':'simple', + }, 'dispatcher': { 'level': 'WARNING', 'class':'logging.handlers.RotatingFileHandler', @@ -1080,6 +1054,10 @@ def log_celery_failure(*args): 'backupCount': 5, 'formatter':'dispatcher', }, + 'celery.beat': { + 'class':'logging.StreamHandler', + 'level': 'ERROR' + }, # don't log every celerybeat wakeup 'inventory_import': { 'level': 'DEBUG', 'class':'logging.StreamHandler', @@ -1162,6 +1140,9 @@ def log_celery_failure(*args): 'awx.main': { 'handlers': ['null'] }, + 'awx.main.commands.run_callback_receiver': { + 'handlers': ['callback_receiver'], + }, 'awx.main.dispatch': { 'handlers': ['dispatcher'], }, diff --git a/awx/settings/development.py b/awx/settings/development.py index eadc6ae1f4a8..9627d647402c 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -68,13 +68,6 @@ 'django.template.loaders.app_directories.Loader', ) -# Disable capturing all SQL queries when running celeryd in development. -if 'celery' in sys.argv: - SQL_DEBUG = False - -CELERYD_HIJACK_ROOT_LOGGER = False -CELERYD_LOG_COLOR = True - CALLBACK_QUEUE = "callback_tasks" # Enable dynamically pulling roles from a requirement.yml file @@ -149,15 +142,6 @@ CLUSTER_HOST_ID = socket.gethostname() -# Supervisor service name dictionary used for programatic restart -SERVICE_NAME_DICT = { - "celery": "celery", - "callback": "receiver", - "runworker": "channels", - "uwsgi": "uwsgi", - "daphne": "daphne", - "nginx": "nginx"} - try: socket.gethostbyname('docker.for.mac.host.internal') os.environ['SDB_NOTIFY_HOST'] = 'docker.for.mac.host.internal' diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index a9cf664c9a5e..09dd03ffdc4c 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -73,13 +73,13 @@ if "pytest" in sys.modules: 'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, 'awx.sqlite3'), 'TEST': { - # Test database cannot be :memory: for celery/inventory tests. + # Test database cannot be :memory: for inventory tests. 'NAME': os.path.join(BASE_DIR, 'awx_test.sqlite3'), }, } } -# Celery AMQP configuration. +# AMQP configuration. BROKER_URL = "amqp://{}:{}@{}/{}".format(os.environ.get("RABBITMQ_USER"), os.environ.get("RABBITMQ_PASS"), os.environ.get("RABBITMQ_HOST"), @@ -138,8 +138,7 @@ REMOTE_HOST_HEADERS = ['REMOTE_ADDR', 'REMOTE_HOST'] # REMOTE_HOST_HEADERS will be trusted unconditionally') PROXY_IP_WHITELIST = [] -# Define additional environment variables to be passed to subprocess started by -# the celery task. +# Define additional environment variables to be passed to ansible subprocesses #AWX_TASK_ENV['FOO'] = 'BAR' # If set, use -vvv for project updates instead of -v for more output. diff --git a/awx/settings/local_settings.py.example b/awx/settings/local_settings.py.example index ffafd7216f9f..915ab36809aa 100644 --- a/awx/settings/local_settings.py.example +++ b/awx/settings/local_settings.py.example @@ -39,13 +39,13 @@ if is_testing(sys.argv): 'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, 'awx.sqlite3'), 'TEST': { - # Test database cannot be :memory: for celery/inventory tests. + # Test database cannot be :memory: for tests. 'NAME': os.path.join(BASE_DIR, 'awx_test.sqlite3'), }, } } -# Celery AMQP configuration. +# AMQP configuration. BROKER_URL = 'amqp://guest:guest@localhost:5672' # Set True to enable additional logging from the job_event_callback plugin @@ -94,8 +94,7 @@ REMOTE_HOST_HEADERS = ['REMOTE_ADDR', 'REMOTE_HOST'] # REMOTE_HOST_HEADERS will be trusted unconditionally') PROXY_IP_WHITELIST = [] -# Define additional environment variables to be passed to subprocess started by -# the celery task. +# Define additional environment variables to be passed to ansible subprocesses #AWX_TASK_ENV['FOO'] = 'BAR' # If set, use -vvv for project updates instead of -v for more output. diff --git a/awx/settings/production.py b/awx/settings/production.py index ff7d9661386c..77fe12156d98 100644 --- a/awx/settings/production.py +++ b/awx/settings/production.py @@ -54,21 +54,13 @@ LOGGING['handlers']['tower_warnings']['filename'] = '/var/log/tower/tower.log' LOGGING['handlers']['callback_receiver']['filename'] = '/var/log/tower/callback_receiver.log' +LOGGING['handlers']['dispatcher']['filename'] = '/var/log/tower/dispatcher.log' LOGGING['handlers']['task_system']['filename'] = '/var/log/tower/task_system.log' LOGGING['handlers']['fact_receiver']['filename'] = '/var/log/tower/fact_receiver.log' LOGGING['handlers']['management_playbooks']['filename'] = '/var/log/tower/management_playbooks.log' LOGGING['handlers']['system_tracking_migrations']['filename'] = '/var/log/tower/tower_system_tracking_migrations.log' LOGGING['handlers']['rbac_migrations']['filename'] = '/var/log/tower/tower_rbac_migrations.log' -# Supervisor service name dictionary used for programatic restart -SERVICE_NAME_DICT = { - "beat": "awx-celery-beat", - "celery": "awx-celery", - "callback": "awx-callback-receiver", - "channels": "awx-channels-worker", - "uwsgi": "awx-uwsgi", - "daphne": "awx-daphne"} - # Store a snapshot of default settings at this point before loading any # customizable config files. DEFAULTS_SNAPSHOT = {} diff --git a/docs/resource_copy.md b/docs/resource_copy.md index da85d5522526..77d6e6b82094 100644 --- a/docs/resource_copy.md +++ b/docs/resource_copy.md @@ -102,7 +102,7 @@ available fields. ``` `CopyAPIView` will automatically detect sub objects of an object, and do a deep copy of all sub objects -as a background celery task. There are sometimes permission issues with sub object copy. For example, +as a background task. There are sometimes permission issues with sub object copy. For example, when copying nodes of a workflow job template, there are cases where the user performing copy has no use permission of related credential and inventory of some nodes, and it is desired those fields will be `None`. In order to do that, developer should provide a static method `deep_copy_permission_check_func` diff --git a/docs/task_manager_system.md b/docs/task_manager_system.md index 4d1dd8daefe3..697474b02d18 100644 --- a/docs/task_manager_system.md +++ b/docs/task_manager_system.md @@ -1,15 +1,15 @@ # Task Manager Overview -The task manager is responsible for deciding when jobs should be introduced to celery for running. When choosing a task to run the considerations are: (1) creation time, (2) job dependency, (3) capacity. +The task manager is responsible for deciding when jobs should scheduled to run. When choosing a task to run the considerations are: (1) creation time, (2) job dependency, (3) capacity. -Independent jobs are ran in order of creation time, earliest first. Jobs with dependencies are also ran in creation time order within the group of job dependencies. Capacity is the final consideration when deciding to release a job to be ran by celery. +Independent jobs are ran in order of creation time, earliest first. Jobs with dependencies are also ran in creation time order within the group of job dependencies. Capacity is the final consideration when deciding to release a job to be ran by the task dispatcher. ## Task Manager Architecture The task manager has a single entry point, `Scheduler().schedule()`. The method may be called in parallel, at any time, as many times as the user wants. The `schedule()` function tries to aquire a single, global, lock using the Instance table first record in the database. If the lock cannot be aquired the method returns. The failure to aquire the lock indicates that there is another instance currently running `schedule()`. ### Hybrid Scheduler: Periodic + Event -The `schedule()` function is ran (a) periodically by a celery task and (b) on job creation or completion. The task manager system would behave correctly if ran, exclusively, via (a) or (b). We chose to trigger `schedule()` via both mechanisms because of the nice properties I will now mention. (b) reduces the time from launch to running, resulting a better user experience. (a) is a fail-safe in case we miss code-paths, in the present and future, that change the 3 scheduling considerations for which we should call `schedule()` (i.e. adding new nodes to tower changes the capacity, obscure job error handling that fails a job) +The `schedule()` function is ran (a) periodically by a background task and (b) on job creation or completion. The task manager system would behave correctly if ran, exclusively, via (a) or (b). We chose to trigger `schedule()` via both mechanisms because of the nice properties I will now mention. (b) reduces the time from launch to running, resulting a better user experience. (a) is a fail-safe in case we miss code-paths, in the present and future, that change the 3 scheduling considerations for which we should call `schedule()` (i.e. adding new nodes to tower changes the capacity, obscure job error handling that fails a job) Emperically, the periodic task manager has served us well in the past and we will continue to rely on it with the added event-triggered `schedule()`. ### Scheduler Algorithm @@ -17,14 +17,14 @@ The `schedule()` function is ran (a) periodically by a celery task and (b) on jo * Detect finished workflow jobs * Spawn next workflow jobs if needed * For each pending jobs; start with oldest created job - * If job is not blocked, and there is capacity in the instance group queue, then mark the as `waiting` and submit the job to celery. + * If job is not blocked, and there is capacity in the instance group queue, then mark the as `waiting` and submit the job to RabbitMQ. ### Job Lifecycle | Job Status | State | |:----------:|:------------------------------------------------------------------------------------------------------------------:| | pending | Job launched.
1. Hasn't yet been seen by the scheduler
2. Is blocked by another task
3. Not enough capacity | -| waiting | Job submitted to celery. | -| running | Job running in celery. | +| waiting | Job published to an AMQP queue. +| running | Job running on a Tower node. | successful | Job finished with ansible-playbook return code 0. | | failed | Job finished with ansible-playbook return code other than 0. | | error | System failure. | diff --git a/docs/tasks.md b/docs/tasks.md new file mode 100644 index 000000000000..97a205e307aa --- /dev/null +++ b/docs/tasks.md @@ -0,0 +1,155 @@ +Background Tasks in AWX +======================= + +AWX runs a lot of Python code asynchronously _in the background_ - meaning +_outside_ of the context of an HTTP request, such as: + +* Any time a Job is launched in AWX (a Job Template, an Adhoc Command, a Project + Update, an Inventory Update, a System Job), a background process retrieves + metadata _about_ that job from the database and forks some process (e.g., + `ansible-playbook`, `awx-manage inventory_import`) +* Certain expensive or time consuming tasks run in the background + asynchronously (like deleting an inventory). +* AWX runs a variety of periodic background tasks on a schedule. Some examples + are: + - AWX's "Task Manager/Scheduler" wakes up periodically and looks for + `pending` jobs that have been launched and are ready to start running. + - AWX periodically runs code that looks for scheduled jobs and launches + them. + - AWX runs a variety of periodic tasks that clean up temporary files, and + perform various administrative checks + - Every node in an AWX cluster runs a periodic task that serves as + a heartbeat and capacity check + +Tasks, Queues and Workers +---------------- + +To accomplish this, AWX makes use of a "Task Queue" abstraction. Task Queues +are used as a mechanism to distribute work across machines in an AWX +installation. A Task Queue's input is a unit of work called a Task. Dedicated +worker processes running on every AWX node constantly monitor these queues for +new work to perform. + +AWX communicates with these worker processes via AMQP - using RabbitMQ, +specifically - to mediate between clients and workers. To initiate a task, the +client (generally, Python code in the AWX API) publishes a message to a queue, +and RabbitMQ then delivers that message to one or more workers. + +Clustered AWX installations consist of multiple workers spread across every +node, giving way to high availability and horizontal scaling. + +Direct vs Fanout Messages +------------------------- + +AWX publishes tasks in two distinct ways. + +*Direct* messages are bound _directly_ to a specific named queue. When you launch +a Job Template in AWX, it looks at the available capacity of the various nodes +in your cluster and chooses an `Execution Node` where the playbook will run. +In this scenario, AWX publishes a message to a direct queue associated with +that AWX node. The dispatcher process running on that AWX node is specifically +bound to _listen_ for new events on their instance-specific queue. + +Certain direct queues in AWX are bound to by _every_ AWX node. For example, +when an inventory deletion task is published, any available node across the +entire AWX may perform the work. Under _direct_ exchanges, every published +message is consumed and handled by *one* worker process. + +*Fanout* messages are sent out in a broadcast fashion. When you change +a setting value in the AWX API, a fanout message is broadcast to _every_ AWX +node in your cluster, and code runs on _every_ node. + +Defining and Running Tasks +-------------------------- +Tasks are defined in AWX's source code, and generally live in the +`awx.main.tasks` module. Tasks can be defined as simple functions: + + from awx.main.dispatch.publish import task + + @task() + def add(a, b): + return a + b + +...or classes that define a `run` method: + + @task() + class Adder: + def run(self, a, b): + return a + b + +To publish a task and run it in the background, use the `apply_async()` +function: + + add.apply_async([1, 1]) + Adder.apply_async([1, 1]) + +When you run this function, a JSON message is composed and published to the +appropriate AMQP queue: + + { + "uuid": "", + "args": [1, 1], + "kwargs": {}, + "task": "awx.main.tasks.add" + } + +When a background worker receives the message, it deserializes it and runs the +associated Python code: + + awx.main.tasks.add(123) + +Dispatcher Implementation +------------------------- +Every node in an AWX install runs `awx-manage run_dispatcher`, a Python process +that uses the `kombu` library to consume messages from the appropriate queues +for that node (the default shared queue, a queue specific to the node's +hostname, and the broadcast queue). The Dispatcher process manages a pool of +child processes that it distributes inbound messages to. These worker +processes perform the actual work of deserializing published tasks and running +the associated Python code. + +Heartbeats, Capacity, and Job Reaping +------------------------------------ +One of the most important tasks in a clustered AWX installation is the periodic +heartbeat task. This task runs periodically on _every_ node, and is used to +record a heartbeat and system capacity for that node (which is used by the +scheduler when determining where to placed launched jobs). + +If a node in an AWX cluster discovers that one of its peers has not updated its +heartbeat within a certain grace period, it is assumed to be offline, and its +capacity is set to zero to avoid scheduling new tasks on that node. +Additionally, jobs allegedly running or scheduled to run on that node are +assumed to be lost, and "reaped", or marked as failed. + +Debugging +--------- +`awx-manage run_dispatcher` includes a few flags that allow interaction and +debugging: + +``` +[root@awx /]# awx-manage run_dispatcher --status +2018-09-14 18:39:22,223 WARNING awx.main.dispatch checking dispatcher status for awx +awx[pid:9610] workers total=4 min=4 max=60 +. worker[pid:9758] sent=12 finished=12 qsize=0 rss=106.730MB [IDLE] +. worker[pid:9769] sent=5 finished=5 qsize=0 rss=105.141MB [IDLE] +. worker[pid:9782] sent=5 finished=4 qsize=1 rss=110.430MB + - running 0c1deb4d-25ae-49a9-804f-a8afd05aff29 RunJob(*[9]) +. worker[pid:9787] sent=3 finished=3 qsize=0 rss=101.824MB [IDLE] +``` + +This outputs running and queued task UUIDs handled by a specific dispatcher +(which corresponds to `main_unifiedjob.celery_task_id` in the database): + +``` +[root@awx /]# awx-manage run_dispatcher --running +2018-09-14 18:39:22,223 WARNING awx.main.dispatch checking dispatcher running for awx +['eb3b0a83-86da-413d-902a-16d7530a6b25', 'f447266a-23da-42b4-8025-fe379d2db96f'] +``` + +Additionally, you can tell the local running dispatcher to recycle all of the +workers in its pool. It will wait for any running jobs to finish and exit when +work has completed, spinning up replacement workers. + +``` +awx-manage run_dispatcher --reload +``` diff --git a/installer/roles/image_build/files/supervisor_task.conf b/installer/roles/image_build/files/supervisor_task.conf index 44e1ffcb8abc..e7e94196e6f3 100644 --- a/installer/roles/image_build/files/supervisor_task.conf +++ b/installer/roles/image_build/files/supervisor_task.conf @@ -2,8 +2,8 @@ nodaemon = True umask = 022 -[program:celery] -command = /var/lib/awx/venv/awx/bin/celery worker -A awx -B -l debug --autoscale=50,4 -Ofair -s /var/lib/awx/beat.db -n celery@%(ENV_HOSTNAME)s +[program:dispatcher] +command = awx-manage run_dispatcher directory = /var/lib/awx environment = LANGUAGE="en_US.UTF-8",LANG="en_US.UTF-8",LC_ALL="en_US.UTF-8",LC_CTYPE="en_US.UTF-8" #user = {{ aw_user }} @@ -15,18 +15,6 @@ stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 -[program:celery-watcher] -command = /usr/bin/awx-manage watch_celery -directory = /var/lib/awx -environment = LANGUAGE="en_US.UTF-8",LANG="en_US.UTF-8",LC_ALL="en_US.UTF-8",LC_CTYPE="en_US.UTF-8" -autostart = true -autorestart = true -stopwaitsecs = 5 -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 - [program:callback-receiver] command = awx-manage run_callback_receiver directory = /var/lib/awx @@ -50,7 +38,7 @@ stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 [group:tower-processes] -programs=celery,celery-watcher,callback-receiver,channels-worker +programs=dispatcher,callback-receiver,channels-worker priority=5 # TODO: Exit Handler diff --git a/installer/roles/image_build/templates/Dockerfile.j2 b/installer/roles/image_build/templates/Dockerfile.j2 index b93605874aca..0236194d53aa 100644 --- a/installer/roles/image_build/templates/Dockerfile.j2 +++ b/installer/roles/image_build/templates/Dockerfile.j2 @@ -41,6 +41,7 @@ RUN yum -y install epel-release && \ rm -rf /root/.cache RUN mkdir -p /var/log/tower +RUN chmod -R g+w /var/log/tower RUN mkdir -p /etc/tower COPY {{ awx_sdist_file }} /tmp/{{ awx_sdist_file }} RUN OFFICIAL=yes pip install /tmp/{{ awx_sdist_file }} diff --git a/requirements/requirements.in b/requirements/requirements.in index 477b1bd5de9a..4f1bfc5b37f6 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -9,7 +9,6 @@ daphne==1.3.0 # Last before backwards-incompatible channels 2 upgrade decorator==4.2.1 Django==1.11.11 django-auth-ldap==1.2.8 -django-celery==3.2.2 django-crum==0.7.2 django-extensions==2.0.0 django-jsonfield==1.0.1 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 7dc132f48549..ac8ef9bc33d4 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -28,7 +28,6 @@ daphne==1.3.0 decorator==4.2.1 defusedxml==0.4.1 # via python-saml django-auth-ldap==1.2.8 -django-celery==3.2.2 django-crum==0.7.2 django-extensions==2.0.0 django-jsonfield==1.0.1 diff --git a/requirements/requirements_dev.txt b/requirements/requirements_dev.txt index 261ba40b54c9..07dd2b9d0444 100644 --- a/requirements/requirements_dev.txt +++ b/requirements/requirements_dev.txt @@ -15,7 +15,6 @@ pytest-mock pytest-timeout pytest-xdist logutils -flower jupyter matplotlib backports.tempfile # support in unit tests for py32+ tempfile.TemporaryDirectory diff --git a/tools/docker-compose-cluster.yml b/tools/docker-compose-cluster.yml index 934acbed1a29..ca0c6b73ef89 100644 --- a/tools/docker-compose-cluster.yml +++ b/tools/docker-compose-cluster.yml @@ -12,7 +12,6 @@ services: - "8013:8013" - "8043:8043" - "1936:1936" - - "5555:5555" - "15672:15672" awx_1: privileged: true diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index 59adf5f0c612..5fbb610631f1 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -14,10 +14,9 @@ services: ports: - "8888:8888" - "8080:8080" - - "5555:5555" - "8013:8013" - "8043:8043" - - "6899-6999:6899-6999" # default port range for celery.contrib.rdb + - "6899-6999:6899-6999" # default port range for sdb-listen links: - postgres - memcached diff --git a/tools/docker-compose/Procfile b/tools/docker-compose/Procfile index bd0b222175e5..033a0e8eb041 100644 --- a/tools/docker-compose/Procfile +++ b/tools/docker-compose/Procfile @@ -1,8 +1,7 @@ nginx: make nginx runworker: make runworker daphne: make daphne -celeryd: make celeryd +dispatcher: make dispatcher receiver: make receiver -flower: make flower uwsgi: make uwsgi -jupyter: make jupyter \ No newline at end of file +jupyter: make jupyter diff --git a/tools/docker-compose/haproxy.cfg b/tools/docker-compose/haproxy.cfg index f692a5c2b171..9f4fcfa6e396 100644 --- a/tools/docker-compose/haproxy.cfg +++ b/tools/docker-compose/haproxy.cfg @@ -22,11 +22,6 @@ frontend localnodes_ssl mode tcp default_backend nodes_ssl -frontend flower - bind *:5555 - mode http - default_backend flower_nodes - frontend rabbitctl bind *:15672 mode http @@ -51,18 +46,6 @@ backend nodes_ssl server awx_2 awx_2:8043 server awx_3 awx_3:8043 -backend flower_nodes - mode http - balance roundrobin - option forwardfor - option http-pretend-keepalive - http-request set-header X-Forwarded-Port %[dst_port] - http-request add-header X-Forwarded-Proto https if { ssl_fc } - #option httpchk HEAD / HTTP/1.1\r\nHost:localhost - server awx_1 awx_1:5555 - server awx_2 awx_2:5555 - server awx_3 awx_3:5555 - backend rabbitctl_nodes mode http balance roundrobin diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index e5d35476444e..bd07fdffe7c1 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -3,8 +3,8 @@ umask = 022 minfds = 4096 nodaemon=true -[program:celeryd] -command = celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 -s /var/lib/awx/beat.db --pidfile /tmp/celery_pid +[program:dispatcher] +command = awx-manage run_dispatcher autostart = true autorestart = true redirect_stderr=true @@ -51,14 +51,6 @@ redirect_stderr=true stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 -[program:flower] -command = make flower -autostart = true -autorestart = true -redirect_stderr=true -stdout_logfile=/dev/fd/1 -stdout_logfile_maxbytes=0 - [program:jupyter] command = make jupyter autostart = true @@ -68,7 +60,7 @@ stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 [group:awx-processes] -programs=celeryd,receiver,runworker,uwsgi,daphne,nginx,flower +programs=dispatcher,receiver,runworker,uwsgi,daphne,nginx priority=5 [unix_http_server]