Skip to content

Commit

Permalink
[GEN-997] update method to concatenate columns for dataframe (#555)
Browse files Browse the repository at this point in the history
* update method to concatenate columns

* refactor _update_table by separating it to several functions

* add test cases for new functions
  • Loading branch information
danlu1 authored Apr 17, 2024
1 parent eb18202 commit e2c2321
Show file tree
Hide file tree
Showing 2 changed files with 326 additions and 31 deletions.
153 changes: 125 additions & 28 deletions genie/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,50 +172,147 @@ def _update_table(
to_delete: bool = False,
):
"""
Updates synapse tables by a row identifier with another
dataset that has the same number and order of columns
A helper function to compare new dataset with existing data,
and store any changes that need to be made to the database
"""
changes = check_database_changes(database, new_dataset, primary_key_cols, to_delete)
store_database(
syn,
database_synid,
changes["col_order"],
changes["allupdates"],
changes["to_delete_rows"],
)


def _get_col_order(orig_database_cols: pd.Index) -> List[str]:
"""
Get column order
Args:
syn (synapseclient.Synaps): Synapse object
database (pd.DataFrame): Original Data
new_dataset (pd.DataFrame): New Data
database_synid (str): Synapse Id of the Synapse table
primary_key_cols (list): Column(s) that make up the primary key
to_delete (bool, optional): Delete rows. Defaults to False
orig_database_cols (pd.Index): A list of column names of the original database
Returns:
The list of re-ordered column names
"""
primary_key = "UNIQUE_KEY"
database = database.fillna("")
orig_database_cols = database.columns
col_order = ["ROW_ID", "ROW_VERSION"]
col_order.extend(orig_database_cols.tolist())
new_dataset = new_dataset.fillna("")
# Columns must be in the same order
return col_order


def _reorder_new_dataset(
orig_database_cols: pd.Index, new_dataset: pd.DataFrame
) -> pd.DataFrame:
"""
Reorder new dataset based on the original database
Args:
orig_database_cols (pd.Index): A list of column names of the original database
new_dataset(pd.DataFrame): New Data
Returns:
The re-ordered new dataset
"""
# Columns must be in the same order as the original data
new_dataset = new_dataset[orig_database_cols]
database[primary_key_cols] = database[primary_key_cols].applymap(str)
database[primary_key] = database[primary_key_cols].apply(
lambda x: " ".join(x), axis=1
)
return new_dataset


def _generate_primary_key(
dataset: pd.DataFrame, primary_key_cols: List[str], primary_key: str
) -> pd.DataFrame:
"""
Generate primary key column a dataframe
Args:
dataset(pd.DataFrame): A dataframe
new_dataset: The re-ordered new dataset
primary_key_cols (list): Column(s) that make up the primary key
primary_key: The column name of the primary_key
Returns:
The dataframe with primary_key column added
"""
# replace NAs with emtpy string
dataset = dataset.fillna("")
# generate primary key column for original database
dataset[primary_key_cols] = dataset[primary_key_cols].applymap(str)
if dataset.empty:
dataset[primary_key] = ""
else:
dataset[primary_key] = dataset[primary_key_cols].apply(
lambda x: " ".join(x), axis=1
)
return dataset

new_dataset[primary_key_cols] = new_dataset[primary_key_cols].applymap(str)
new_dataset[primary_key] = new_dataset[primary_key_cols].apply(
lambda x: " ".join(x), axis=1
)

def check_database_changes(
database: pd.DataFrame,
new_dataset: pd.DataFrame,
primary_key_cols: List[str],
to_delete: bool = False,
) -> Dict[pd.DataFrame, List[str]]:
"""
Check changes that need to be made, i.e. append/update/delete rows to the database
based on its comparison with new data
Args:
database (pd.DataFrame): Original Data
new_dataset (pd.DataFrame): New Data
primary_key_cols (list): Column(s) that make up the primary key
to_delete (bool, optional): Delete rows. Defaults to False
"""
# get a list of column names of the original database
orig_database_cols = database.columns
# get the final column order
col_order = _get_col_order(orig_database_cols)
# reorder new_dataset
new_dataset = _reorder_new_dataset(orig_database_cols, new_dataset)
# set the primary_key name
primary_key = "UNIQUE_KEY"
# generate primary_key column for dataset comparison
ori_data = _generate_primary_key(database, primary_key_cols, primary_key)
new_data = _generate_primary_key(new_dataset, primary_key_cols, primary_key)
# output dictionary
changes = {"col_order": col_order, "allupdates": None, "to_delete_rows": None}
# get rows to be appened or updated
allupdates = pd.DataFrame(columns=col_order)
to_append_rows = process_functions._append_rows(new_dataset, database, primary_key)
to_update_rows = process_functions._update_rows(new_dataset, database, primary_key)
to_append_rows = process_functions._append_rows(new_data, ori_data, primary_key)
to_update_rows = process_functions._update_rows(new_data, ori_data, primary_key)
allupdates = pd.concat([allupdates, to_append_rows, to_update_rows], sort=False)
changes["allupdates"] = allupdates
# get rows to be deleted
if to_delete:
to_delete_rows = process_functions._delete_rows(
new_dataset, database, primary_key
)
to_delete_rows = process_functions._delete_rows(new_data, ori_data, primary_key)
else:
to_delete_rows = pd.DataFrame()
allupdates = pd.concat([allupdates, to_append_rows, to_update_rows], sort=False)
changes["to_delete_rows"] = to_delete_rows
return changes


def store_database(
syn: synapseclient.Synapse,
database_synid: str,
col_order: List[str],
allupdates: pd.DataFrame,
to_delete_rows: pd.DataFrame,
):
"""
Store changes to the database
Args:
syn (synapseclient.Synaps): Synapse object
database_synid (str): Synapse Id of the Synapse table
col_order (List[str]): The ordered column names to be saved
allupdates (pd.DataFrame): rows to be appended and/or updated
to_deleted_rows (pd.DataFrame): rows to be deleted
Returns:
None
"""
storedatabase = False
update_all_file = tempfile.NamedTemporaryFile(
dir=process_functions.SCRIPT_DIR, delete=False
)

with open(update_all_file.name, "w") as updatefile:
# Must write out the headers in case there are no appends or updates
updatefile.write(",".join(col_order) + "\n")
Expand Down
204 changes: 201 additions & 3 deletions tests/test_load.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from unittest.mock import patch

from unittest.mock import patch, mock_open
import synapseclient
from synapseclient.core.exceptions import SynapseTimeoutError

from genie import load, __version__
from genie import load, __version__, process_functions
import pandas as pd
import tempfile
import os
import pytest


def test_store_file(syn):
Expand Down Expand Up @@ -67,3 +70,198 @@ def test_store_table_error(syn):
with patch.object(syn, "store", side_effect=SynapseTimeoutError) as patch_store:
load.store_table(syn, "full/path", "syn1234")
patch_store.assert_called_once()


def test__get_col_order():
database = pd.DataFrame(columns=["a", "b", "c"])
orig_database_cols = database.columns
expected = ["ROW_ID", "ROW_VERSION", "a", "b", "c"]
results = load._get_col_order(orig_database_cols)
assert results == expected


@pytest.mark.parametrize(
"ori_dataset,new_dataset",
[
(
pd.DataFrame(columns=["a", "b", "c"]),
pd.DataFrame(columns=["b", "c", "a"]),
),
],
)
def test__reorder_new_dataset(ori_dataset, new_dataset):
"""Test if new dataset is re-ordered as the column order of original dataset.
No need to check different number of columns since we are using commmon columns as
specified in load.update_table()
"""
orig_database_cols = ori_dataset.columns
reorder_new_dataset = load._reorder_new_dataset(orig_database_cols, new_dataset)
pd.testing.assert_index_equal(reorder_new_dataset.columns, orig_database_cols)


@pytest.mark.parametrize(
"dataset,expected",
[
(
pd.DataFrame(
{
"test": ["test1", "test2", "test3", "test4"],
"foo": [1.0, 2.0, float("nan"), float("nan")],
"baz": ["b", float("nan"), "c", float("nan")],
}
),
pd.DataFrame(
{
"test": ["test1", "test2", "test3", "test4"],
"foo": ["1.0", "2.0", "", ""],
"baz": ["b", "", "c", ""],
"UNIQUE_KEY": ["test1 1.0 b", "test2 2.0 ", "test3 c", "test4 "],
}
),
),
(
pd.DataFrame(columns=["test", "foo", "baz"]),
pd.DataFrame(columns=["test", "foo", "baz", "UNIQUE_KEY"]),
),
],
ids=["non_empty dataframe", "empty dataframe"],
)
def test__generate_primary_key(dataset, expected):
results = load._generate_primary_key(
dataset=dataset,
primary_key_cols=["test", "foo", "baz"],
primary_key="UNIQUE_KEY",
)
pd.testing.assert_frame_equal(results, expected)


@pytest.mark.parametrize(
"to_delete,expected_to_delete_rows",
[
(True, pd.DataFrame({0: ["2", "3"], 1: ["3", "5"]})),
(False, pd.DataFrame()),
],
ids=["to_delted is True", "to_delted is False"],
)
def test_check_database_changes(to_delete, expected_to_delete_rows):
with patch.object(
process_functions,
"_append_rows",
return_value=pd.DataFrame({"test": ["test4"], "foo": [4], "baz": [3.2]}),
) as patch_append_rows, patch.object(
process_functions,
"_update_rows",
return_value=pd.DataFrame(
{
"test": ["test", "test2"],
"foo": [1, 3],
"baz": ["", 5],
"ROW_ID": ["1", "2"],
"ROW_VERSION": ["3", "3"],
}
),
) as patch_update, patch.object(
process_functions,
"_delete_rows",
return_value=pd.DataFrame({0: ["2", "3"], 1: ["3", "5"]}),
) as patch_delete, patch.object(
load, "_generate_primary_key", return_value="table"
), patch.object(
load,
"_get_col_order",
return_value=["ROW_ID", "ROW_VERSION", "test", "foo", "baz"],
) as col_order:
database = pd.DataFrame(columns=["test", "foo", "baz"])
new_dataset = pd.DataFrame(columns=["test", "foo", "baz"])
primary_key_cols = ["test", "foo", "baz"]
primary_key = "UNIQUE_KEY"
allupdates = pd.DataFrame(columns=col_order.return_value)
expected_allupdates = pd.concat(
[allupdates, patch_append_rows.return_value, patch_update.return_value],
sort=False,
)

# # check if to_delete is False
results = load.check_database_changes(
database, new_dataset, primary_key_cols, to_delete
)
if to_delete:
patch_delete.assert_called_once_with("table", "table", primary_key)
else:
patch_delete.assert_not_called()
pd.testing.assert_frame_equal(
results["allupdates"].sort_index(axis=1).reset_index(drop=True),
expected_allupdates.sort_index(axis=1).reset_index(drop=True),
)
pd.testing.assert_frame_equal(
results["to_delete_rows"], expected_to_delete_rows
)


@pytest.mark.parametrize(
"allupdates,to_delete_rows,expected_results",
[
(
pd.DataFrame(
{
"test": ["test", "test2"],
"foo": [1, 3],
"baz": ["", 5],
"ROW_ID": ["1", "2"],
"ROW_VERSION": ["3", "3"],
}
),
pd.DataFrame({0: ["3"], 1: ["5"]}),
[
"ROW_ID,ROW_VERSION,test,foo,baz\n",
"1,3,test,1,\n2,3,test2,3,5\n",
"3,5\n",
],
),
(
pd.DataFrame(),
pd.DataFrame({0: ["3"], 1: ["5"]}),
["ROW_ID,ROW_VERSION,test,foo,baz\n", "3,5\n"],
),
(
pd.DataFrame(
{
"test": ["test", "test2"],
"foo": [1, 3],
"baz": ["", 5],
"ROW_ID": ["1", "2"],
"ROW_VERSION": ["3", "3"],
}
),
pd.DataFrame(),
["ROW_ID,ROW_VERSION,test,foo,baz\n", "1,3,test,1,\n2,3,test2,3,5\n"],
),
(pd.DataFrame(), pd.DataFrame(), ["ROW_ID,ROW_VERSION,test,foo,baz\n"]),
],
ids=[
"non_empty dataframes",
"empty allupdates dataframe",
"empty to_delete_rows dataframe",
"empty dataframes",
],
)
def test_store_database(syn, allupdates, to_delete_rows, expected_results):
"""Test _update_table function with both new_dataset and database as non_empty dataframe"""
database_synid = "syn123"
col_order = ["ROW_ID", "ROW_VERSION", "test", "foo", "baz"]
with patch("os.unlink") as mock_unlink, patch(
"tempfile.NamedTemporaryFile"
) as mock_tempfile:
with patch("builtins.open", mock_open()) as mock_file_open:
# set the tempfile name
mock_tempfile.return_value.name = "test.csv"
load.store_database(
syn, database_synid, col_order, allupdates, to_delete_rows
)
mock_file_open.assert_called_once_with("test.csv", "w")
mock_file_handle = mock_file_open()
write_calls = mock_file_handle.write.call_args_list
results = [call_args[0][0] for call_args in write_calls]
assert results == expected_results
mock_unlink.assert_called_once_with("test.csv")

0 comments on commit e2c2321

Please sign in to comment.