From bf30a01a49a580a185cc99ac9ba41f9a05a5672f Mon Sep 17 00:00:00 2001 From: wongwill86 Date: Sat, 24 Feb 2018 14:44:04 -0500 Subject: [PATCH] Overhaul Docker Image (#16) - Overhauled build process - New in-repo docker base image. - Allows custom airflow configuration - currently 1.9 + custom performance patch - remove unnecessary custom celery loader - Update fernet key - Fix some sp - Update README - Upgrade to python 3 - Reorganize repo to reflect what's in the docker image - Remove chunkflow dag --- .dockerignore | 16 + README.md | 308 ++++++++++-------- airflow.cfg | 1 + cloud | 2 +- {docker/config => config}/airflow.cfg | 90 ++++- config/celeryconfig.py | 19 ++ dags/chunkflow/noop.py | 42 --- dags/chunkflow/tasks/noop_tasks.txt | 11 - dags/examples/docker_runtime_nvidia.py | 2 +- dags/examples/docker_with_variables.py | 2 +- dags/examples/interleaved.py | 2 +- dags/examples/multi_trigger.py | 8 +- dags/manager/scaler.py | 8 +- .../docker-compose-CeleryExecutor.yml | 91 +++--- docker/Dockerfile | 116 +++---- docker/Dockerfile.test | 9 - docker/base/Dockerfile.base-alpine | 85 +++++ docker/base/Dockerfile.base-slim | 75 +++++ docker/base/Dockerfile.base.test | 5 + docker/base/docker-compose.test.yml | 13 + docker/base/hooks/post_push | 21 ++ docker/config/celery_loader.py | 11 - docker/config/celeryconfig.py | 16 - docker/docker-compose-LocalExecutor.yml | 28 -- docker/docker-compose.test.yml | 10 +- docker/hooks/build | 30 ++ docker/hooks/post_checkout | 11 + docker/hooks/post_push | 21 ++ docker/hooks/pre_test | 12 + docker/scripts/entrypoint-dood.sh | 14 - docker/scripts/entrypoint-test.sh | 9 - plugins/chunkflow/__init__.py | 0 plugins/chunkflow/chunkflow.py | 96 ------ plugins/custom/custom.py | 20 +- plugins/custom/docker_custom.py | 83 ++--- pylama.ini | 10 + .../scripts => scripts}/add-user-docker.sh | 4 +- scripts/entrypoint-dood.sh | 13 + .../secrets_to_airflow_variables.py | 0 tests/plugins/chunkflow/__init__.py | 0 tests/plugins/chunkflow/test_chunkflow.py | 125 ------- .../chunkflow/test_chunkflow/empty.txt | 0 .../plugins/chunkflow/test_chunkflow/many.txt | 11 - .../chunkflow/test_chunkflow/no_tasks.txt | 0 .../chunkflow/test_chunkflow/single.txt | 4 - tests/plugins/custom/test_docker_custom.py | 10 +- .../plugins/custom/test_multi_trigger_dag.py | 33 +- 47 files changed, 779 insertions(+), 718 deletions(-) create mode 100644 .dockerignore create mode 120000 airflow.cfg rename {docker/config => config}/airflow.cfg (81%) create mode 100644 config/celeryconfig.py delete mode 100644 dags/chunkflow/noop.py delete mode 100644 dags/chunkflow/tasks/noop_tasks.txt rename {docker => deploy}/docker-compose-CeleryExecutor.yml (84%) delete mode 100644 docker/Dockerfile.test create mode 100644 docker/base/Dockerfile.base-alpine create mode 100644 docker/base/Dockerfile.base-slim create mode 100644 docker/base/Dockerfile.base.test create mode 100644 docker/base/docker-compose.test.yml create mode 100644 docker/base/hooks/post_push delete mode 100644 docker/config/celery_loader.py delete mode 100644 docker/config/celeryconfig.py delete mode 100644 docker/docker-compose-LocalExecutor.yml create mode 100644 docker/hooks/build create mode 100644 docker/hooks/post_checkout create mode 100644 docker/hooks/post_push create mode 100644 docker/hooks/pre_test delete mode 100755 docker/scripts/entrypoint-dood.sh delete mode 100755 docker/scripts/entrypoint-test.sh delete mode 100644 plugins/chunkflow/__init__.py delete mode 100644 plugins/chunkflow/chunkflow.py create mode 100644 pylama.ini rename {docker/scripts => scripts}/add-user-docker.sh (91%) create mode 100755 scripts/entrypoint-dood.sh rename {docker/scripts => scripts}/secrets_to_airflow_variables.py (100%) delete mode 100644 tests/plugins/chunkflow/__init__.py delete mode 100644 tests/plugins/chunkflow/test_chunkflow.py delete mode 100644 tests/plugins/chunkflow/test_chunkflow/empty.txt delete mode 100644 tests/plugins/chunkflow/test_chunkflow/many.txt delete mode 100644 tests/plugins/chunkflow/test_chunkflow/no_tasks.txt delete mode 100644 tests/plugins/chunkflow/test_chunkflow/single.txt diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..c154c273 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,16 @@ +# prevent from adding these files/folders into the docker image +.cache +.git +.ropeproject +cloud +secrets +tests +docker +.env +.* +# we need these 3 git folders to be included for version tagging +!.git/refs/heads/* +.git/refs/heads/*/* +!.git/refs/tags/* +.git/refs/tags/*/* +!.git/HEAD diff --git a/README.md b/README.md index caf0f1bb..1cb3a3d6 100644 --- a/README.md +++ b/README.md @@ -8,26 +8,155 @@ A curated set of tools for managing distributed task workflows. * [Docker Infrakit](https://github.com/docker/infrakit): Infrastructure Orchestration to deploy on the cloud -Note: This project builds off of the docker image provided by https://github.com/puckel/docker-airflow and infrakit examples https://github.com/infrakit/examples +Note: This project was inspired by https://github.com/puckel/docker-airflow and infrakit examples https://github.com/infrakit/examples ## Table of Contents -- [Core Concepts](#core-concepts) -- [Architectural Concepts](#architectural-concepts) -- [Setup](#setup) - [Where to Start](#where-to-start) -- [Hot to Test](#how-to-test) -- [How to Deploy](#how-to-deploy) -- [Debug Tools](#debug-tools) + - [Setup](#setup) + - [How to Run](#how-to-run) + - [How to Develop Custom Dags](#how-to-develop-custom-dags) + - [How to Package for Deployment](#how-to-package-for-deployment) + - [How to Deploy](#how-to-deploy) + - [Debug Tools](#debug-tools) + - [How to Test](#how-to-test) +- [Concepts](#concepts) + - [Basics](#basics) + - [Architectural](#architectural-concepts) - [Notes](#notes) - [Nvidia GPU Docker Support](#nvidia-gpu-docker-support) - [Mounting Secrets](#mounting-secrets) - [Multiple Instance Types](#multiple-instance-types) - [Developing Plugins](#developing-plugins) - [AWS ECR Access](#aws-ecr-access) + - [Base Images](#base-images) +- [Docker Cloud](#docker-cloud) -## Core Concepts: + +## Where to Start +### Setup +1. Install docker + * **NO** Nvidia GPU support needed + ``` + wget -qO- https://get.docker.com/ | sh + ``` + * **YES** Nvidia GPU support needed + 1. [Install Docker CE](https://docs.docker.com/engine/installation/linux/docker-ce/ubuntu/#install-using-the-repository) + 2. Install [nvidia-docker2](https://github.com/NVIDIA/nvidia-docker#xenial-x86_64) + +2. Install docker compose + ``` + pip install docker-compose + ``` + +### How to Run +1. [Install requirements](#setup) +2. Clone this repo +3. [Deploy Local](#local) +4. Go to [localhost](http://localhost) +5. Activate dag and trigger run + +### How to Develop Custom Dags +1. Uncomment **every** dag and plugin folder mounts in deploy/docker-compose-CeleryExecutor.yml + ``` + #- ../dags/:/usr/local/airflow/dags + #- ../plugins:/usr/local/airflow/plugins + ``` +2. Create or modify DAG inside [dags folder](https://github.com/wongwill86/air-tasks/tree/master/dags) +3. Check [webserver](http://localhost) to see if DAG is now updated + +See other [examples](https://github.com/wongwill86/air-tasks/tree/master/dags/examples) for inspiration + +### How to Package for Deployment +1. Build docker image + ``` + docker build -f docker/Dockerfile -t wongwill86/air-tasks: . + ``` +2. Before committing/pushing, comment **every** dag and plugin folder mounts in deploy/docker-compose-CeleryExecutor.yml + ``` + #- ../dags/:/usr/local/airflow/dags + #- ../plugins:/usr/local/airflow/plugins + ``` +3. Replace **every** air-tasks tag with your tag in deploy/docker-compose-CeleryExecutor.yml + ``` + : + image: wongwill86/air-tasks: + ``` +4. Try to [Deploy](#local) +5. Push to docker and/or wait for docker cloud to build + +### How to Deploy +#### Local +``` +docker-compose -f deploy/docker-compose-CeleryExecutor.yml up -d +``` + +#### Swarm +``` +echo '' | docker secret create basic_auth_username - +echo '' | docker secret create basic_auth_password - +echo '' | docker secret create ssl_certificate - +echo '' | docker secret create ssl_certificate_key - +docker stack deploy -c deploy/docker-compose-CeleryExecutor.yml +``` + +#### AWS +1. Initialize submodule + ``` + git submodule init + git submodule update --recursive --remote + ``` +2. Use [Cloudformation](https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/new) to create a new stack. +3. Use this [cloud/latest/swarm/aws/vpc.cfn](https://raw.githubusercontent.com/wongwill86/examples/air-tasks/latest/swarm/aws/vpc.cfn) + +#### GCloud +1. Initialize submodule + ``` + git submodule init + git submodule update --recursive --remote + ``` +2. Install [gcloud](https://cloud.google.com/sdk/downloads) +3. *(Optional)* configure yaml [cloud/latest/swarm/google/cloud-deployment.yaml](https://github.com/wongwill86/examples/blob/air-tasks/latest/swarm/google/cloud-deployment.yaml) +4. Deploy using gcloud + ``` + gcloud deployment-manager deployments create --config cloud/latest/swarm/google/cloud-deployment.yaml + ``` + +## Debug Tools +[AirFlow](http://localhost) or (`/`) - Airflow Webserver + +[Celery Flower](http://localhost/flower) or (`/flower`)- Monitor Workers + +[Swarm Visualizer](http://localhost/visualizer) or (`/visualizer`)- Visualize Stack Deployment + +[RabbitMQ](http://localhost/rabbitmq) or (`/rabbitmq`)- RabbitMQ Management Plugin (Queue Info) -### Tasks +Note: if running with ssl, use https: instead of http + +### How to Test +1. [Install requirements](#setup) +2. Clone this repo +3. Build the test image + ``` + export PYTHONDONTWRITEBYTECODE=1 + export IMAGE_NAME=wongwill86/air-tasks: + docker-compose -f docker/docker-compose.test.yml -p ci build + ``` +4. Run test container + ``` + docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut + ``` +5. *(Optional)* Watch / Test. + ``` + docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut ptw -- --pylama + ``` + *Warning 1: if nothing runs, make sure all tests pass first* + + *Warning 2: you may need to restart if you rename/move files, especially possible if these are plugin modules* + +## Concepts: + +### Basics +#### Tasks Tasks are defined as independent and stateless units of work. A task is described by creating an [Airflow Operator](https://airflow.apache.org/concepts.html#operators) Airflow provides many operators function such as calling bash scripts, python scripts, and even calling Docker images. @@ -35,7 +164,7 @@ There is no guarantee that related tasks will be run on the same machine/environ When a task is being scheduled to run, a `task_instance` is created. -### DAG +#### DAG A Directed Acyclic Graph (DAG) is a static set of repeatable tasks operators that are invoked automatically or manually. DAGs are described in [Airflow DAGS]( https://airflow.apache.org/concepts.html#dags ). The nodes of this graph are the task operators and the edges describe the dependencies between them. Edges are created by setting `operator.set_upstream` or `operator.set_downstream` to and from each task operator. It should be assumed that the size of a dag is immutable ( actually its not but it gets really messy if you modify it ). DAGS themselves can also be invoked using parameters. @@ -52,15 +181,15 @@ $ airflow trigger_dag dag_id --conf '{"param_name":"param_value" }' See more examples from https://github.com/wongwill86/air-tasks/tree/master/dags/examples -#### Useful (maybe?) Patterns -##### Standard +##### Useful (maybe?) Patterns +###### Standard Create a one shot dag that is only run when manually triggered: See https://github.com/wongwill86/air-tasks/blob/master/dags/examples/interleaved.py This should be the most common use case. Should fit most needs. -##### Unbounded +###### Unbounded Two separate DAGS are created: 1. Listener DAG: Listens for command to be triggered with parameters 2. Trigger DAG: Dynamically create a list of parameters to trigger the Listener DAG @@ -69,19 +198,19 @@ See https://github.com/wongwill86/air-tasks/blob/master/dags/examples/multi_trig This should be avoided if possible since there is no good way to set fan-in dependencies for the listener DAG (possible but probably very hacky) -### Variables +#### Variables These are global variables that all task operators can have access to. See [Variables]( https://airflow.apache.org/concepts.html#variables ) -### Compose File +#### Compose File This file is a schedule of services necessary to start Air-tasks -See https://github.com/wongwill86/air-tasks/blob/master/docker/docker-compose-CeleryExecutor.yml +See https://github.com/wongwill86/air-tasks/blob/master/deploy/docker-compose-CeleryExecutor.yml This is a description of all the services for [docker-compose]( https://docs.docker.com/compose/compose-file/ ). -#### Core Components +##### Core Components * **Postgres:** Database for saving DAGs, DAG runs, Tasks, Task Instances, etc... * **RabbitMQ:** Internal queue service used to schedule tasks instances. Task instances are *only* scheduled when they are ready to run * **Webserver:** Parses DAG python files and inserts them into the database @@ -124,115 +253,6 @@ A special worker service ("worker-manager") is created in the [compose file](#co See https://github.com/wongwill86/air-tasks/blob/master/dags/manager/scaler.py for more information -## Setup -1. Install docker - * **NO** Nvidia GPU support needed - ``` - wget -qO- https://get.docker.com/ | sh - ``` - * **YES** Nvidia GPU support needed - 1. [Install Docker CE](https://docs.docker.com/engine/installation/linux/docker-ce/ubuntu/#install-using-the-repository) - 2. Install [nvidia-docker2](https://github.com/NVIDIA/nvidia-docker#xenial-x86_64) - -2. Install docker compose - ``` - pip install docker-compose - ``` -3. *(Optional: Only for development)* Build docker image - ``` - docker build -f docker/Dockerfile -t wongwill86/air-tasks: . - ``` - -## Where to Start -1. [Install requirements](#setup) -2. Clone this repo -3. Uncomment **every** dag and plugin folder mounts in docker/docker-compose-CeleryExecutor.yml - ``` - #- ../dags/:/usr/local/airflow/dags - #- ../plugins:/usr/local/airflow/plugins - ``` -3. *(Optional: Only for development)* Replace **every** air-tasks tag with your tag in docker/docker-compose-CeleryExecutor.yml - ``` - : - image: wongwill86/air-tasks: - ``` -4. *(Optional: Only for development)* Create your DAG inside [dags folder](https://github.com/wongwill86/air-tasks/tree/master/dags) -5. *(Optional: Only for development)* Start [Tests](#how-to-test) -6. [Deploy Local](#local) -7. Go to [localhost](http://localhost) -8. Activate dag and trigger run - -See other [examples](https://github.com/wongwill86/air-tasks/tree/master/dags/examples) for inspiration - -## How to Test -1. [Install requirements](#setup) -2. Clone this repo -2. Build the test image - ``` - export PYTHONDONTWRITEBYTECODE=1 - export IMAGE_NAME=wongwill86/air-tasks: - docker-compose -f docker/docker-compose.test.yml -p ci build - ``` -3. Run test container - ``` - docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut - ``` -4. *(Optional)* Watch / Test. - ``` - docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut ptw -- --pylama - ``` - *Warning 1: if nothing runs, make sure all tests pass first* - - *Warning 2: you may need to restart if you rename/move files, especially possible if these are plugin modules* - -## How to Deploy -### Local -``` -docker-compose -f docker/docker-compose-CeleryExecutor.yml up -d -``` - -### Swarm -``` -echo '' | docker secret create basic_auth_username - -echo '' | docker secret create basic_auth_password - -echo '' | docker secret create ssl_certificate - -echo '' | docker secret create ssl_certificate_key - -docker stack deploy -c docker/docker-compose-CeleryExecutor.yml -``` - -### AWS -1. *(Optional)* Initialize submodule - ``` - git submodule init - git submodule update --recursive --remote - ``` -2. Use [Cloudformation](https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/new) to create a new stack. -3. Use this [cloud/latest/swarm/aws/vpc.cfn](https://raw.githubusercontent.com/wongwill86/examples/air-tasks/latest/swarm/aws/vpc.cfn) - -### GCloud -1. *(Optional)* Initialize submodule - ``` - git submodule init - git submodule update --recursive --remote - ``` -2. Install [gcloud](https://cloud.google.com/sdk/downloads) -3. *(Optional)* configure yaml [cloud/latest/swarm/google/cloud-deployment.yaml](https://github.com/wongwill86/examples/blob/air-tasks/latest/swarm/google/cloud-deployment.yaml) -4. Deploy using gcloud - ``` - gcloud deployment-manager deployments create --config cloud/latest/swarm/google/cloud-deployment.yaml - ``` - -## Debug Tools -[AirFlow](http://localhost) or (`/`) - Airflow Webserver - -[Celery Flower](http://localhost/flower) or (`/flower`)- Monitor Workers - -[Swarm Visualizer](http://localhost/visualizer) or (`/visualizer`)- Visualize Stack Deployment - -[RabbitMQ](http://localhost/rabbitmq) or (`/rabbitmq`)- RabbitMQ Management Plugin (Queue Info) - -Note: if running with ssl, use https: instead of http - ## Notes ### Nvidia GPU Docker Support #### Setup Notes @@ -295,7 +315,7 @@ If you need to run tasks on different machine instance types, this can be achiev queue='other-instance-type', dag=dag) ``` -2. In the *[docker compose file](https://github.com/wongwill86/air-tasks/blob/gpu/docker/docker-compose-CeleryExecutor.yml)*, create a new service copied and pasted from `worker-worker`. This will create workers that will listen to this new queue topic (`other-instance-type`) and only deploy on machines with the docker engine label: `[ engine.labels.infrakit-role == other-instance-type ]`. +2. In the *[docker compose file](https://github.com/wongwill86/air-tasks/blob/master/deploy/docker-compose-CeleryExecutor.yml)*, create a new service copied and pasted from `worker-worker`. This will create workers that will listen to this new queue topic (`other-instance-type`) and only deploy on machines with the docker engine label: `[ engine.labels.infrakit-role == other-instance-type ]`. ``` worker-other-instance-type: @@ -327,3 +347,37 @@ To access a private AWS container registry, remember to set aws environment vari Docker login to AWS ECR will automatically be set up. +### Base Images +The main [Dockerfile](https://github.com/wongwill86/air-tasks/blob/master/docker/Dockerfile) is built on top of one of two base images: +* [Alpine](https://github.com/wongwill86/air-tasks/blob/master/docker/base/Dockerfile.base-alpine) (default) +* [Slim](https://github.com/wongwill86/air-tasks/blob/master/docker/base/Dockerfile.base-slim) + +Additionally, this base image is used to build the test image which includes python test libraries. This is useful for testing derived images so that the test libraries do not need to be reinstalled. See [docker-compose.test.yml](https://github.com/wongwill86/air-tasks/blob/master/docker/docker-compose.test.yml) for more details. These base images should automatically be built in docker cloud. + +If for any reason you require building new base images: +1. Build the base image + ``` + docker build -f docker/base/Dockerfile.base-alpine -t wongwill86/air-tasks: . + ``` +2. Prepare base image to build test base image + ``` + export IMAGE_NAME=wongwill86/air-tasks: + ``` +3. Build test base image + ``` + docker-compose -f docker/docker-compose.test.yml -p ci_base build + ``` +4. Retag built test base image for testing + ``` + docker tag ci_base_sut wongwill86/air-tasks:-test + ``` + +## Docker Cloud + +| Source Type | Source | Docker Tag | Dockerfile location | Build Context +| - | - | - | - | - | +| Branch | master | latest | Dockerfile | / | +| Branch | /^(.{1,5}|.{7}|([^m]|m[^a]|ma[^s]|mas[^t]|mast[^e]|maste[^r])(.*))$/ | {source-ref} | docker/Dockerfile | / | +| Tag | /^base-([0-9.a-zA-Z-]+)$/ | base-alpine-{\1} | docker/base/Dockerfile.base-alpine | / | +| Tag | /^base-([0-9.a-zA-Z-]+)$/ | base-slim-{\1} | docker/base/Dockerfile.base-slim | / | +| Tag | /^v([0-9.a-zA-Z-]+)$/ | release-v{\1} | docker/Dockerfile | / | diff --git a/airflow.cfg b/airflow.cfg new file mode 120000 index 00000000..c7e1d73f --- /dev/null +++ b/airflow.cfg @@ -0,0 +1 @@ +./config/airflow.cfg \ No newline at end of file diff --git a/cloud b/cloud index 32bd89a6..45bf0bf3 160000 --- a/cloud +++ b/cloud @@ -1 +1 @@ -Subproject commit 32bd89a6b4c53039b56ca381255e59bbc626406b +Subproject commit 45bf0bf392feffe504731f73dc6d4b0350bd66cc diff --git a/docker/config/airflow.cfg b/config/airflow.cfg similarity index 81% rename from docker/config/airflow.cfg rename to config/airflow.cfg index 01fc2c0b..4efc68fc 100644 --- a/docker/config/airflow.cfg +++ b/config/airflow.cfg @@ -12,15 +12,27 @@ dags_folder = /usr/local/airflow/dags base_log_folder = /usr/local/airflow/logs # Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users -# must supply a remote location URL (starting with either 's3://...' or -# 'gs://...') and an Airflow connection id that provides access to the storage +# must supply an Airflow connection id that provides access to the storage # location. -remote_base_log_folder = remote_log_conn_id = -# Use server-side encryption for logs stored in S3 encrypt_s3_logs = False -# DEPRECATED option for remote log storage, use remote_base_log_folder instead! -s3_log_folder = + +# Logging level +logging_level = INFO + +# Logging class +# Specify the class that will specify the logging configuration +# This class has to be on the python classpath +# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG +logging_config_class = + +# Log format +log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s +simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s + +# Default timezone in case supplied date times are naive +# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam) +default_timezone = utc # The executor class that airflow should use. Choices include # SequentialExecutor, LocalExecutor, CeleryExecutor @@ -40,6 +52,10 @@ sql_alchemy_pool_size = 300 # not apply to sqlite. sql_alchemy_pool_recycle = 3600 +# How many seconds to retry re-establishing a DB connection after +# disconnects. Setting this to 0 disables retries. +sql_alchemy_reconnect_timeout = 300 + # The amount of parallelism as a setting to the executor. This defines # the max number of task instances that should run simultaneously # on this airflow installation @@ -85,10 +101,26 @@ default_impersonation = # What security module to use (for example kerberos): security = +# If set to False enables some unsecure features like Charts and Ad Hoc Queries. +# In 2.0 will default to True. +secure_mode = False + # Turn unit test mode on (overwrites many configuration options with test # values at runtime) unit_test_mode = False +# Name of handler to read task instance logs. +# Default to use file task handler. +task_log_reader = file.task + +# Whether to enable pickling for xcom (note that this is insecure and allows for +# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False). +enable_xcom_pickling = True + +# When a task is killed forcefully, this is the amount of time in seconds that +# it has to cleanup after it is sent a SIGTERM, before it is SIGKILLED +killed_task_cleanup_time = 60 + [cli] # In what way should the cli access the API. The LocalClient will use the # database directly, while the json_client will use the api running on the @@ -109,6 +141,7 @@ default_ram = 512 default_disk = 512 default_gpus = 0 + [webserver] # The base url of your website as airflow cannot guess what domain or # cname you are using. This is used in automated emails that @@ -152,7 +185,7 @@ access_logfile = - error_logfile = - # Expose the configuration file in the web server -expose_config = True +expose_config = False # Set to true to turn on authentication: # http://pythonhosted.org/airflow/security.html#web-authentication @@ -168,6 +201,10 @@ filter_by_owner = False # in order to user the ldapgroup mode. owner_mode = user +# Default DAG view. Valid values are: +# tree, graph, duration, gantt, landing_times +dag_default_view = tree + # Default DAG orientation. Valid values are: # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top) dag_orientation = LR @@ -184,9 +221,13 @@ log_fetch_timeout_sec = 5 # DAGs by default hide_paused_dags_by_default = False +# Consistent page size across all listing views in the UI +page_size = 100 + [email] email_backend = airflow.utils.email.send_email_smtp + [smtp] # If you want airflow to send emails on retries, failure, and you want to use # the airflow.utils.email.send_email_smtp function, you have to configure an @@ -224,7 +265,7 @@ worker_log_server_port = 8793 # a sqlalchemy database. Refer to the Celery documentation for more # information. #broker_url = redis://redis:6379/1 -broker_url = pyamqp://rabbitmq/ +broker_url = amqp://rabbitmq:5672 # Another key Celery setting celery_result_backend = db+postgresql://airflow:airflow@postgres/airflow @@ -239,6 +280,16 @@ flower_port = 5555 # Default queue that tasks get assigned to and that worker listen on. default_queue = worker +# Import path for celery configuration options +celery_config_options = celeryconfig.CELERY_CONFIG + +[dask] +# This section only applies if you are using the DaskExecutor in +# [core] section above + +# The IP address and port of the Dask cluster's scheduler. +cluster_address = 127.0.0.1:8786 + [scheduler] # Task instances listen for external kill signal (when you clear tasks # from the CLI or the UI), this defines the frequency at which they should @@ -276,6 +327,10 @@ scheduler_zombie_task_threshold = 300 # however it can be set on a per DAG basis in the # DAG definition (catchup) catchup_by_default = False +# This changes the batch size of queries in the scheduling main loop. +# This depends on query length limits and how long you are willing to hold locks. +# 0 for no limit +max_tis_per_query = 500 # Statsd (https://github.com/etsy/statsd) integration settings statsd_on = False @@ -286,10 +341,24 @@ statsd_prefix = airflow # The scheduler can run multiple threads in parallel to schedule dags. # This defines how many threads will run. However airflow will never # use more threads than the amount of cpu cores available. -max_threads = 2 +max_threads = 8 authenticate = False +[ldap] +# set this to ldaps://: +uri = +user_filter = objectClass=* +user_name_attr = uid +group_member_attr = memberOf +superuser_filter = +data_profiler_filter = +bind_user = cn=Manager,dc=example,dc=com +bind_password = insecure +basedn = dc=example,dc=com +cacert = /etc/ca/ldap_ca.crt +search_scope = LEVEL + [mesos] # Mesos master address which MesosExecutor will connect to. master = localhost:5050 @@ -327,6 +396,7 @@ authenticate = False # default_principal = admin # default_secret = admin + [kerberos] ccache = /tmp/airflow_krb5_ccache # gets augmented with fqdn @@ -335,9 +405,11 @@ reinit_frequency = 3600 kinit_path = kinit keytab = airflow.keytab + [github_enterprise] api_rev = v3 + [admin] # UI to hide sensitive variable fields when set to True hide_sensitive_variable_fields = True diff --git a/config/celeryconfig.py b/config/celeryconfig.py new file mode 100644 index 00000000..dcbc9916 --- /dev/null +++ b/config/celeryconfig.py @@ -0,0 +1,19 @@ +from airflow import configuration + +# Broker settings. +CELERY_CONFIG = { + 'accept_content': ['json', 'pickle'], + 'event_serializer': 'json', + 'result_serializer': 'pickle', + 'task_serializer': 'pickle', + 'worker_prefetch_multiplier': 1, + 'task_acks_late': True, + 'task_reject_on_worker_lost': True, + 'broker_url': configuration.get('celery', 'broker_url'), + 'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'), + 'worker_concurrency': + configuration.getint('celery', 'CELERYD_CONCURRENCY'), + 'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'), + 'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'), + 'worker_send_task_events': True +} diff --git a/dags/chunkflow/noop.py b/dags/chunkflow/noop.py deleted file mode 100644 index 3975e773..00000000 --- a/dags/chunkflow/noop.py +++ /dev/null @@ -1,42 +0,0 @@ -from airflow import DAG -from datetime import datetime, timedelta -from airflow.operators.dummy_operator import DummyOperator -from airflow.operators.chunkflow_plugin import chunkflow_subdag_from_file -# logging.basicConfig(level=logging.INFO) - -DAG_NAME = 'chunkflow_noop' - -TASKS_FILENAME = "./dags/chunkflow/tasks/noop_tasks.txt" - -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': datetime(2017, 5, 1), - 'cactchup_by_default': False, - 'retries': 1, - 'retry_delay': timedelta(seconds=2), - 'retry_exponential_backoff': True, -} - -dag = DAG( - dag_id=DAG_NAME, - default_args=default_args, - schedule_interval=None -) - -start = DummyOperator( - task_id='start', - default_args=default_args, - dag=dag -) - -chunkflow_subdag = chunkflow_subdag_from_file(TASKS_FILENAME, - task_id="noop_tasks", - image_version='raw_json_task', - default_args=default_args, - dag=dag) - -end = DummyOperator(task_id='end', default_args=default_args, dag=dag) - -start.set_downstream(chunkflow_subdag) -chunkflow_subdag.set_downstream(end) diff --git a/dags/chunkflow/tasks/noop_tasks.txt b/dags/chunkflow/tasks/noop_tasks.txt deleted file mode 100644 index 1cf80c49..00000000 --- a/dags/chunkflow/tasks/noop_tasks.txt +++ /dev/null @@ -1,11 +0,0 @@ -use default region: us-east-1 -argDict = Dict{Symbol,Any}(Pair{Symbol,Any}(:shutdown,false),Pair{Symbol,Any}(:workernumber,1),Pair{Symbol,Any}(:workerwaittime,1),Pair{Symbol,Any}(:stride,[1024,1024,128,0]),Pair{Symbol,Any}(:deviceid,nothing),Pair{Symbol,Any}(:queuename,""),Pair{Symbol,Any}(:origin,[10240,10240,1,1]),Pair{Symbol,Any}(:gridsize,[2,2,2,1]),Pair{Symbol,Any}(:task,"test/edges/readnop.json"),Pair{Symbol,Any}(:continuefrom,Int64[])) -PRINT TASK JSONS (no queue has been set) -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,1,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,10240,1,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,11264,1,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,11264,1,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,129,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,10240,129,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,11264,129,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,11264,129,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}} diff --git a/dags/examples/docker_runtime_nvidia.py b/dags/examples/docker_runtime_nvidia.py index 96b01464..69894e49 100644 --- a/dags/examples/docker_runtime_nvidia.py +++ b/dags/examples/docker_runtime_nvidia.py @@ -13,7 +13,7 @@ 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2017, 5, 1), - 'cactchup_by_default': False, + 'catchup_by_default': False, 'retries': 1, 'retry_delay': timedelta(seconds=2), 'retry_exponential_backoff': True, diff --git a/dags/examples/docker_with_variables.py b/dags/examples/docker_with_variables.py index 990c7499..cdf5418b 100644 --- a/dags/examples/docker_with_variables.py +++ b/dags/examples/docker_with_variables.py @@ -10,7 +10,7 @@ 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2017, 5, 1), - 'cactchup_by_default': False, + 'catchup_by_default': False, 'retries': 1, 'retry_delay': timedelta(seconds=2), 'retry_exponential_backoff': True, diff --git a/dags/examples/interleaved.py b/dags/examples/interleaved.py index b68da043..45bcf06a 100644 --- a/dags/examples/interleaved.py +++ b/dags/examples/interleaved.py @@ -7,7 +7,7 @@ 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2017, 5, 1), - 'cactchup_by_default': False, + 'catchup_by_default': False, 'retries': 1, 'retry_delay': timedelta(seconds=2), 'retry_exponential_backoff': True, diff --git a/dags/examples/multi_trigger.py b/dags/examples/multi_trigger.py index d0997938..cc1d65e4 100644 --- a/dags/examples/multi_trigger.py +++ b/dags/examples/multi_trigger.py @@ -10,7 +10,7 @@ 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2017, 5, 1), - 'cactchup_by_default': False, + 'catchup_by_default': False, 'retries': 1, 'retry_delay': timedelta(seconds=2), 'retry_exponential_backoff': True, @@ -24,8 +24,8 @@ ) -def param_generator(): - iterable = xrange(0, 100) +def get_generator(): + iterable = range(0, 100) for i in iterable: yield i @@ -33,7 +33,7 @@ def param_generator(): operator = MultiTriggerDagRunOperator( task_id='trigger_%s' % TARGET_DAG_ID, trigger_dag_id=TARGET_DAG_ID, - params_list=param_generator(), + params_list=get_generator, default_args=default_args, dag=scheduler_dag) diff --git a/dags/manager/scaler.py b/dags/manager/scaler.py index 379ba475..ae11912c 100644 --- a/dags/manager/scaler.py +++ b/dags/manager/scaler.py @@ -18,6 +18,8 @@ from airflow import models import requests import json +import logging +logger = logging.root.getChild(__name__) DAG_ID = 'z_manager_cluster_scaler' @@ -52,7 +54,7 @@ {% set queue_sizes = task_instance.xcom_pull(task_ids=params.task_id) %} {% set docker_compose_command='docker-compose -f ' + - conf.get('core', 'airflow_home') + '/docker/docker-compose-CeleryExecutor.yml' + + conf.get('core', 'airflow_home') + '/deploy/docker-compose-CeleryExecutor.yml' + ' up -d --no-recreate --no-deps --no-build --no-color' %} {% @@ -122,8 +124,8 @@ def get_queue_sizes(): stats = json.loads(response.text) size = stats['messages_ready'] + stats['messages_unacknowledged'] queue_sizes[queue_name] = size - except Exception as e: - print('No tasks found for %s because %s' % (queue_name, e.message)) + except Exception: + logger.exception('No tasks found for %s', queue_name) queue_sizes[queue_name] = 0 return queue_sizes diff --git a/docker/docker-compose-CeleryExecutor.yml b/deploy/docker-compose-CeleryExecutor.yml similarity index 84% rename from docker/docker-compose-CeleryExecutor.yml rename to deploy/docker-compose-CeleryExecutor.yml index c64c1d5d..b8441fc6 100644 --- a/docker/docker-compose-CeleryExecutor.yml +++ b/deploy/docker-compose-CeleryExecutor.yml @@ -21,33 +21,46 @@ services: constraints: [ engine.labels.infrakit-role == manager ] command: postgres -c max_connections=1000 -c shared_buffers=4096MB + # single shot initialize the database + init-db: + image: wongwill86/air-tasks:latest + restart: on-failure + depends_on: + - postgres + - rabbitmq + environment: + - FERNET_KEY=Z3jDcE-i0gRc7-A0ETMDUxd1MuL3Ye0tVcdDl5zmnec= + volumes: + # Warning mounting dags/plugins not working for autoscaler + #- ../dags/:/usr/local/airflow/dags + #- ../plugins:/usr/local/airflow/plugins + #- ../config/airflow.cfg:/usr/local/airflow/airflow.cfg + - /var/run/docker.sock:/var/run/docker.sock + #ports: + #- "8080:8080" + command: airflow initdb + deploy: + placement: + constraints: [ engine.labels.infrakit-role == manager ] + webserver: image: wongwill86/air-tasks:latest restart: always depends_on: - postgres - rabbitmq + - init-db environment: - - LOAD_EX=y - - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= - - EXECUTOR=Celery - - AIRFLOW_HOME=/usr/local/airflow - - AWS_ACCESS_KEY_ID - - AWS_SECRET_ACCESS_KEY - - AWS_DEFAULT_REGION - - REDIS_HOST=rabbitmq - - REDIS_PORT=5672 - # - POSTGRES_USER=airflow - # - POSTGRES_PASSWORD=airflow - # - POSTGRES_DB=airflow + - FERNET_KEY=Z3jDcE-i0gRc7-A0ETMDUxd1MuL3Ye0tVcdDl5zmnec= volumes: # Warning mounting dags/plugins not working for autoscaler #- ../dags/:/usr/local/airflow/dags #- ../plugins:/usr/local/airflow/plugins + #- ../config/airflow.cfg:/usr/local/airflow/airflow.cfg - /var/run/docker.sock:/var/run/docker.sock #ports: #- "8080:8080" - command: webserver + command: airflow webserver deploy: placement: constraints: [ engine.labels.infrakit-role == manager ] @@ -61,13 +74,9 @@ services: restart: always depends_on: - rabbitmq - environment: - - EXECUTOR=Celery - - REDIS_HOST=rabbitmq - - REDIS_PORT=5672 #ports: #- "5555:5555" - command: flower + command: airflow flower deploy: placement: constraints: [ engine.labels.infrakit-role == manager ] @@ -81,21 +90,11 @@ services: # Warning mounting dags/plugins not working for autoscaler #- ../dags/:/usr/local/airflow/dags #- ../plugins:/usr/local/airflow/plugins + #- ../config/airflow.cfg:/usr/local/airflow/airflow.cfg - /var/run/docker.sock:/var/run/docker.sock environment: - - LOAD_EX=y - - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= - - EXECUTOR=Celery - - AIRFLOW_HOME=/usr/local/airflow - - AWS_ACCESS_KEY_ID - - AWS_SECRET_ACCESS_KEY - - AWS_DEFAULT_REGION - - REDIS_HOST=rabbitmq - - REDIS_PORT=5672 - # - POSTGRES_USER=airflow - # - POSTGRES_PASSWORD=airflow - # - POSTGRES_DB=airflow - command: scheduler + - FERNET_KEY=Z3jDcE-i0gRc7-A0ETMDUxd1MuL3Ye0tVcdDl5zmnec= + command: airflow scheduler deploy: placement: constraints: [ engine.labels.infrakit-role == manager ] @@ -119,21 +118,15 @@ services: # Warning mounting dags/plugins not working for autoscaler #- ../dags/:/usr/local/airflow/dags #- ../plugins:/usr/local/airflow/plugins + #- ../config/airflow.cfg:/usr/local/airflow/airflow.cfg - /var/run/docker.sock:/var/run/docker.sock - /tmp:/tmp environment: - - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= - - EXECUTOR=Celery - - AIRFLOW_HOME=/usr/local/airflow + - FERNET_KEY=Z3jDcE-i0gRc7-A0ETMDUxd1MuL3Ye0tVcdDl5zmnec= - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY - AWS_DEFAULT_REGION - - REDIS_HOST=rabbitmq - - REDIS_PORT=5672 - # - POSTGRES_USER=airflow - # - POSTGRES_PASSWORD=airflow - # - POSTGRES_DB=airflow - command: worker -q worker + command: airflow worker -q worker deploy: mode: global placement: @@ -148,23 +141,17 @@ services: # Warning mounting dags/plugins not working for autoscaler #- ../dags/:/usr/local/airflow/dags #- ../plugins:/usr/local/airflow/plugins + #- ../config/airflow.cfg:/usr/local/airflow/airflow.cfg - /var/run/docker.sock:/var/run/docker.sock - /tmp:/tmp environment: - - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= - - EXECUTOR=Celery - - AIRFLOW_HOME=/usr/local/airflow + - FERNET_KEY=Z3jDcE-i0gRc7-A0ETMDUxd1MuL3Ye0tVcdDl5zmnec= - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY - AWS_DEFAULT_REGION - - REDIS_HOST=rabbitmq - - REDIS_PORT=5672 - # - POSTGRES_USER=airflow - # - POSTGRES_PASSWORD=airflow - # - POSTGRES_DB=airflow - INFRAKIT_IMAGE - INFRAKIT_GROUPS_URL - command: worker -q manager + command: airflow worker -q manager deploy: placement: constraints: [ engine.labels.infrakit-role == manager ] @@ -294,11 +281,7 @@ services: depends_on: - scheduler environment: - - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= - - EXECUTOR=Celery - - AIRFLOW_HOME=/usr/local/airflow - - REDIS_HOST=rabbitmq - - REDIS_PORT=5672 + - FERNET_KEY=Z3jDcE-i0gRc7-A0ETMDUxd1MuL3Ye0tVcdDl5zmnec= entrypoint: python /secrets_to_airflow_variables.py deploy: restart_policy: diff --git a/docker/Dockerfile b/docker/Dockerfile index 9d748842..89dcb741 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,90 +1,52 @@ -# VERSION 0.0.1 +# VERSION 0.0.4 # AUTHOR: Will Wong # DESCRIPTION: Docker airflow with ECR registry and DooD (Docker outside of Dkr) # BUILD: docker build --rm -t wongwill86/air-tasks . # SOURCE: https://github.com/wongwill86/air-tasks -# Compile AWS credential helper -FROM golang:1.8.3 as aws_ecr_credential_helper -WORKDIR /go/src/github.com/awslabs/ -RUN git clone https://github.com/awslabs/amazon-ecr-credential-helper.git -WORKDIR /go/src/github.com/awslabs/amazon-ecr-credential-helper -RUN make +ARG BASE_DIST=alpine +ARG BASE_TAG=0.0.4-customAF +ARG BASE_TEST= -FROM puckel/docker-airflow:1.8.1 +FROM wongwill86/air-tasks:base-${BASE_DIST}-${BASE_TAG}${BASE_TEST} MAINTAINER wongwill86 +ARG BASE_DIST +ARG BASE_TAG ARG AIRFLOW_HOME=/usr/local/airflow ENV AIRFLOW_HOME ${AIRFLOW_HOME} -ENV AIRFLOW_USER airflow +ARG AIRFLOW_USER=airflow +ENV AIRFLOW_USER ${AIRFLOW_USER} -USER root -RUN curl -fsSL https://get.docker.com/ | sh \ - && pip install docker docker-compose \ - && pip install celery -U \ - && apt-get install sudo \ - && buildDeps=' \ - python-dev \ - libkrb5-dev \ - libsasl2-dev \ - libssl-dev \ - libffi-dev \ - build-essential \ - libblas-dev \ - liblapack-dev \ - libpq-dev \ - git \ - ' && apt-get remove --purge -yqq $buildDeps \ - && apt-get clean \ - && rm -rf \ - /var/lib/apt/lists/* \ - /tmp/* \ - /var/tmp/* \ - /usr/share/man \ - /usr/share/doc \ - /usr/share/doc-base - -# SUPER HACK PLEASE REMOVE AFTER AIRFLOW UPDATES (i.e. https://github.com/apache/incubator-airflow/pull/2417) -RUN sed -i -e 's/import Client/import APIClient as Client/' /usr/local/lib/python2.7/dist-packages/airflow/operators/docker_operator.py - -RUN usermod -aG docker airflow - -# unfortunately this is required to update the container docker gid to match the -# host's gid, we remove this permission from entrypoint-dood.sh script -RUN echo "airflow ALL=NOPASSWD: ALL" >> /etc/sudoers - -# this is to enable aws ecr credentials helpers to reauthorize docker -RUN mkdir -p ${AIRFLOW_HOME}/.docker/ \ - && echo '{\n "credsStore": "ecr-login"\n}' > \ - ${AIRFLOW_HOME}/.docker/config.json -RUN chown airflow ${AIRFLOW_HOME}/.docker/config.json - -# copy the built docker credentials module to this container -COPY --from=aws_ecr_credential_helper \ - /go/src/github.com/awslabs/amazon-ecr-credential-helper/bin/local/docker-credential-ecr-login \ - /usr/local/bin - -COPY docker/docker-compose-CeleryExecutor.yml ${AIRFLOW_HOME}/docker/docker-compose-CeleryExecutor.yml -RUN chown airflow ${AIRFLOW_HOME}/docker/docker-compose-CeleryExecutor.yml - -COPY docker/scripts/ / - -COPY docker/config/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg -RUN chown airflow ${AIRFLOW_HOME}/airflow.cfg - -ENV CELERY_CONFIG_MODULE celeryconfig -COPY docker/config/celeryconfig.py ${AIRFLOW_HOME}/celeryconfig.py -RUN chown airflow ${AIRFLOW_HOME}/celeryconfig.py - -ENV CELERY_LOADER celery_loader:CeleryLoader -COPY docker/config/celery_loader.py ${AIRFLOW_HOME}/celery_loader.py -RUN chown airflow ${AIRFLOW_HOME}/celery_loader.py - -COPY plugins/ ${AIRFLOW_HOME}/plugins/ - -COPY dags/ ${AIRFLOW_HOME}/dags/ -RUN chown airflow ${AIRFLOW_HOME}/dags +WORKDIR ${AIRFLOW_HOME} -USER airflow +# Move base version information directly into airflow home directory +RUN mkdir -p ${AIRFLOW_HOME}/version +RUN cp -r /version ${AIRFLOW_HOME} + +# prepare airflow user +RUN groupadd -r ${AIRFLOW_USER} \ + && useradd -r -d ${AIRFLOW_HOME} -g ${AIRFLOW_USER} -s /bin/bash ${AIRFLOW_USER} \ + && chown -R ${AIRFLOW_USER}: ${AIRFLOW_HOME} \ + && usermod -aG docker ${AIRFLOW_USER} \ + # unfortunately this is required to update the container docker gid to match the + # host's gid, we remove this permission from entrypoint-dood.sh script + && echo "${AIRFLOW_USER} ALL=NOPASSWD: ALL" >> /etc/sudoers + +# Docker config (i.e. credentials helper from base image) +RUN cp /.docker/ .docker -r +# Copy repo contents into docker image ( see .dockerignore to see skipped folders ) +COPY . . + +# Get commit hash and tags these three files need to be excluded from .dockerignore: +# .git/refs/heads/ +# .git/refs/tags/ +# .git/HEAD +RUN cat .git/refs/$(cat .git/HEAD | sed -e's/ref: refs\///g') > version/COMMIT \ + && grep $(cat version/COMMIT) .git/refs/tags/* -l | xargs -n 1 -r basename > version/TAGS \ + && mv version/COMMIT version/air-tasks.COMMIT.${BASE_DIST}-${BASE_TAG} \ + && mv version/TAGS version/air-tasks.TAGS.${BASE_DIST}-${BASE_TAG} + +USER ${AIRFLOW_USER} WORKDIR ${AIRFLOW_HOME} -ENTRYPOINT ["/entrypoint-dood.sh"] +ENTRYPOINT ["scripts/entrypoint-dood.sh"] diff --git a/docker/Dockerfile.test b/docker/Dockerfile.test deleted file mode 100644 index 053bc6ac..00000000 --- a/docker/Dockerfile.test +++ /dev/null @@ -1,9 +0,0 @@ -ARG image_name -FROM $image_name -ARG IMAGE_NAME -USER root -COPY docker/scripts/entrypoint-test.sh /entrypoint-test.sh -RUN pip install pytest pytest-watch pytest-env pytest-cov-exclude pytest-xdist pylama mock -ENV PYTHONDONTWRITEBYTECODE 1 -USER airflow -ENTRYPOINT ["/entrypoint-test.sh"] diff --git a/docker/base/Dockerfile.base-alpine b/docker/base/Dockerfile.base-alpine new file mode 100644 index 00000000..dc52044b --- /dev/null +++ b/docker/base/Dockerfile.base-alpine @@ -0,0 +1,85 @@ +# VERSION 0.0.4-customAF +# AUTHOR: Will Wong +# DESCRIPTION: Alpine base image with dockerized airflow and ECR registry and DooD (Docker outside of Docker) +# BUILD: docker build --rm -t wongwill86/air-tasks:base-alpine -f /docker/base/Dockerfile.base-slim . +# SOURCE: https://github.com/wongwill86/air-tasks + +# Compile AWS credential helper +FROM golang:1.8.3 as aws_ecr_credential_helper +WORKDIR /go/src/github.com/awslabs/ +RUN git clone https://github.com/awslabs/amazon-ecr-credential-helper.git +WORKDIR /go/src/github.com/awslabs/amazon-ecr-credential-helper +RUN make + +FROM python:3.6-alpine3.7 +LABEL maintainer=wongwill86 + +ARG AIRFLOW_VERSION=1.9.0 +ARG DOCKER_VERSION=17.12.0-ce + +RUN set -ex \ + && apk add --no-cache bash wget shadow sudo \ + && apk add --no-cache --virtual .build-dependencies \ + build-base \ + python3-dev \ + postgresql-dev \ + libffi-dev \ + openblas-dev \ + libxslt-dev \ + libxml2-dev \ + linux-headers \ + git \ + && wget https://download.docker.com/linux/static/stable/x86_64/docker-${DOCKER_VERSION}.tgz \ + && tar -xvf docker-${DOCKER_VERSION}.tgz \ + && mv docker/* /usr/local/bin/ \ + && rm -rf docker docker-${DOCKER_VERSION}.tgz docker \ + # Temporarily use this performance branch of airflow instead of pip install apache-airflow[cyrpto,celery,postgres]==${AIRFLOW_VERSION} + && git clone https://github.com/wongwill86/incubator-airflow.git --depth 1 -b v1-9-stable-performance \ + # Deleting this symlink not handled correctly by shutil.copy + && rm -rf incubator-airflow/airflow/www/static/docs \ + && pip install incubator-airflow/[crypto,celery,postgres] \ + && rm -rf incubator-airflow \ + && pip install docker-compose docker \ + # this is really only needed for testing (pytest-cov-exclude), include here so we don't need gcc for test build + && pip install ujson \ + # SUPER HACK PLEASE REMOVE AFTER AIRFLOW UPDATES (i.e. https://github.com/apache/incubator-airflow/pull/2417) + && sed -i -e 's/import Client/import APIClient as Client/' /usr/local/lib/python3.6/site-packages/airflow/operators/docker_operator.py \ + && sed -i -e 's/import Client/import APIClient as Client/' /usr/local/lib/python3.6/site-packages/airflow/hooks/docker_hook.py \ + && find /usr/local \ + \( -type d -a -name test -o -name tests \) \ + -o \( -type f -a -name '*.pyc' -o -name '*.pyo' \) \ + -exec rm -rf '{}' + \ + && runDeps="$( \ + scanelf --needed --nobanner --recursive /usr/local \ + | awk '{ gsub(/,/, "\nso:", $2); print "so:" $2 }' \ + | sort -u \ + | xargs -r apk info --installed \ + | sort -u \ + )" \ + && apk add --virtual .run-dependencies $runDeps \ + && apk del .build-dependencies + +# Prep docker group +RUN delgroup ping +RUN addgroup -g 999 docker + +# copy the built docker credentials module to this container +COPY --from=aws_ecr_credential_helper \ + /go/src/github.com/awslabs/amazon-ecr-credential-helper/bin/local/docker-credential-ecr-login \ + /usr/local/bin + +# this is to enable aws ecr credentials helpers to reauthorize docker +RUN mkdir -p /.docker/ \ + && echo '{\n "credsStore": "ecr-login"\n}' > \ + /.docker/config.json + +# Get commit hash and tags these three files need to be excluded from .dockerignore: +# .git/refs/heads/ +# .git/refs/tags/ +# .git/HEAD +COPY .git .git +RUN mkdir version +RUN cat .git/refs/$(cat .git/HEAD | sed -e's/ref: refs\///g') > version/COMMIT \ + && grep $(cat version/COMMIT) .git/refs/tags/* -l | xargs -n 1 -r basename > version/TAGS \ + && mv version/COMMIT version/base.COMMIT.alpine \ + && mv version/TAGS version/base.TAGS.alpine diff --git a/docker/base/Dockerfile.base-slim b/docker/base/Dockerfile.base-slim new file mode 100644 index 00000000..242c01aa --- /dev/null +++ b/docker/base/Dockerfile.base-slim @@ -0,0 +1,75 @@ +# VERSION 0.0.4-customAF +# AUTHOR: Will Wong +# DESCRIPTION: Slim base image with dockerized airflow and ECR registry and DooD (Docker outside of Docker) +# BUILD: docker build --rm -t wongwill86/air-tasks:base-slim -f /docker/base/Dockerfile.base-slim . +# SOURCE: https://github.com/wongwill86/air-tasks + +# Compile AWS credential helper +FROM golang:1.8.3 as aws_ecr_credential_helper +WORKDIR /go/src/github.com/awslabs/ +RUN git clone https://github.com/awslabs/amazon-ecr-credential-helper.git +WORKDIR /go/src/github.com/awslabs/amazon-ecr-credential-helper +RUN make + +FROM python:3.6-slim +LABEL maintainer=wongwill86 + +ARG AIRFLOW_VERSION=1.9.0 + +RUN apt-get update \ + && buildDeps=' \ + libffi-dev \ + build-essential \ + libblas-dev \ + liblapack-dev \ + libpq-dev \ + software-properties-common \ + curl \ + ' \ + && apt-get install $buildDeps git vim sudo apt-transport-https -y \ + && curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - \ + && apt-key fingerprint 0EBFCD88 \ + && add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/debian jessie stable" \ + && apt-get update \ + && apt-get install docker-ce -y \ + # Temporarily use this performance branch of airflow instead of pip install apache-airflow[cyrpto,celery,postgres]==${AIRFLOW_VERSION} + && git clone https://github.com/wongwill86/incubator-airflow.git --depth 1 -b v1-9-stable-performance \ + && pip install incubator-airflow/[crypto,celery,postgres] \ + && rm -rf incubator-airflow \ + && pip install docker-compose docker \ + # this is really only needed for testing (pytest-cov-exclude), include here so we don't need gcc for test build + && pip install ujson \ + # SUPER HACK PLEASE REMOVE AFTER AIRFLOW UPDATES (i.e. https://github.com/apache/incubator-airflow/pull/2417) + && sed -i -e 's/import Client/import APIClient as Client/' /usr/local/lib/python3.6/site-packages/airflow/operators/docker_operator.py \ + && sed -i -e 's/import Client/import APIClient as Client/' /usr/local/lib/python3.6/site-packages/airflow/hooks/docker_hook.py \ + && apt-get remove --purge -yqq $buildDeps \ + && apt-get clean -y \ + && apt-get autoremove -y \ + && rm -rf \ + /var/lib/apt/lists/* \ + /tmp/* \ + /var/tmp/* \ + /usr/share/man \ + /usr/share/doc \ + /usr/share/doc-base + +# copy the built docker credentials module to this container +COPY --from=aws_ecr_credential_helper \ + /go/src/github.com/awslabs/amazon-ecr-credential-helper/bin/local/docker-credential-ecr-login \ + /usr/local/bin + +# this is to enable aws ecr credentials helpers to reauthorize docker +RUN mkdir -p /.docker/ \ + && echo '{\n "credsStore": "ecr-login"\n}' > \ + /.docker/config.json + +# Get commit hash and tags these three files need to be excluded from .dockerignore: +# .git/refs/heads/ +# .git/refs/tags/ +# .git/HEAD +COPY .git .git +RUN mkdir version +RUN cat .git/refs/$(cat .git/HEAD | sed -e's/ref: refs\///g') > version/COMMIT \ + && grep $(cat version/COMMIT) .git/refs/tags/* -l | xargs -n 1 -r basename > version/TAGS \ + && mv version/COMMIT version/base.COMMIT.slim \ + && mv version/TAGS version/base.TAGS.slim diff --git a/docker/base/Dockerfile.base.test b/docker/base/Dockerfile.base.test new file mode 100644 index 00000000..9de9e705 --- /dev/null +++ b/docker/base/Dockerfile.base.test @@ -0,0 +1,5 @@ +ARG IMAGE_NAME +FROM ${IMAGE_NAME} +USER root +RUN pip install pytest pytest-watch pytest-env pytest-cov-exclude pytest-xdist pytest-datadir pylama mock +ENV PYTHONDONTWRITEBYTECODE 1 diff --git a/docker/base/docker-compose.test.yml b/docker/base/docker-compose.test.yml new file mode 100644 index 00000000..106b61e8 --- /dev/null +++ b/docker/base/docker-compose.test.yml @@ -0,0 +1,13 @@ +version: '3.3' +services: + sut: + build: + args: + IMAGE_NAME: $IMAGE_NAME + context: ../../ + dockerfile: docker/base/Dockerfile.base.test + cache_from: + - ${DOCKER_REPO}:${CACHE_TAG} + command: + - airflow + - version diff --git a/docker/base/hooks/post_push b/docker/base/hooks/post_push new file mode 100644 index 00000000..8f4034ab --- /dev/null +++ b/docker/base/hooks/post_push @@ -0,0 +1,21 @@ +#!/bin/bash +# Also push out the built test image +# Pieced together undocumented docker cloud build behavior from: +# https://hub.docker.com/r/docker/highland_builder/ + +BUILT_TEST_IMAGE=${BUILD_CODE}_sut +CACHED_TEST_IMAGE=${IMAGE_NAME}-test + +echo 'Tagging to ' $CACHED_TEST_IMAGE +docker tag $BUILT_TEST_IMAGE $CACHED_TEST_IMAGE + +echo "Starting push of $CACHED_TEST_IMAGE" +for i in {1..5}; do + if docker push $CACHED_TEST_IMAGE; then + echo "Pushed $CACHED_TEST_IMAGE" + break + else + echo 'Push failed. Attempt $((i +1)) in 30 seconds.' + sleep 30 + fi +done; diff --git a/docker/config/celery_loader.py b/docker/config/celery_loader.py deleted file mode 100644 index 8f63b66f..00000000 --- a/docker/config/celery_loader.py +++ /dev/null @@ -1,11 +0,0 @@ -from celery import loaders -import celeryconfig - - -BaseLoader = loaders.get_loader_cls('app') - - -class CeleryLoader(BaseLoader): - def config_from_object(self, obj, silent=False): - print('calling custom config') - super(CeleryLoader, self).config_from_object(celeryconfig, silent) diff --git a/docker/config/celeryconfig.py b/docker/config/celeryconfig.py deleted file mode 100644 index e02c5447..00000000 --- a/docker/config/celeryconfig.py +++ /dev/null @@ -1,16 +0,0 @@ -from airflow import configuration - -# Broker settings. -accept_content = ['json', 'pickle'] -event_serializer = 'json' -result_serializer = 'pickle' -task_serializer = 'pickle' -worker_prefetch_multiplier = 1 -task_acks_late = True -task_reject_on_worker_lost = True -broker_url = configuration.get('celery', 'broker_url') -result_backend = configuration.get('celery', 'CELERY_RESULT_BACKEND') -worker_concurrency = configuration.getint('celery', 'CELERYD_CONCURRENCY') -task_default_queue = configuration.get('celery', 'DEFAULT_QUEUE') -task_default_exchange = configuration.get('celery', 'DEFAULT_QUEUE') -worker_send_task_events = True diff --git a/docker/docker-compose-LocalExecutor.yml b/docker/docker-compose-LocalExecutor.yml deleted file mode 100644 index e073a573..00000000 --- a/docker/docker-compose-LocalExecutor.yml +++ /dev/null @@ -1,28 +0,0 @@ -version: '3' -services: - postgres: - image: postgres:9.6 - environment: - - POSTGRES_USER=airflow - - POSTGRES_PASSWORD=airflow - - POSTGRES_DB=airflow - - webserver: - image: wongwill86/air-tasks:latest - restart: always - depends_on: - - postgres - environment: - - LOAD_EX=y - - EXECUTOR=Local - - AIRFLOW_HOME=/usr/local/airflow - - AWS_ACCESS_KEY_ID - - AWS_SECRET_ACCESS_KEY - - AWS_DEFAULT_REGION - volumes: - - ../dags:/usr/local/airflow/dags - - ../plugins:/usr/local/airflow/plugins - - /var/run/docker.sock:/var/run/docker.sock - ports: - - "8080:8080" - command: webserver diff --git a/docker/docker-compose.test.yml b/docker/docker-compose.test.yml index 7e72ece3..c9dcc938 100644 --- a/docker/docker-compose.test.yml +++ b/docker/docker-compose.test.yml @@ -1,18 +1,20 @@ -version: '3.2' +version: '3.3' services: sut: build: args: - image_name: $IMAGE_NAME + BASE_TEST: -test context: ../ - dockerfile: docker/Dockerfile.test + dockerfile: docker/Dockerfile cache_from: - - $CACHE_TAG - $IMAGE_NAME + - ci_sut + - ${IMAGE_NAME}-test volumes: - ../tests:/usr/local/airflow/tests - ../plugins:/usr/local/airflow/plugins - ../dags:/usr/local/airflow/dags + - ../config:/usr/local/airflow/config - /var/run/docker.sock:/var/run/docker.sock - /tmp:/tmp environment: diff --git a/docker/hooks/build b/docker/hooks/build new file mode 100644 index 00000000..acdcf4cf --- /dev/null +++ b/docker/hooks/build @@ -0,0 +1,30 @@ +#!/bin/bash +# Build with custom build args +# Pieced together undocumented docker cloud build behavior from: +# https://hub.docker.com/r/docker/highland_builder/ + +cd /src/$BUILD_CODE + +if [[ $CACHE_TAG == *"slim"* ]]; then + BASE_DIST=slim +elif [[ $CACHE_TAG == *"alpine"* ]]; then + BASE_DIST=alpine +fi + +# equivalent to clean_path +if [[ $BUILD_PATH == "/"* ]]; then + BUILD_PATH_CUSTOM=${BUILD_PATH:1} +else + BUILD_PATH_CUSTOM=${BUILD_PATH} +fi + +if [[ $BUILD_PATH_CUSTOM != "./"* ]]; then + BUILD_PATH_CUSTOM=./${BUILD_PATH_CUSTOM} +fi + +echo "Building $BASE_DIST using cache $IMAGE_NAME..." +if [ -n $BASE_DIST ]; then + docker build -t $IMAGE_NAME -f $DOCKERFILE_PATH --cache-from=$IMAGE_NAME . +else + docker build --build-arg BASE_DIST=$BASE_DIST -t $IMAGE_NAME -f $DOCKERFILE_PATH --cache-from=$IMAGE_NAME . +fi diff --git a/docker/hooks/post_checkout b/docker/hooks/post_checkout new file mode 100644 index 00000000..0f4328d8 --- /dev/null +++ b/docker/hooks/post_checkout @@ -0,0 +1,11 @@ +#!/bin/bash +# This file exists because At some point, docker cloud was incorrectly setting the environment variable CACHE_TAG +# incorrectly but IMAGE_NAME was somehow correct + +echo SOURCE_BRANCH: $SOURCE_BRANCH +echo SOURCE_COMMIT: $SOURCE_COMMIT +echo COMMIT_MSG: $COMMIT_MSG +echo DOCKER_REPO: $DOCKER_REPO +echo CACHE_TAG: $CACHE_TAG +echo IMAGE_NAME: $IMAGE_NAME +echo DOCKER_REPO: $DOCKER_REPO diff --git a/docker/hooks/post_push b/docker/hooks/post_push new file mode 100644 index 00000000..6c4fdd6e --- /dev/null +++ b/docker/hooks/post_push @@ -0,0 +1,21 @@ +#!/bin/bash +# Tag the the built test image for reuse +# Pieced together undocumented docker cloud build behavior from: +# https://hub.docker.com/r/docker/highland_builder/ + +BUILT_TEST_IMAGE=${BUILD_CODE}_sut +CACHED_TEST_IMAGE=${IMAGE_NAME}-test + +echo 'Tagging to ' $CACHED_TEST_IMAGE +docker tag $BUILT_TEST_IMAGE $CACHED_TEST_IMAGE + +echo "Starting push of $CACHED_TEST_IMAGE" +for i in {1..5}; do + if docker push $CACHED_TEST_IMAGE; then + echo "Pushed $CACHED_TEST_IMAGE" + break + else + echo 'Push failed. Attempt $((i +1)) in 30 seconds.' + sleep 30 + fi +done; diff --git a/docker/hooks/pre_test b/docker/hooks/pre_test new file mode 100644 index 00000000..afbe30ba --- /dev/null +++ b/docker/hooks/pre_test @@ -0,0 +1,12 @@ +#!/bin/bash +# Test to pull image manually for cache +# Pieced together undocumented docker cloud build behavior from: +# https://hub.docker.com/r/docker/highland_builder/ + + +CACHED_TEST_IMAGE=${IMAGE_NAME}-test +if docker pull $CACHED_TEST_IMAGE; then + echo 'Pulled $CACHED_TEST_IMAGE ' +else + echo 'Failed to pull $CACHED_TEST_IMAGE' +fi diff --git a/docker/scripts/entrypoint-dood.sh b/docker/scripts/entrypoint-dood.sh deleted file mode 100755 index 7ce349b0..00000000 --- a/docker/scripts/entrypoint-dood.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash -PUCKEL_ENTRYPOINT=/entrypoint.sh - -source /add-user-docker.sh - -# this doens't protect from docker but it's a little more secure -sudo sed -i "/$AIRFLOW_USER/d" /etc/sudoers -echo "start script with group $DOCKER_GROUP" -# DOCKER_GROUP from /add-user-docker.sh -if [ -z ${DOCKER_GROUP} ]; then - exec ${PUCKEL_ENTRYPOINT} $* -else - exec sg ${DOCKER_GROUP} "${PUCKEL_ENTRYPOINT} $*" -fi diff --git a/docker/scripts/entrypoint-test.sh b/docker/scripts/entrypoint-test.sh deleted file mode 100755 index ba438073..00000000 --- a/docker/scripts/entrypoint-test.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash -PUCKEL_ENTRYPOINT=/entrypoint.sh - -source /add-user-docker.sh - -# this doens't protect from docker but it's a little more secure -sudo sed -i "/$AIRFLOW_USER/d" /etc/sudoers -echo "start script" -exec sg $DOCKER_GROUP "$*" diff --git a/plugins/chunkflow/__init__.py b/plugins/chunkflow/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/plugins/chunkflow/chunkflow.py b/plugins/chunkflow/chunkflow.py deleted file mode 100644 index 1c6fac85..00000000 --- a/plugins/chunkflow/chunkflow.py +++ /dev/null @@ -1,96 +0,0 @@ -from airflow.plugins_manager import AirflowPlugin -from airflow.models import DAG -from airflow.operators.subdag_operator import SubDagOperator -import logging -import json -from custom.docker_custom import DockerConfigurableOperator - - -class ChunkFlowOperator(DockerConfigurableOperator): - DEFAULT_IMAGE_ID = '098703261575.dkr.ecr.us-east-1.amazonaws.com/chunkflow' - DEFAULT_VERSION = 'v1.7.8' - DEFAULT_COMMAND = 'julia /root/.julia/v0.5/ChunkFlow/scripts/main.jl ' + \ - '-t "%s"' - - def __init__(self, - image_id=DEFAULT_IMAGE_ID, - image_version=DEFAULT_VERSION, - command=DEFAULT_COMMAND, - task_json="{}", - *args, **kwargs - ): - print("using " + image_id + ':' + image_version) - super(ChunkFlowOperator, self).__init__( - image=image_id + ':' + image_version, - command=command % task_json.replace('"', '\\"'), - network_mode='bridge', - *args, **kwargs) - - -def create_chunkflow_subdag(parent_dag_name, child_dag_name, subdag_args, - tasks_filename, image_id, image_version): - subdag_name = '%s.%s' % (parent_dag_name, child_dag_name) - subdag = DAG(dag_id=subdag_name, default_args=subdag_args, - schedule_interval='@once') - print('Generating subdag: <%s>' % subdag_name) - - with open(tasks_filename, "r") as tasks_file: - line = tasks_file.readline() - # skip lines that are irrelevant - while line.strip(): - line = tasks_file.readline() - if tasks_file.readline().startswith("PRINT TASK JSONS"): - break - - task_json = tasks_file.readline() - - while task_json.strip(): - try: - task = json.loads(task_json) - task_origin = task['input']['params']['origin'] - except KeyError: - logger = logging.getLogger(__name__) - logger.error("Unable to find chunk input origin key " + - "(json.input.params.origin) from \n %s", - task_json) - raise - except ValueError: - logger = logging.getLogger(__name__) - logger.error("Unable to parse task as json: \n %s", task_json) - raise - - ChunkFlowOperator( - task_id='%s-task-%s' % (child_dag_name, - '_'.join(str(x) for x in task_origin)), - task_json=task_json, dag=subdag, - image_id=image_id, - image_version=image_version - ) - task_json = tasks_file.readline() - - return subdag - - -def chunkflow_subdag_from_file(tasks_filename, - image_id=ChunkFlowOperator.DEFAULT_IMAGE_ID, - image_version=ChunkFlowOperator.DEFAULT_VERSION, - *args, **kwargs): - subdag = create_chunkflow_subdag( - kwargs['dag'].dag_id, kwargs['task_id'], - {} if 'default_args' not in kwargs else kwargs['default_args'], - tasks_filename, - image_id, - image_version - ) - return SubDagOperator(subdag=subdag, *args, **kwargs) - - -class ChunkFlowPlugin(AirflowPlugin): - name = "chunkflow_plugin" - operators = [ChunkFlowOperator, chunkflow_subdag_from_file] - hooks = [] - executors = [] - macros = [] - admin_views = [] - flask_blueprints = [] - menu_links = [] diff --git a/plugins/custom/custom.py b/plugins/custom/custom.py index aa86c7fa..46e39507 100644 --- a/plugins/custom/custom.py +++ b/plugins/custom/custom.py @@ -20,9 +20,8 @@ class MultiTriggerDagRunOperator(BaseOperator): :param trigger_dag_id: the dag_id to trigger :type trigger_dag_id: str - :param params_list: list of dicts for DAG level parameters that are made - acesssible in templates - namespaced under params for each dag run. + :param params_list: list of dicts or a thunked generator for DAG level parameters that are made acesssible + in templates namespaced under params for each dag run. :type params: Iterable or types.GeneratorType """ @@ -32,14 +31,20 @@ def __init__( trigger_dag_id, params_list, *args, **kwargs): - super(MultiTriggerDagRunOperator, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self.trigger_dag_id = trigger_dag_id self.params_list = params_list + if hasattr(self.params_list, '__len__'): assert len(self.params_list) > 0 else: - assert (isinstance(params_list, collections.Iterable) or - isinstance(params_list, types.GeneratorType)) + if callable(params_list): + generator = params_list() + else: + generator = params_list + assert (isinstance(generator, collections.Iterable) or isinstance(generator, types.GeneratorType)), \ + ('Params list returned %s, must return either an iterable, generator, or callable to returns an ' + + 'iterable or generator') % type(generator) def execute(self, context): session = settings.Session() @@ -49,8 +54,7 @@ def execute(self, context): assert trigger_dag is not None trigger_id = 0 - # for trigger_id in range(0, len(self.params_list)): - for params in self.params_list: + for params in self.params_list if not callable(self.params_list) else self.params_list(): dr = trigger_dag.create_dagrun(run_id='trig_%s_%d_%s' % (self.trigger_dag_id, trigger_id, datetime.now().isoformat()), diff --git a/plugins/custom/docker_custom.py b/plugins/custom/docker_custom.py index 7967dec6..f3c64907 100644 --- a/plugins/custom/docker_custom.py +++ b/plugins/custom/docker_custom.py @@ -1,7 +1,6 @@ import os import json -import logging -from docker import APIClient as Client, tls +from docker import APIClient as Client from airflow.exceptions import AirflowException from airflow.plugins_manager import AirflowPlugin from airflow.models import Variable @@ -15,28 +14,32 @@ class DockerConfigurableOperator(DockerOperator): with the exception that we are able to inject container and host arguments before the container is run. """ # noqa - def __init__(self, container_args={}, host_args={}, *args, **kwargs): - self.container_args = container_args - self.host_args = host_args - super(DockerConfigurableOperator, self).__init__(*args, **kwargs) + def __init__(self, container_args=None, host_args=None, *args, **kwargs): + if container_args is None: + self.container_args = {} + else: + self.container_args = container_args + + if host_args is None: + self.host_args = {} + else: + self.host_args = host_args + super().__init__(*args, **kwargs) # This needs to be updated whenever we update to a new version of airflow! def execute(self, context): - logging.info('Starting docker container from image ' + self.image) - - tls_config = None - if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key: - tls_config = tls.TLSConfig( - ca_cert=self.tls_ca_cert, - client_cert=(self.tls_client_cert, self.tls_client_key), - verify=True, - ssl_version=self.tls_ssl_version, - assert_hostname=self.tls_hostname - ) - self.docker_url = self.docker_url.replace('tcp://', 'https://') + self.log.info('Starting docker container from image %s', self.image) + + tls_config = self._DockerOperator__get_tls_config() - self.cli = Client(base_url=self.docker_url, version=self.api_version, - tls=tls_config) + if self.docker_conn_id: + self.cli = self.get_hook().get_conn() + else: + self.cli = Client( + base_url=self.docker_url, + version=self.api_version, + tls=tls_config + ) if ':' not in self.image: image = self.image + ':latest' @@ -44,10 +47,10 @@ def execute(self, context): image = self.image if self.force_pull or len(self.cli.images(name=image)) == 0: - logging.info('Pulling docker image ' + image) + self.log.info('Pulling docker image %s', image) for l in self.cli.pull(image, stream=True): output = json.loads(l.decode('utf-8')) - logging.info("{}".format(output['status'])) + self.log.info("%s", output['status']) cpu_shares = int(round(self.cpus * 1024)) @@ -57,18 +60,19 @@ def execute(self, context): host_args = { 'binds': self.volumes, + 'cpu_shares': cpu_shares, + 'mem_limit': self.mem_limit, 'network_mode': self.network_mode } host_args.update(self.host_args) container_args = { 'command': self.get_command(), - 'cpu_shares': cpu_shares, 'environment': self.environment, + 'host_config': self.cli.create_host_config(**host_args), 'image': image, - 'mem_limit': self.mem_limit, 'user': self.user, - 'host_config': self.cli.create_host_config(**host_args) + 'working_dir': self.working_dir } container_args.update(self.container_args) @@ -78,18 +82,21 @@ def execute(self, context): self.cli.start(self.container['Id']) line = '' - for line in self.cli.logs(container=self.container['Id'], - stream=True): - logging.info("{}".format(line.strip())) - - exit_code = self.cli.wait(self.container['Id']) + for line in self.cli.logs( + container=self.container['Id'], stream=True): + line = line.strip() + if hasattr(line, 'decode'): + line = line.decode('utf-8') + self.log.info(line) + + exit_code = self.cli.wait(self.container['Id'])['StatusCode'] if exit_code != 0: raise AirflowException('docker container failed') - if self.xcom_push: + if self.xcom_push_flag: return self.cli.logs( container=self.container['Id']) if self.xcom_all else str( - line.strip()) + line) class DockerRemovableContainer(DockerConfigurableOperator): @@ -104,11 +111,11 @@ def __init__(self, remove=True, *args, **kwargs): self.remove = remove - super(DockerRemovableContainer, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) def execute(self, context): try: - return super(DockerRemovableContainer, self).execute(context) + return super().execute(context) finally: if self.cli and self.container and self.remove: self.cli.stop(self.container, timeout=1) @@ -124,17 +131,19 @@ def __init__(self, *args, **kwargs): self.variables = variables self.mount_point = mount_point - super(DockerWithVariablesOperator, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) def execute(self, context): with TemporaryDirectory(prefix='dockervariables') as tmp_var_dir: for key in self.variables: value = Variable.get(key) with open(os.path.join(tmp_var_dir, key), 'w') as value_file: - value_file.write(str(value)) + # import pdb + # pdb.set_trace() + value_file.write(value) self.volumes.append('{0}:{1}'.format(tmp_var_dir, self.mount_point)) - return super(DockerWithVariablesOperator, self).execute(context) + return super().execute(context) class CustomPlugin(AirflowPlugin): diff --git a/pylama.ini b/pylama.ini new file mode 100644 index 00000000..3c65dabb --- /dev/null +++ b/pylama.ini @@ -0,0 +1,10 @@ +[pylama] +max_line_length=120 +[pylama:pycodestyle] +max_line_length=120 +;[pylama:pylint] +;max_line_length=120 +;[pylama:flake8] +;max_line_length=120 +;[pylama:pep8] +;max_line_length=120 diff --git a/docker/scripts/add-user-docker.sh b/scripts/add-user-docker.sh similarity index 91% rename from docker/scripts/add-user-docker.sh rename to scripts/add-user-docker.sh index 53cc1071..a7e4cdc9 100755 --- a/docker/scripts/add-user-docker.sh +++ b/scripts/add-user-docker.sh @@ -17,8 +17,8 @@ else echo "Host Docker Group ID $DOCKER_GID not found on user" echo "Updating docker group to host docker group" sudo groupmod -g ${DOCKER_GID} ${DOCKER_GROUP} - # add this for boot2docker - sudo usermod -aG 100,50 $AIRFLOW_USER + # add this for boot2docker + sudo usermod -aG 100,50 $AIRFLOW_USER fi fi diff --git a/scripts/entrypoint-dood.sh b/scripts/entrypoint-dood.sh new file mode 100755 index 00000000..586da5be --- /dev/null +++ b/scripts/entrypoint-dood.sh @@ -0,0 +1,13 @@ +#!/bin/bash +source scripts/add-user-docker.sh + +# this doesn't protect from docker but it's a little more secure +sudo sed -i "/$AIRFLOW_USER/d" /etc/sudoers + +echo "start script with group $DOCKER_GROUP" +# DOCKER_GROUP from /add-user-docker.sh +if [ -z ${DOCKER_GROUP} ]; then + exec ${COMMAND} $* +else + exec sg ${DOCKER_GROUP} "$*" +fi diff --git a/docker/scripts/secrets_to_airflow_variables.py b/scripts/secrets_to_airflow_variables.py similarity index 100% rename from docker/scripts/secrets_to_airflow_variables.py rename to scripts/secrets_to_airflow_variables.py diff --git a/tests/plugins/chunkflow/__init__.py b/tests/plugins/chunkflow/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/plugins/chunkflow/test_chunkflow.py b/tests/plugins/chunkflow/test_chunkflow.py deleted file mode 100644 index 461f1713..00000000 --- a/tests/plugins/chunkflow/test_chunkflow.py +++ /dev/null @@ -1,125 +0,0 @@ -from __future__ import unicode_literals -from airflow.operators.chunkflow_plugin import ChunkFlowOperator -from airflow.operators.chunkflow_plugin import chunkflow_subdag_from_file - -from airflow.models import DAG -from datetime import datetime, timedelta -from pytest import fixture -from distutils import dir_util -import os - -DAG_ARGS = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': datetime(2017, 5, 1), - 'cactchup_by_default': False, - 'retries': 1, - 'retry_delay': timedelta(seconds=2), - 'retry_exponential_backoff': True, -} -TASK_ID = 'test_run_task' -PARENT_DAG_ID = 'parent_dag' -IMAGE_ID = 'alpine' -IMAGE_VERSION = 'latest' -COMMAND = 'echo "json is: \n %s"' - - -@fixture -def datadir(tmpdir, request): - ''' - https://stackoverflow.com/a/29631801/1470224 - Fixture responsible for searching a folder with the same name of test - module and, if available, moving all contents to a temporary directory - so tests can use them freely. - ''' - filename = request.module.__file__ - test_dir, _ = os.path.splitext(filename) - - if os.path.isdir(test_dir): - dir_util.copy_tree(test_dir, bytes(tmpdir)) - - return tmpdir - - -class TestChunkFlowOperator(object): - def test_create(self): - operator = ChunkFlowOperator(task_id=TASK_ID, - default_args=DAG_ARGS) - assert operator - assert operator.task_id == TASK_ID - assert operator.image == "%s:%s" % (ChunkFlowOperator.DEFAULT_IMAGE_ID, - ChunkFlowOperator.DEFAULT_VERSION) - - def test_run_single(self, datadir): - operator = ChunkFlowOperator(task_id=TASK_ID, - default_args=DAG_ARGS, - command=COMMAND, - task_json='{"test":1}', - image_id=IMAGE_ID, - image_version=IMAGE_VERSION - ) - operator.execute(None) - - -class TestChunkFlowTasksFileOperator(object): - @staticmethod - def create_parent_dag(parent_dag_id): - return DAG(dag_id=parent_dag_id, - default_args=DAG_ARGS, - schedule_interval=None) - - @staticmethod - def create_task(filename): - parent_dag = TestChunkFlowTasksFileOperator.create_parent_dag( - PARENT_DAG_ID) - - operator = chunkflow_subdag_from_file(filename, - task_id=TASK_ID, - image_id=IMAGE_ID, - image_version=IMAGE_VERSION, - default_args=DAG_ARGS, - dag=parent_dag) - return operator - - def test_empty(self, datadir): - task_filename = str(datadir.join('empty.txt')) - operator = \ - TestChunkFlowTasksFileOperator.create_task(bytes(task_filename)) - - assert operator - assert operator.task_id == TASK_ID - assert len(operator.subdag.task_ids) == 0 - - def test_none(self, datadir): - task_filename = str(datadir.join('no_tasks.txt')) - operator = \ - TestChunkFlowTasksFileOperator.create_task(bytes(task_filename)) - - assert operator - assert operator.task_id == TASK_ID - assert len(operator.subdag.task_ids) == 0 - - def test_single(self, datadir): - task_filename = str(datadir.join('single.txt')) - operator = \ - TestChunkFlowTasksFileOperator.create_task(bytes(task_filename)) - - assert operator - assert operator.task_id == TASK_ID - assert len(operator.subdag.task_ids) == 1 - assert operator.subdag.tasks[0].image == "%s:%s" % ( - IMAGE_ID, - IMAGE_VERSION - ) - - def test_many(self, datadir): - task_filename = str(datadir.join('many.txt')) - operator = \ - TestChunkFlowTasksFileOperator.create_task(bytes(task_filename)) - - assert operator - assert operator.task_id == TASK_ID - assert len(operator.subdag.tasks) == 8 - for task in operator.subdag.tasks: - assert task.image == "%s:%s" % (IMAGE_ID, - IMAGE_VERSION) diff --git a/tests/plugins/chunkflow/test_chunkflow/empty.txt b/tests/plugins/chunkflow/test_chunkflow/empty.txt deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/plugins/chunkflow/test_chunkflow/many.txt b/tests/plugins/chunkflow/test_chunkflow/many.txt deleted file mode 100644 index 43257149..00000000 --- a/tests/plugins/chunkflow/test_chunkflow/many.txt +++ /dev/null @@ -1,11 +0,0 @@ -use default region: us-east-1 -argDict = Dict{Symbol,Any}(Pair{Symbol,Any}(:shutdown,false),Pair{Symbol,Any}(:workernumber,1),Pair{Symbol,Any}(:workerwaittime,1),Pair{Symbol,Any}(:stride,[1024,1024,128,0]),Pair{Symbol,Any}(:deviceid,nothing),Pair{Symbol,Any}(:queuename,""),Pair{Symbol,Any}(:origin,[10240,10240,1,1]),Pair{Symbol,Any}(:gridsize,[2,2,2,1]),Pair{Symbol,Any}(:task,"templates/test/pinkytest40.3.segment.json"),Pair{Symbol,Any}(:continuefrom,Int64[])) -PRINT TASK JSONS (no queue has been set) -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,1,1],"chunkSize":[1152,1152,144],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v10/image/4_4_40/"},"outputs":{"data":"img"}},"input_aff":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,1,1],"chunkSize":[1152,1152,144,3],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v11/affinitymap-jnet/4_4_40/"},"outputs":{"data":"aff"}},"cropimg":{"kind":"crop","params":{"cropMarginSize":[64,64,8]},"inputs":{"img":"img"},"outputs":{"img":"img"}},"watershed":{"kind":"atomicseg","params":{"low":0.012,"high":0.999806,"thresholds":[{"size":600,"threshold":0.087818}],"dust":300,"isThresholdRelative":false},"inputs":{"aff":"aff"},"outputs":{"seg":"seg"}},"agglomeration":{"kind":"agglomeration","params":{"mode":"mean","cropSegMarginSize":[64,64,8],"isDeleteAff":true},"inputs":{"seg":"seg","aff":"aff"},"outputs":{"sgm":"sgm"}},"savesgm":{"kind":"savechunk","params":{},"inputs":{"chunk":"sgm"},"outputs":{"prefix":"gs://wwong/pinky40/segmentation/chunk_"}},"hypersquare":{"kind":"hypersquare","params":{"segment_id_type":"UInt16","affinity_type":"Float32"},"inputs":{"img":"img","sgm":"sgm"},"outputs":{"projectsDirectory":"gs://wwong/pinky40/hypersquare/"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,10240,1,1],"chunkSize":[1152,1152,144],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v10/image/4_4_40/"},"outputs":{"data":"img"}},"input_aff":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,10240,1,1],"chunkSize":[1152,1152,144,3],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v11/affinitymap-jnet/4_4_40/"},"outputs":{"data":"aff"}},"cropimg":{"kind":"crop","params":{"cropMarginSize":[64,64,8]},"inputs":{"img":"img"},"outputs":{"img":"img"}},"watershed":{"kind":"atomicseg","params":{"low":0.012,"high":0.999806,"thresholds":[{"size":600,"threshold":0.087818}],"dust":300,"isThresholdRelative":false},"inputs":{"aff":"aff"},"outputs":{"seg":"seg"}},"agglomeration":{"kind":"agglomeration","params":{"mode":"mean","cropSegMarginSize":[64,64,8],"isDeleteAff":true},"inputs":{"seg":"seg","aff":"aff"},"outputs":{"sgm":"sgm"}},"savesgm":{"kind":"savechunk","params":{},"inputs":{"chunk":"sgm"},"outputs":{"prefix":"gs://wwong/pinky40/segmentation/chunk_"}},"hypersquare":{"kind":"hypersquare","params":{"segment_id_type":"UInt16","affinity_type":"Float32"},"inputs":{"img":"img","sgm":"sgm"},"outputs":{"projectsDirectory":"gs://wwong/pinky40/hypersquare/"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,11264,1,1],"chunkSize":[1152,1152,144],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v10/image/4_4_40/"},"outputs":{"data":"img"}},"input_aff":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,11264,1,1],"chunkSize":[1152,1152,144,3],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v11/affinitymap-jnet/4_4_40/"},"outputs":{"data":"aff"}},"cropimg":{"kind":"crop","params":{"cropMarginSize":[64,64,8]},"inputs":{"img":"img"},"outputs":{"img":"img"}},"watershed":{"kind":"atomicseg","params":{"low":0.012,"high":0.999806,"thresholds":[{"size":600,"threshold":0.087818}],"dust":300,"isThresholdRelative":false},"inputs":{"aff":"aff"},"outputs":{"seg":"seg"}},"agglomeration":{"kind":"agglomeration","params":{"mode":"mean","cropSegMarginSize":[64,64,8],"isDeleteAff":true},"inputs":{"seg":"seg","aff":"aff"},"outputs":{"sgm":"sgm"}},"savesgm":{"kind":"savechunk","params":{},"inputs":{"chunk":"sgm"},"outputs":{"prefix":"gs://wwong/pinky40/segmentation/chunk_"}},"hypersquare":{"kind":"hypersquare","params":{"segment_id_type":"UInt16","affinity_type":"Float32"},"inputs":{"img":"img","sgm":"sgm"},"outputs":{"projectsDirectory":"gs://wwong/pinky40/hypersquare/"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,11264,1,1],"chunkSize":[1152,1152,144],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v10/image/4_4_40/"},"outputs":{"data":"img"}},"input_aff":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,11264,1,1],"chunkSize":[1152,1152,144,3],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v11/affinitymap-jnet/4_4_40/"},"outputs":{"data":"aff"}},"cropimg":{"kind":"crop","params":{"cropMarginSize":[64,64,8]},"inputs":{"img":"img"},"outputs":{"img":"img"}},"watershed":{"kind":"atomicseg","params":{"low":0.012,"high":0.999806,"thresholds":[{"size":600,"threshold":0.087818}],"dust":300,"isThresholdRelative":false},"inputs":{"aff":"aff"},"outputs":{"seg":"seg"}},"agglomeration":{"kind":"agglomeration","params":{"mode":"mean","cropSegMarginSize":[64,64,8],"isDeleteAff":true},"inputs":{"seg":"seg","aff":"aff"},"outputs":{"sgm":"sgm"}},"savesgm":{"kind":"savechunk","params":{},"inputs":{"chunk":"sgm"},"outputs":{"prefix":"gs://wwong/pinky40/segmentation/chunk_"}},"hypersquare":{"kind":"hypersquare","params":{"segment_id_type":"UInt16","affinity_type":"Float32"},"inputs":{"img":"img","sgm":"sgm"},"outputs":{"projectsDirectory":"gs://wwong/pinky40/hypersquare/"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,129,1],"chunkSize":[1152,1152,144],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v10/image/4_4_40/"},"outputs":{"data":"img"}},"input_aff":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,129,1],"chunkSize":[1152,1152,144,3],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v11/affinitymap-jnet/4_4_40/"},"outputs":{"data":"aff"}},"cropimg":{"kind":"crop","params":{"cropMarginSize":[64,64,8]},"inputs":{"img":"img"},"outputs":{"img":"img"}},"watershed":{"kind":"atomicseg","params":{"low":0.012,"high":0.999806,"thresholds":[{"size":600,"threshold":0.087818}],"dust":300,"isThresholdRelative":false},"inputs":{"aff":"aff"},"outputs":{"seg":"seg"}},"agglomeration":{"kind":"agglomeration","params":{"mode":"mean","cropSegMarginSize":[64,64,8],"isDeleteAff":true},"inputs":{"seg":"seg","aff":"aff"},"outputs":{"sgm":"sgm"}},"savesgm":{"kind":"savechunk","params":{},"inputs":{"chunk":"sgm"},"outputs":{"prefix":"gs://wwong/pinky40/segmentation/chunk_"}},"hypersquare":{"kind":"hypersquare","params":{"segment_id_type":"UInt16","affinity_type":"Float32"},"inputs":{"img":"img","sgm":"sgm"},"outputs":{"projectsDirectory":"gs://wwong/pinky40/hypersquare/"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,10240,129,1],"chunkSize":[1152,1152,144],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v10/image/4_4_40/"},"outputs":{"data":"img"}},"input_aff":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,10240,129,1],"chunkSize":[1152,1152,144,3],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v11/affinitymap-jnet/4_4_40/"},"outputs":{"data":"aff"}},"cropimg":{"kind":"crop","params":{"cropMarginSize":[64,64,8]},"inputs":{"img":"img"},"outputs":{"img":"img"}},"watershed":{"kind":"atomicseg","params":{"low":0.012,"high":0.999806,"thresholds":[{"size":600,"threshold":0.087818}],"dust":300,"isThresholdRelative":false},"inputs":{"aff":"aff"},"outputs":{"seg":"seg"}},"agglomeration":{"kind":"agglomeration","params":{"mode":"mean","cropSegMarginSize":[64,64,8],"isDeleteAff":true},"inputs":{"seg":"seg","aff":"aff"},"outputs":{"sgm":"sgm"}},"savesgm":{"kind":"savechunk","params":{},"inputs":{"chunk":"sgm"},"outputs":{"prefix":"gs://wwong/pinky40/segmentation/chunk_"}},"hypersquare":{"kind":"hypersquare","params":{"segment_id_type":"UInt16","affinity_type":"Float32"},"inputs":{"img":"img","sgm":"sgm"},"outputs":{"projectsDirectory":"gs://wwong/pinky40/hypersquare/"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,11264,129,1],"chunkSize":[1152,1152,144],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v10/image/4_4_40/"},"outputs":{"data":"img"}},"input_aff":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,11264,129,1],"chunkSize":[1152,1152,144,3],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v11/affinitymap-jnet/4_4_40/"},"outputs":{"data":"aff"}},"cropimg":{"kind":"crop","params":{"cropMarginSize":[64,64,8]},"inputs":{"img":"img"},"outputs":{"img":"img"}},"watershed":{"kind":"atomicseg","params":{"low":0.012,"high":0.999806,"thresholds":[{"size":600,"threshold":0.087818}],"dust":300,"isThresholdRelative":false},"inputs":{"aff":"aff"},"outputs":{"seg":"seg"}},"agglomeration":{"kind":"agglomeration","params":{"mode":"mean","cropSegMarginSize":[64,64,8],"isDeleteAff":true},"inputs":{"seg":"seg","aff":"aff"},"outputs":{"sgm":"sgm"}},"savesgm":{"kind":"savechunk","params":{},"inputs":{"chunk":"sgm"},"outputs":{"prefix":"gs://wwong/pinky40/segmentation/chunk_"}},"hypersquare":{"kind":"hypersquare","params":{"segment_id_type":"UInt16","affinity_type":"Float32"},"inputs":{"img":"img","sgm":"sgm"},"outputs":{"projectsDirectory":"gs://wwong/pinky40/hypersquare/"}}} -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,11264,129,1],"chunkSize":[1152,1152,144],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v10/image/4_4_40/"},"outputs":{"data":"img"}},"input_aff":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,11264,129,1],"chunkSize":[1152,1152,144,3],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v11/affinitymap-jnet/4_4_40/"},"outputs":{"data":"aff"}},"cropimg":{"kind":"crop","params":{"cropMarginSize":[64,64,8]},"inputs":{"img":"img"},"outputs":{"img":"img"}},"watershed":{"kind":"atomicseg","params":{"low":0.012,"high":0.999806,"thresholds":[{"size":600,"threshold":0.087818}],"dust":300,"isThresholdRelative":false},"inputs":{"aff":"aff"},"outputs":{"seg":"seg"}},"agglomeration":{"kind":"agglomeration","params":{"mode":"mean","cropSegMarginSize":[64,64,8],"isDeleteAff":true},"inputs":{"seg":"seg","aff":"aff"},"outputs":{"sgm":"sgm"}},"savesgm":{"kind":"savechunk","params":{},"inputs":{"chunk":"sgm"},"outputs":{"prefix":"gs://wwong/pinky40/segmentation/chunk_"}},"hypersquare":{"kind":"hypersquare","params":{"segment_id_type":"UInt16","affinity_type":"Float32"},"inputs":{"img":"img","sgm":"sgm"},"outputs":{"projectsDirectory":"gs://wwong/pinky40/hypersquare/"}}} diff --git a/tests/plugins/chunkflow/test_chunkflow/no_tasks.txt b/tests/plugins/chunkflow/test_chunkflow/no_tasks.txt deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/plugins/chunkflow/test_chunkflow/single.txt b/tests/plugins/chunkflow/test_chunkflow/single.txt deleted file mode 100644 index 22eaf293..00000000 --- a/tests/plugins/chunkflow/test_chunkflow/single.txt +++ /dev/null @@ -1,4 +0,0 @@ -use default region: us-east-1 -argDict = Dict{Symbol,Any}(Pair{Symbol,Any}(:shutdown,false),Pair{Symbol,Any}(:workernumber,1),Pair{Symbol,Any}(:workerwaittime,1),Pair{Symbol,Any}(:stride,[1024,1024,128,0]),Pair{Symbol,Any}(:deviceid,nothing),Pair{Symbol,Any}(:queuename,""),Pair{Symbol,Any}(:origin,[10240,10240,1,1]),Pair{Symbol,Any}(:gridsize,[2,2,2,1]),Pair{Symbol,Any}(:task,"templates/test/pinkytest40.3.segment.json"),Pair{Symbol,Any}(:continuefrom,Int64[])) -PRINT TASK JSONS (no queue has been set) -{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,1,1],"chunkSize":[1152,1152,144],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v10/image/4_4_40/"},"outputs":{"data":"img"}},"input_aff":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,1,1],"chunkSize":[1152,1152,144,3],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://neuroglancer/pinky40_v11/affinitymap-jnet/4_4_40/"},"outputs":{"data":"aff"}},"cropimg":{"kind":"crop","params":{"cropMarginSize":[64,64,8]},"inputs":{"img":"img"},"outputs":{"img":"img"}},"watershed":{"kind":"atomicseg","params":{"low":0.012,"high":0.999806,"thresholds":[{"size":600,"threshold":0.087818}],"dust":300,"isThresholdRelative":false},"inputs":{"aff":"aff"},"outputs":{"seg":"seg"}},"agglomeration":{"kind":"agglomeration","params":{"mode":"mean","cropSegMarginSize":[64,64,8],"isDeleteAff":true},"inputs":{"seg":"seg","aff":"aff"},"outputs":{"sgm":"sgm"}},"savesgm":{"kind":"savechunk","params":{},"inputs":{"chunk":"sgm"},"outputs":{"prefix":"gs://wwong/pinky40/segmentation/chunk_"}},"hypersquare":{"kind":"hypersquare","params":{"segment_id_type":"UInt16","affinity_type":"Float32"},"inputs":{"img":"img","sgm":"sgm"},"outputs":{"projectsDirectory":"gs://wwong/pinky40/hypersquare/"}}} diff --git a/tests/plugins/custom/test_docker_custom.py b/tests/plugins/custom/test_docker_custom.py index c9bae24f..2a9c800d 100644 --- a/tests/plugins/custom/test_docker_custom.py +++ b/tests/plugins/custom/test_docker_custom.py @@ -1,4 +1,3 @@ -from __future__ import unicode_literals from airflow.operators.docker_plugin import DockerRemovableContainer from airflow.operators.docker_plugin import DockerWithVariablesOperator import unittest @@ -10,7 +9,7 @@ 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2017, 5, 1), - 'cactchup_by_default': False, + 'catchup_by_default': False, 'retries': 1, 'retry_delay': timedelta(seconds=2), 'retry_exponential_backoff': True, @@ -75,7 +74,7 @@ def test_should_override_command(self): xcom_push=True, xcom_all=True, ) - assert '%s\n' % NEW_TEXT == operator.execute(None) + assert '%s\n' % NEW_TEXT == operator.execute(None).decode('utf-8') class TestDockerRemovableContainer(unittest.TestCase): @@ -169,13 +168,14 @@ def test_should_find_variables(self, variable_class): assert operator assert operator.task_id == TASK_ID - assert show_items == variables_to_show_items(DEFAULT_VARIABLES) + assert str(show_items, 'utf-8') == variables_to_show_items( + DEFAULT_VARIABLES) @patch_plugin_file('plugins/custom/docker', 'Variable', autospec=True) def test_should_fail_when_variable_not_found(self, variable_class): variable_class.get.side_effect = DEFAULT_VARIABLES.__getitem__ - bad_keys = DEFAULT_VARIABLES.keys() + bad_keys = list(DEFAULT_VARIABLES.keys()) bad_keys.append('bad_key') operator = DockerWithVariablesOperator( diff --git a/tests/plugins/custom/test_multi_trigger_dag.py b/tests/plugins/custom/test_multi_trigger_dag.py index 561d735b..61acf17d 100644 --- a/tests/plugins/custom/test_multi_trigger_dag.py +++ b/tests/plugins/custom/test_multi_trigger_dag.py @@ -1,4 +1,3 @@ -from __future__ import unicode_literals import unittest from airflow.operators.custom_plugin import MultiTriggerDagRunOperator from airflow.utils.state import State @@ -18,7 +17,7 @@ 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2017, 5, 1), - 'cactchup_by_default': False, + 'catchup_by_default': False, 'retries': 1, 'retry_delay': timedelta(seconds=2), 'retry_exponential_backoff': True, @@ -180,10 +179,9 @@ def test_should_execute_params_list_of_nones(self, mock_session, TestMultiTriggerDag.verify_session(params_list) - def test_should_execute_generator_function(self, mock_session, - dag_bag_class): + def test_should_execute_generator(self, mock_session, dag_bag_class): def param_generator(): - iterable = xrange(1, 10) + iterable = range(1, 10) for i in iterable: yield i @@ -199,17 +197,36 @@ def param_generator(): TestMultiTriggerDag.verify_session(param_generator()) + def test_should_execute_thunked_generator(self, mock_session, dag_bag_class): + def param_generator(): + iterable = range(1, 10) + for i in iterable: + yield i + + dag_bag_class.return_value = TestMultiTriggerDag.create_mock_dag_bag() + + operator = MultiTriggerDagRunOperator( + task_id=TASK_ID, + trigger_dag_id=TRIGGER_DAG_ID, + params_list=param_generator, + default_args=DAG_ARGS) + + operator.execute(None) + + TestMultiTriggerDag.verify_session(param_generator()) + def test_should_execute_iterable(self, mock_session, dag_bag_class): - params_list = xrange(1, 10) + def param_generator(): + return range(1, 10) dag_bag_class.return_value = TestMultiTriggerDag.create_mock_dag_bag() operator = MultiTriggerDagRunOperator( task_id=TASK_ID, trigger_dag_id=TRIGGER_DAG_ID, - params_list=params_list, + params_list=param_generator, default_args=DAG_ARGS) operator.execute(None) - TestMultiTriggerDag.verify_session(params_list) + TestMultiTriggerDag.verify_session(param_generator())