Skip to content

Commit

Permalink
[5/n][dagster-dlt] Update dlt docs to use DagsterDltTranslator.get_as…
Browse files Browse the repository at this point in the history
…set_spec
  • Loading branch information
maximearmstrong committed Jan 21, 2025
1 parent 50f3952 commit 7fb5202
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
27 changes: 14 additions & 13 deletions docs/content/integrations/dlt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,14 @@ And that's it! You should now have two assets that load data to corresponding Sn
The <PyObject module="dagster_dlt" object="DagsterDltTranslator" /> object can be used to customize how dlt properties map to Dagster concepts.
For example, to change how the name of the asset is derived, you can override the <PyObject module="dagster_dlt" object="DagsterDltTranslator" method="get_asset_key" /> method, or if you would like to change the key of the upstream source asset, you can override the <PyObject module="dagster_dlt" object="DagsterDltTranslator" method="get_deps_assets_keys" /> method.
For example, to change how the name of the asset is derived, or if you would like to change the key of the upstream source asset, you can override the <PyObject module="dagster_dlt" object="DagsterDltTranslator" method="get_asset_spec" /> method.
```python file=/integrations/dlt/dlt_dagster_translator.py
from collections.abc import Iterable
import dlt
from dagster_dlt import DagsterDltResource, DagsterDltTranslator, dlt_assets
from dlt.extract.resource import DltResource
from dagster_dlt.translator import DltResourceTranslatorData
from dagster import AssetExecutionContext, AssetKey
from dagster import AssetExecutionContext, AssetKey, AssetSpec
@dlt.source
Expand All @@ -266,13 +264,16 @@ def example_dlt_source():
class CustomDagsterDltTranslator(DagsterDltTranslator):
def get_asset_key(self, resource: DltResource) -> AssetKey:
"""Overrides asset key to be the dlt resource name."""
return AssetKey(f"{resource.name}")
def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]:
"""Overrides upstream asset key to be a single source asset."""
return [AssetKey("common_upstream_dlt_dependency")]
def get_asset_spec(self, data: DltResourceTranslatorData) -> AssetSpec:
"""Overrides asset spec to:
- Override asset key to be the dlt resource name,
- Override upstream asset key to be a single source asset.
"""
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=AssetKey(f"{data.resource.name}"),
deps=[AssetKey("common_upstream_dlt_dependency")],
)
@dlt_assets(
Expand All @@ -296,7 +297,7 @@ In this example, we customized the translator to change how the dlt assets' name
A common question is how to define metadata on the external assets upstream of the dlt assets.
This can be accomplished by defining a <PyObject object="AssetSpec" /> with a key that matches the one defined in the <PyObject module="dagster_dlt" object="DagsterDltTranslator" method="get_deps_assets_keys" /> method.
This can be accomplished by defining a <PyObject object="AssetSpec" /> with a key that matches the one defined in the <PyObject module="dagster_dlt" object="DagsterDltTranslator" method="get_asset_spec" /> method.
For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a <PyObject object="AssetSpec" /> with attributes like `group_name`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from collections.abc import Iterable

import dlt
from dagster_dlt import DagsterDltResource, DagsterDltTranslator, dlt_assets
from dlt.extract.resource import DltResource
from dagster_dlt.translator import DltResourceTranslatorData

from dagster import AssetExecutionContext, AssetKey
from dagster import AssetExecutionContext, AssetKey, AssetSpec


@dlt.source
Expand All @@ -15,13 +13,16 @@ def example_resource(): ...


class CustomDagsterDltTranslator(DagsterDltTranslator):
def get_asset_key(self, resource: DltResource) -> AssetKey:
"""Overrides asset key to be the dlt resource name."""
return AssetKey(f"{resource.name}")

def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]:
"""Overrides upstream asset key to be a single source asset."""
return [AssetKey("common_upstream_dlt_dependency")]
def get_asset_spec(self, data: DltResourceTranslatorData) -> AssetSpec:
"""Overrides asset spec to:
- Override asset key to be the dlt resource name,
- Override upstream asset key to be a single source asset.
"""
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=AssetKey(f"{data.resource.name}"),
deps=[AssetKey("common_upstream_dlt_dependency")],
)


@dlt_assets(
Expand Down

0 comments on commit 7fb5202

Please sign in to comment.