Skip to content

Commit

Permalink
Move remaining integrations docs (#27278)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Some integrations guides and reference pages were missing from new
integrations docs -- this PR moves them over.

## How I Tested These Changes

## Changelog

> Insert changelog entry or delete this section.

---------

Signed-off-by: nikki everett <[email protected]>
  • Loading branch information
neverett authored Jan 22, 2025
1 parent 669e708 commit 75e36b4
Show file tree
Hide file tree
Showing 47 changed files with 5,346 additions and 24 deletions.
225 changes: 225 additions & 0 deletions docs/docs-beta/docs/integrations/libraries/deltalake/reference.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
---
title: "dagster-deltalake integration reference"
description: Store your Dagster assets in Delta Lake
sidebar_position: 200
---

This reference page provides information for working with [`dagster-deltalake`](/api/python-api/libraries/dagster-deltalake) features that are not covered as part of the [Using Delta Lake with Dagster tutorial](using-deltalake-with-dagster).

- [Selecting specific columns in a downstream asset](#selecting-specific-columns-in-a-downstream-asset)
- [Storing partitioned assets](#storing-partitioned-assets)
- [Storing tables in multiple schemas](#storing-tables-in-multiple-schemas)
- [Using the Delta Lake I/O manager with other I/O managers](#using-the-delta-lake-io-manager-with-other-io-managers)
- [Storing and loading PyArrow Tables or Polars DataFrames in Delta Lake](#storing-and-loading-pyarrow-tables-or-polars-dataframes-in-delta-lake)
- [Configuring storage backends](#configuring-storage-backends)

## Selecting specific columns in a downstream asset

Sometimes you may not want to fetch an entire table as the input to a downstream asset. With the Delta Lake I/O manager, you can select specific columns to load by supplying metadata on the downstream asset.

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/downstream_columns.py" />

In this example, we only use the columns containing sepal data from the `iris_dataset` table created in [Step 2](using-deltalake-with-dagster#step-2-create-delta-lake-tables) of the [Using Dagster with Delta Lake tutorial](using-deltalake-with-dagster). To select specific columns, we can add metadata to the input asset. We do this in the `metadata` parameter of the `AssetIn` that loads the `iris_dataset` asset in the `ins` parameter. We supply the key `columns` with a list of names of the columns we want to fetch.

When Dagster materializes `sepal_data` and loads the `iris_dataset` asset using the Delta Lake I/O manager, it will only fetch the `sepal_length_cm` and `sepal_width_cm` columns of the `iris/iris_dataset` table and pass them to `sepal_data` as a Pandas DataFrame.

## Storing partitioned assets

The Delta Lake I/O manager supports storing and loading partitioned data. To correctly store and load data from the Delta table, the Delta Lake I/O manager needs to know which column contains the data defining the partition bounds. The Delta Lake I/O manager uses this information to construct the correct queries to select or replace the data.

In the following sections, we describe how the I/O manager constructs these queries for different types of partitions.

:::

For partitioning to work, the partition dimension needs to be one of the partition columns defined on the Delta table. Tables created via the I/O manager will be configured accordingly.

:::

<Tabs>
<TabItem value="Static partitioned assets">

**Storing static partitioned assets**

To store static partitioned assets in your Delta Lake, specify `partition_expr` metadata on the asset to tell the Delta Lake I/O manager which column contains the partition data:

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/static_partition.py" startAfter="start_example" endBefore="end_example" />

Dagster uses the `partition_expr` metadata to generate appropriate function parameters when loading the partition in the downstream asset. When loading a static partition this roughly corresponds to the following SQL statement:

```sql
SELECT *
WHERE [partition_expr] in ([selected partitions])
```

A partition must be selected when materializing the above assets, as described in the [Materializing partitioned assets](/guides/build/partitions-and-backfills/partitioning-assets) documentation. In this example, the query used when materializing the `Iris-setosa` partition of the above assets would be:

```sql
SELECT *
WHERE species = 'Iris-setosa'
```

</TabItem>
<TabItem value="Time-partitioned assets">

**Storing time-partitioned assets**

Like static partitioned assets, you can specify `partition_expr` metadata on the asset to tell the Delta Lake I/O manager which column contains the partition data:

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/time_partition.py" startAfter="start_example" endBefore="end_example" />

Dagster uses the `partition_expr` metadata to craft the `SELECT` statement when loading the correct partition in the downstream asset. When loading a dynamic partition, the following statement is used:

```sql
SELECT *
WHERE [partition_expr] = [partition_start]
```

A partition must be selected when materializing the above assets, as described in the [Materializing partitioned assets](/guides/build/partitions-and-backfills/partitioning-assets) documentation. The `[partition_start]` and `[partition_end]` bounds are of the form `YYYY-MM-DD HH:MM:SS`. In this example, the query when materializing the `2023-01-02` partition of the above assets would be:

```sql
SELECT *
WHERE time = '2023-01-02 00:00:00'
```

</TabItem>
<TabItem value="Multi-partitioned assets">

**Storing multi-partitioned assets**

The Delta Lake I/O manager can also store data partitioned on multiple dimensions. To do this, specify the column for each partition as a dictionary of `partition_expr` metadata:

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/multi_partition.py" startAfter="start_example" endBefore="end_example" />

Dagster uses the `partition_expr` metadata to craft the `SELECT` statement when loading the correct partition in a downstream asset. For multi-partitions, Dagster concatenates the `WHERE` statements described in the above sections to craft the correct `SELECT` statement.

A partition must be selected when materializing the above assets, as described in the [Materializing partitioned assets](/todo) documentation. For example, when materializing the `2023-01-02|Iris-setosa` partition of the above assets, the following query will be used:

```sql
SELECT *
WHERE species = 'Iris-setosa'
AND time = '2023-01-02 00:00:00'
```

</TabItem>
<TabItem value="Dynamic-partitioned assets">
</TabItem>
</Tabs>

## Storing tables in multiple schemas

You may want to have different assets stored in different Delta Lake schemas. The Delta Lake I/O manager allows you to specify the schema in several ways.

If you want all of your assets to be stored in the same schema, you can specify the schema as configuration to the I/O manager, as we did in [Step 1](using-deltalake-with-dagster#step-1-configure-the-delta-lake-io-manager) of the [Using Dagster with Delta Lake tutorial](using-deltalake-with-dagster).

If you want to store assets in different schemas, you can specify the schema as part of the asset's key:

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/schema.py" startAfter="start_asset_key" endBefore="end_asset_key" />

In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, and the `daffodil_dataset` asset will be found in the `DAFFODIL` schema.

:::

The two options for specifying schema are mutually exclusive. If you provide `schema` configuration to the I/O manager, you cannot also provide it via the asset key and vice versa. If no `schema` is provided, either from configuration or asset keys, the default schema `public` will be used.

:::

## Using the Delta Lake I/O manager with other I/O managers

You may have assets that you don't want to store in Delta Lake. You can provide an I/O manager to each asset using the `io_manager_key` parameter in the <PyObject section="assets" module="dagster" object="asset" decorator /> decorator:

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/multiple_io_managers.py" startAfter="start_example" endBefore="end_example" />

In this example:

- The `iris_dataset` asset uses the I/O manager bound to the key `warehouse_io_manager` and `iris_plots` uses the I/O manager bound to the key `blob_io_manager`
- In the <PyObject section="definitions" module="dagster" object="Definitions" /> object, we supply the I/O managers for those keys
- When the assets are materialized, the `iris_dataset` will be stored in Delta Lake, and `iris_plots` will be saved in Amazon S3

## Storing and loading PyArrow tables or Polars DataFrames in Delta Lake

The Delta Lake I/O manager also supports storing and loading PyArrow and Polars DataFrames.

<Tabs>
<TabItem value="PyArrow Tables">

**Storing and loading PyArrow Tables with Delta Lake**

The `deltalake` package relies heavily on Apache Arrow for efficient data transfer, so PyArrow is natively supported.

You can use the `DeltaLakePyArrowIOManager` in a <PyObject section="definitions" module="dagster" object="Definitions" /> object as in [Step 1](using-deltalake-with-dagster#step-1-configure-the-delta-lake-io-manager) of the [Using Dagster with Delta Lake tutorial](using-deltalake-with-dagster).

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/pyarrow_configuration.py" startAfter="start_configuration" endBefore="end_configuration" />

</TabItem>
</Tabs>

## Configuring storage backends

The deltalake library comes with support for many storage backends out of the box. Which exact storage is to be used, is derived from the URL of a storage location.

### S3 compatible storages

The S3 APIs are implemented by a number of providers and it is possible to interact with many of them. However, most S3 implementations do not offer support for atomic operations, which is a requirement for multi writer support. As such some additional setup and configuration is required.

<Tabs>
<TabItem value="Unsafe rename">

In case there will always be only a single writer to a table - this includes no concurrent dagster jobs writing to the same table - you can allow unsafe writes to the table.

```py
from dagster_deltalake import S3Config

config = S3Config(allow_unsafe_rename=True)
```

</TabItem>

<TabItem value="Set-up a locking client">

To use DynamoDB, set the `AWS_S3_LOCKING_PROVIDER` variable to `dynamodb` and create a table named delta_rs_lock_table in Dynamo. An example DynamoDB table creation snippet using the aws CLI follows, and should be customized for your environment’s needs (e.g. read/write capacity modes):

```bash
aws dynamodb create-table --table-name delta_rs_lock_table \
--attribute-definitions \
AttributeName=key,AttributeType=S \
--key-schema \
AttributeName=key,KeyType=HASH \
--provisioned-throughput \
ReadCapacityUnits=10,WriteCapacityUnits=10
```

:::

The delta-rs community is actively working on extending the available options for locking backends. This includes locking backends compatible with Databricks to allow concurrent writes from Databricks and external environments.

:::

</TabItem>

<TabItem value="Cloudflare R2 storage">

Cloudflare R2 storage has built-in support for atomic copy operations. This can be leveraged by sending additional headers with the copy requests.

```py
from dagster_deltalake import S3Config

config = S3Config(copy_if_not_exists="header: cf-copy-destination-if-none-match: *")
```

</TabItem>

</Tabs>

In cases where non-AWS S3 implementations are used, the endpoint URL or the S3 service needs to be provided.

```py
config = S3Config(endpoint="https://<my-s3-endpoint-url>")
```

### Working with locally running storage (emulators)

A common pattern for e.g. integration tests is to run a storage emulator like Azurite, Localstack, o.a. If not configured to use TLS, we need to configure the http client, to allow for http traffic.

```py
config = AzureConfig(use_emulator=True, client=ClientConfig(allow_http=True))
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
---
title: "Using Delta Lake with Dagster"
description: Store your Dagster assets in a Delta Lake
sidebar_position: 100
---

This tutorial focuses on how to store and load Dagster [asset definitions](/guides/build/assets/defining-assets) in a Delta Lake.

By the end of the tutorial, you will:

- Configure a Delta Lake I/O manager
- Create a table in Delta Lake using a Dagster asset
- Make a Delta Lake table available in Dagster
- Load Delta tables in downstream assets

While this guide focuses on storing and loading Pandas DataFrames in Delta Lakes, Dagster also supports using PyArrow Tables and Polars DataFrames. Learn more about setting up and using the Delta Lake I/O manager with PyArrow Tables and Polars DataFrames in the [Delta Lake reference](reference).

## Prerequisites

To complete this tutorial, you'll need to install the `dagster-deltalake` and `dagster-deltalake-pandas` libraries:

```shell
pip install dagster-deltalake dagster-deltalake-pandas
```

## Step 1: Configure the Delta Lake I/O manager

The Delta Lake I/O manager requires some configuration to set up your Delta Lake. You must provide a root path where your Delta tables will be created. Additionally, you can specify a `schema` where the Delta Lake I/O manager will create tables.

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/configuration.py" startAfter="start_example" endBefore="end_example" />

With this configuration, if you materialized an asset called `iris_dataset`, the Delta Lake I/O manager would store the data within a folder `iris/iris_dataset` under the provided root directory `path/to/deltalake`.

Finally, in the <PyObject section="definitions" module="dagster" object="Definitions" /> object, we assign the <PyObject section="libraries" module="dagster_deltalake_pandas" object="DeltaLakePandasIOManager" /> to the `io_manager` key. `io_manager` is a reserved key to set the default I/O manager for your assets.

## Step 2: Create Delta Lake tables

The Delta Lake I/O manager can create and update tables for your Dagster-defined assets, but you can also make existing Delta Lake tables available to Dagster.

<Tabs>

<TabItem value="Create Delta tables from Dagster assets">

**Store a Dagster asset as a table in Delta Lake**

To store data in Delta Lake using the Delta Lake I/O manager, the definitions of your assets don't need to change. You can tell Dagster to use the Delta Lake I/O manager, like in [Step 1](#step-1-configure-the-delta-lake-io-manager), and Dagster will handle storing and loading your assets in Delta Lake.

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/basic_example.py" />

In this example, we first define an [asset](/guides/build/assets/defining-assets). Here, we fetch the Iris dataset as a Pandas DataFrame and rename the columns. The type signature of the function tells the I/O manager what data type it is working with, so it's important to include the return type `pd.DataFrame`.

When Dagster materializes the `iris_dataset` asset using the configuration from [Step 1](#step-1-configure-the-delta-lake-io-manager), the Delta Lake I/O manager will create the table `iris/iris_dataset` if it doesn't exist and replace the contents of the table with the value returned from the `iris_dataset` asset.

</TabItem>

<TabItem value="Make existing tables available in Dagster">

### Make an existing table available in Dagster

If you already have tables in your Delta Lake, you may want to make them available to other Dagster assets. You can accomplish this by defining [external assets](/guides/build/assets/external-assets) for these tables. By creating an external asset for the existing table, you tell Dagster how to find the table so it can be fetched for downstream assets.

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/source_asset.py" />

In this example, we create a <PyObject section="assets" module="dagster" object="AssetSpec" /> for an existing table containing iris harvest data. To make the data available to other Dagster assets, we need to tell the Delta Lake I/O manager how to find the data.

Because we already supplied the database and schema in the I/O manager configuration in [Step 1](#step-1-configure-the-delta-lake-io-manager), we only need to provide the table name. We do this with the `key` parameter in `AssetSpec`. When the I/O manager needs to load the `iris_harvest_data` in a downstream asset, it will select the data in the `iris/iris_harvest_data` folder as a Pandas DataFrame and provide it to the downstream asset.

</TabItem>
</Tabs>

## Step 3: Load Delta Lake tables in downstream assets

Once you've created an asset that represents a table in your Delta Lake, you will likely want to create additional assets that work with the data. Dagster and the Delta Lake I/O manager allow you to load the data stored in Delta tables into downstream assets.

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/load_downstream.py" startAfter="start_example" endBefore="end_example" />

In this example, we want to provide the `iris_dataset` asset to the `iris_cleaned` asset. Refer to the Store a Dagster asset as a table in Delta Lake example in [step 2](#step-2-create-delta-lake-tables) for a look at the `iris_dataset` asset.

In `iris_cleaned`, the `iris_dataset` parameter tells Dagster that the value for the `iris_dataset` asset should be provided as input to `iris_cleaned`. If this feels too magical for you, refer to the docs for explicitly specifying dependencies.

When materializing these assets, Dagster will use the `DeltaLakePandasIOManager` to fetch the `iris/iris_dataset` as a Pandas DataFrame and pass the DataFrame as the `iris_dataset` parameter to `iris_cleaned`. When `iris_cleaned` returns a Pandas DataFrame, Dagster will use the `DeltaLakePandasIOManager` to store the DataFrame as the `iris/iris_cleaned` table in your Delta Lake.

## Completed code example

When finished, your code should look like the following:

<CodeExample path="docs_snippets/docs_snippets/integrations/deltalake/full_example.py" />

## Related

For more Delta Lake features, refer to the [Delta Lake reference](reference).

For more information on asset definitions, see the [Assets documentation](/guides/build/assets/defining-assets).

For more information on I/O managers, refer to the [I/O manager documentation](/guides/build/io-managers/).
Loading

1 comment on commit 75e36b4

@github-actions
Copy link

@github-actions github-actions bot commented on 75e36b4 Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs-beta ready!

✅ Preview
https://dagster-docs-beta-egdt7i13h-elementl.vercel.app

Built with commit 75e36b4.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.