diff --git a/CHANGELOG-dguan2-STAR-9183.adoc b/CHANGELOG-dguan2-STAR-9183.adoc new file mode 100644 index 0000000..505f953 --- /dev/null +++ b/CHANGELOG-dguan2-STAR-9183.adoc @@ -0,0 +1 @@ +* ({uri-jira}/STAR-9183[STAR-9183]) - Add nullify column option to Dumpty \ No newline at end of file diff --git a/config.yaml.example b/config.yaml.example index 89567f2..2f80ea3 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -76,7 +76,7 @@ log_file: {{EXTRACT_LOG_FILE}} # Source database schema name schema: dbo - +empty_columns: "patient.ssn,provider.dea" # Tables to extract from above schema tables: - REPORTS @@ -85,4 +85,4 @@ views: - name: VIEW_MAT materialized: true - name: VIEW_SQL - file: vv_Example.sql \ No newline at end of file + file: vv_Example.sql diff --git a/src/dumpty/config.py b/src/dumpty/config.py index 9fbfda5..3881ac0 100644 --- a/src/dumpty/config.py +++ b/src/dumpty/config.py @@ -60,6 +60,7 @@ class Config(YAMLWizard): create_views_workers: int = 8 drop_dataset: bool = False normalize_schema: bool = True + empty_columns: str = None last_successful_run: str = None extract: str = None tables_query: str = None diff --git a/src/dumpty/main.py b/src/dumpty/main.py index e20ce72..51c9471 100644 --- a/src/dumpty/main.py +++ b/src/dumpty/main.py @@ -119,6 +119,13 @@ def config_from_args(argv) -> Config: help="Local path or gs:// URI to store extracted data", ) + parser.add_argument( + "--empty_columns", + type=str, + default=None, + help="Empty the values of table columns (e.g. table1_name.field1_name,table2_name.field2_name)", + ) + parser.add_argument( "dataset", type=str, @@ -241,6 +248,7 @@ def main(args=None): logger.info("config.target_dataset_location: %s", config.target_dataset_location) logger.info("config.drop_dataset: %s", config.drop_dataset) logger.info("config.normalize_schema: %s", config.normalize_schema) + logger.info("config.empty_columns: %s", config.empty_columns) logger.info("config.extract: %s", config.extract) logger.info("config.last_successful_run: %s", config.last_successful_run) logger.info("config.tables_query: %s", config.tables_query) diff --git a/src/dumpty/pipeline.py b/src/dumpty/pipeline.py index c2f5f47..053a54f 100644 --- a/src/dumpty/pipeline.py +++ b/src/dumpty/pipeline.py @@ -171,6 +171,20 @@ def __exit__(self, exc_type, exc_val, exc_tb): # sleep(36000) self._spark_session.stop() + @staticmethod + def empty_cols(df: DataFrame, table_name: str, drop_cols: str) -> DataFrame: + drop_col_names: List[str] = [] + for col_name in drop_cols.split(','): + if col_name.rsplit('.')[0].lower() == table_name.lower(): + drop_col_names.append(col_name.rsplit('.')[-1]) + # Dataframe drop doesn't work. When it saves the data, the columns still exists. e.g. df.drop(col_name.rsplit('.')[-1]) + logger.info( + f"Table {table_name} - Drop column: {col_name.rsplit('.')[-1]}") + if len(drop_col_names) > 0: + return df.select([c for c in df.columns if c not in drop_col_names]) + else: + return df + @staticmethod def normalize_df(df: DataFrame) -> DataFrame: return df.select([col(x).alias(normalize_str(x)) for x in df.columns]) @@ -694,6 +708,9 @@ def _extract(self, extract: Extract, uri: str) -> str: # Normalize column names? df = self.normalize_df(df) + if self.config.empty_columns is not None: + df = self.empty_cols(df, extract.name, self.config.empty_columns) + session.sparkContext.setLocalProperty("callSite.short", n_table_name) df.write.save(f"{uri}/{n_table_name}", format=self.config.spark.format, mode="overwrite", timestampFormat=self.config.spark.timestamp_format, compression=self.config.spark.compression) @@ -773,7 +790,7 @@ def load(self, extract: Extract) -> Extract: # Load into BigQuery if self.config.target_dataset is not None: - if self.config.schemaonly == False and extract.rows > 0: + if self.config.schemaonly == False: # Load from GCS into BQ bq_rows: int = 0 bq_bytes: int = 0