Skip to content

Commit

Permalink
[dagster-sdf] Add Assets Decorator and Information Schema Reader (#23091
Browse files Browse the repository at this point in the history
)

## Summary & Motivation

This PR introduces the `sdf_assets` decorator and `SdfInformationSchema` Reader Class.

1. `sdf_assets` decorator

Wraps a `multi_asset` definition, which simply defines the workspace as a single op / graph. This can make sense in the context of an sdf workspace for several reasons (namely caching). The decorator takes as inputs:

  - target_dir: the target path to store sdf outputs in (i.e. materialized data from the sdf db, cache, etc.)
  - environment: which environment to run sdf commands against (default: dbg)

The decorator namely depends on the `SdfInformationSchema`.

2. `SdfInformationSchema` class

This is a wrapper class for interacting with sdf's compile time generated information schema. The information schema is a store of various metadata, such as classifiers, table materializations (i.e. table, view, external etc.), table and column level lineage, and table schema information. We load the information schema using pandas, extracting the dagster dag and schema information from the tables and columns dataframes respectively. These are both located in the sdf_target dir under `sdf_target/data/system/information_schema::sdf/`.

## How I Tested These Changes

See tests in `dagster_sdf_tests/test_asset_decorator.py` and `dagster_sdf_tests/test_information_schema.py`

Co-authored-by: Rex Ledesma <[email protected]>
  • Loading branch information
akbog and rexledesma authored Jul 19, 2024
1 parent 1647609 commit 22ebbb0
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 1 deletion.
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-sdf/dagster_sdf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dagster._core.libraries import DagsterLibraryRegistry

from .asset_decorator import sdf_assets as sdf_assets
from .information_schema import SdfInformationSchema as SdfInformationSchema
from .resource import SdfCliResource as SdfCliResource
from .version import __version__ as __version__

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Any, Callable, Mapping, Optional, Set

from dagster import (
AssetsDefinition,
BackfillPolicy,
PartitionsDefinition,
RetryPolicy,
TimeWindowPartitionsDefinition,
multi_asset,
)

from .information_schema import SdfInformationSchema


def sdf_assets(
*,
information_schema: SdfInformationSchema,
name: Optional[str] = None,
io_manager_key: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
backfill_policy: Optional[BackfillPolicy] = None,
op_tags: Optional[Mapping[str, Any]] = None,
required_resource_keys: Optional[Set[str]] = None,
retry_policy: Optional[RetryPolicy] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
outs, internal_asset_deps = information_schema.build_sdf_multi_asset_args(
io_manager_key=io_manager_key
)

if (
partitions_def
and isinstance(partitions_def, TimeWindowPartitionsDefinition)
and not backfill_policy
):
backfill_policy = BackfillPolicy.single_run()

return multi_asset(
outs=outs,
name=name,
internal_asset_deps=internal_asset_deps,
required_resource_keys=required_resource_keys,
compute_kind="sdf",
partitions_def=partitions_def,
can_subset=True,
op_tags=op_tags,
backfill_policy=backfill_policy,
retry_policy=retry_policy,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
SDF_EXECUTABLE = "sdf"
SDF_TARGET_DIR = "sdftarget"
SDF_DAGSTER_OUTPUT_DIR = "sdf_dagster_out"
SDF_INFORMATION_SCHEMA_TABLES = ["tables", "columns", "table_lineage", "column_lineage"]
DEFAULT_SDF_WORKSPACE_ENVIRONMENT = "dbg"
SDF_WORKSPACE_YML = "workspace.sdf.yml"
100 changes: 100 additions & 0 deletions python_modules/libraries/dagster-sdf/dagster_sdf/information_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from pathlib import Path
from typing import Dict, Literal, Mapping, Optional, Set, Tuple, Union

import dagster._check as check
import polars as pl
from dagster import AssetKey, AssetOut, Nothing
from dagster._record import IHaveNew, record_custom

from .asset_utils import dagster_name_fn, default_asset_key_fn
from .constants import (
DEFAULT_SDF_WORKSPACE_ENVIRONMENT,
SDF_INFORMATION_SCHEMA_TABLES,
SDF_TARGET_DIR,
)


@record_custom(checked=False)
class SdfInformationSchema(IHaveNew):
"""A class to represent the SDF information schema.
The information schema is a set of tables that are generated by the sdf cli on compilation.
It can be queried directly via the sdf cli, or by reading the parquet files that live in the
`sdftarget` directory.
This class specifically interfaces with the tables and columns tables, which contain metadata
on their upstream and downstream dependencies, as well as their schemas, descriptions, classifiers,
and other metadata.
Read more about the information schema here: https://docs.sdf.com/reference/sdf-information-schema#sdf-information-schema
Args:
target_dir (Union[Path, str]): The path to the target directory.
environment (str, optional): The environment to use. Defaults to "dbg".
"""

information_schema_dir: Path
information_schema: Dict[str, pl.DataFrame]

def __new__(
cls,
target_dir: Union[Path, str],
environment: str = DEFAULT_SDF_WORKSPACE_ENVIRONMENT,
):
check.inst_param(target_dir, "target_dir", (str, Path))
check.str_param(environment, "environment")

information_schema_dir = Path(
target_dir, SDF_TARGET_DIR, environment, "data", "system", "information_schema::sdf"
)
check.invariant(
information_schema_dir.exists(),
f"Information schema directory {information_schema_dir} does not exist.",
)

return super().__new__(
cls,
information_schema_dir=information_schema_dir,
information_schema={},
)

def read_table(
self, table_name: Literal["tables", "columns", "table_lineage", "column_lineage"]
) -> pl.DataFrame:
check.invariant(
table_name in SDF_INFORMATION_SCHEMA_TABLES,
f"Table `{table_name}` is not valid information schema table."
f" Select from one of {SDF_INFORMATION_SCHEMA_TABLES}.",
)

return self.information_schema.setdefault(
table_name, pl.read_parquet(self.information_schema_dir.joinpath(table_name))
)

def build_sdf_multi_asset_args(
self, io_manager_key: Optional[str]
) -> Tuple[Mapping[str, AssetOut], Dict[str, Set[AssetKey]]]:
outs: Dict[str, AssetOut] = {}
internal_asset_deps: Dict[str, Set[AssetKey]] = {}

for table_row in self.read_table("tables").rows(named=True):
if table_row["purpose"] in ["system", "external-system"]:
continue
if table_row["origin"] == "remote":
continue

asset_key = default_asset_key_fn(table_row["table_id"])
output_name = dagster_name_fn(table_row["table_id"])

outs[output_name] = AssetOut(
key=asset_key,
dagster_type=Nothing,
io_manager_key=io_manager_key,
description=table_row["description"],
is_required=False,
)
internal_asset_deps[output_name] = {
default_asset_key_fn(dep) for dep in table_row["depends_on"]
}

return outs, internal_asset_deps
17 changes: 17 additions & 0 deletions python_modules/libraries/dagster-sdf/dagster_sdf/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,21 @@ def __init__(
**kwargs,
)

@validator("workspace_dir", pre=True)
def convert_path_to_str(cls, v: Any) -> Any:
"""Validate that the path is converted to a string."""
if isinstance(v, Path):
resolved_path = cls._validate_absolute_path_exists(v)

absolute_path = Path(v).absolute()
try:
resolved_path = absolute_path.resolve(strict=True)
except FileNotFoundError:
raise ValueError(f"The absolute path of '{v}' ('{absolute_path}') does not exist")
return os.fspath(resolved_path)

return v

@validator("workspace_dir")
def validate_workspace_dir(cls, project_dir: str) -> str:
resolved_workspace_dir = cls._validate_absolute_path_exists(project_dir)
Expand Down Expand Up @@ -431,6 +446,8 @@ def cli(
Args:
args (Sequence[str]): The sdf CLI command to execute.
target_dir (Optional[Path]): The path to the target directory.
environment (str): The environment to use. Defaults to "dbg".
raise_on_error (bool): Whether to raise an exception if the sdf CLI command fails.
context (Optional[Union[OpExecutionContext, AssetExecutionContext]]): The execution context from within `@sdf_assets`.
If an AssetExecutionContext is passed, its underlying OpExecutionContext will be used.
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SDF_VERSION_UPPER_BOUND = "0.3.10"
SDF_VERSION_UPPER_BOUND = "0.3.11"
30 changes: 30 additions & 0 deletions python_modules/libraries/dagster-sdf/dagster_sdf_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
from pathlib import Path

import pytest
from dagster_sdf.constants import DEFAULT_SDF_WORKSPACE_ENVIRONMENT
from dagster_sdf.resource import SdfCliInvocation, SdfCliResource

from .sdf_workspaces import moms_flower_shop_path


def _create_sdf_invocation(
workspace_dir: Path,
run_workspace: bool = False,
environment: str = DEFAULT_SDF_WORKSPACE_ENVIRONMENT,
) -> SdfCliInvocation:
sdf = SdfCliResource(
workspace_dir=os.fspath(workspace_dir), global_config_flags=["--log-form=nested"]
)

sdf_invocation = sdf.cli(["compile"], environment=environment).wait()

if run_workspace:
sdf.cli(["run"], environment=environment, raise_on_error=False).wait()

return sdf_invocation


@pytest.fixture(name="moms_flower_shop_target_dir", scope="function")
def test_moms_flower_shop_target_dir_fixture() -> Path:
return _create_sdf_invocation(moms_flower_shop_path).target_dir
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from pathlib import Path

from dagster import AssetExecutionContext, AssetKey, materialize
from dagster_sdf.asset_decorator import sdf_assets
from dagster_sdf.information_schema import SdfInformationSchema
from dagster_sdf.resource import SdfCliResource

from .sdf_workspaces import moms_flower_shop_path


def test_asset_deps(moms_flower_shop_target_dir: Path) -> None:
@sdf_assets(information_schema=SdfInformationSchema(target_dir=moms_flower_shop_target_dir))
def my_flower_shop_assets(): ...

assert my_flower_shop_assets.asset_deps == {
AssetKey(["moms_flower_shop", "raw", "raw_addresses"]): set(),
AssetKey(["moms_flower_shop", "raw", "raw_customers"]): set(),
AssetKey(["moms_flower_shop", "raw", "raw_inapp_events"]): set(),
AssetKey(["moms_flower_shop", "raw", "raw_marketing_campaign_events"]): set(),
AssetKey(["moms_flower_shop", "staging", "app_installs"]): {
AssetKey(["moms_flower_shop", "staging", "inapp_events"]),
AssetKey(["moms_flower_shop", "raw", "raw_marketing_campaign_events"]),
},
AssetKey(["moms_flower_shop", "staging", "app_installs_v2"]): {
AssetKey(["moms_flower_shop", "staging", "inapp_events"]),
AssetKey(["moms_flower_shop", "raw", "raw_marketing_campaign_events"]),
},
AssetKey(["moms_flower_shop", "staging", "customers"]): {
AssetKey(["moms_flower_shop", "raw", "raw_addresses"]),
AssetKey(["moms_flower_shop", "staging", "app_installs_v2"]),
AssetKey(["moms_flower_shop", "raw", "raw_customers"]),
},
AssetKey(["moms_flower_shop", "staging", "inapp_events"]): {
AssetKey(["moms_flower_shop", "raw", "raw_inapp_events"])
},
AssetKey(["moms_flower_shop", "staging", "marketing_campaigns"]): {
AssetKey(["moms_flower_shop", "raw", "raw_marketing_campaign_events"])
},
AssetKey(["moms_flower_shop", "staging", "stg_installs_per_campaign"]): {
AssetKey(["moms_flower_shop", "staging", "app_installs_v2"])
},
AssetKey(["moms_flower_shop", "analytics", "agg_installs_and_campaigns"]): {
AssetKey(["moms_flower_shop", "staging", "app_installs_v2"])
},
AssetKey(["moms_flower_shop", "analytics", "dim_marketing_campaigns"]): {
AssetKey(["moms_flower_shop", "staging", "marketing_campaigns"]),
AssetKey(["moms_flower_shop", "staging", "stg_installs_per_campaign"]),
},
}


def test_sdf_with_materialize(moms_flower_shop_target_dir: Path) -> None:
@sdf_assets(information_schema=SdfInformationSchema(target_dir=moms_flower_shop_target_dir))
def my_sdf_assets(context: AssetExecutionContext, sdf: SdfCliResource):
yield from sdf.cli(["run"], context=context).stream()

result = materialize(
[my_sdf_assets],
resources={"sdf": SdfCliResource(workspace_dir=moms_flower_shop_path)},
)

assert result.success
assert result.get_asset_materialization_events()
1 change: 1 addition & 0 deletions python_modules/libraries/dagster-sdf/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_version() -> Tuple[str, str]:
f"dagster{pin}",
f"sdf-cli>=0.3.9,<{SDF_VERSION_UPPER_BOUND}",
"orjson",
"polars",
],
zip_safe=False,
extras_require={"test": []},
Expand Down

0 comments on commit 22ebbb0

Please sign in to comment.