Skip to content

Commit

Permalink
Update to Airflow 2.0.0 (#52)
Browse files Browse the repository at this point in the history
* Use Airflow 1.10.14 as a bridge before switching to Airflow 2.0

* Update dependencies to fit Airflow 2.0.0

* Udpate cwl-airflow init to upgrade to Airflow 2.0.0

* Update logging template to correspond to Airflow 2.0.0

* Fix bug in triggering dags from API, updated readme

* Remove deprecated add_connections function

* Set lower number of scheduler parsing processes in travis tests

* Add DAG to resend all missed progress reports and results

* Print error category in cwl-airflow test

* Add docker pull limit error category

* No need in --conn-extra when adding new connection
  • Loading branch information
michael-kotliar authored Dec 31, 2020
1 parent a2bdc32 commit 945fc76
Show file tree
Hide file tree
Showing 21 changed files with 1,093 additions and 666 deletions.
21 changes: 14 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
before_script:
- cwl-airflow init --upgrade
- rm -f ~/airflow/dags/bam-bedgraph-bigwig-single.cwl
script: airflow list_dags --report # to check if all DAGs are correct
script: airflow dags list # to check if all DAGs are correct
- name: Test packaging for Ubuntu 18.04, Python 3.6
install:
- ./packaging/portable/ubuntu/pack.sh 18.04 3.6 $TRAVIS_BRANCH
Expand All @@ -89,37 +89,44 @@ jobs:
before_script:
- ./python3/bin_portable/airflow --help # to generate airflow.cfg
- sed -i'.backup' -e 's/^executor.*/executor = LocalExecutor/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^parsing_processes.*/parsing_processes = 1/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^sql_alchemy_pool_enabled.*/sql_alchemy_pool_enabled = False/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^dag_dir_list_interval =.*/dag_dir_list_interval = 60/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^parallelism =.*/parallelism = 1/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^sql_alchemy_conn.*/sql_alchemy_conn = mysql:\/\/airflow:[email protected]:6603\/airflow/g' ~/airflow/airflow.cfg
- ./python3/bin_portable/cwl-airflow init # to init database
- ./python3/bin_portable/airflow connections --add --conn_id process_report --conn_type http --conn_host localhost --conn_port 3070 --conn_extra "{\"endpoint\":\"/airflow/\"}" # to add process_report connection
- ./python3/bin_portable/cwl-airflow init
- ./python3/bin_portable/airflow connections add process_report --conn-type http --conn-host localhost --conn-port 3070 # to add process_report connection
- ./python3/bin_portable/airflow scheduler > /dev/null 2>&1 &
- ./python3/bin_portable/cwl-airflow api > /dev/null 2>&1 &
- sleep 5 # to let scheduler to parse all dags, otherwise we can't run the following command
- ./python3/bin_portable/airflow dags unpause resend_results
script: ./python3/bin_portable/cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1

before_install:
- git clone https://github.com/datirium/workflows.git --recursive
- docker pull mysql/mysql-server:5.7
- docker run -v ~/database:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=airflow -e MYSQL_DATABASE=airflow -e MYSQL_USER=airflow -e MYSQL_PASSWORD=airflow -p 6603:3306 -d mysql/mysql-server:5.7 --explicit-defaults-for-timestamp=1
install:
- pip install ".[mysql,crypto]" --constraint ./packaging/constraints/constraints-$TRAVIS_PYTHON_VERSION.txt
- pip install ".[mysql]" --constraint ./packaging/constraints/constraints-$TRAVIS_PYTHON_VERSION.txt
before_script:
- airflow --help # to generate airflow.cfg
- sed -i'.backup' -e 's/^executor.*/executor = LocalExecutor/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^parsing_processes.*/parsing_processes = 1/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^sql_alchemy_pool_enabled.*/sql_alchemy_pool_enabled = False/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^dag_dir_list_interval =.*/dag_dir_list_interval = 60/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^parallelism =.*/parallelism = 1/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^sql_alchemy_conn.*/sql_alchemy_conn = mysql:\/\/airflow:[email protected]:6603\/airflow/g' ~/airflow/airflow.cfg
- cwl-airflow init # to init database
- airflow connections --add --conn_id process_report --conn_type http --conn_host localhost --conn_port 3070 --conn_extra "{\"endpoint\":\"/airflow/\"}" # to add process_report connection
- cwl-airflow init
- airflow connections add process_report --conn-type http --conn-host localhost --conn-port 3070 # to add process_report connection
- airflow scheduler > /dev/null 2>&1 &
- cwl-airflow api > /dev/null 2>&1 &
- sleep 5 # to let scheduler to parse all dags, otherwise we can't run the following command
- airflow dags unpause resend_results
script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range $NTEST

branches:
only:
- master
- /^*_devel$/
- /^([1-9]\d*!)?(0|[1-9]\d*)(\.(0|[1-9]\d*))*((a|b|rc)(0|[1-9]\d*))?(\.post(0|[1-9]\d*))?(\.dev(0|[1-9]\d*))?$/

notifications:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# **CWL-Airflow**

Python package to extend **[Apache-Airflow 1.10.12](https://airflow.apache.org)**
Python package to extend **[Apache-Airflow 2.0.0](https://airflow.apache.org)**
functionality with **[CWL v1.1](https://www.commonwl.org/v1.1/)** support

## **Cite as**
Expand Down
3 changes: 2 additions & 1 deletion cwl_airflow/components/api/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from airflow.utils.state import State
from airflow.utils.timezone import parse as parsedate
from airflow.utils.db import provide_session
from airflow.utils.types import DagRunType

from cwl_airflow.utilities.helpers import (
get_version,
Expand Down Expand Up @@ -200,7 +201,7 @@ def create_dag_run(self, dag_id, run_id, conf, session):
raise ValueError(f"dag_run {run_id} for dag_id {dag_id} already exists")
else:
run_conf = conf if isinstance(conf, dict) else json.loads(conf)
dag_run = DagRun(dag_id=dag_id, run_id=run_id, conf=run_conf)
dag_run = DagRun(dag_id=dag_id, run_id=run_id, conf=run_conf, run_type=DagRunType.MANUAL)
session.add(dag_run)
session.commit()
return dag_run
Expand Down
159 changes: 98 additions & 61 deletions cwl_airflow/components/init/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,54 @@
)

with CleanAirflowImport():
from airflow import models
from airflow.configuration import conf
from airflow.utils.db import merge_conn
from airflow.exceptions import AirflowConfigException
from airflow.utils.dag_processing import list_py_file_paths
from cwl_airflow.utilities.cwl import overwrite_deprecated_dag


def run_init_config(args):
"""
Runs sequence of steps required to configure CWL-Airflow
for the first time. Safe to run several times
for the first time. Safe to run several times. Upgrades
config to correspond to Airflow 2.0.0
"""

create_airflow_config(args) # will create default airflow.cfg if it wasn't present
patch_airflow_config(args)
init_airflow_db(args)
patch_airflow_config(args.config)
# add_connections(args)

if args.upgrade:
upgrade_dags(args.config)
copy_dags(args.home)
upgrade_dags(args)
copy_dags(args)


def create_airflow_config(args):
"""
Runs airflow --help command with AIRFLOW_HOME and AIRFLOW_CONFIG
environment variables just to create airflow.cfg file
"""

custom_env = os.environ.copy()
custom_env["AIRFLOW_HOME"] = args.home
custom_env["AIRFLOW_CONFIG"] = args.config
try:
run(
["airflow", "--help"],
env=custom_env,
check=True,
stdout=DEVNULL,
stderr=DEVNULL
)
except (FileNotFoundError, CalledProcessError) as err:
logging.error(f"""Failed to find or to run airflow executable'. Exiting.\n{err}""")
sys.exit(1)


def init_airflow_db(args):
"""
Sets AIRFLOW_HOME and AIRFLOW_CONFIG from args.
Call airflow initdb from subprocess to make sure
Call airflow db init from subprocess to make sure
that the only two things we should care about
are AIRFLOW_HOME and AIRFLOW_CONFIG
"""
Expand All @@ -47,38 +70,85 @@ def init_airflow_db(args):
custom_env["AIRFLOW_CONFIG"] = args.config
try:
run(
["airflow", "initdb"], # TODO: check what's the difference initdb from updatedb
["airflow", "db", "init"], # `db init` always runs `db upgrade` internally, so it's ok to run only `db init`
env=custom_env,
check=True,
stdout=DEVNULL,
stderr=DEVNULL
)
except (CalledProcessError, FileNotFoundError) as err:
logging.error(f"""Failed to run 'airflow initdb'. Exiting.\n{err}""")
except (FileNotFoundError) as err:
logging.error(f"""Failed to find airflow executable'. Exiting.\n{err}""")
sys.exit(1)
except (CalledProcessError) as err:
logging.error(f"""Failed to run 'airflow db init'. Delete airflow.db if SQLite was used. Exiting.\n{err}""")
sys.exit(1)


def patch_airflow_config(airflow_config):
def patch_airflow_config(args):
"""
Updates provided Airflow configuration file to include defaults for cwl-airflow.
If something went wrong, restores the original airflow.cfg from the backed up copy
Updates current Airflow configuration file to include defaults for cwl-airflow.
If something went wrong, restores the original airflow.cfg from the backed up copy.
If update to Airflow 2.0.0 is required, generates new airflow.cfg with some of the
important parameters copied from the old airflow.cfg. Backed up copy is not deleted in
this case.
"""

# TODO: add cwl section with the following parameters:
# - singularity
# - use_container

# CWL-Airflow specific settings
patches = [
["sed", "-i", "-e", "s/^dags_are_paused_at_creation.*/dags_are_paused_at_creation = False/g", airflow_config],
["sed", "-i", "-e", "s/^load_examples.*/load_examples = False/g", airflow_config],
["sed", "-i", "-e", "s/^logging_config_class.*/logging_config_class = cwl_airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG/g", airflow_config],
["sed", "-i", "-e", "s/^hide_paused_dags_by_default.*/hide_paused_dags_by_default = True/g", airflow_config]
["sed", "-i", "-e", "s#^dags_are_paused_at_creation.*#dags_are_paused_at_creation = False#g", args.config],
["sed", "-i", "-e", "s#^load_examples.*#load_examples = False#g", args.config],
["sed", "-i", "-e", "s#^load_default_connections.*#load_default_connections = False#g", args.config],
["sed", "-i", "-e", "s#^logging_config_class.*#logging_config_class = cwl_airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG#g", args.config],
["sed", "-i", "-e", "s#^hide_paused_dags_by_default.*#hide_paused_dags_by_default = True#g", args.config]
]

airflow_config_backup = airflow_config + "_backup_" + str(uuid.uuid4())
# Minimum amount of setting that should be enough for starting
# SequentialExecutor, LocalExecutor or CeleryExecutor with
# the same dags and metadata database after updating to Airflow 2.0.0.
# All other user specific settings should be manually updated from the
# backuped airflow.cfg as a lot of them have been refactored.
transferable_settings = [
("core", "dags_folder"),
("core", "default_timezone"),
("core", "executor"),
("core", "sql_alchemy_conn"),
("core", "sql_engine_encoding"), # just in case
("core", "fernet_key"), # to be able to read from the old database
("celery", "broker_url"),
("celery", "result_backend")
]

# create a temporary backup of airflow.cfg to restore from if we failed to apply patches
# this backup will be deleted after all patches applied if it wasn't created right before
# Airflow version update to 2.0.0
airflow_config_backup = args.config + "_backup_" + str(uuid.uuid4())
try:
shutil.copyfile(airflow_config, airflow_config_backup)
# reading aiflow.cfg before applying any patches and creating backup
conf.read(args.config)
shutil.copyfile(args.config, airflow_config_backup)

# check if we need to make airflow.cfg correspond to the Airflow 2.0.0
# we search for [logging] section as it's present only Airflow >= 2.0.0
airflow_version_update = not conf.has_section("logging")
if airflow_version_update:
logging.info("Airflow config will be upgraded to correspond to Airflow 2.0.0")
for section, key in transferable_settings:
try:
patches.append(
["sed", "-i", "-e", f"s#^{key}.*#{key} = {conf.get(section, key)}#g", args.config]
)
except AirflowConfigException: # just skip missing in the config section/key
pass
os.remove(args.config) # remove old config
create_airflow_config(args) # create new airflow.cfg with the default values

# Apply all patches
for patch in patches:
logging.debug(f"Applying patch {patch}")
run(
patch,
shell=False, # for proper handling of filenames with spaces
Expand All @@ -89,17 +159,17 @@ def patch_airflow_config(airflow_config):
except (CalledProcessError, FileNotFoundError) as err:
logging.error(f"""Failed to patch Airflow configuration file. Restoring from the backup and exiting.\n{err}""")
if os.path.isfile(airflow_config_backup):
shutil.copyfile(airflow_config_backup, airflow_config)
shutil.copyfile(airflow_config_backup, args.config)
sys.exit(1)
finally:
if os.path.isfile(airflow_config_backup):
if os.path.isfile(airflow_config_backup) and not airflow_version_update:
os.remove(airflow_config_backup)


def upgrade_dags(airflow_config):
def upgrade_dags(args):
"""
Corrects old style DAG python files into the new format.
Reads configuration from "airflow_config". Uses standard
Reads configuration from "args.config". Uses standard
"conf.get" instead of "conf_get", because the fields we
use are always set. Copies all deprecated dags into the
"deprecated_dags" folder, adds deprecated DAGs to the
Expand All @@ -109,7 +179,7 @@ def upgrade_dags(airflow_config):
files remain unchanged.
"""

conf.read(airflow_config)
conf.read(args.config) # this will read already patched airflow.cfg
dags_folder = conf.get("core", "dags_folder")
for dag_location in list_py_file_paths( # will skip all DAGs from ".airflowignore"
directory=dags_folder,
Expand All @@ -125,10 +195,10 @@ def upgrade_dags(airflow_config):
)


def copy_dags(airflow_home, source_folder=None):
def copy_dags(args, source_folder=None):
"""
Copies *.py files (dags) from source_folder (default ../../extensions/dags)
to dags_folder, which is always {airflow_home}/dags. Overwrites existent
to dags_folder, which is always {args.home}/dags. Overwrites existent
files
"""

Expand All @@ -142,42 +212,9 @@ def copy_dags(airflow_home, source_folder=None):
"extensions/dags",
)

target_folder = get_dir(os.path.join(airflow_home, "dags"))
target_folder = get_dir(os.path.join(args.home, "dags"))
for root, dirs, files in os.walk(source_folder):
for filename in files:
if re.match(".*\\.py$", filename) and filename != "__init__.py":
# if not os.path.isfile(os.path.join(target_folder, filename)):
shutil.copy(os.path.join(root, filename), target_folder)


# not used anymore
def add_connections(args):
"""
Sets AIRFLOW_HOME and AIRFLOW_CONFIG from args.
Call 'airflow connections --add' from subproces to make sure that
the only two things we should care about are AIRFLOW_HOME and
AIRFLOW_CONFIG. Adds "process_report" connections to the Airflow DB
that is used to report workflow execution progress and results.
"""

custom_env = os.environ.copy()
custom_env["AIRFLOW_HOME"] = args.home
custom_env["AIRFLOW_CONFIG"] = args.config
try:
run(
[
"airflow", "connections", "--add",
"--conn_id", "process_report",
"--conn_type", "http",
"--conn_host", "localhost",
"--conn_port", "3070",
"--conn_extra", "{\"endpoint\":\"/airflow/\"}"
],
env=custom_env,
check=True,
stdout=DEVNULL,
stderr=DEVNULL
)
except (CalledProcessError, FileNotFoundError) as err:
logging.error(f"""Failed to run 'airflow connections --add'. Exiting.\n{err}""")
sys.exit(1)
8 changes: 4 additions & 4 deletions cwl_airflow/components/test/conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ def do_POST(self):
int(self.headers["Content-Length"])
).decode("UTF-8")
)["payload"]
if "results" in payload or payload.get("state", None) == "failed": # "results" can be {}, so we should check only if key is present, but not value
self.server.results_queue.put({
"run_id": payload["run_id"],
if "results" in payload or payload["error"] != "": # "results" can be {}, so we should check only if key is present, but not value
self.server.results_queue.put({ # we read "error" without get, because if we got to this line and "results" not in payload,
"run_id": payload["run_id"], # it will definately has "error"
"dag_id": payload["dag_id"],
"results": payload.get("results", None)
"results": payload.get("results", payload.get("error", None)) # here need to use get for "error", because it is calculated even if "results" is present
})


Expand Down
Loading

0 comments on commit 945fc76

Please sign in to comment.