diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 4998e6f301a00..2b45f9f5ee620 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -990,7 +990,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size, ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values, uncompressed_data_->mutable_data()); auto [page_stats, page_size_stats] = GetPageStatistics(); - page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); + page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path(), descr_->schema_path())); page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); ResetPageStatistics(); @@ -1049,7 +1049,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, compressed_values, combined->mutable_data()); auto [page_stats, page_size_stats] = GetPageStatistics(); - page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); + page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path(), descr_->path())); page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); ResetPageStatistics(); @@ -1095,7 +1095,7 @@ int64_t ColumnWriterImpl::Close() { auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics(); chunk_statistics.ApplyStatSizeLimits( - properties_->max_statistics_size(descr_->path())); + properties_->max_statistics_size(descr_->path(), descr_->path())); chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); // Write stats only if the column has at least one row written @@ -1225,7 +1225,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // Will be null if not using dictionary, but that's ok current_dict_encoder_ = dynamic_cast*>(current_encoder_.get()); - if (properties->statistics_enabled(descr_->path()) && + if (properties->statistics_enabled(descr_->path(), descr_->path()) && (SortOrder::UNKNOWN != descr_->sort_order())) { page_statistics_ = MakeStatistics(descr_, allocator_); chunk_statistics_ = MakeStatistics(descr_, allocator_); @@ -1237,7 +1237,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< } pages_change_on_record_boundaries_ = properties->data_page_version() == ParquetDataPageVersion::V2 || - properties->page_index_enabled(descr_->path()); + properties->page_index_enabled(descr_->path(), descr_->path()); } int64_t Close() override { return ColumnWriterImpl::Close(); } @@ -2457,9 +2457,9 @@ std::shared_ptr ColumnWriter::Make(ColumnChunkMetaDataBuilder* met std::unique_ptr pager, const WriterProperties* properties) { const ColumnDescriptor* descr = metadata->descr(); - const bool use_dictionary = properties->dictionary_enabled(descr->path()) && + const bool use_dictionary = properties->dictionary_enabled(descr->path(), descr->schema_path()) && descr->physical_type() != Type::BOOLEAN; - Encoding::type encoding = properties->encoding(descr->path()); + Encoding::type encoding = properties->encoding(descr->path(), descr->schema_path()); if (encoding == Encoding::UNKNOWN) { encoding = (descr->physical_type() == Type::BOOLEAN && properties->version() != ParquetVersion::PARQUET_1_0 && diff --git a/cpp/src/parquet/encryption/encryption.cc b/cpp/src/parquet/encryption/encryption.cc index 731120d9a6396..b877325d7a0da 100644 --- a/cpp/src/parquet/encryption/encryption.cc +++ b/cpp/src/parquet/encryption/encryption.cc @@ -355,13 +355,23 @@ FileEncryptionProperties::Builder* FileEncryptionProperties::Builder::footer_key } std::shared_ptr -FileEncryptionProperties::column_encryption_properties(const std::string& column_path) { +FileEncryptionProperties::column_encryption_properties( + const std::shared_ptr& column_path) { + auto path = column_path->ToDotString(); if (encrypted_columns_.size() == 0) { - auto builder = std::make_shared(column_path); + auto builder = std::make_shared(path); return builder->build(); } - if (encrypted_columns_.find(column_path) != encrypted_columns_.end()) { - return encrypted_columns_[column_path]; + if (encrypted_columns_.find(path) != encrypted_columns_.end()) { + return encrypted_columns_[path]; + } + auto schema_path_cursor = column_path; + while (schema_path_cursor != nullptr) { + path = schema_path_cursor->ToDotString(); + if (encrypted_columns_.find(path) != encrypted_columns_.end()) { + return encrypted_columns_[path]; + } + schema_path_cursor = schema_path_cursor->parent(); } return nullptr; diff --git a/cpp/src/parquet/encryption/encryption.h b/cpp/src/parquet/encryption/encryption.h index 1ddef9e8236db..4df24750c53ee 100644 --- a/cpp/src/parquet/encryption/encryption.h +++ b/cpp/src/parquet/encryption/encryption.h @@ -474,7 +474,7 @@ class PARQUET_EXPORT FileEncryptionProperties { std::string file_aad() const { return file_aad_; } std::shared_ptr column_encryption_properties( - const std::string& column_path); + const std::shared_ptr& column_path); bool is_utilized() const { return utilized_; } diff --git a/cpp/src/parquet/encryption/internal_file_encryptor.cc b/cpp/src/parquet/encryption/internal_file_encryptor.cc index 94094e6aca228..5fd73fc3a1a25 100644 --- a/cpp/src/parquet/encryption/internal_file_encryptor.cc +++ b/cpp/src/parquet/encryption/internal_file_encryptor.cc @@ -94,29 +94,37 @@ std::shared_ptr InternalFileEncryptor::GetFooterSigningEncryptor() { } std::shared_ptr InternalFileEncryptor::GetColumnMetaEncryptor( - const std::string& column_path) { - return GetColumnEncryptor(column_path, true); + const std::shared_ptr& column_path, + const std::shared_ptr& schema_path) { + return GetColumnEncryptor(column_path, schema_path, true); } std::shared_ptr InternalFileEncryptor::GetColumnDataEncryptor( - const std::string& column_path) { - return GetColumnEncryptor(column_path, false); + const std::shared_ptr& column_path, + const std::shared_ptr& schema_path) { + return GetColumnEncryptor(column_path, schema_path, false); } std::shared_ptr InternalFileEncryptor::InternalFileEncryptor::GetColumnEncryptor( - const std::string& column_path, bool metadata) { + const std::shared_ptr& column_path, + const std::shared_ptr& schema_path, + bool metadata) { // first look if we already got the encryptor from before + auto path = column_path->ToDotString(); if (metadata) { - if (column_metadata_map_.find(column_path) != column_metadata_map_.end()) { - return column_metadata_map_.at(column_path); + if (column_metadata_map_.find(path) != column_metadata_map_.end()) { + return column_metadata_map_.at(path); } } else { - if (column_data_map_.find(column_path) != column_data_map_.end()) { - return column_data_map_.at(column_path); + if (column_data_map_.find(path) != column_data_map_.end()) { + return column_data_map_.at(path); } } auto column_prop = properties_->column_encryption_properties(column_path); + if (column_prop == nullptr) { + column_prop = properties_->column_encryption_properties(schema_path); + } if (column_prop == nullptr) { return nullptr; } @@ -136,9 +144,9 @@ InternalFileEncryptor::InternalFileEncryptor::GetColumnEncryptor( std::shared_ptr encryptor = std::make_shared(aes_encryptor, key, file_aad, "", pool_); if (metadata) - column_metadata_map_[column_path] = encryptor; + column_metadata_map_[path] = encryptor; else - column_data_map_[column_path] = encryptor; + column_data_map_[path] = encryptor; return encryptor; } diff --git a/cpp/src/parquet/encryption/internal_file_encryptor.h b/cpp/src/parquet/encryption/internal_file_encryptor.h index 5a3d743ce5365..1c27ca3df6caf 100644 --- a/cpp/src/parquet/encryption/internal_file_encryptor.h +++ b/cpp/src/parquet/encryption/internal_file_encryptor.h @@ -75,8 +75,12 @@ class InternalFileEncryptor { std::shared_ptr GetFooterEncryptor(); std::shared_ptr GetFooterSigningEncryptor(); - std::shared_ptr GetColumnMetaEncryptor(const std::string& column_path); - std::shared_ptr GetColumnDataEncryptor(const std::string& column_path); + std::shared_ptr GetColumnMetaEncryptor( + const std::shared_ptr& column_path, + const std::shared_ptr& schema_path); + std::shared_ptr GetColumnDataEncryptor( + const std::shared_ptr& column_path, + const std::shared_ptr& schema_path); void WipeOutEncryptionKeys(); private: @@ -95,8 +99,10 @@ class InternalFileEncryptor { ::arrow::MemoryPool* pool_; - std::shared_ptr GetColumnEncryptor(const std::string& column_path, - bool metadata); + std::shared_ptr GetColumnEncryptor( + const std::shared_ptr& column_path, + const std::shared_ptr& schema_path, + bool metadata); encryption::AesEncryptor* GetMetaAesEncryptor(ParquetCipher::type algorithm, size_t key_len); diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index f80a095a13587..0f4d9de46c2d5 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -266,12 +266,13 @@ class RowGroupSerializer : public RowGroupWriter::Contents { std::shared_ptr CreateColumnWriterForColumn( ColumnChunkMetaDataBuilder* col_meta, int32_t column_ordinal) const { const auto& path = col_meta->descr()->path(); - const ColumnProperties& column_properties = properties_->column_properties(path); + const auto& schema_path = col_meta->descr()->schema_path(); + const ColumnProperties& column_properties = properties_->column_properties(path, schema_path); auto meta_encryptor = - file_encryptor_ ? file_encryptor_->GetColumnMetaEncryptor(path->ToDotString()) + file_encryptor_ ? file_encryptor_->GetColumnMetaEncryptor(path, schema_path) : nullptr; auto data_encryptor = - file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString()) + file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path, schema_path) : nullptr; auto ci_builder = page_index_builder_ && column_properties.page_index_enabled() ? page_index_builder_->GetColumnIndexBuilder(column_ordinal) @@ -497,7 +498,7 @@ class FileSerializer : public ParquetFileWriter::Contents { if (it == column_path_vec.end()) { std::stringstream ss; ss << "Encrypted column " + elem.first + " not in file schema"; - throw ParquetException(ss.str()); + // throw ParquetException(ss.str()); } } } diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 9b53da021f52e..088856b3f12d5 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -1631,7 +1631,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { } const auto& encrypt_md = - properties_->column_encryption_properties(column_->path()->ToDotString()); + properties_->column_encryption_properties(column_->path(), column_->schema_path()); // column is encrypted if (encrypt_md != nullptr && encrypt_md->is_encrypted()) { column_chunk_->__isset.crypto_metadata = true; @@ -1707,7 +1707,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_type(ToThrift(column_->physical_type())); column_chunk_->meta_data.__set_path_in_schema(column_->path()->ToDotVector()); column_chunk_->meta_data.__set_codec( - ToThrift(properties_->compression(column_->path()))); + ToThrift(properties_->compression(column_->path(), column_->schema_path()))); } format::ColumnChunk* column_chunk_; diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 8cc819f10cacd..110f7f767dccd 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -818,8 +818,9 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { int8_t module_type) const { std::shared_ptr encryptor; if (file_encryptor_ != nullptr) { - const auto column_path = schema_->Column(column_ordinal)->path()->ToDotString(); - encryptor = file_encryptor_->GetColumnMetaEncryptor(column_path); + const auto column_path = schema_->Column(column_ordinal)->path(); + const auto schema_path = schema_->Column(column_ordinal)->schema_path(); + encryptor = file_encryptor_->GetColumnMetaEncryptor(column_path, schema_path); if (encryptor != nullptr) { encryptor->UpdateAad(encryption::CreateModuleAad( encryptor->file_aad(), module_type, row_group_ordinal, column_ordinal, diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 8ae3660014f76..75e570f6cc961 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -775,45 +775,70 @@ class PARQUET_EXPORT WriterProperties { } const ColumnProperties& column_properties( - const std::shared_ptr& path) const { + const std::shared_ptr& path, + const std::shared_ptr& schema_path) const { + // check the (parquet) path can be found auto it = column_properties_.find(path->ToDotString()); if (it != column_properties_.end()) return it->second; + // check the schema path or any parent can be found + std::shared_ptr a_schema_path = std::make_shared(schema_path->ToDotVector()); + while (a_schema_path != nullptr) { + auto it = column_properties_.find(a_schema_path->ToDotString()); + if (it != column_properties_.end()) return it->second; + a_schema_path = a_schema_path->parent(); + } + // noting found, use the default column properties return default_column_properties_; } - Encoding::type encoding(const std::shared_ptr& path) const { - return column_properties(path).encoding(); + Encoding::type encoding( + const std::shared_ptr& path, + const std::shared_ptr& schema_path) const { + return column_properties(path, schema_path).encoding(); } - Compression::type compression(const std::shared_ptr& path) const { - return column_properties(path).compression(); + Compression::type compression( + const std::shared_ptr& path, + const std::shared_ptr& schema_path) const { + return column_properties(path, schema_path).compression(); } - int compression_level(const std::shared_ptr& path) const { - return column_properties(path).compression_level(); + int compression_level( + const std::shared_ptr& path, + const std::shared_ptr& schema_path) const { + return column_properties(path, schema_path).compression_level(); } const std::shared_ptr codec_options( - const std::shared_ptr& path) const { - return column_properties(path).codec_options(); + const std::shared_ptr& path, + const std::shared_ptr& schema_path) const { + return column_properties(path, schema_path).codec_options(); } - bool dictionary_enabled(const std::shared_ptr& path) const { - return column_properties(path).dictionary_enabled(); + bool dictionary_enabled( + const std::shared_ptr& path, + const std::shared_ptr& schema_path) const { + return column_properties(path, schema_path).dictionary_enabled(); } const std::vector& sorting_columns() const { return sorting_columns_; } - bool statistics_enabled(const std::shared_ptr& path) const { - return column_properties(path).statistics_enabled(); + bool statistics_enabled( + const std::shared_ptr& path, + const std::shared_ptr& schema_path) const { + return column_properties(path, schema_path).statistics_enabled(); } - size_t max_statistics_size(const std::shared_ptr& path) const { - return column_properties(path).max_statistics_size(); + size_t max_statistics_size( + const std::shared_ptr& path, + const std::shared_ptr& schema_path) const { + return column_properties(path, schema_path).max_statistics_size(); } - bool page_index_enabled(const std::shared_ptr& path) const { - return column_properties(path).page_index_enabled(); + bool page_index_enabled( + const std::shared_ptr& path, + const std::shared_ptr& schema_path) const { + return column_properties(path, schema_path).page_index_enabled(); } bool page_index_enabled() const { @@ -832,10 +857,17 @@ class PARQUET_EXPORT WriterProperties { return file_encryption_properties_.get(); } - std::shared_ptr column_encryption_properties( - const std::string& path) const { + std::shared_ptr column_encryption_properties( + const std::shared_ptr& column_path, + const std::shared_ptr& schema_path + ) const { if (file_encryption_properties_) { - return file_encryption_properties_->column_encryption_properties(path); + auto column_props = + file_encryption_properties_->column_encryption_properties(column_path); + if (column_props == nullptr) { + column_props = file_encryption_properties_->column_encryption_properties(schema_path); + } + return column_props; } else { return NULLPTR; } diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc index 47fa72d829658..46d0ee2f075d5 100644 --- a/cpp/src/parquet/schema.cc +++ b/cpp/src/parquet/schema.cc @@ -67,11 +67,23 @@ std::shared_ptr ColumnPath::FromDotString(const std::string& dotstri } std::shared_ptr ColumnPath::FromNode(const Node& node) { + return FromNode(node, false); +} + +std::shared_ptr ColumnPath::FromNode(const Node& node, bool schema_path) { // Build the path in reverse order as we traverse the nodes to the top std::vector rpath_; const Node* cursor = &node; // The schema node is not part of the ColumnPath while (cursor->parent()) { + if (schema_path && ( + cursor->parent()->converted_type() == ConvertedType::MAP || + cursor->parent()->converted_type() == ConvertedType::LIST + ) + ) { + cursor = cursor->parent(); + continue; + } rpath_.push_back(cursor->name()); cursor = cursor->parent(); } @@ -91,6 +103,17 @@ std::shared_ptr ColumnPath::extend(const std::string& node_name) con return std::make_shared(std::move(path)); } +std::shared_ptr ColumnPath::parent() const { + if (path_.size() <= 1) { return nullptr; } + + std::vector path; + path.reserve(path_.size() - 1); + path.resize(path_.size() - 1); + std::copy(path_.cbegin(), path_.cend() - 1, path.begin()); + + return std::make_shared(std::move(path)); +} + std::string ColumnPath::ToDotString() const { std::stringstream ss; for (auto it = path_.cbegin(); it != path_.cend(); ++it) { @@ -113,6 +136,12 @@ const std::shared_ptr Node::path() const { return ColumnPath::FromNode(*this); } +const std::shared_ptr Node::schema_path() const { + // TODO(itaiin): Cache the result, or more precisely, cache ->ToDotString() + // since it is being used to access the leaf nodes + return ColumnPath::FromNode(*this, true); +} + bool Node::EqualsInternal(const Node* other) const { return type_ == other->type_ && name_ == other->name_ && repetition_ == other->repetition_ && converted_type_ == other->converted_type_ && @@ -960,4 +989,8 @@ const std::shared_ptr ColumnDescriptor::path() const { return primitive_node_->path(); } +const std::shared_ptr ColumnDescriptor::schema_path() const { + return primitive_node_->schema_path(); +} + } // namespace parquet diff --git a/cpp/src/parquet/schema.h b/cpp/src/parquet/schema.h index 1addc73bd367d..790a055eea348 100644 --- a/cpp/src/parquet/schema.h +++ b/cpp/src/parquet/schema.h @@ -84,8 +84,10 @@ class PARQUET_EXPORT ColumnPath { static std::shared_ptr FromDotString(const std::string& dotstring); static std::shared_ptr FromNode(const Node& node); + static std::shared_ptr FromNode(const Node& node, bool filter_converted_types); std::shared_ptr extend(const std::string& node_name) const; + std::shared_ptr parent() const; std::string ToDotString() const; const std::vector& ToDotVector() const; @@ -132,6 +134,8 @@ class PARQUET_EXPORT Node { const std::shared_ptr path() const; + const std::shared_ptr schema_path() const; + virtual void ToParquet(void* element) const = 0; // Node::Visitor abstract class for walking schemas with the visitor pattern @@ -386,6 +390,8 @@ class PARQUET_EXPORT ColumnDescriptor { const std::shared_ptr path() const; + const std::shared_ptr schema_path() const; + const schema::NodePtr& schema_node() const { return node_; } std::string ToString() const; diff --git a/cpp/src/parquet/schema_test.cc b/cpp/src/parquet/schema_test.cc index 2532a8656e69f..fa6167b7132c0 100644 --- a/cpp/src/parquet/schema_test.cc +++ b/cpp/src/parquet/schema_test.cc @@ -110,6 +110,33 @@ TEST(TestColumnPath, TestAttrs) { ASSERT_EQ(extended->ToDotString(), "toplevel.leaf.anotherlevel"); } +TEST(TestColumnPath, FromNode) { + auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, Type::INT32, ConvertedType::INT_32); + auto key_value = GroupNode::Make("key_value", Repetition::REQUIRED, {key}, ConvertedType::NONE); + auto map = GroupNode::Make("a", Repetition::REQUIRED, {key_value}, ConvertedType::MAP); + + auto element = PrimitiveNode::Make("element", Repetition::REPEATED, Type::INT32, ConvertedType::INT_32); + auto inner_list = GroupNode::Make("list", Repetition::REQUIRED, {element}, ConvertedType::NONE); + auto list = GroupNode::Make("b", Repetition::REQUIRED, {inner_list}, ConvertedType::LIST); + + auto f1 = PrimitiveNode::Make("f1", Repetition::OPTIONAL, Type::INT32, ConvertedType::INT_32); + auto f2 = PrimitiveNode::Make("f2", Repetition::OPTIONAL, Type::BYTE_ARRAY, ConvertedType::UTF8); + auto struct_ = GroupNode::Make("c", Repetition::REQUIRED, {f1, f2}, ConvertedType::NONE); + + auto schema = GroupNode::Make("schema", Repetition::REQUIRED, {map, list, struct_}, ConvertedType::NONE); + + ASSERT_EQ(ColumnPath::FromNode(*key)->ToDotString(), "a.key_value.key"); + ASSERT_EQ(ColumnPath::FromNode(*key, true)->ToDotString(), "a.key"); + + ASSERT_EQ(ColumnPath::FromNode(*element)->ToDotString(), "b.list.element"); + ASSERT_EQ(ColumnPath::FromNode(*element, true)->ToDotString(), "b.element"); + + ASSERT_EQ(ColumnPath::FromNode(*f1)->ToDotString(), "c.f1"); + ASSERT_EQ(ColumnPath::FromNode(*f1, true)->ToDotString(), "c.f1"); + ASSERT_EQ(ColumnPath::FromNode(*f2)->ToDotString(), "c.f2"); + ASSERT_EQ(ColumnPath::FromNode(*f2, true)->ToDotString(), "c.f2"); +} + // ---------------------------------------------------------------------- // Primitive node