Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GEN-997] update method to concatenate columns for dataframe #555

Merged
merged 6 commits into from
Apr 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 125 additions & 28 deletions genie/load.py
Original file line number Diff line number Diff line change
@@ -172,50 +172,147 @@ def _update_table(
to_delete: bool = False,
):
"""
Updates synapse tables by a row identifier with another
dataset that has the same number and order of columns
A helper function to compare new dataset with existing data,
and store any changes that need to be made to the database
"""
changes = check_database_changes(database, new_dataset, primary_key_cols, to_delete)
store_database(
syn,
database_synid,
changes["col_order"],
changes["allupdates"],
changes["to_delete_rows"],
)


def _get_col_order(orig_database_cols: pd.Index) -> List[str]:
"""
Get column order

Args:
syn (synapseclient.Synaps): Synapse object
database (pd.DataFrame): Original Data
new_dataset (pd.DataFrame): New Data
database_synid (str): Synapse Id of the Synapse table
primary_key_cols (list): Column(s) that make up the primary key
to_delete (bool, optional): Delete rows. Defaults to False
orig_database_cols (pd.Index): A list of column names of the original database

Returns:
The list of re-ordered column names
"""
primary_key = "UNIQUE_KEY"
database = database.fillna("")
orig_database_cols = database.columns
col_order = ["ROW_ID", "ROW_VERSION"]
col_order.extend(orig_database_cols.tolist())
new_dataset = new_dataset.fillna("")
# Columns must be in the same order
return col_order


def _reorder_new_dataset(
orig_database_cols: pd.Index, new_dataset: pd.DataFrame
) -> pd.DataFrame:
"""
Reorder new dataset based on the original database

Args:
orig_database_cols (pd.Index): A list of column names of the original database
new_dataset(pd.DataFrame): New Data

Returns:
The re-ordered new dataset
"""
# Columns must be in the same order as the original data
new_dataset = new_dataset[orig_database_cols]
database[primary_key_cols] = database[primary_key_cols].applymap(str)
database[primary_key] = database[primary_key_cols].apply(
lambda x: " ".join(x), axis=1
)
return new_dataset


def _generate_primary_key(
dataset: pd.DataFrame, primary_key_cols: List[str], primary_key: str
) -> pd.DataFrame:
"""
Generate primary key column a dataframe

Args:
dataset(pd.DataFrame): A dataframe
new_dataset: The re-ordered new dataset
primary_key_cols (list): Column(s) that make up the primary key
primary_key: The column name of the primary_key
Returns:
The dataframe with primary_key column added
"""
# replace NAs with emtpy string
dataset = dataset.fillna("")
# generate primary key column for original database
dataset[primary_key_cols] = dataset[primary_key_cols].applymap(str)
if dataset.empty:
dataset[primary_key] = ""
else:
dataset[primary_key] = dataset[primary_key_cols].apply(
lambda x: " ".join(x), axis=1
)
return dataset

new_dataset[primary_key_cols] = new_dataset[primary_key_cols].applymap(str)
new_dataset[primary_key] = new_dataset[primary_key_cols].apply(
lambda x: " ".join(x), axis=1
)

def check_database_changes(
database: pd.DataFrame,
new_dataset: pd.DataFrame,
primary_key_cols: List[str],
to_delete: bool = False,
) -> Dict[pd.DataFrame, List[str]]:
"""
Check changes that need to be made, i.e. append/update/delete rows to the database
based on its comparison with new data

Args:
database (pd.DataFrame): Original Data
new_dataset (pd.DataFrame): New Data
primary_key_cols (list): Column(s) that make up the primary key
to_delete (bool, optional): Delete rows. Defaults to False
"""
# get a list of column names of the original database
orig_database_cols = database.columns
# get the final column order
col_order = _get_col_order(orig_database_cols)
# reorder new_dataset
new_dataset = _reorder_new_dataset(orig_database_cols, new_dataset)
# set the primary_key name
primary_key = "UNIQUE_KEY"
# generate primary_key column for dataset comparison
ori_data = _generate_primary_key(database, primary_key_cols, primary_key)
new_data = _generate_primary_key(new_dataset, primary_key_cols, primary_key)
# output dictionary
changes = {"col_order": col_order, "allupdates": None, "to_delete_rows": None}
# get rows to be appened or updated
allupdates = pd.DataFrame(columns=col_order)
to_append_rows = process_functions._append_rows(new_dataset, database, primary_key)
to_update_rows = process_functions._update_rows(new_dataset, database, primary_key)
to_append_rows = process_functions._append_rows(new_data, ori_data, primary_key)
to_update_rows = process_functions._update_rows(new_data, ori_data, primary_key)
allupdates = pd.concat([allupdates, to_append_rows, to_update_rows], sort=False)
changes["allupdates"] = allupdates
# get rows to be deleted
if to_delete:
to_delete_rows = process_functions._delete_rows(
new_dataset, database, primary_key
)
to_delete_rows = process_functions._delete_rows(new_data, ori_data, primary_key)
else:
to_delete_rows = pd.DataFrame()
allupdates = pd.concat([allupdates, to_append_rows, to_update_rows], sort=False)
changes["to_delete_rows"] = to_delete_rows
return changes


def store_database(
syn: synapseclient.Synapse,
database_synid: str,
col_order: List[str],
allupdates: pd.DataFrame,
to_delete_rows: pd.DataFrame,
):
"""
Store changes to the database

Args:
syn (synapseclient.Synaps): Synapse object
database_synid (str): Synapse Id of the Synapse table
col_order (List[str]): The ordered column names to be saved
allupdates (pd.DataFrame): rows to be appended and/or updated
to_deleted_rows (pd.DataFrame): rows to be deleted

Returns:
None
"""
storedatabase = False
update_all_file = tempfile.NamedTemporaryFile(
dir=process_functions.SCRIPT_DIR, delete=False
)

with open(update_all_file.name, "w") as updatefile:
# Must write out the headers in case there are no appends or updates
updatefile.write(",".join(col_order) + "\n")
204 changes: 201 additions & 3 deletions tests/test_load.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from unittest.mock import patch

from unittest.mock import patch, mock_open
import synapseclient
from synapseclient.core.exceptions import SynapseTimeoutError

from genie import load, __version__
from genie import load, __version__, process_functions
import pandas as pd
import tempfile
import os
import pytest


def test_store_file(syn):
@@ -67,3 +70,198 @@ def test_store_table_error(syn):
with patch.object(syn, "store", side_effect=SynapseTimeoutError) as patch_store:
load.store_table(syn, "full/path", "syn1234")
patch_store.assert_called_once()


def test__get_col_order():
database = pd.DataFrame(columns=["a", "b", "c"])
orig_database_cols = database.columns
expected = ["ROW_ID", "ROW_VERSION", "a", "b", "c"]
results = load._get_col_order(orig_database_cols)
assert results == expected


@pytest.mark.parametrize(
"ori_dataset,new_dataset",
[
(
pd.DataFrame(columns=["a", "b", "c"]),
pd.DataFrame(columns=["b", "c", "a"]),
),
],
)
def test__reorder_new_dataset(ori_dataset, new_dataset):
"""Test if new dataset is re-ordered as the column order of original dataset.
No need to check different number of columns since we are using commmon columns as
specified in load.update_table()

"""
orig_database_cols = ori_dataset.columns
reorder_new_dataset = load._reorder_new_dataset(orig_database_cols, new_dataset)
pd.testing.assert_index_equal(reorder_new_dataset.columns, orig_database_cols)


@pytest.mark.parametrize(
"dataset,expected",
[
(
pd.DataFrame(
{
"test": ["test1", "test2", "test3", "test4"],
"foo": [1.0, 2.0, float("nan"), float("nan")],
"baz": ["b", float("nan"), "c", float("nan")],
}
),
pd.DataFrame(
{
"test": ["test1", "test2", "test3", "test4"],
"foo": ["1.0", "2.0", "", ""],
"baz": ["b", "", "c", ""],
"UNIQUE_KEY": ["test1 1.0 b", "test2 2.0 ", "test3 c", "test4 "],
}
),
),
(
pd.DataFrame(columns=["test", "foo", "baz"]),
pd.DataFrame(columns=["test", "foo", "baz", "UNIQUE_KEY"]),
),
],
ids=["non_empty dataframe", "empty dataframe"],
)
def test__generate_primary_key(dataset, expected):
results = load._generate_primary_key(
dataset=dataset,
primary_key_cols=["test", "foo", "baz"],
primary_key="UNIQUE_KEY",
)
pd.testing.assert_frame_equal(results, expected)


@pytest.mark.parametrize(
"to_delete,expected_to_delete_rows",
[
(True, pd.DataFrame({0: ["2", "3"], 1: ["3", "5"]})),
(False, pd.DataFrame()),
],
ids=["to_delted is True", "to_delted is False"],
)
def test_check_database_changes(to_delete, expected_to_delete_rows):
with patch.object(
process_functions,
"_append_rows",
return_value=pd.DataFrame({"test": ["test4"], "foo": [4], "baz": [3.2]}),
) as patch_append_rows, patch.object(
process_functions,
"_update_rows",
return_value=pd.DataFrame(
{
"test": ["test", "test2"],
"foo": [1, 3],
"baz": ["", 5],
"ROW_ID": ["1", "2"],
"ROW_VERSION": ["3", "3"],
}
),
) as patch_update, patch.object(
process_functions,
"_delete_rows",
return_value=pd.DataFrame({0: ["2", "3"], 1: ["3", "5"]}),
) as patch_delete, patch.object(
load, "_generate_primary_key", return_value="table"
), patch.object(
load,
"_get_col_order",
return_value=["ROW_ID", "ROW_VERSION", "test", "foo", "baz"],
) as col_order:
database = pd.DataFrame(columns=["test", "foo", "baz"])
new_dataset = pd.DataFrame(columns=["test", "foo", "baz"])
primary_key_cols = ["test", "foo", "baz"]
primary_key = "UNIQUE_KEY"
allupdates = pd.DataFrame(columns=col_order.return_value)
expected_allupdates = pd.concat(
[allupdates, patch_append_rows.return_value, patch_update.return_value],
sort=False,
)

# # check if to_delete is False
results = load.check_database_changes(
database, new_dataset, primary_key_cols, to_delete
)
if to_delete:
patch_delete.assert_called_once_with("table", "table", primary_key)
else:
patch_delete.assert_not_called()
pd.testing.assert_frame_equal(
results["allupdates"].sort_index(axis=1).reset_index(drop=True),
expected_allupdates.sort_index(axis=1).reset_index(drop=True),
)
pd.testing.assert_frame_equal(
results["to_delete_rows"], expected_to_delete_rows
)


@pytest.mark.parametrize(
"allupdates,to_delete_rows,expected_results",
[
(
pd.DataFrame(
{
"test": ["test", "test2"],
"foo": [1, 3],
"baz": ["", 5],
"ROW_ID": ["1", "2"],
"ROW_VERSION": ["3", "3"],
}
),
pd.DataFrame({0: ["3"], 1: ["5"]}),
[
"ROW_ID,ROW_VERSION,test,foo,baz\n",
"1,3,test,1,\n2,3,test2,3,5\n",
"3,5\n",
],
),
(
pd.DataFrame(),
pd.DataFrame({0: ["3"], 1: ["5"]}),
["ROW_ID,ROW_VERSION,test,foo,baz\n", "3,5\n"],
),
(
pd.DataFrame(
{
"test": ["test", "test2"],
"foo": [1, 3],
"baz": ["", 5],
"ROW_ID": ["1", "2"],
"ROW_VERSION": ["3", "3"],
}
),
pd.DataFrame(),
["ROW_ID,ROW_VERSION,test,foo,baz\n", "1,3,test,1,\n2,3,test2,3,5\n"],
),
(pd.DataFrame(), pd.DataFrame(), ["ROW_ID,ROW_VERSION,test,foo,baz\n"]),
],
ids=[
"non_empty dataframes",
"empty allupdates dataframe",
"empty to_delete_rows dataframe",
"empty dataframes",
],
)
def test_store_database(syn, allupdates, to_delete_rows, expected_results):
"""Test _update_table function with both new_dataset and database as non_empty dataframe"""
database_synid = "syn123"
col_order = ["ROW_ID", "ROW_VERSION", "test", "foo", "baz"]
with patch("os.unlink") as mock_unlink, patch(
"tempfile.NamedTemporaryFile"
) as mock_tempfile:
with patch("builtins.open", mock_open()) as mock_file_open:
# set the tempfile name
mock_tempfile.return_value.name = "test.csv"
load.store_database(
syn, database_synid, col_order, allupdates, to_delete_rows
)
mock_file_open.assert_called_once_with("test.csv", "w")
mock_file_handle = mock_file_open()
write_calls = mock_file_handle.write.call_args_list
results = [call_args[0][0] for call_args in write_calls]
assert results == expected_results
mock_unlink.assert_called_once_with("test.csv")