Skip to content

Commit

Permalink
update code snippets
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Aug 27, 2024
1 parent e2434ad commit 95cd32c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def sales_summary(clean_sales_data: pd.DataFrame) -> pd.DataFrame:


defs = dg.Definitions(
assets=[raw_sales_data, clean_sales_data, sales_summary],
resources={"io_manager": DuckDBPandasIOManager(database="sales.duckdb")},
assets=[clean_sales_data, sales_summary],
resources={
"io_manager": DuckDBPandasIOManager(database="sales.duckdb", schema="public")
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def sales_summary(clean_sales_data: pd.DataFrame) -> pd.DataFrame:


defs = dg.Definitions(
assets=[raw_sales_data, clean_sales_data, sales_summary],
assets=[clean_sales_data, sales_summary],
resources={
"io_manager": SnowflakePandasIOManager(
database=dg.EnvVar("SNOWFLAKE_DATABASE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,26 @@ def clean_sales_data(duckdb: DuckDBResource) -> None:
with duckdb.get_connection() as conn:
df = conn.execute("SELECT * FROM raw_sales_data").fetch_df()
clean_df = df.fillna({"amount": 0.0})
conn.execute("INSERT into clean_sales_data select * from clean_df")
conn.execute(
"CREATE TABLE IF NOT EXISTS clean_sales_data AS SELECT * FROM clean_df"
)
if not conn.fetchall():
conn.execute("INSERT INTO clean_sales_data SELECT * FROM clean_df")


@dg.asset(deps=[clean_sales_data])
def sales_summary(duckdb: DuckDBResource) -> None:
with duckdb.get_connection() as conn:
df = conn.execute("SELECT * FROM clean_sales_data").fetch_df()
summary = df.groupby(["owner"])["amount"].sum().reset_index()
conn.execute("INSERT into sales_summary select * from summary")
conn.execute(
"CREATE TABLE IF NOT EXISTS sales_summary AS SELECT * from summary"
)
if not conn.fetchall():
conn.execute("INSERT INTO sales_summary SELECT * from summary")


defs = dg.Definitions(
assets=[raw_sales_data, clean_sales_data, sales_summary],
resources={"duckdb": DuckDBResource(database="sales.duckdb")},
assets=[clean_sales_data, sales_summary],
resources={"duckdb": DuckDBResource(database="sales.duckdb", schema="public")},
)

0 comments on commit 95cd32c

Please sign in to comment.