Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GEN-997] update method to concatenate columns for dataframe #555

Merged
merged 6 commits into from
Apr 17, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor _update_table
danlu1 committed Mar 22, 2024
commit cb8690249e3197298d684940bcb048ba7d9efce3
124 changes: 103 additions & 21 deletions genie/load.py
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
from . import __version__, extract, process_functions

logger = logging.getLogger(__name__)

import pdb

# TODO Edit docstring
def store_file(
@@ -170,53 +170,135 @@ def _update_table(
database_synid: str,
primary_key_cols: List[str],
to_delete: bool = False,
):
):
"""
A helper function to compare new dataset with existing data,
and store any changes that need to be made to the database
"""
changes = check_database_changes(database, new_dataset, primary_key_cols, to_delete)
store_database(syn, database_synid, changes["col_order"], changes["allupdates"], changes["to_delete_rows"])

def get_col_order(orig_database_cols: List[str]) -> List[str]:
"""
Updates synapse tables by a row identifier with another
dataset that has the same number and order of columns
Get column order

Args:
syn (synapseclient.Synaps): Synapse object
database (pd.DataFrame): Original Data
new_dataset (pd.DataFrame): New Data
database_synid (str): Synapse Id of the Synapse table
primary_key_cols (list): Column(s) that make up the primary key
to_delete (bool, optional): Delete rows. Defaults to False
orig_database_cols (List[str]): A list of column names of the original database

Returns:
The list of re-ordered column names
"""
primary_key = "UNIQUE_KEY"
database = database.fillna("")
orig_database_cols = database.columns
col_order = ["ROW_ID", "ROW_VERSION"]
col_order.extend(orig_database_cols.tolist())
new_dataset = new_dataset.fillna("")
# Columns must be in the same order
return col_order

def _reorder_new_dataset(orig_database_cols: List[str], new_dataset:pd.DataFrame) -> pd.DataFrame:
"""
Reorder new dataset based on the original database

Args:
orig_database_cols (List[str]): A list of column names of the original database
new_dataset(pd.DataFrame): New Data

Returns:
The re-ordered new dataset
"""
# Columns must be in the same order as the original data
new_dataset = new_dataset[orig_database_cols]
database[primary_key_cols] = database[primary_key_cols].applymap(str)
return new_dataset

def _generate_primary_key(database: pd.DataFrame, new_dataset:pd.DataFrame, primary_key_cols: List[str], primary_key: str) -> Dict[pd.DataFrame]:
"""
Generate primary key column for both original database and re-ordered new_dataset

Args:
database: Original Data
new_dataset: The re-ordered new dataset
primary_key_cols (list): Column(s) that make up the primary key
primary_key: The column name of the primary_key
Returns:
A dictionary of original and new dataset with primary_key column added
"""
# replace NAs with emtpy string
database = database.fillna("")
new_dataset = new_dataset.fillna("")
# generate primary key column for original database
database[primary_key_cols] = database[primary_key_cols].applymap(str)
if database.empty:
database[primary_key] = ""
else:
database[primary_key] = database[primary_key_cols].apply(
lambda x: " ".join(x), axis=1
)

# generate primary key column for new dataset
new_dataset[primary_key_cols] = new_dataset[primary_key_cols].applymap(str)
if new_dataset.empty:
new_dataset[primary_key] = ""
else:
new_dataset[primary_key] = new_dataset[primary_key_cols].apply(
lambda x: " ".join(x), axis=1
)
datasets = {'original_data': database, 'new_data': new_dataset}
return datasets

def check_database_changes(
database: pd.DataFrame,
new_dataset: pd.DataFrame,
primary_key_cols: List[str],
to_delete: bool = False,
) -> Dict[pd.DataFrame, List[str]]:
"""
Check changes that need to be made, i.e. append/update/delete rows to the database
based on its comparison with new data

Args:
database (pd.DataFrame): Original Data
new_dataset (pd.DataFrame): New Data
primary_key_cols (list): Column(s) that make up the primary key
to_delete (bool, optional): Delete rows. Defaults to False
"""
# get a list of column names of the original database
orig_database_cols = database.columns
# get the final column order
col_order = get_col_order(orig_database_cols)
# reorder new_dataset
new_dataset = _reorder_new_dataset(orig_database_cols, new_dataset)
# set the primary_key name
primary_key = "UNIQUE_KEY"
# generate primary_key column for dataset comparison
datasets = _generate_primary_key(database, new_dataset, primary_key_cols, primary_key)
# output dictionary
changes = {"col_order": col_order, "allupdates": None, "to_delete_rows": None}
# get rows to be appened or updated
allupdates = pd.DataFrame(columns=col_order)
to_append_rows = process_functions._append_rows(new_dataset, database, primary_key)
to_update_rows = process_functions._update_rows(new_dataset, database, primary_key)
to_append_rows = process_functions._append_rows(datasets["new_data"], datasets["original_data"], primary_key)
to_update_rows = process_functions._update_rows(datasets["new_data"], datasets["original_data"], primary_key)
allupdates = pd.concat([allupdates, to_append_rows, to_update_rows], sort=False)
changes["allupdates"] = allupdates
# get rows to be deleted
if to_delete:
to_delete_rows = process_functions._delete_rows(
new_dataset, database, primary_key
datasets["new_data"], datasets["original_data"], primary_key
)
else:
to_delete_rows = pd.DataFrame()
allupdates = pd.concat([allupdates, to_append_rows, to_update_rows], sort=False)
changes["to_delete_rows"] = to_delete_rows
return changes

def store_database(syn: synapseclient.Synapse, database_synid: str, col_order: List[str], allupdates: pd.DataFrame, to_delete_rows: pd.DataFrame) -> ModuleNotFoundError:
"""
Store changes to the database

Args:
syn (synapseclient.Synaps): Synapse object
database_synid (str): Synapse Id of the Synapse table
col_order (List[str]): The ordered column names to be saved
allupdates (pd.DataFrame): rows to be appended and/or updated
to_deleted_rows (pd.DataFrame): rows to be deleted

Returns:
None
"""
storedatabase = False
update_all_file = tempfile.NamedTemporaryFile(
dir=process_functions.SCRIPT_DIR, delete=False