Skip to content

Commit

Permalink
refactor: merge celery app code
Browse files Browse the repository at this point in the history
  • Loading branch information
dlbrittain committed Apr 23, 2021
1 parent eac1895 commit 2befcd5
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 107 deletions.
44 changes: 0 additions & 44 deletions materializationengine/celery_app.py

This file was deleted.

24 changes: 0 additions & 24 deletions materializationengine/celery_status.py

This file was deleted.

120 changes: 91 additions & 29 deletions materializationengine/celery_worker.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,110 @@
from materializationengine.app import create_app
from materializationengine.celery_app import create_celery
from materializationengine.celery_init import celery
from celery.schedules import crontab
from materializationengine.workflows.periodic_database_removal import remove_expired_databases
from materializationengine.workflows.periodic_materialization import run_periodic_materialization
from materializationengine.schemas import CeleryBeatSchema
from materializationengine.errors import TaskNotFound
import logging
import sys

from celery.app import builtins
from celery.schedules import crontab
from celery.signals import after_setup_logger
from celery.utils.log import get_task_logger

app = create_app()
celery = create_celery(app, celery)
from materializationengine.celery_init import celery
from materializationengine.errors import TaskNotFound
from materializationengine.schemas import CeleryBeatSchema
from materializationengine.workflows.periodic_database_removal import \
remove_expired_databases
from materializationengine.workflows.periodic_materialization import \
run_periodic_materialization

celery_logger = get_task_logger(__name__)


def create_celery(app=None):

celery.conf.broker_url = app.config["CELERY_BROKER_URL"]
celery.conf.result_backend = app.config["CELERY_RESULT_BACKEND"]
if app.config.get("USE_SENTINEL", False):
celery.conf.broker_transport_options = {
"master_name": app.config["MASTER_NAME"]
}
celery.conf.result_backend_transport_options = {
"master_name": app.config["MASTER_NAME"]
}

@celery.on_after_configure.connect
celery.conf.update(
{
"task_routes": ("materializationengine.task_router.TaskRouter"),
"task_serializer": "pickle",
"result_serializer": "pickle",
"accept_content": ["pickle"],
"optimization": "fair",
"worker_prefetch_multiplier": 1,
"result_expires": 86400, # results expire in broker after 1 day
}
)

celery.conf.update(app.config)
TaskBase = celery.Task

class ContextTask(TaskBase):
abstract = True

def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)

celery.Task = ContextTask
return celery


@after_setup_logger.connect
def celery_loggers(logger, *args, **kwargs):
"""
Display the Celery banner appears in the log output.
https://www.distributedpython.com/2018/10/01/celery-docker-startup/
"""
logger.info(f"Customize Celery logger, default handler: {logger.handlers[0]}")
logger.addHandler(logging.StreamHandler(sys.stdout))


@celery.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):

periodic_tasks = {
'run_daily_periodic_materialization': run_periodic_materialization.s(days_to_expire=2),
'run_weekly_periodic_materialization': run_periodic_materialization.s(days_to_expire=7),
'run_lts_periodic_materialization': run_periodic_materialization.s(days_to_expire=30),
'remove_expired_databases': remove_expired_databases.s(delete_threshold=5),
"run_daily_periodic_materialization": run_periodic_materialization.s(
days_to_expire=2
),
"run_weekly_periodic_materialization": run_periodic_materialization.s(
days_to_expire=7
),
"run_lts_periodic_materialization": run_periodic_materialization.s(
days_to_expire=30
),
"remove_expired_databases": remove_expired_databases.s(delete_threshold=5),
}

# remove expired task results in redis broker
sender.add_periodic_task(crontab(hour=0, minute=0, day_of_week='*', day_of_month='*', month_of_year='*'),
builtins.add_backend_cleanup_task(celery), name="Clean up back end results")
sender.add_periodic_task(
crontab(hour=0, minute=0, day_of_week="*", day_of_month="*", month_of_year="*"),
builtins.add_backend_cleanup_task(celery),
name="Clean up back end results",
)

beat_schedules = app.config['BEAT_SCHEDULES']
beat_schedules = celery.conf["BEAT_SCHEDULES"]
celery_logger.info(beat_schedules)
schedules = CeleryBeatSchema(many=True).dump(beat_schedules)
for schedule in schedules:

if schedule['task'] in periodic_tasks:
task = periodic_tasks[schedule['task']]
sender.add_periodic_task(crontab(minute=schedule['minute'],
hour=schedule['hour'],
day_of_week=schedule['day_of_week'],
day_of_month=schedule['day_of_month'],
month_of_year=schedule['month_of_year']),
task,
name=schedule['name'])
else:
raise TaskNotFound(schedule['task'], periodic_tasks)
if schedule["task"] not in periodic_tasks:
raise TaskNotFound(schedule["task"], periodic_tasks)

task = periodic_tasks[schedule["task"]]
sender.add_periodic_task(
crontab(
minute=schedule["minute"],
hour=schedule["hour"],
day_of_week=schedule["day_of_week"],
day_of_month=schedule["day_of_month"],
month_of_year=schedule["month_of_year"],
),
task,
name=schedule["name"],
)
3 changes: 1 addition & 2 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
from werkzeug.serving import WSGIRequestHandler

from materializationengine.app import create_app
from materializationengine.celery_init import celery
from materializationengine.celery_worker import create_celery

HOME = os.path.expanduser("~")

application = create_app()
celery = create_celery(application, celery)
celery = create_celery(application)

if __name__ == "__main__":

Expand Down
14 changes: 6 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import datetime
import json
import logging
import pathlib
import time
import uuid
import sys
import datetime

import docker
import psycopg2
import pytest
from dynamicannotationdb.annotation_client import DynamicAnnotationClient
from dynamicannotationdb.materialization_client import \
DynamicMaterializationClient
from dynamicannotationdb.annotation_client import DynamicAnnotationClient
from materializationengine.app import create_app
from materializationengine.celery_app import create_celery
from materializationengine.celery_init import celery as celery_instance
from materializationengine.celery_worker import create_celery
from materializationengine.models import Base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
Expand Down Expand Up @@ -81,7 +79,7 @@ def test_app():
@pytest.fixture(scope='session')
def test_celery_app(test_app):
test_logger.info(f"Starting test celery worker...")
celery = create_celery(test_app, celery_instance)
celery = create_celery(test_app)
yield celery

# Setup docker image if '--docker=True' in pytest args
Expand All @@ -91,7 +89,7 @@ def setup_docker_image(docker_mode, mat_metadata):
postgis_docker_image = mat_metadata['postgis_docker_image']
aligned_volume = mat_metadata['aligned_volume']

db_enviroment = [
db_environment = [
f"POSTGRES_USER=postgres",
f"POSTGRES_PASSWORD=postgres",
f"POSTGRES_DB={aligned_volume}"
Expand All @@ -108,7 +106,7 @@ def setup_docker_image(docker_mode, mat_metadata):
hostname='test_postgres',
auto_remove=True,
name=container_name,
environment=db_enviroment,
environment=db_environment,
ports={"5432/tcp": 5432},
)
test_logger.info('STARTING POSTGIS DOCKER IMAGE')
Expand Down

0 comments on commit 2befcd5

Please sign in to comment.