Skip to content

Commit

Permalink
Fix the metadata reading (Couldn't deserialize thrift) (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-krystianc authored Apr 12, 2024
1 parent 1cb376f commit f97e431
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 16 deletions.
Binary file added my.parquet.index
Binary file not shown.
35 changes: 22 additions & 13 deletions python/palletjack/palletjack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ std::vector<char> GenerateMetadataIndex(const char *parquet_path)
throw std::logic_error(msg);
}

// column_orders is optional
if (metadata.column_orders_offsets.size() == 0)
{
metadata.column_orders_offsets.resize(data_header.get_column_orders_offsets_size());
}

if (data_header.get_column_orders_offsets_size() != metadata.column_orders_offsets.size())
{
auto msg = std::string("Column orders offsets information is invalid, columns=") + std::to_string(data_header.columns) + ", column_orders_offsets=" + std::to_string(metadata.column_orders_offsets.size()) + " !";
Expand Down Expand Up @@ -588,26 +594,29 @@ std::shared_ptr<parquet::FileMetaData> ReadMetadata(const DataHeader &dataHeader
}
}

index_src = row_groups_offsets[dataHeader.get_row_groups_offsets_size()];
index_src = row_groups_offsets[1 + dataHeader.row_groups];

if (columns.size() > 0)
{
//> 7: optional list<ColumnOrder> column_orders;
auto column_orders_list = &column_orders_offsets[0];
toCopy = column_orders_list[0] - index_src;
thriftCopier.CopyFrom(index_src, toCopy);
index_src += toCopy;
if (column_orders_offsets[0] != 0)
{
auto column_orders_list = &column_orders_offsets[0];
toCopy = column_orders_list[0] - index_src;
thriftCopier.CopyFrom(index_src, toCopy);
index_src += toCopy;

thriftCopier.WriteListBegin(::apache::thrift::protocol::T_STRUCT, columns.size()); // one extra element for root
index_src = column_orders_list[1];
thriftCopier.WriteListBegin(::apache::thrift::protocol::T_STRUCT, columns.size()); // one extra element for root
index_src = column_orders_list[1];

auto column_orders = &column_orders_offsets[1];
for (auto column : columns)
{
toCopy = column_orders[column + 1] - column_orders[column];
thriftCopier.CopyFrom(column_orders[column], toCopy);
auto column_orders = &column_orders_offsets[1];
for (auto column : columns)
{
toCopy = column_orders[column + 1] - column_orders[column];
thriftCopier.CopyFrom(column_orders[column], toCopy);
}
index_src = column_orders[dataHeader.columns];
}
index_src = column_orders[dataHeader.columns];
}

// Copy leftovers
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "palletjack"
version = "2.1.1"
version = "2.1.2"
authors = [
{ name="Marcin Krystianc", email="[email protected]" },
]
Expand Down
File renamed without changes.
File renamed without changes.
Binary file added python/test/data/no_column_orders.parquet
Binary file not shown.
46 changes: 44 additions & 2 deletions python/test/test_palletjack.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,48 @@ def validate_reading(parquet_path, index_path, row_groups, column_indices):
for cp in it.permutations(all_columns, c):
validate_reading(path, index_path, row_groups = rp, column_indices = cp)

def test_metadata_roundtrip(self):
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdirname:
path = os.path.join(tmpdirname, "my.parquet")
table = get_table()

pq.write_table(table, path, row_group_size=chunk_size)

index_path = path + '.index'
pj.generate_metadata_index(path, index_path)

pr = pq.ParquetReader()
pr.open(path)

row_groups_columns = [
([], []),
([], range(n_columns)),
(range(n_row_groups), []),
(range(n_row_groups), range(n_columns)),
]

for (row_groups, columns) in row_groups_columns:
pj_metadata = pj.read_metadata(index_path, row_groups=row_groups, column_indices=columns)
self.assertEqual(pr.metadata, pj_metadata)

def test_reading_non_pyarrow_files(self):

path = os.path.join(current_dir, 'data/no_column_orders.parquet')
pr = pq.ParquetReader()
pr.open(path)

row_groups_columns = [
([], []),
([], range(pr.metadata.num_columns)),
(range(pr.metadata.num_row_groups), []),
(range(pr.metadata.num_row_groups), range(pr.metadata.num_columns)),
]

index_data = pj.generate_metadata_index(path)
for (row_groups, columns) in row_groups_columns:
pj_metadata = pj.read_metadata(index_data = index_data, row_groups=row_groups, column_indices=columns)
self.assertEqual(pr.metadata, pj_metadata)

def test_reading_invalid_row_group(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path = os.path.join(tmpdirname, "my.parquet")
Expand Down Expand Up @@ -131,8 +173,8 @@ def test_reading_missing_index_file(self):
def test_index_file_golden_master(self):
with tempfile.TemporaryDirectory() as tmpdirname:
index_path = os.path.join(tmpdirname, 'my.parquet.index')
path = os.path.join(current_dir, 'data/sample.parquet')
expected_index_path = os.path.join(current_dir, 'data/sample.parquet.index')
path = os.path.join(current_dir, 'data/golden_master.parquet')
expected_index_path = os.path.join(current_dir, 'data/golden_master.parquet.index')
pj.generate_metadata_index(path, index_path)

# Read the expected output
Expand Down

0 comments on commit f97e431

Please sign in to comment.