From a9021fe8dec1b8b282cae112a3d3663c3914c05f Mon Sep 17 00:00:00 2001 From: David Scharf Date: Tue, 11 Jun 2024 15:35:06 +0200 Subject: [PATCH] Fix streamlit bug on chess example (#1425) * fix error on missing nullable hint * remove unneeded function (and unrelated formatting :) ) --- .../streamlit_app/blocks/table_hints.py | 27 +++++++-------- .../postgres_to_postgres.py | 33 ++++++++++--------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/dlt/helpers/streamlit_app/blocks/table_hints.py b/dlt/helpers/streamlit_app/blocks/table_hints.py index e2ebcde1c9..4b0328d1dc 100644 --- a/dlt/helpers/streamlit_app/blocks/table_hints.py +++ b/dlt/helpers/streamlit_app/blocks/table_hints.py @@ -3,7 +3,7 @@ import dlt import streamlit as st -from dlt.common.schema.typing import TTableSchema +from dlt.common.schema.typing import TTableSchema, TColumnSchema from dlt.common.utils import flatten_list_or_items from dlt.helpers.streamlit_app.blocks.resource_state import resource_state_info from dlt.helpers.streamlit_app.blocks.show_data import show_data_button @@ -62,19 +62,14 @@ def list_table_hints(pipeline: dlt.Pipeline, tables: List[TTableSchema]) -> None table["resource"], ) - # table schema contains various hints (like clustering or partition options) - # that we do not want to show in basic view - def essentials_f(c: Any) -> Dict[str, Any]: - essentials: Dict[str, Any] = {} - for k, v in c.items(): - if k in ["name", "data_type", "nullable"]: - essentials[k] = v - - return { - "name": essentials["name"], - "data_type": essentials["data_type"], - "nullable": essentials["nullable"], - } - - st.table(map(essentials_f, table["columns"].values())) + st.table( + map( + lambda c: { + "name": c["name"], + "data_type": c.get("data_type"), + "nullable": c.get("nullable", True), + }, + table["columns"].values(), + ) + ) show_data_button(pipeline, table["name"]) diff --git a/docs/examples/postgres_to_postgres/postgres_to_postgres.py b/docs/examples/postgres_to_postgres/postgres_to_postgres.py index 85b8aed045..f5327ee236 100644 --- a/docs/examples/postgres_to_postgres/postgres_to_postgres.py +++ b/docs/examples/postgres_to_postgres/postgres_to_postgres.py @@ -91,16 +91,17 @@ def pg_resource_chunked( order_date: str, load_type: str = "merge", columns: str = "*", - credentials: ConnectionStringCredentials = dlt.secrets[ - "sources.postgres.credentials" - ], + credentials: ConnectionStringCredentials = dlt.secrets["sources.postgres.credentials"], ): print( f"dlt.resource write_disposition: `{load_type}` -- ", - f"connection string: postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}", + "connection string:" + f" postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}", ) - query = f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}" # Needed to have an idempotent query + query = ( # Needed to have an idempotent query + f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}" + ) source = dlt.resource( # type: ignore name=table_name, @@ -133,9 +134,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): if __name__ == "__main__": # Input Handling - parser = argparse.ArgumentParser( - description="Run specific functions in the script." - ) + parser = argparse.ArgumentParser(description="Run specific functions in the script.") parser.add_argument("--replace", action="store_true", help="Run initial load") parser.add_argument("--merge", action="store_true", help="Run delta load") args = parser.parse_args() @@ -233,20 +232,26 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): ).fetchone()[0] print(f"timestamped_schema: {timestamped_schema}") - target_credentials = ConnectionStringCredentials(dlt.secrets["destination.postgres.credentials"]) + target_credentials = ConnectionStringCredentials( + dlt.secrets["destination.postgres.credentials"] + ) # connect to destination (timestamped schema) conn.sql( - f"ATTACH 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}' AS pg_db (TYPE postgres);" + "ATTACH" + f" 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}'" + " AS pg_db (TYPE postgres);" ) conn.sql(f"CREATE SCHEMA IF NOT EXISTS pg_db.{timestamped_schema};") for table in tables: print( - f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO Postgres {timestamped_schema}.{table['table_name']}" + f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO" + f" Postgres {timestamped_schema}.{table['table_name']}" ) conn.sql( - f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS SELECT * FROM {timestamped_schema}.{table['table_name']};" + f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS" + f" SELECT * FROM {timestamped_schema}.{table['table_name']};" ) conn.sql( f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};" @@ -262,9 +267,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): assert int(rows) == 9 # 5. Cleanup and rename Schema - print( - "##################################### RENAME Schema and CLEANUP ########" - ) + print("##################################### RENAME Schema and CLEANUP ########") try: con_hd = psycopg2.connect( dbname=target_credentials.database,