Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] Asset checks guide #24035

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion docs/docs-beta/docs/guides/quality-testing/asset-checks.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,64 @@
---
title: "Test assets with Asset Checks"
sidebar_position: 10
---
---

Asset checks in Dagster provide a way to define and execute different types of data quality checks on your data assets directly in Dagster.

This guide covers the most common use cases for testing assets with asset checks and taking action based on the result.

<details>
<summary>Prerequisites</summary>
- Familiarity with [Assets](/concepts/assets)
</details>

## Testing assets with a single asset check

The example below defines a single asset check on an asset that fails if the `order_id` column of the asset contains a null value.

In this example, the asset check will run after the asset has been materialized, to ensure the quality of its data.

<CodeExample filePath="guides/data-assets/quality-testing/asset-checks/single-asset-check.py" language="python" title="Asset with a single asset check" />

## Testing assets with multiple asset checks

In most cases, checking the data quality of an asset will require multiple checks.

The example below defines two asset checks using the `@multi_asset_check` decorator:

- One check that fails if the `order_id` column of the asset contains a null value
- Another check that fails if the `item_id` column of the asset contains a null value

In this example, both asset checks will run in a single operation after the asset has been materialized.

cmpadden marked this conversation as resolved.
Show resolved Hide resolved
<CodeExample filePath="guides/data-assets/quality-testing/asset-checks/multiple-asset-checks.py" language="python" title="Asset with multiple asset checks" />

## Programmatically generating asset checks

Defining multiple checks can also be done using a factory pattern. The example below defines the same two asset checks as in the previous example, but this time using a factory pattern and the `@multi_asset_check` decorator.

<CodeExample filePath="guides/data-assets/quality-testing/asset-checks/asset-checks-factory.py" language="python" title="Defining asset checks using a factory pattern" />

## Blocking downstream assets

By default, if a parent's asset check fails during a run, the run will continue and downstream assets will be materialized. To prevent this behavior, set the `blocking` argument to `True` in the `@asset_check` decorator. This will cause the run to fail and prevent further materializations.

In the example bellow, when the `orders_id_has_no_nulls` check fails, the `augmented_orders` asset won't be materialized.

<CodeExample filePath="guides/data-assets/quality-testing/asset-checks/block-downstream-with-asset-checks.py" language="python" title="Block downstream assets when asset check fails" />

## Scheduling and monitoring asset checks

In some cases, running asset checks separately from the job materializing the assets can be useful. For example, running all data quality checks once a day and sending an alert if they fail. This can be achieved using schedules and sensors.

In the example below, two jobs are defined: one for the asset and another for the asset check.

Using these jobs, schedules are defined to materialize the asset and execute the asset check independently. A sensor is defined to send an email alert when the asset check job fails.

<CodeExample filePath="guides/data-assets/quality-testing/asset-checks/asset-checks-with-schedule-and-sensor.py" language="python" title="Schedule and monitor asset checks separately from their asset" />

cmpadden marked this conversation as resolved.
Show resolved Hide resolved
## Next steps

- Learn more about assets in [Understanding Assets](/concepts/assets)
- Learn more about asset checks in [Understanding Asset Checks](/concepts/assets/asset-checks)
- Learn about how to use Great Expectations with Dagster in [our blog post](https://dagster.io/blog/ensuring-data-quality-with-dagster-and-great-expectations)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from typing import Iterable, Mapping, Sequence

import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


# highlight-start
def make_orders_checks(
check_blobs: Sequence[Mapping[str, str]],
) -> dg.AssetChecksDefinition:
@dg.multi_asset_check(
specs=[
dg.AssetCheckSpec(name=check_blob["name"], asset=check_blob["asset"])
for check_blob in check_blobs
]
)
def orders_check() -> Iterable[dg.AssetCheckResult]:
orders_df = pd.read_csv("orders.csv")

for check_blob in check_blobs:
num_null_order_ids = orders_df[check_blob["column"]].isna().sum()
yield dg.AssetCheckResult(
check_name=check_blob["name"],
passed=bool(num_null_order_ids == 0),
asset_key=check_blob["asset"],
)

return orders_check


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"column": "order_id",
},
{
"name": "items_id_has_no_nulls",
"asset": "orders",
"column": "item_id",
},
]
# highlight-end


defs = dg.Definitions(
assets=[orders],
asset_checks=[make_orders_checks(check_blobs)],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os

import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
)


# highlight-start
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
# Only includes orders
asset_job = dg.define_asset_job(
"asset_job",
selection=dg.AssetSelection.assets(orders).without_checks(),
)

# Only includes orders_id_has_no_nulls
check_job = dg.define_asset_job(
"check_job", selection=dg.AssetSelection.checks_for_assets(orders)
)

# Job schedules
asset_schedule = dg.ScheduleDefinition(job=asset_job, cron_schedule="0 0 * * *")
check_schedule = dg.ScheduleDefinition(job=check_job, cron_schedule="0 6 * * *")

# Send email on failure
check_sensor = dg.make_email_on_run_failure_sensor(
email_from="[email protected]",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["[email protected]"],
monitored_jobs=[check_job],
)
# highlight-end


defs = dg.Definitions(
assets=[orders],
asset_checks=[orders_id_has_no_nulls],
jobs=[asset_job, check_job],
schedules=[asset_schedule, check_schedule],
sensors=[check_sensor],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


# highlight-next-line
@dg.asset_check(asset=orders, blocking=True)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
)


@dg.asset(deps=[orders])
def augmented_orders():
orders_df = pd.read_csv("orders.csv")
augmented_orders_df = orders_df.assign(description=["item_432", "item_878"])
augmented_orders_df.to_csv("augmented_orders.csv")


defs = dg.Definitions(
assets=[orders, augmented_orders],
asset_checks=[orders_id_has_no_nulls],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Iterable

import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


# highlight-start
@dg.multi_asset_check(
specs=[
dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders"),
dg.AssetCheckSpec(name="items_id_has_no_nulls", asset="orders"),
]
)
def orders_check() -> Iterable[dg.AssetCheckResult]:
orders_df = pd.read_csv("orders.csv")

# asset check for null order_id column values
num_null_order_ids = orders_df["order_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="orders_id_has_no_nulls",
passed=bool(num_null_order_ids == 0),
asset_key="orders",
)

# asset check for null item_id column values
num_null_item_ids = orders_df["item_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="items_id_has_no_nulls",
passed=bool(num_null_item_ids == 0),
asset_key="orders",
)
# highlight-end


defs = dg.Definitions(
assets=[orders],
asset_checks=[orders_check],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pandas as pd

import dagster as dg


@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")


# highlight-start
@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(
passed=bool(num_null_order_ids == 0),
)
# highlight-end


defs = dg.Definitions(
assets=[orders],
asset_checks=[orders_id_has_no_nulls],
)
Loading