Skip to content

Commit

Permalink
Chunkflow Integration and Project overhaul (#2)
Browse files Browse the repository at this point in the history
* Reorganize chunkflow into a testable plugin
* Included testing framework
* In-Docker testing with Docker Hub
  • Loading branch information
wongwill86 authored Jul 28, 2017
1 parent 77c1650 commit 92353cf
Show file tree
Hide file tree
Showing 24 changed files with 463 additions and 103 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
*.swp
*.pyc
.cache*
.rope*
.tox
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# air-tasks

DooD support and AWS ECR Credential Helper


NOTES:
Chunkflow: make sure AWS_ACCESS_KEY_ID, etc... are set in environment variables!
docker-compose -f docker/docker-compose.test.yml -p ci build
docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut ptw

42 changes: 42 additions & 0 deletions dags/chunkflow/noop_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.chunkflow_plugin import chunkflow_subdag_from_file
# logging.basicConfig(level=logging.INFO)

DAG_NAME = 'chunkflow_noop'

TASKS_FILENAME = "./dags/chunkflow/tasks/noop_tasks.txt"

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 5, 1),
'cactchup_by_default': False,
'retries': 1,
'retry_delay': timedelta(seconds=2),
'retry_exponential_backoff': True,
}

dag = DAG(
dag_id=DAG_NAME,
default_args=default_args,
schedule_interval=None
)

start = DummyOperator(
task_id='start',
default_args=default_args,
dag=dag
)

chunkflow_subdag = chunkflow_subdag_from_file(TASKS_FILENAME,
task_id="noop_tasks",
image_version='raw_json_task',
default_args=default_args,
dag=dag)

end = DummyOperator(task_id='end', default_args=default_args, dag=dag)

start.set_downstream(chunkflow_subdag)
chunkflow_subdag.set_downstream(end)
11 changes: 11 additions & 0 deletions dags/chunkflow/tasks/noop_tasks.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use default region: us-east-1
argDict = Dict{Symbol,Any}(Pair{Symbol,Any}(:shutdown,false),Pair{Symbol,Any}(:workernumber,1),Pair{Symbol,Any}(:workerwaittime,1),Pair{Symbol,Any}(:stride,[1024,1024,128,0]),Pair{Symbol,Any}(:deviceid,nothing),Pair{Symbol,Any}(:queuename,""),Pair{Symbol,Any}(:origin,[10240,10240,1,1]),Pair{Symbol,Any}(:gridsize,[2,2,2,1]),Pair{Symbol,Any}(:task,"test/edges/readnop.json"),Pair{Symbol,Any}(:continuefrom,Int64[]))
PRINT TASK JSONS (no queue has been set)
{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,1,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}}
{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,10240,1,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}}
{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,11264,1,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}}
{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,11264,1,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}}
{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,10240,129,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}}
{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,10240,129,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}}
{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[10240,11264,129,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}}
{"input":{"kind":"cutoutchunk","params":{"bigArrayType":"s3","origin":[11264,11264,129,1],"chunkSize":[4,4,4],"voxelSize":[4,4,40],"nonzeroRatioThreshold":0.01},"inputs":{"path":"s3://seunglab-test/test_dataset/image/4_4_40/"},"outputs":{"data":"img"}}}
15 changes: 7 additions & 8 deletions dags/many.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import logging
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator

from airflow.operators import PythonOperator


default_args = {
'owner': 'airflow',
Expand All @@ -18,18 +15,21 @@
}
dag = DAG("many_ws", default_args=default_args, schedule_interval=None)


def create_print_date(dag, count_print_date):
return BashOperator(
task_id='print_date_' + str(count_print_date),
bash_command='date',
dag=dag)


def create_print_hello(dag, count_print_hello):
return BashOperator(
task_id='print_hello_' + str(count_print_hello),
bash_command='echo "hello world!"',
dag=dag)


def create_docker_print(dag, count_docker_print):
return DockerOperator(
task_id='watershed_print_' + str(count_docker_print),
Expand All @@ -38,15 +38,16 @@ def create_docker_print(dag, count_docker_print):
network_mode='bridge',
dag=dag)


begin_task = BashOperator(
task_id='begin_task',
bash_command='echo "Start here"',
dag=dag)

width = 5
print_date_tasks = [ create_print_date(dag, i) for i in range(width)]
print_hello_tasks = [ create_print_hello(dag, i) for i in range(width)]
docker_print_tasks = [ create_docker_print(dag, i) for i in range(width)]
print_date_tasks = [create_print_date(dag, i) for i in range(width)]
print_hello_tasks = [create_print_hello(dag, i) for i in range(width)]
docker_print_tasks = [create_docker_print(dag, i) for i in range(width)]

done_task = BashOperator(
task_id='end_task',
Expand Down Expand Up @@ -77,5 +78,3 @@ def create_docker_print(dag, count_docker_print):

for docker_print_task in docker_print_tasks:
docker_print_task.set_downstream(done_task)


70 changes: 0 additions & 70 deletions dags/simple.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,7 @@
import logging
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# from airflow.operators.python_operator import PythonOperator
from airflow.operators.docker_operator import DockerOperator
from fileflow.operators import DivePythonOperator
from fileflow.task_runners import TaskRunner

from airflow.operators import PythonOperator
from fileflow.operators.dive_operator import DiveOperator

class TaskRunnerExample(TaskRunner):
def run(self, *args, **kwargs):
output_string = "This task -- called {} -- was run.".format(self.task_instance.task_id)
self.write_file(output_string)
logging.info("Wrote '{}' to '{}'".format(output_string, self.get_output_filename()))

class TaskWriter(TaskRunner):
def run(self, *args, **kwargs):
logging.info("I am calling task writer!, writing to {}".format(
self.get_output_filename()))
self.write_file("Text written from task writer")

class TaskReader(TaskRunner):
def run(self, *args, **kwargs):
logging.info("I am calling task reader! Read {} from {}".format(
self.read_upstream_file("thingy"),
self.get_input_filename("thingy")))


class EnDivePythonOperator(DiveOperator, PythonOperator):
"""
Python operator that can send along data dependencies to its callable.
Generates the callable by initializing its python object and calling its method.
"""

def __init__(self, python_object, python_method="run", *args, **kwargs):
self.python_object = python_object
self.python_method = python_method
kwargs['python_callable'] = lambda: None

super(EnDivePythonOperator, self).__init__(*args, **kwargs)

def pre_execute(self, context):
context.update(self.op_kwargs)
context.update({"data_dependencies": self.data_dependencies})
instantiated_object = self.python_object(context)
self.python_callable = getattr(instantiated_object, self.python_method)

default_args = {
'owner': 'airflow',
Expand Down Expand Up @@ -87,33 +42,8 @@ def pre_execute(self, context):
bash_command='echo "goodbye world!"',
dag=dag)

t6 = EnDivePythonOperator(
task_id="dive_python_writer",
python_object=TaskWriter,
provide_context=True,
dag=dag)

t7 = EnDivePythonOperator(
task_id="dive_python_reader",
python_object=TaskReader,
data_dependencies={"thingy": t6.task_id},
provide_context=True,
dag=dag)

tt = EnDivePythonOperator(
task_id="write_a_file",
python_method="run",
python_object=TaskRunnerExample,
provide_context=True,
owner="airflow",
dag=dag
)

t1.set_downstream(t2)
t2.set_downstream(t3)
t2.set_downstream(t4)
t3.set_downstream(t5)
t4.set_downstream(t5)
t5.set_downstream(t6)
t6.set_downstream(t7)
t7.set_downstream(tt)
38 changes: 28 additions & 10 deletions Dockerfile → docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,22 @@ ENV AIRFLOW_HOME ${AIRFLOW_HOME}
ENV AIRFLOW_USER airflow

USER root
RUN curl -fsSL https://get.docker.com/ | sh
RUN pip install docker-py
RUN apt-get install sudo

RUN apt-get clean \
RUN curl -fsSL https://get.docker.com/ | sh \
&& pip install docker-py \
&& apt-get install sudo \
&& buildDeps=' \
python-dev \
libkrb5-dev \
libsasl2-dev \
libssl-dev \
libffi-dev \
build-essential \
libblas-dev \
liblapack-dev \
libpq-dev \
git \
' && apt-get remove --purge -yqq $buildDeps \
&& apt-get clean \
&& rm -rf \
/var/lib/apt/lists/* \
/tmp/* \
Expand All @@ -37,19 +48,26 @@ RUN usermod -aG docker airflow
# unfortunately this is required to update the container docker gid to match the
# host's gid, we remove this permission from entrypoint-dood.sh script
RUN echo "airflow ALL=NOPASSWD: ALL" >> /etc/sudoers
WORKDIR ${AIRFLOW_HOME}/.docker

# this is to enable aws ecr credentials helpers to reauthorize docker
RUN echo '{\n "credsStore": "ecr-login"\n}' > config.json
RUN mkdir -p ${AIRFLOW_HOME}/.docker/ \
&& echo '{\n "credsStore": "ecr-login"\n}' > \
${AIRFLOW_HOME}/.docker/config.json
RUN chown airflow ${AIRFLOW_HOME}/.docker/config.json

# copy the built docker credentials module to this container
COPY --from=aws_ecr_credential_helper \
/go/src/github.com/awslabs/amazon-ecr-credential-helper/bin/local/docker-credential-ecr-login \
/usr/local/bin

COPY deploy/scripts/entrypoint-dood.sh /entrypoint-dood.sh
COPY deploy/config/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg
COPY docker/scripts/add-user-docker.sh /add-user-docker.sh
COPY docker/scripts/entrypoint-dood.sh /entrypoint-dood.sh
COPY docker/config/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg
RUN chown airflow ${AIRFLOW_HOME}/airflow.cfg
COPY plugins/ ${AIRFLOW_HOME}/plugins/
COPY dags/ ${AIRFLOW_HOME}/dags/
RUN chown airflow ${AIRFLOW_HOME}/dags

RUN chown -R airflow: ${AIRFLOW_HOME}

USER airflow
WORKDIR ${AIRFLOW_HOME}
Expand Down
8 changes: 8 additions & 0 deletions docker/Dockerfile.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ARG image_name
FROM $image_name
ARG IMAGE_NAME
USER root
COPY docker/scripts/entrypoint-test.sh /entrypoint-test.sh
RUN pip install pytest pytest-watch pytest-env flake8
USER airflow
ENTRYPOINT ["/entrypoint-test.sh"]
File renamed without changes.
Loading

0 comments on commit 92353cf

Please sign in to comment.