From 7fb520220f43a8205f4a45a443c646a93655025b Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Tue, 21 Jan 2025 13:36:37 -0500 Subject: [PATCH] [5/n][dagster-dlt] Update dlt docs to use DagsterDltTranslator.get_asset_spec --- docs/content/integrations/dlt.mdx | 27 ++++++++++--------- .../dlt/dlt_dagster_translator.py | 23 ++++++++-------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/docs/content/integrations/dlt.mdx b/docs/content/integrations/dlt.mdx index 6e8a864170402..a022b8e2803a9 100644 --- a/docs/content/integrations/dlt.mdx +++ b/docs/content/integrations/dlt.mdx @@ -246,16 +246,14 @@ And that's it! You should now have two assets that load data to corresponding Sn The object can be used to customize how dlt properties map to Dagster concepts. -For example, to change how the name of the asset is derived, you can override the method, or if you would like to change the key of the upstream source asset, you can override the method. +For example, to change how the name of the asset is derived, or if you would like to change the key of the upstream source asset, you can override the method. ```python file=/integrations/dlt/dlt_dagster_translator.py -from collections.abc import Iterable - import dlt from dagster_dlt import DagsterDltResource, DagsterDltTranslator, dlt_assets -from dlt.extract.resource import DltResource +from dagster_dlt.translator import DltResourceTranslatorData -from dagster import AssetExecutionContext, AssetKey +from dagster import AssetExecutionContext, AssetKey, AssetSpec @dlt.source @@ -266,13 +264,16 @@ def example_dlt_source(): class CustomDagsterDltTranslator(DagsterDltTranslator): - def get_asset_key(self, resource: DltResource) -> AssetKey: - """Overrides asset key to be the dlt resource name.""" - return AssetKey(f"{resource.name}") - - def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]: - """Overrides upstream asset key to be a single source asset.""" - return [AssetKey("common_upstream_dlt_dependency")] + def get_asset_spec(self, data: DltResourceTranslatorData) -> AssetSpec: + """Overrides asset spec to: + - Override asset key to be the dlt resource name, + - Override upstream asset key to be a single source asset. + """ + default_spec = super().get_asset_spec(data) + return default_spec.replace_attributes( + key=AssetKey(f"{data.resource.name}"), + deps=[AssetKey("common_upstream_dlt_dependency")], + ) @dlt_assets( @@ -296,7 +297,7 @@ In this example, we customized the translator to change how the dlt assets' name A common question is how to define metadata on the external assets upstream of the dlt assets. -This can be accomplished by defining a with a key that matches the one defined in the method. +This can be accomplished by defining a with a key that matches the one defined in the method. For example, let's say we have defined a set of dlt assets named `thinkific_assets`, we can iterate over those assets and derive a with attributes like `group_name`. diff --git a/examples/docs_snippets/docs_snippets/integrations/dlt/dlt_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/dlt/dlt_dagster_translator.py index 96b6d0103d7b8..695d44daab922 100644 --- a/examples/docs_snippets/docs_snippets/integrations/dlt/dlt_dagster_translator.py +++ b/examples/docs_snippets/docs_snippets/integrations/dlt/dlt_dagster_translator.py @@ -1,10 +1,8 @@ -from collections.abc import Iterable - import dlt from dagster_dlt import DagsterDltResource, DagsterDltTranslator, dlt_assets -from dlt.extract.resource import DltResource +from dagster_dlt.translator import DltResourceTranslatorData -from dagster import AssetExecutionContext, AssetKey +from dagster import AssetExecutionContext, AssetKey, AssetSpec @dlt.source @@ -15,13 +13,16 @@ def example_resource(): ... class CustomDagsterDltTranslator(DagsterDltTranslator): - def get_asset_key(self, resource: DltResource) -> AssetKey: - """Overrides asset key to be the dlt resource name.""" - return AssetKey(f"{resource.name}") - - def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]: - """Overrides upstream asset key to be a single source asset.""" - return [AssetKey("common_upstream_dlt_dependency")] + def get_asset_spec(self, data: DltResourceTranslatorData) -> AssetSpec: + """Overrides asset spec to: + - Override asset key to be the dlt resource name, + - Override upstream asset key to be a single source asset. + """ + default_spec = super().get_asset_spec(data) + return default_spec.replace_attributes( + key=AssetKey(f"{data.resource.name}"), + deps=[AssetKey("common_upstream_dlt_dependency")], + ) @dlt_assets(