Skip to content

Commit

Permalink
Use schema paths to configure column keys
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Feb 6, 2025
1 parent 9b01a8c commit 02d2732
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 55 deletions.
14 changes: 7 additions & 7 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values,
uncompressed_data_->mutable_data());
auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path(), descr_->schema_path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();

Expand Down Expand Up @@ -1049,7 +1049,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
compressed_values, combined->mutable_data());

auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path(), descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();

Expand Down Expand Up @@ -1095,7 +1095,7 @@ int64_t ColumnWriterImpl::Close() {

auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics();
chunk_statistics.ApplyStatSizeLimits(
properties_->max_statistics_size(descr_->path()));
properties_->max_statistics_size(descr_->path(), descr_->path()));
chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());

// Write stats only if the column has at least one row written
Expand Down Expand Up @@ -1225,7 +1225,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
// Will be null if not using dictionary, but that's ok
current_dict_encoder_ = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());

if (properties->statistics_enabled(descr_->path()) &&
if (properties->statistics_enabled(descr_->path(), descr_->path()) &&
(SortOrder::UNKNOWN != descr_->sort_order())) {
page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
Expand All @@ -1237,7 +1237,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
}
pages_change_on_record_boundaries_ =
properties->data_page_version() == ParquetDataPageVersion::V2 ||
properties->page_index_enabled(descr_->path());
properties->page_index_enabled(descr_->path(), descr_->path());
}

int64_t Close() override { return ColumnWriterImpl::Close(); }
Expand Down Expand Up @@ -2457,9 +2457,9 @@ std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* met
std::unique_ptr<PageWriter> pager,
const WriterProperties* properties) {
const ColumnDescriptor* descr = metadata->descr();
const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
const bool use_dictionary = properties->dictionary_enabled(descr->path(), descr->schema_path()) &&
descr->physical_type() != Type::BOOLEAN;
Encoding::type encoding = properties->encoding(descr->path());
Encoding::type encoding = properties->encoding(descr->path(), descr->schema_path());
if (encoding == Encoding::UNKNOWN) {
encoding = (descr->physical_type() == Type::BOOLEAN &&
properties->version() != ParquetVersion::PARQUET_1_0 &&
Expand Down
18 changes: 14 additions & 4 deletions cpp/src/parquet/encryption/encryption.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,23 @@ FileEncryptionProperties::Builder* FileEncryptionProperties::Builder::footer_key
}

std::shared_ptr<ColumnEncryptionProperties>
FileEncryptionProperties::column_encryption_properties(const std::string& column_path) {
FileEncryptionProperties::column_encryption_properties(
const std::shared_ptr<schema::ColumnPath>& column_path) {
auto path = column_path->ToDotString();
if (encrypted_columns_.size() == 0) {
auto builder = std::make_shared<ColumnEncryptionProperties::Builder>(column_path);
auto builder = std::make_shared<ColumnEncryptionProperties::Builder>(path);
return builder->build();
}
if (encrypted_columns_.find(column_path) != encrypted_columns_.end()) {
return encrypted_columns_[column_path];
if (encrypted_columns_.find(path) != encrypted_columns_.end()) {
return encrypted_columns_[path];
}
auto schema_path_cursor = column_path;
while (schema_path_cursor != nullptr) {
path = schema_path_cursor->ToDotString();
if (encrypted_columns_.find(path) != encrypted_columns_.end()) {
return encrypted_columns_[path];
}
schema_path_cursor = schema_path_cursor->parent();
}

return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/encryption/encryption.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ class PARQUET_EXPORT FileEncryptionProperties {
std::string file_aad() const { return file_aad_; }

std::shared_ptr<ColumnEncryptionProperties> column_encryption_properties(
const std::string& column_path);
const std::shared_ptr<schema::ColumnPath>& column_path);

bool is_utilized() const { return utilized_; }

Expand Down
30 changes: 19 additions & 11 deletions cpp/src/parquet/encryption/internal_file_encryptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,37 @@ std::shared_ptr<Encryptor> InternalFileEncryptor::GetFooterSigningEncryptor() {
}

std::shared_ptr<Encryptor> InternalFileEncryptor::GetColumnMetaEncryptor(
const std::string& column_path) {
return GetColumnEncryptor(column_path, true);
const std::shared_ptr<schema::ColumnPath>& column_path,
const std::shared_ptr<schema::ColumnPath>& schema_path) {
return GetColumnEncryptor(column_path, schema_path, true);
}

std::shared_ptr<Encryptor> InternalFileEncryptor::GetColumnDataEncryptor(
const std::string& column_path) {
return GetColumnEncryptor(column_path, false);
const std::shared_ptr<schema::ColumnPath>& column_path,
const std::shared_ptr<schema::ColumnPath>& schema_path) {
return GetColumnEncryptor(column_path, schema_path, false);
}

std::shared_ptr<Encryptor>
InternalFileEncryptor::InternalFileEncryptor::GetColumnEncryptor(
const std::string& column_path, bool metadata) {
const std::shared_ptr<schema::ColumnPath>& column_path,
const std::shared_ptr<schema::ColumnPath>& schema_path,
bool metadata) {
// first look if we already got the encryptor from before
auto path = column_path->ToDotString();
if (metadata) {
if (column_metadata_map_.find(column_path) != column_metadata_map_.end()) {
return column_metadata_map_.at(column_path);
if (column_metadata_map_.find(path) != column_metadata_map_.end()) {
return column_metadata_map_.at(path);
}
} else {
if (column_data_map_.find(column_path) != column_data_map_.end()) {
return column_data_map_.at(column_path);
if (column_data_map_.find(path) != column_data_map_.end()) {
return column_data_map_.at(path);
}
}
auto column_prop = properties_->column_encryption_properties(column_path);
if (column_prop == nullptr) {
column_prop = properties_->column_encryption_properties(schema_path);
}
if (column_prop == nullptr) {
return nullptr;
}
Expand All @@ -136,9 +144,9 @@ InternalFileEncryptor::InternalFileEncryptor::GetColumnEncryptor(
std::shared_ptr<Encryptor> encryptor =
std::make_shared<Encryptor>(aes_encryptor, key, file_aad, "", pool_);
if (metadata)
column_metadata_map_[column_path] = encryptor;
column_metadata_map_[path] = encryptor;
else
column_data_map_[column_path] = encryptor;
column_data_map_[path] = encryptor;

return encryptor;
}
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/parquet/encryption/internal_file_encryptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ class InternalFileEncryptor {

std::shared_ptr<Encryptor> GetFooterEncryptor();
std::shared_ptr<Encryptor> GetFooterSigningEncryptor();
std::shared_ptr<Encryptor> GetColumnMetaEncryptor(const std::string& column_path);
std::shared_ptr<Encryptor> GetColumnDataEncryptor(const std::string& column_path);
std::shared_ptr<Encryptor> GetColumnMetaEncryptor(
const std::shared_ptr<schema::ColumnPath>& column_path,
const std::shared_ptr<schema::ColumnPath>& schema_path);
std::shared_ptr<Encryptor> GetColumnDataEncryptor(
const std::shared_ptr<schema::ColumnPath>& column_path,
const std::shared_ptr<schema::ColumnPath>& schema_path);
void WipeOutEncryptionKeys();

private:
Expand All @@ -95,8 +99,10 @@ class InternalFileEncryptor {

::arrow::MemoryPool* pool_;

std::shared_ptr<Encryptor> GetColumnEncryptor(const std::string& column_path,
bool metadata);
std::shared_ptr<Encryptor> GetColumnEncryptor(
const std::shared_ptr<schema::ColumnPath>& column_path,
const std::shared_ptr<schema::ColumnPath>& schema_path,
bool metadata);

encryption::AesEncryptor* GetMetaAesEncryptor(ParquetCipher::type algorithm,
size_t key_len);
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,13 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
std::shared_ptr<ColumnWriter> CreateColumnWriterForColumn(
ColumnChunkMetaDataBuilder* col_meta, int32_t column_ordinal) const {
const auto& path = col_meta->descr()->path();
const ColumnProperties& column_properties = properties_->column_properties(path);
const auto& schema_path = col_meta->descr()->schema_path();
const ColumnProperties& column_properties = properties_->column_properties(path, schema_path);
auto meta_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnMetaEncryptor(path->ToDotString())
file_encryptor_ ? file_encryptor_->GetColumnMetaEncryptor(path, schema_path)
: nullptr;
auto data_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path, schema_path)
: nullptr;
auto ci_builder = page_index_builder_ && column_properties.page_index_enabled()
? page_index_builder_->GetColumnIndexBuilder(column_ordinal)
Expand Down Expand Up @@ -497,7 +498,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
if (it == column_path_vec.end()) {
std::stringstream ss;
ss << "Encrypted column " + elem.first + " not in file schema";
throw ParquetException(ss.str());
// throw ParquetException(ss.str());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1631,7 +1631,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
}

const auto& encrypt_md =
properties_->column_encryption_properties(column_->path()->ToDotString());
properties_->column_encryption_properties(column_->path(), column_->schema_path());
// column is encrypted
if (encrypt_md != nullptr && encrypt_md->is_encrypted()) {
column_chunk_->__isset.crypto_metadata = true;
Expand Down Expand Up @@ -1707,7 +1707,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_type(ToThrift(column_->physical_type()));
column_chunk_->meta_data.__set_path_in_schema(column_->path()->ToDotVector());
column_chunk_->meta_data.__set_codec(
ToThrift(properties_->compression(column_->path())));
ToThrift(properties_->compression(column_->path(), column_->schema_path())));
}

format::ColumnChunk* column_chunk_;
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/parquet/page_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,9 @@ class PageIndexBuilderImpl final : public PageIndexBuilder {
int8_t module_type) const {
std::shared_ptr<Encryptor> encryptor;
if (file_encryptor_ != nullptr) {
const auto column_path = schema_->Column(column_ordinal)->path()->ToDotString();
encryptor = file_encryptor_->GetColumnMetaEncryptor(column_path);
const auto column_path = schema_->Column(column_ordinal)->path();
const auto schema_path = schema_->Column(column_ordinal)->schema_path();
encryptor = file_encryptor_->GetColumnMetaEncryptor(column_path, schema_path);
if (encryptor != nullptr) {
encryptor->UpdateAad(encryption::CreateModuleAad(
encryptor->file_aad(), module_type, row_group_ordinal, column_ordinal,
Expand Down
72 changes: 52 additions & 20 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -775,45 +775,70 @@ class PARQUET_EXPORT WriterProperties {
}

const ColumnProperties& column_properties(
const std::shared_ptr<schema::ColumnPath>& path) const {
const std::shared_ptr<schema::ColumnPath>& path,
const std::shared_ptr<schema::ColumnPath>& schema_path) const {
// check the (parquet) path can be found
auto it = column_properties_.find(path->ToDotString());
if (it != column_properties_.end()) return it->second;
// check the schema path or any parent can be found
std::shared_ptr<schema::ColumnPath> a_schema_path = std::make_shared<schema::ColumnPath>(schema_path->ToDotVector());
while (a_schema_path != nullptr) {
auto it = column_properties_.find(a_schema_path->ToDotString());
if (it != column_properties_.end()) return it->second;
a_schema_path = a_schema_path->parent();
}
// noting found, use the default column properties
return default_column_properties_;
}

Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).encoding();
Encoding::type encoding(
const std::shared_ptr<schema::ColumnPath>& path,
const std::shared_ptr<schema::ColumnPath>& schema_path) const {
return column_properties(path, schema_path).encoding();
}

Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).compression();
Compression::type compression(
const std::shared_ptr<schema::ColumnPath>& path,
const std::shared_ptr<schema::ColumnPath>& schema_path) const {
return column_properties(path, schema_path).compression();
}

int compression_level(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).compression_level();
int compression_level(
const std::shared_ptr<schema::ColumnPath>& path,
const std::shared_ptr<schema::ColumnPath>& schema_path) const {
return column_properties(path, schema_path).compression_level();
}

const std::shared_ptr<CodecOptions> codec_options(
const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).codec_options();
const std::shared_ptr<schema::ColumnPath>& path,
const std::shared_ptr<schema::ColumnPath>& schema_path) const {
return column_properties(path, schema_path).codec_options();
}

bool dictionary_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).dictionary_enabled();
bool dictionary_enabled(
const std::shared_ptr<schema::ColumnPath>& path,
const std::shared_ptr<schema::ColumnPath>& schema_path) const {
return column_properties(path, schema_path).dictionary_enabled();
}

const std::vector<SortingColumn>& sorting_columns() const { return sorting_columns_; }

bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).statistics_enabled();
bool statistics_enabled(
const std::shared_ptr<schema::ColumnPath>& path,
const std::shared_ptr<schema::ColumnPath>& schema_path) const {
return column_properties(path, schema_path).statistics_enabled();
}

size_t max_statistics_size(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).max_statistics_size();
size_t max_statistics_size(
const std::shared_ptr<schema::ColumnPath>& path,
const std::shared_ptr<schema::ColumnPath>& schema_path) const {
return column_properties(path, schema_path).max_statistics_size();
}

bool page_index_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).page_index_enabled();
bool page_index_enabled(
const std::shared_ptr<schema::ColumnPath>& path,
const std::shared_ptr<schema::ColumnPath>& schema_path) const {
return column_properties(path, schema_path).page_index_enabled();
}

bool page_index_enabled() const {
Expand All @@ -832,10 +857,17 @@ class PARQUET_EXPORT WriterProperties {
return file_encryption_properties_.get();
}

std::shared_ptr<ColumnEncryptionProperties> column_encryption_properties(
const std::string& path) const {
std::shared_ptr<ColumnEncryptionProperties> column_encryption_properties(
const std::shared_ptr<schema::ColumnPath>& column_path,
const std::shared_ptr<schema::ColumnPath>& schema_path
) const {
if (file_encryption_properties_) {
return file_encryption_properties_->column_encryption_properties(path);
auto column_props =
file_encryption_properties_->column_encryption_properties(column_path);
if (column_props == nullptr) {
column_props = file_encryption_properties_->column_encryption_properties(schema_path);
}
return column_props;
} else {
return NULLPTR;
}
Expand Down
Loading

0 comments on commit 02d2732

Please sign in to comment.