Skip to content

Commit

Permalink
Basic dbt integration guide (#24014)
Browse files Browse the repository at this point in the history
## Summary & Motivation

A basic dbt integration guide that walks users through:

- Loading a dbt project into Dagster
- Setting upstream asset dependencies
- Setting downstream asset dependencies

Note that the `manifest.json` file is being committed so that tests pass.

## How I Tested These Changes

## Changelog [New | Bug | Docs]

NOCHANGELOG
  • Loading branch information
nicklausroach authored Aug 30, 2024
1 parent b00abe6 commit 454f966
Show file tree
Hide file tree
Showing 18 changed files with 451 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,4 +1,86 @@
---
title: Transforming data with dbt
sidebar_position: 20
---
last_update:
date: 2024-08-26
author: Nick Roach
---

Dagster orchestrates dbt alongside other technologies, so you can schedule dbt with Spark, Python, etc. in a single data pipeline. Dagster's asset-oriented approach allows Dagster to understand dbt at the level of individual dbt models.

## What you'll learn

- How to import a basic dbt project into Dagster
- How to set upstream and downstream dependencies on non-dbt assets
- How to schedule your dbt assets

<details>
<summary>Prerequisites</summary>

To follow the steps in this guide, you'll need:

- A basic understanding of dbt, DuckDB, and Dagster concepts such as [assets](/todo) and [resources](/todo)
- To install the [dbt](https://docs.getdbt.com/docs/core/installation-overview) and [DuckDB CLIs](https://duckdb.org/docs/api/cli/overview.html)
- To install the following packages:

```shell
pip install dagster duckdb plotly dagster-dbt dbt-duckdb
```
</details>

## Setting up a basic dbt project

Start by downloading this basic dbt project, which includes a few models and a DuckDB backend:

```bash
git clone https://github.com/dagster-io/basic-dbt-project
```

The project structure should look like this:

```
├── README.md
├── dbt_project.yml
├── profiles.yml
├── models
│ └── example
│ ├── my_first_dbt_model.sql
│ ├── my_second_dbt_model.sql
│ └── schema.yml
```

First, you need to point Dagster at the dbt project and ensure Dagster has what it needs to build an asset graph. Create a `definitions.py` in the same directory as the dbt project:

<CodeExample filePath="guides/etl/transform-dbt/dbt_definitions.py" language="python" title="definitions.py" />

## Adding upstream dependencies

Oftentimes, you'll want Dagster to generate data that will be used by downstream dbt models. To do this, add an upstream asset that the dbt project will as a source:

<CodeExample filePath="guides/etl/transform-dbt/dbt_definitions_with_upstream.py" language="python" title="definitions.py" />

Next, you'll add a dbt model that will source the `raw_customers` asset and define the dependency for Dagster. Create the dbt model:

<CodeExample filePath="guides/etl/transform-dbt/basic-dbt-project/models/example/customers.sql" language="sql" title="customers.sql" />

Next, create a `_source.yml` file that points dbt to the upstream `raw_customers` asset:

<CodeExample filePath="guides/etl/transform-dbt/basic-dbt-project/models/example/_source.yml" language="yaml" title="_source.yml_" />

{/* TODO: Maybe screenshot to show the lineage? */}

## Adding downstream dependencies

You may also have assets that depend on the output of dbt models. Next, create an asset that depends on the result of the new `customers` model. This asset will create a histogram of the first names of the customers:

<CodeExample filePath="guides/etl/transform-dbt/dbt_definitions_with_downstream.py" language="python" title="definitions.py" />

## Scheduling dbt models

You can schedule your dbt models by using the `dagster-dbt`'s `build_schedule_from_dbt_selection` function:

<CodeExample filePath="guides/etl/transform-dbt/dbt_definitions_with_schedule.py" language="python" title="Scheduling our dbt models" />

## Next steps

[comment]: <> (TODO: Add link to dbt partitioning guide)
2 changes: 2 additions & 0 deletions docs/vale/styles/config/vocabularies/Dagster/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ gRPC
REST API
[Ss]ubprocess
Serverless
CLI[s]
uncomment

Airbyte
AirFlow
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
jaffle_shop/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

target/
dbt_packages/
logs/
*.duckdb
order_count_chart.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'basic_dbt_project'
version: '1.0.0'

# This setting configures which "profile" dbt uses for this project.
profile: 'basic_dbt_project'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
basic_dbt_project:
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: 2
sources:
- name: raw
tables:
- name: raw_customers
# highlight-start
meta: # This metadata:
dagster: # Tells dbt where this model's source data is, and
asset_key: ["raw_customers"] # Tells Dagster which asset represents the source data
# highlight-end
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
select
id as customer_id,
first_name,
last_name
from {{ source('raw', 'raw_customers') }} # Define the raw_customers asset as a source
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{{ config(materialized='table') }}

with source_data as (

select 1 as id
union all
select null as id

)

select *
from source_data
where id is not null
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
select *
from {{ ref('my_first_dbt_model') }}
where id = 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

version: 2

models:
- name: my_first_dbt_model
description: "A starter dbt model"
columns:
- name: id
description: "The primary key for this table"
data_tests:
- unique
- not_null

- name: my_second_dbt_model
description: "A starter dbt model"
columns:
- name: id
description: "The primary key for this table"
data_tests:
- unique
- not_null
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
basic_dbt_project:
target: dev
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 1

prod:
type: duckdb
path: prod.duckdb
threads: 4

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import dagster as dg
from dagster_dbt import dbt_assets, DbtCliResource, DbtProject
from pathlib import Path


# Points to the dbt project path
dbt_project_directory = Path(__file__).absolute().parent / "basic-dbt-project"
dbt_project = DbtProject(project_dir=dbt_project_directory)

# References the dbt project object
dbt_resource = DbtCliResource(project_dir=dbt_project)

# Compiles the dbt project & allow Dagster to build an asset graph
dbt_project.prepare_if_dev()


# Yields Dagster events streamed from the dbt CLI
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


# Dagster object that contains the dbt assets and resource
defs = dg.Definitions(assets=[dbt_models], resources={"dbt": dbt_resource})

if __name__ == "__main__":
dg.materialize(assets=[dbt_models], resources={"dbt": dbt_resource})
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import dagster as dg
from dagster_dbt import dbt_assets, DbtCliResource, DbtProject, get_asset_key_for_model
import plotly.express as px
from pathlib import Path
import pandas as pd
import duckdb
import os

duckdb_database_path = "basic-dbt-project/dev.duckdb"

@dg.asset(compute_kind="python")
def raw_customers(context: dg.AssetExecutionContext) -> None:
# Pull customer data from a CSV
data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")
connection = duckdb.connect(os.fspath(duckdb_database_path))

# Create a schema named raw
connection.execute("create schema if not exists raw")

# Create/replace table named raw_customers
connection.execute(
"create or replace table raw.raw_customers as select * from data"
)

# Log some metadata about the new table. It will show up in the UI.
context.add_output_metadata({"num_rows": data.shape[0]})


dbt_project_directory = Path(__file__).absolute().parent / "basic-dbt-project"
dbt_project = DbtProject(project_dir=dbt_project_directory)
dbt_resource = DbtCliResource(project_dir=dbt_project)
dbt_project.prepare_if_dev()


@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


# highlight-start
@dg.asset(
compute_kind="python",
# Defines the dependency on the customers model,
# which is represented as an asset in Dagster
deps=get_asset_key_for_model([dbt_models], "customers"),
)
# highlight-end
def customer_histogram(context: dg.AssetExecutionContext):
# Read the contents of the customers table into a Pandas DataFrame
connection = duckdb.connect(os.fspath(duckdb_database_path))
customers = connection.sql("select * from customers").df()

# Create a customer histogram and write it out to an HTML file
fig = px.histogram(customers, x="FIRST_NAME")
fig.update_layout(bargap=0.2)
fig.update_xaxes(categoryorder="total ascending")
save_chart_path = Path(duckdb_database_path).parent.joinpath(
"order_count_chart.html"
)
fig.write_html(save_chart_path, auto_open=True)

# Tell Dagster about the location of the HTML file,
# so it's easy to access from the Dagster UI
context.add_output_metadata(
{"plot_url": dg.MetadataValue.url("file://" + os.fspath(save_chart_path))}
)



# Add Dagster definitions to Definitions object
defs = dg.Definitions(
assets=[raw_customers, dbt_models, customer_histogram],
resources={"dbt": dbt_resource},
)

if __name__ == "__main__":
dg.materialize(
assets=[raw_customers, dbt_models, customer_histogram],
resources={"dbt": dbt_resource},
)
Loading

0 comments on commit 454f966

Please sign in to comment.