From ca14acfd25ff58892a2eac763b89ab5aa1a8a8f3 Mon Sep 17 00:00:00 2001 From: Joe Mesterhazy Date: Wed, 18 Sep 2024 11:38:02 -0700 Subject: [PATCH 1/5] Materialize view data to table option for Dumpty --- CHANGELOG-dguan2-STAR-8825.adoc | 1 + config.yaml.example | 13 +++++++++++++ src/dumpty/config.py | 2 +- src/dumpty/main.py | 26 ++++++++++++++++++-------- src/dumpty/pipeline.py | 17 ++++++++++++++++- src/dumpty/sql/vv_Example.sql | 1 + 6 files changed, 50 insertions(+), 10 deletions(-) create mode 100644 CHANGELOG-dguan2-STAR-8825.adoc create mode 100644 src/dumpty/sql/vv_Example.sql diff --git a/CHANGELOG-dguan2-STAR-8825.adoc b/CHANGELOG-dguan2-STAR-8825.adoc new file mode 100644 index 0000000..c7a6719 --- /dev/null +++ b/CHANGELOG-dguan2-STAR-8825.adoc @@ -0,0 +1 @@ +* ({uri-jira}/STAR-8825[STAR-8825]) - Materialize view data to table option for Dumpty \ No newline at end of file diff --git a/config.yaml.example b/config.yaml.example index 79f34f8..89567f2 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -73,3 +73,16 @@ extract_workers: 64 load_workers: {{BQ_LOAD_THREADS}} log_file: {{EXTRACT_LOG_FILE}} + +# Source database schema name +schema: dbo + +# Tables to extract from above schema +tables: + - REPORTS + - LOGS +views: + - name: VIEW_MAT + materialized: true + - name: VIEW_SQL + file: vv_Example.sql \ No newline at end of file diff --git a/src/dumpty/config.py b/src/dumpty/config.py index 76039e7..9fbfda5 100644 --- a/src/dumpty/config.py +++ b/src/dumpty/config.py @@ -34,7 +34,7 @@ class Config(YAMLWizard): schema: str tables: List[str] - views: List[dict] + views: List[dict] = None credentials: str = None diff --git a/src/dumpty/main.py b/src/dumpty/main.py index 3ef72ca..e20ce72 100644 --- a/src/dumpty/main.py +++ b/src/dumpty/main.py @@ -204,7 +204,7 @@ def config_from_args(argv) -> Config: return config -# Create views +# Create views (non-materialized) with definition SQL file def create_view(config: Config): """ Create views. @@ -217,11 +217,12 @@ def create_view(config: Config): gcp = GCP() logger.info("Total number of views in YAML: %d", len(view_list)) for view in view_list: - template: Template = env.get_template(view["file"]) - sql = template.render(vars | view) - logger.info("Creating view in BigQuery {0} from file {1} {2}".format(view["name"], view["file"], sql)) - gcp.bigquery_create_view( - "{0}.{1}".format(config.target_dataset, view["name"]), sql) + if "file" in view: + template: Template = env.get_template(view["file"]) + sql = template.render(vars | view) + logger.info("Creating view in BigQuery {0} from file {1} {2}".format(view["name"], view["file"], sql)) + gcp.bigquery_create_view( + "{0}.{1}".format(config.target_dataset, view["name"]), sql) def main(args=None): @@ -311,7 +312,10 @@ def main(args=None): if config.reconcile: # Check if tables being requested actually exist in SQL database before doing anything else # This can be very slow for databases with thousands of tables so it is off by default - pipeline.reconcile(config.tables) + if config.tables is not None: + pipeline.reconcile(config.tables) + if config.views is not None: + pipeline.reconcile_view(config.views) """ STEPS: @@ -325,6 +329,11 @@ def main(args=None): table_list = config.tables logger.info("Total number of tables in YAML: %d", len(table_list)) + # Append materialized views to table list + for view in config.views: + if view["materialized"]: + table_list.append(view["name"]) + if config.extract.strip() == "incremental": logger.info("Running INCREMENTAL EXTRACTION ETL...") @@ -456,7 +465,8 @@ def main(args=None): logger.info("DATA EXTRACTION IS DONE!!!") # Create views - create_view(config) + if config.views != None: + create_view(config) # Summarize summary["end_date"] = datetime.now() diff --git a/src/dumpty/pipeline.py b/src/dumpty/pipeline.py index e4722a9..208286a 100644 --- a/src/dumpty/pipeline.py +++ b/src/dumpty/pipeline.py @@ -350,7 +350,7 @@ def introspect(self, extract: Extract) -> Extract: extract.rows = res.count else: logger.debug(f"Getting count(*) of {extract.name}") - if self.config.fastcount: + if self.config.fastcount and not extract.name.startswith("vv_"): result = session.execute( f"EXEC sp_spaceused N'{self.config.schema}.{extract.name}';").fetchall() @@ -808,3 +808,18 @@ def reconcile(self, table_names: List[str]): if len(not_found) > 0: raise ValidationException( f"Could not find these tables in {self.config.schema}: {','.join(not_found)}") + + def reconcile_view(self, view_names: List[str]): + """Checks if a list of view names exist in TinyDB and SQL database + :param view_names: List of view names to validate against database + :raises: :class:`.ValidationException` when a view is not found. Use this + to fail early if you are not sure if the views are actually in the SQL database. + """ + logger.info( + f"Reconciling list of views against schema {self.config.schema}") + sql_views = self._inspector.get_view_names(schema=self.config.schema) + not_found = [t["name"] for t in view_names if t["name"].lower() not in ( + view.lower() for view in sql_views)] + if len(not_found) > 0: + raise ValidationException( + f"Could not find these views in {self.config.schema}: {','.join(not_found)}") diff --git a/src/dumpty/sql/vv_Example.sql b/src/dumpty/sql/vv_Example.sql new file mode 100644 index 0000000..db2832b --- /dev/null +++ b/src/dumpty/sql/vv_Example.sql @@ -0,0 +1 @@ +select * from `{{ target_dataset }}`.doctor \ No newline at end of file From 9f8ace23b995e6d7c87d5d7769a7ecfbe63b58d1 Mon Sep 17 00:00:00 2001 From: Joe Mesterhazy Date: Thu, 19 Sep 2024 08:50:24 -0700 Subject: [PATCH 2/5] skip rowcount for views --- src/dumpty/pipeline.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/dumpty/pipeline.py b/src/dumpty/pipeline.py index 208286a..c2f5f47 100644 --- a/src/dumpty/pipeline.py +++ b/src/dumpty/pipeline.py @@ -350,18 +350,18 @@ def introspect(self, extract: Extract) -> Extract: extract.rows = res.count else: logger.debug(f"Getting count(*) of {extract.name}") - if self.config.fastcount and not extract.name.startswith("vv_"): - result = session.execute( - - f"EXEC sp_spaceused N'{self.config.schema}.{extract.name}';").fetchall() - logger.debug( - f"fast counting result of {result[0][1].rstrip()}") - extract.rows = int(result[0][1].rstrip()) - else: - qry = session.query( - count_fn(literal_column("*")).label("count") - ).select_from(table) - extract.rows = qry.scalar() + if not extract.name.startswith("vv_"): + if self.config.fastcount: + result = session.execute( + f"EXEC sp_spaceused N'{self.config.schema}.{extract.name}';").fetchall() + logger.debug( + f"fast counting result of {result[0][1].rstrip()}") + extract.rows = int(result[0][1].rstrip()) + else: + qry = session.query( + count_fn(literal_column("*")).label("count") + ).select_from(table) + extract.rows = qry.scalar() if not full_introspect: # Stop here if this table was already introspected recently From f50f206bc2398efc281da791c5d3f056bb4620ce Mon Sep 17 00:00:00 2001 From: Darren Guan Date: Tue, 15 Oct 2024 09:22:17 -0700 Subject: [PATCH 3/5] Add nullify column option to Dumpty --- CHANGELOG-dguan2-STAR-9183.adoc | 1 + src/dumpty/config.py | 1 + src/dumpty/main.py | 8 ++++++++ src/dumpty/pipeline.py | 19 ++++++++++++++++++- 4 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 CHANGELOG-dguan2-STAR-9183.adoc diff --git a/CHANGELOG-dguan2-STAR-9183.adoc b/CHANGELOG-dguan2-STAR-9183.adoc new file mode 100644 index 0000000..505f953 --- /dev/null +++ b/CHANGELOG-dguan2-STAR-9183.adoc @@ -0,0 +1 @@ +* ({uri-jira}/STAR-9183[STAR-9183]) - Add nullify column option to Dumpty \ No newline at end of file diff --git a/src/dumpty/config.py b/src/dumpty/config.py index 9fbfda5..3881ac0 100644 --- a/src/dumpty/config.py +++ b/src/dumpty/config.py @@ -60,6 +60,7 @@ class Config(YAMLWizard): create_views_workers: int = 8 drop_dataset: bool = False normalize_schema: bool = True + empty_columns: str = None last_successful_run: str = None extract: str = None tables_query: str = None diff --git a/src/dumpty/main.py b/src/dumpty/main.py index e20ce72..51c9471 100644 --- a/src/dumpty/main.py +++ b/src/dumpty/main.py @@ -119,6 +119,13 @@ def config_from_args(argv) -> Config: help="Local path or gs:// URI to store extracted data", ) + parser.add_argument( + "--empty_columns", + type=str, + default=None, + help="Empty the values of table columns (e.g. table1_name.field1_name,table2_name.field2_name)", + ) + parser.add_argument( "dataset", type=str, @@ -241,6 +248,7 @@ def main(args=None): logger.info("config.target_dataset_location: %s", config.target_dataset_location) logger.info("config.drop_dataset: %s", config.drop_dataset) logger.info("config.normalize_schema: %s", config.normalize_schema) + logger.info("config.empty_columns: %s", config.empty_columns) logger.info("config.extract: %s", config.extract) logger.info("config.last_successful_run: %s", config.last_successful_run) logger.info("config.tables_query: %s", config.tables_query) diff --git a/src/dumpty/pipeline.py b/src/dumpty/pipeline.py index c2f5f47..053a54f 100644 --- a/src/dumpty/pipeline.py +++ b/src/dumpty/pipeline.py @@ -171,6 +171,20 @@ def __exit__(self, exc_type, exc_val, exc_tb): # sleep(36000) self._spark_session.stop() + @staticmethod + def empty_cols(df: DataFrame, table_name: str, drop_cols: str) -> DataFrame: + drop_col_names: List[str] = [] + for col_name in drop_cols.split(','): + if col_name.rsplit('.')[0].lower() == table_name.lower(): + drop_col_names.append(col_name.rsplit('.')[-1]) + # Dataframe drop doesn't work. When it saves the data, the columns still exists. e.g. df.drop(col_name.rsplit('.')[-1]) + logger.info( + f"Table {table_name} - Drop column: {col_name.rsplit('.')[-1]}") + if len(drop_col_names) > 0: + return df.select([c for c in df.columns if c not in drop_col_names]) + else: + return df + @staticmethod def normalize_df(df: DataFrame) -> DataFrame: return df.select([col(x).alias(normalize_str(x)) for x in df.columns]) @@ -694,6 +708,9 @@ def _extract(self, extract: Extract, uri: str) -> str: # Normalize column names? df = self.normalize_df(df) + if self.config.empty_columns is not None: + df = self.empty_cols(df, extract.name, self.config.empty_columns) + session.sparkContext.setLocalProperty("callSite.short", n_table_name) df.write.save(f"{uri}/{n_table_name}", format=self.config.spark.format, mode="overwrite", timestampFormat=self.config.spark.timestamp_format, compression=self.config.spark.compression) @@ -773,7 +790,7 @@ def load(self, extract: Extract) -> Extract: # Load into BigQuery if self.config.target_dataset is not None: - if self.config.schemaonly == False and extract.rows > 0: + if self.config.schemaonly == False: # Load from GCS into BQ bq_rows: int = 0 bq_bytes: int = 0 From 96cbc7b5d8e466de4b0fcf260dab08275c21a6ad Mon Sep 17 00:00:00 2001 From: Darren Guan Date: Tue, 15 Oct 2024 13:59:17 -0700 Subject: [PATCH 4/5] Add nullify column option to Dumpty --- config.yaml.example | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yaml.example b/config.yaml.example index 89567f2..eee2c30 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -76,7 +76,7 @@ log_file: {{EXTRACT_LOG_FILE}} # Source database schema name schema: dbo - +empty_columns: "patient.ssn" # Tables to extract from above schema tables: - REPORTS From b6d953676e1c2c82b115c5db1f1c682877b325bd Mon Sep 17 00:00:00 2001 From: Darren Guan <55291390+delguan@users.noreply.github.com> Date: Mon, 21 Oct 2024 09:54:25 -0700 Subject: [PATCH 5/5] Update config.yaml.example --- config.yaml.example | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config.yaml.example b/config.yaml.example index eee2c30..2f80ea3 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -76,7 +76,7 @@ log_file: {{EXTRACT_LOG_FILE}} # Source database schema name schema: dbo -empty_columns: "patient.ssn" +empty_columns: "patient.ssn,provider.dea" # Tables to extract from above schema tables: - REPORTS @@ -85,4 +85,4 @@ views: - name: VIEW_MAT materialized: true - name: VIEW_SQL - file: vv_Example.sql \ No newline at end of file + file: vv_Example.sql