Skip to content

Commit

Permalink
Overhaul Docker Image (#16)
Browse files Browse the repository at this point in the history
- Overhauled build process
- New in-repo docker base image.
- Allows custom airflow configuration
- currently 1.9 + custom performance patch
- remove unnecessary custom celery loader
- Update fernet key
- Fix some sp
- Update README
- Upgrade to python 3
- Reorganize repo to reflect what's in the docker image
- Remove chunkflow dag
  • Loading branch information
wongwill86 committed Feb 24, 2018
1 parent 66f42b2 commit bf30a01
Show file tree
Hide file tree
Showing 47 changed files with 779 additions and 718 deletions.
16 changes: 16 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# prevent from adding these files/folders into the docker image
.cache
.git
.ropeproject
cloud
secrets
tests
docker
.env
.*
# we need these 3 git folders to be included for version tagging
!.git/refs/heads/*
.git/refs/heads/*/*
!.git/refs/tags/*
.git/refs/tags/*/*
!.git/HEAD
308 changes: 181 additions & 127 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions airflow.cfg
90 changes: 81 additions & 9 deletions docker/config/airflow.cfg → config/airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,27 @@ dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
s3_log_folder =

# Logging level
logging_level = INFO

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =

# Log format
log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = utc

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
Expand All @@ -40,6 +52,10 @@ sql_alchemy_pool_size = 300
# not apply to sqlite.
sql_alchemy_pool_recycle = 3600

# How many seconds to retry re-establishing a DB connection after
# disconnects. Setting this to 0 disables retries.
sql_alchemy_reconnect_timeout = 300

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
Expand Down Expand Up @@ -85,10 +101,26 @@ default_impersonation =
# What security module to use (for example kerberos):
security =

# If set to False enables some unsecure features like Charts and Ad Hoc Queries.
# In 2.0 will default to True.
secure_mode = False

# Turn unit test mode on (overwrites many configuration options with test
# values at runtime)
unit_test_mode = False

# Name of handler to read task instance logs.
# Default to use file task handler.
task_log_reader = file.task

# Whether to enable pickling for xcom (note that this is insecure and allows for
# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False).
enable_xcom_pickling = True

# When a task is killed forcefully, this is the amount of time in seconds that
# it has to cleanup after it is sent a SIGTERM, before it is SIGKILLED
killed_task_cleanup_time = 60

[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
Expand All @@ -109,6 +141,7 @@ default_ram = 512
default_disk = 512
default_gpus = 0


[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
Expand Down Expand Up @@ -152,7 +185,7 @@ access_logfile = -
error_logfile = -

# Expose the configuration file in the web server
expose_config = True
expose_config = False

# Set to true to turn on authentication:
# http://pythonhosted.org/airflow/security.html#web-authentication
Expand All @@ -168,6 +201,10 @@ filter_by_owner = False
# in order to user the ldapgroup mode.
owner_mode = user

# Default DAG view. Valid values are:
# tree, graph, duration, gantt, landing_times
dag_default_view = tree

# Default DAG orientation. Valid values are:
# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
dag_orientation = LR
Expand All @@ -184,9 +221,13 @@ log_fetch_timeout_sec = 5
# DAGs by default
hide_paused_dags_by_default = False

# Consistent page size across all listing views in the UI
page_size = 100

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
Expand Down Expand Up @@ -224,7 +265,7 @@ worker_log_server_port = 8793
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
#broker_url = redis://redis:6379/1
broker_url = pyamqp://rabbitmq/
broker_url = amqp://rabbitmq:5672

# Another key Celery setting
celery_result_backend = db+postgresql://airflow:airflow@postgres/airflow
Expand All @@ -239,6 +280,16 @@ flower_port = 5555
# Default queue that tasks get assigned to and that worker listen on.
default_queue = worker

# Import path for celery configuration options
celery_config_options = celeryconfig.CELERY_CONFIG

[dask]
# This section only applies if you are using the DaskExecutor in
# [core] section above

# The IP address and port of the Dask cluster's scheduler.
cluster_address = 127.0.0.1:8786

[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
Expand Down Expand Up @@ -276,6 +327,10 @@ scheduler_zombie_task_threshold = 300
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = False
# This changes the batch size of queries in the scheduling main loop.
# This depends on query length limits and how long you are willing to hold locks.
# 0 for no limit
max_tis_per_query = 500

# Statsd (https://github.com/etsy/statsd) integration settings
statsd_on = False
Expand All @@ -286,10 +341,24 @@ statsd_prefix = airflow
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run. However airflow will never
# use more threads than the amount of cpu cores available.
max_threads = 2
max_threads = 8

authenticate = False

[ldap]
# set this to ldaps://<your.ldap.server>:<port>
uri =
user_filter = objectClass=*
user_name_attr = uid
group_member_attr = memberOf
superuser_filter =
data_profiler_filter =
bind_user = cn=Manager,dc=example,dc=com
bind_password = insecure
basedn = dc=example,dc=com
cacert = /etc/ca/ldap_ca.crt
search_scope = LEVEL

[mesos]
# Mesos master address which MesosExecutor will connect to.
master = localhost:5050
Expand Down Expand Up @@ -327,6 +396,7 @@ authenticate = False
# default_principal = admin
# default_secret = admin


[kerberos]
ccache = /tmp/airflow_krb5_ccache
# gets augmented with fqdn
Expand All @@ -335,9 +405,11 @@ reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab


[github_enterprise]
api_rev = v3


[admin]
# UI to hide sensitive variable fields when set to True
hide_sensitive_variable_fields = True
19 changes: 19 additions & 0 deletions config/celeryconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from airflow import configuration

# Broker settings.
CELERY_CONFIG = {
'accept_content': ['json', 'pickle'],
'event_serializer': 'json',
'result_serializer': 'pickle',
'task_serializer': 'pickle',
'worker_prefetch_multiplier': 1,
'task_acks_late': True,
'task_reject_on_worker_lost': True,
'broker_url': configuration.get('celery', 'broker_url'),
'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'),
'worker_concurrency':
configuration.getint('celery', 'CELERYD_CONCURRENCY'),
'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
'worker_send_task_events': True
}
42 changes: 0 additions & 42 deletions dags/chunkflow/noop.py

This file was deleted.

11 changes: 0 additions & 11 deletions dags/chunkflow/tasks/noop_tasks.txt

This file was deleted.

2 changes: 1 addition & 1 deletion dags/examples/docker_runtime_nvidia.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 5, 1),
'cactchup_by_default': False,
'catchup_by_default': False,
'retries': 1,
'retry_delay': timedelta(seconds=2),
'retry_exponential_backoff': True,
Expand Down
2 changes: 1 addition & 1 deletion dags/examples/docker_with_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 5, 1),
'cactchup_by_default': False,
'catchup_by_default': False,
'retries': 1,
'retry_delay': timedelta(seconds=2),
'retry_exponential_backoff': True,
Expand Down
2 changes: 1 addition & 1 deletion dags/examples/interleaved.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 5, 1),
'cactchup_by_default': False,
'catchup_by_default': False,
'retries': 1,
'retry_delay': timedelta(seconds=2),
'retry_exponential_backoff': True,
Expand Down
8 changes: 4 additions & 4 deletions dags/examples/multi_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 5, 1),
'cactchup_by_default': False,
'catchup_by_default': False,
'retries': 1,
'retry_delay': timedelta(seconds=2),
'retry_exponential_backoff': True,
Expand All @@ -24,16 +24,16 @@
)


def param_generator():
iterable = xrange(0, 100)
def get_generator():
iterable = range(0, 100)
for i in iterable:
yield i


operator = MultiTriggerDagRunOperator(
task_id='trigger_%s' % TARGET_DAG_ID,
trigger_dag_id=TARGET_DAG_ID,
params_list=param_generator(),
params_list=get_generator,
default_args=default_args,
dag=scheduler_dag)

Expand Down
8 changes: 5 additions & 3 deletions dags/manager/scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from airflow import models
import requests
import json
import logging
logger = logging.root.getChild(__name__)

DAG_ID = 'z_manager_cluster_scaler'

Expand Down Expand Up @@ -52,7 +54,7 @@
{% set queue_sizes = task_instance.xcom_pull(task_ids=params.task_id) %}
{%
set docker_compose_command='docker-compose -f ' +
conf.get('core', 'airflow_home') + '/docker/docker-compose-CeleryExecutor.yml' +
conf.get('core', 'airflow_home') + '/deploy/docker-compose-CeleryExecutor.yml' +
' up -d --no-recreate --no-deps --no-build --no-color'
%}
{%
Expand Down Expand Up @@ -122,8 +124,8 @@ def get_queue_sizes():
stats = json.loads(response.text)
size = stats['messages_ready'] + stats['messages_unacknowledged']
queue_sizes[queue_name] = size
except Exception as e:
print('No tasks found for %s because %s' % (queue_name, e.message))
except Exception:
logger.exception('No tasks found for %s', queue_name)
queue_sizes[queue_name] = 0

return queue_sizes
Expand Down
Loading

0 comments on commit bf30a01

Please sign in to comment.