Skip to content

Latest commit

 

History

History
executable file
·
145 lines (102 loc) · 5.22 KB

README.md

File metadata and controls

executable file
·
145 lines (102 loc) · 5.22 KB

Airflow

Contents

Description

Airflow project structure

dags /                                  contains DAGs
dags / configs / transformation /       contains JSON files with data transform configuration 
                                          (renaming columns, dtypes for dataframes, etc.) used in DAGs
docker /                                contains Dockerfile and requirements.txt
variables_and_connections /             contains JSON files with variables and connections to import on airflow-init step

DAGs structure

Three DAGs are presented in this project:

  • crime_pipeline_dag:

    img.png img.png

  • datasets_to_gcs_dag:

    img.png img.png

  • gcs_to_bigquery_dag:

    img.png img.png

Local setup and run

Prerequisites

Warning

All requests should include an app token that identifies your application, and each application should have its own unique app token. A limited number of requests can be made without an app token, but they are subject to much lower throttling limits than request that do include one. With an app token, your application is guaranteed access to it's own pool of requests. If you don't have an app token yet, click the button to the right to sign up for one.

Create App Token following the instructions.

Warning

Before triggering DAGs your should create GCP infrastructure using terraform and due to randomly generated bucket name set its value in variables manually following this instructions.

Warning

You have to put credentials.json into credentials folder in project directory.

Create Project and Service Account following the instructions.

Note

Due to Tables should be partitioned and clustered in a way that makes sense for the upstream queries You can see cluster and partition field in gcs_to_bigquery.py:

cluster_fields=[
    'year',
    'community_area',
    'location',
    'iucr',
] if dataset_name == 'crime' else None,
time_partitioning={
    'field': 'date',
    'type': 'YEAR',
} if dataset_name == 'crime' else None

Clustering and partitioning makes sense only for crime table as it has millions of rows. The order of the clustering_fields is due to the specifics of the aggregated models (ordering and filtering data in reports based on dbt models).

  1. Run in terminal in project root directory:

./create_airflow_env_file.sh && docker-compose -f airflow/docker-compose.yaml up --build

  1. Open browser and go to localhost:8080 and use airflow as username and password:

    img.png

  2. Now you see dags:

    img.png

  3. Unpause dags:

    img.png

  4. Due to schedule setting @weekly for crime_pipeline_dag you can trigger it manually:

    img.png

  5. Watch DAGs running:

    img.png img.png

  6. After completion see the results and logs:

    img.png img.png img.png img.png img.png

Troubleshooting

Error on downloading dataset from source:

Sometimes the error occurs when download_dataset task is running: img.png

This can happen for several reasons:

  1. Wrong application token
  2. Request limit too high

In such cases, you can reduce the limit, set the correct application token by replacing the appropriate values in the "datasets_to_gcs" variable. If the task didn't crash immediately with an error, but after downloading a certain amount of data, you can use the logs to see what the last offset value was, set this value in the "datasets_to_gcs" variable for the desired dataset, and restart the task, clearing its state.

If you want to change the limit and offset values at the same time, you have to calculate appropriate values by yourself. ¯_(ツ)_/¯

By default, the offset value is 0, and the limit is 100000. Usually, you just need to change the offset value and restart the task.

img.png