Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Darren star 9183 - Add nullify column option to Dumpty #29

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-dguan2-STAR-9183.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* ({uri-jira}/STAR-9183[STAR-9183]) - Add nullify column option to Dumpty
4 changes: 2 additions & 2 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ log_file: {{EXTRACT_LOG_FILE}}

# Source database schema name
schema: dbo

empty_columns: "patient.ssn,provider.dea"
# Tables to extract from above schema
tables:
- REPORTS
Expand All @@ -85,4 +85,4 @@ views:
- name: VIEW_MAT
materialized: true
- name: VIEW_SQL
file: vv_Example.sql
file: vv_Example.sql
1 change: 1 addition & 0 deletions src/dumpty/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class Config(YAMLWizard):
create_views_workers: int = 8
drop_dataset: bool = False
normalize_schema: bool = True
empty_columns: str = None
last_successful_run: str = None
extract: str = None
tables_query: str = None
Expand Down
8 changes: 8 additions & 0 deletions src/dumpty/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ def config_from_args(argv) -> Config:
help="Local path or gs:// URI to store extracted data",
)

parser.add_argument(
"--empty_columns",
type=str,
default=None,
help="Empty the values of table columns (e.g. table1_name.field1_name,table2_name.field2_name)",
)

parser.add_argument(
"dataset",
type=str,
Expand Down Expand Up @@ -241,6 +248,7 @@ def main(args=None):
logger.info("config.target_dataset_location: %s", config.target_dataset_location)
logger.info("config.drop_dataset: %s", config.drop_dataset)
logger.info("config.normalize_schema: %s", config.normalize_schema)
logger.info("config.empty_columns: %s", config.empty_columns)
logger.info("config.extract: %s", config.extract)
logger.info("config.last_successful_run: %s", config.last_successful_run)
logger.info("config.tables_query: %s", config.tables_query)
Expand Down
19 changes: 18 additions & 1 deletion src/dumpty/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ def __exit__(self, exc_type, exc_val, exc_tb):
# sleep(36000)
self._spark_session.stop()

@staticmethod
def empty_cols(df: DataFrame, table_name: str, drop_cols: str) -> DataFrame:
drop_col_names: List[str] = []
for col_name in drop_cols.split(','):
if col_name.rsplit('.')[0].lower() == table_name.lower():
drop_col_names.append(col_name.rsplit('.')[-1])
# Dataframe drop doesn't work. When it saves the data, the columns still exists. e.g. df.drop(col_name.rsplit('.')[-1])
logger.info(
f"Table {table_name} - Drop column: {col_name.rsplit('.')[-1]}")
if len(drop_col_names) > 0:
return df.select([c for c in df.columns if c not in drop_col_names])
else:
return df

@staticmethod
def normalize_df(df: DataFrame) -> DataFrame:
return df.select([col(x).alias(normalize_str(x)) for x in df.columns])
Expand Down Expand Up @@ -694,6 +708,9 @@ def _extract(self, extract: Extract, uri: str) -> str:
# Normalize column names?
df = self.normalize_df(df)

if self.config.empty_columns is not None:
df = self.empty_cols(df, extract.name, self.config.empty_columns)

session.sparkContext.setLocalProperty("callSite.short", n_table_name)
df.write.save(f"{uri}/{n_table_name}", format=self.config.spark.format, mode="overwrite",
timestampFormat=self.config.spark.timestamp_format, compression=self.config.spark.compression)
Expand Down Expand Up @@ -773,7 +790,7 @@ def load(self, extract: Extract) -> Extract:

# Load into BigQuery
if self.config.target_dataset is not None:
if self.config.schemaonly == False and extract.rows > 0:
if self.config.schemaonly == False:
# Load from GCS into BQ
bq_rows: int = 0
bq_bytes: int = 0
Expand Down