Skip to content

Commit

Permalink
Create MultiTriggerDag Operator (#6)
Browse files Browse the repository at this point in the history
* Added patch module helper to allow patching of loaded modules (i.e. via import.load_source)
* Adding multi trigger dag example.
* Reorganized example dags.
* Use pylama for testing.
  • Loading branch information
wongwill86 authored Aug 24, 2017
1 parent c7e9af6 commit 160f014
Show file tree
Hide file tree
Showing 18 changed files with 398 additions and 62 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
IMAGE_NAME=wongwill86/air-tasks:latest
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ DooD support and AWS ECR Credential Helper

NOTES:
Chunkflow: make sure AWS_ACCESS_KEY_ID, etc... are set in environment variables!
export PYTHONDONTWRITEBYTECODE=1
docker-compose -f docker/docker-compose.test.yml -p ci build
docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut ptw
docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut ptw -- --pylama

export

When deploying docker/docker-compose-CeleryExecutor.yml remember to deploy secrets!
( or put in blank for no web auth )
Expand Down
File renamed without changes.
12 changes: 5 additions & 7 deletions dags/many.py → dags/examples/interleaved.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator


default_args = {
Expand All @@ -13,7 +12,8 @@
'retry_delay': timedelta(seconds=2),
'retry_exponential_backoff': True,
}
dag = DAG("many_ws", default_args=default_args, schedule_interval=None)
dag = DAG(
"example_interleaved", default_args=default_args, schedule_interval=None)


def create_print_date(dag, count_print_date):
Expand All @@ -31,11 +31,9 @@ def create_print_hello(dag, count_print_hello):


def create_docker_print(dag, count_docker_print):
return DockerOperator(
task_id='watershed_print_' + str(count_docker_print),
image='watershed',
command='echo "watershed printing!"',
network_mode='bridge',
return BashOperator(
task_id='bash_print_' + str(count_docker_print),
bash_command='echo "watershed printing!"',
dag=dag)


Expand Down
54 changes: 54 additions & 0 deletions dags/examples/multi_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.custom_plugin import MultiTriggerDagRunOperator
from airflow.operators.bash_operator import BashOperator

SCHEDULE_DAG_ID = 'example_multi_trigger_scheduler'
TARGET_DAG_ID = 'example_multi_trigger_target'

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 5, 1),
'cactchup_by_default': False,
'retries': 1,
'retry_delay': timedelta(seconds=2),
'retry_exponential_backoff': True,
}

# ####################### SCHEDULER #################################
scheduler_dag = DAG(
dag_id=SCHEDULE_DAG_ID,
default_args=default_args,
schedule_interval=None
)


def param_generator():
iterable = xrange(0, 100)
for i in iterable:
yield i


operator = MultiTriggerDagRunOperator(
task_id='trigger_%s' % TARGET_DAG_ID,
trigger_dag_id=TARGET_DAG_ID,
params_list=param_generator(),
default_args=default_args,
dag=scheduler_dag)

# ####################### TARGET DAG #################################

target_dag = DAG(
dag_id=TARGET_DAG_ID,
default_args=default_args,
schedule_interval=None
)

start = BashOperator(
task_id='bash_task',
bash_command='sleep 1; echo "Hello from message #' +
'{{ dag_run.conf if dag_run else "NO MESSAGE" }}"',
default_args=default_args,
dag=target_dag
)
49 changes: 0 additions & 49 deletions dags/simple.py

This file was deleted.

2 changes: 1 addition & 1 deletion docker/Dockerfile.test
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ FROM $image_name
ARG IMAGE_NAME
USER root
COPY docker/scripts/entrypoint-test.sh /entrypoint-test.sh
RUN pip install pytest pytest-watch pytest-env flake8
RUN pip install pytest pytest-watch pytest-env pylama mock
USER airflow
ENTRYPOINT ["/entrypoint-test.sh"]
2 changes: 1 addition & 1 deletion docker/config/airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ max_active_runs_per_dag = 16
# Whether to load the examples that ship with Airflow. It's good to
# get started, but you probably want to set this to False in a production
# environment
load_examples = True
load_examples = False

# Where your Airflow plugins are stored
plugins_folder = /usr/local/airflow/plugins
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ services:
- AWS_SECRET_ACCESS_KEY
- AWS_DEFAULT_REGION
command:
- pytest && flake8 .
- pytest --pylama
77 changes: 77 additions & 0 deletions plugins/custom/multi_trigger_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from airflow.plugins_manager import AirflowPlugin
from datetime import datetime
import logging
import types
import collections

from airflow.models import BaseOperator
from airflow.models import DagBag
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow import settings


class MultiTriggerDagRunOperator(BaseOperator):
"""
Triggers multiple DAG runs for a specified ``dag_id``.
Draws inspiration from:
airflow.operators.dagrun_operator.TriggerDagRunOperator
:param trigger_dag_id: the dag_id to trigger
:type trigger_dag_id: str
:param params_list: list of dicts for DAG level parameters that are made
acesssible in templates
namespaced under params for each dag run.
:type params: Iterable<dict> or types.GeneratorType
"""

@apply_defaults
def __init__(
self,
trigger_dag_id,
params_list,
*args, **kwargs):
super(MultiTriggerDagRunOperator, self).__init__(*args, **kwargs)
self.trigger_dag_id = trigger_dag_id
self.params_list = params_list
if hasattr(self.params_list, '__len__'):
assert len(self.params_list) > 0
else:
assert (isinstance(params_list, collections.Iterable) or
isinstance(params_list, types.GeneratorType))

def execute(self, context):
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)

assert trigger_dag is not None

trigger_id = 0
# for trigger_id in range(0, len(self.params_list)):
for params in self.params_list:
dr = trigger_dag.create_dagrun(run_id='trig_%s_%d_%s' %
(self.trigger_dag_id, trigger_id,
datetime.now().isoformat()),
state=State.RUNNING,
conf=params,
external_trigger=True)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
trigger_id = trigger_id + 1
if trigger_id % 10:
session.commit()
session.commit()
session.close()


class CustomPlugin(AirflowPlugin):
name = "custom_plugin"
operators = [MultiTriggerDagRunOperator]
hooks = []
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
apache-airflow==1.8.1
docker-py
docker
Empty file added tests/__init__.py
Empty file.
Empty file added tests/plugins/__init__.py
Empty file.
Empty file.
Empty file.
Loading

0 comments on commit 160f014

Please sign in to comment.