From a00fa05f366bf74d7222d58d65fe927e96dd18eb Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 13 Sep 2023 18:54:06 -0500 Subject: [PATCH] Removed ParquetHadoop module (#4457) Removed duplicate code copied from Apache parquet-hadoop project Co-authored-by: Colin Alworth --- NOTICE.md | 2 - ParquetHadoop/LICENSE | 202 --- ParquetHadoop/NOTICE | 116 -- ParquetHadoop/build.gradle | 62 - ParquetHadoop/gradle.properties | 1 - .../tempfix/ParquetMetadataConverter.java | 1538 ----------------- buildSrc/src/main/groovy/Classpaths.groovy | 28 + extensions/parquet/base/build.gradle | 2 +- .../parquet/base/ColumnWriterImpl.java | 2 +- .../parquet/base/ParquetFileWriter.java | 2 +- extensions/parquet/compression/build.gradle | 7 +- extensions/parquet/table/build.gradle | 2 + .../parquet/table/ParquetSchemaReader.java | 2 +- .../deephaven/parquet/table/ParquetTools.java | 2 +- .../layout/ParquetMetadataFileLayout.java | 2 +- .../table/location/ParquetColumnLocation.java | 2 +- .../location/ParquetTableLocationKey.java | 2 +- settings.gradle | 2 - 18 files changed, 43 insertions(+), 1933 deletions(-) delete mode 100644 ParquetHadoop/LICENSE delete mode 100644 ParquetHadoop/NOTICE delete mode 100644 ParquetHadoop/build.gradle delete mode 100644 ParquetHadoop/gradle.properties delete mode 100644 ParquetHadoop/src/main/java/io/deephaven/parquet/base/tempfix/ParquetMetadataConverter.java diff --git a/NOTICE.md b/NOTICE.md index 995681efa31..8e17bb71e23 100644 --- a/NOTICE.md +++ b/NOTICE.md @@ -17,6 +17,4 @@ Deephaven Community License. See the `LICENSE` and `NOTICE` files in these directories for more information. * [Container](Container): Apache-2.0 -* [py/jpy](py/jpy): Apache-2.0 * [style](style): Apache-2.0 -* [ParquetHadoop](ParquetHadoop): Apache-2.0 diff --git a/ParquetHadoop/LICENSE b/ParquetHadoop/LICENSE deleted file mode 100644 index d6456956733..00000000000 --- a/ParquetHadoop/LICENSE +++ /dev/null @@ -1,202 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/ParquetHadoop/NOTICE b/ParquetHadoop/NOTICE deleted file mode 100644 index efdd44573d6..00000000000 --- a/ParquetHadoop/NOTICE +++ /dev/null @@ -1,116 +0,0 @@ -Deephaven ParquetHadoop -Copyright 2021 Deephaven Data Labs - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - --------------------------------------------------------------------------------- - -This product includes code from Apache Parquet Hadoop (https://github.com/apache/parquet-mr), -which is available under the Apache 2.0 License. The project includes the following notice: - - Apache Parquet MR (Incubating) - Copyright 2014 The Apache Software Foundation - - This product includes software developed at - The Apache Software Foundation (http://www.apache.org/). - - -------------------------------------------------------------------------------- - - This product includes parquet-tools, initially developed at ARRIS, Inc. with - the following copyright notice: - - Copyright 2013 ARRIS, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- - - This product includes parquet-protobuf, initially developed by Lukas Nalezenc - with the following copyright notice: - - Copyright 2013 Lukas Nalezenec. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- - - This product includes code from Apache Avro, which includes the following in - its NOTICE file: - - Apache Avro - Copyright 2010-2015 The Apache Software Foundation - - This product includes software developed at - The Apache Software Foundation (http://www.apache.org/). - - -------------------------------------------------------------------------------- - - This project includes code from Kite, developed at Cloudera, Inc. with - the following copyright notice: - - | Copyright 2013 Cloudera Inc. - | - | Licensed under the Apache License, Version 2.0 (the "License"); - | you may not use this file except in compliance with the License. - | You may obtain a copy of the License at - | - | http://www.apache.org/licenses/LICENSE-2.0 - | - | Unless required by applicable law or agreed to in writing, software - | distributed under the License is distributed on an "AS IS" BASIS, - | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - | See the License for the specific language governing permissions and - | limitations under the License. - - -------------------------------------------------------------------------------- - - This project includes code from Netflix, Inc. with the following copyright - notice: - - | Copyright 2016 Netflix, Inc. - | - | Licensed under the Apache License, Version 2.0 (the "License"); - | you may not use this file except in compliance with the License. - | You may obtain a copy of the License at - | - | http://www.apache.org/licenses/LICENSE-2.0 - | - | Unless required by applicable law or agreed to in writing, software - | distributed under the License is distributed on an "AS IS" BASIS, - | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - | See the License for the specific language governing permissions and - | limitations under the License. --------------------------------------------------------------------------------- - - - diff --git a/ParquetHadoop/build.gradle b/ParquetHadoop/build.gradle deleted file mode 100644 index a9faf546632..00000000000 --- a/ParquetHadoop/build.gradle +++ /dev/null @@ -1,62 +0,0 @@ -import nl.javadude.gradle.plugins.license.License - -plugins { - id 'java-library' - id 'io.deephaven.project.register' -} - -sourceSets { - main { - java { - srcDir 'java' - } - } -} -tasks.withType(License) { - enabled = false -} - -dependencies { - // TODO(deephaven-core#3148): LZ4_RAW parquet support - api('org.apache.parquet:parquet-hadoop:1.13.0') - - // TODO(deephaven-core#806): Remove dependency on hadoop-common - api('org.apache.hadoop:hadoop-common:3.3.3') { - // do not take any dependencies of this project, - // we just want a few classes (Configuration, Path) for - // simplified prototyping work, and api compatibility. - transitive = false - // if we actually need any more of hadoop at runtime, - // we can add more jars w/ transitive=false, - // or replace transitive=false here w/ more exclusions; - // (we want to avoid pulling in netty, loggers, jetty-util, guice and asm). - } - - /* A dependency to woodstox is triggered from the - * initialization of ParquetReader the first time - * we try to open a parquet file. Without this below, - * we get: - * - * java.lang.NoClassDefFoundError: com/ctc/wstx/io/InputBootstrapper - * at io.deephaven.parquet.ParquetFileReader.lambda$new$0(ParquetFileReader.java:44) - * at java.lang.ThreadLocal$SuppliedThreadLocal.initialValue(ThreadLocal.java:284) - * at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180) - * at java.lang.ThreadLocal.get(ThreadLocal.java:170) - * at io.deephaven.parquet.ColumnChunkReaderImpl.lambda$new$0(ColumnChunkReaderImpl.java:49) - * [...] - * - * Similarly for hadoop-shaded-guava. - * - * lz4-pure-java - note that we can't _easily_ use aircompressor here, as the service loader sees - * the copy in hadoop-common. TODO use config instead of service loader - */ - runtimeOnly('com.fasterxml.woodstox:woodstox-core:6.4.0') { - because 'hadoop-common required dependency for Configuration' - } - runtimeOnly('org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.1.1') { - because 'hadoop-common required dependency for Configuration' - } - runtimeOnly('commons-collections:commons-collections:3.2.2') { - because 'hadoop-common required dependency for Configuration' - } -} diff --git a/ParquetHadoop/gradle.properties b/ParquetHadoop/gradle.properties deleted file mode 100644 index 1c0cc01b600..00000000000 --- a/ParquetHadoop/gradle.properties +++ /dev/null @@ -1 +0,0 @@ -io.deephaven.project.ProjectType=JAVA_EXTERNAL diff --git a/ParquetHadoop/src/main/java/io/deephaven/parquet/base/tempfix/ParquetMetadataConverter.java b/ParquetHadoop/src/main/java/io/deephaven/parquet/base/tempfix/ParquetMetadataConverter.java deleted file mode 100644 index c800aebb1f7..00000000000 --- a/ParquetHadoop/src/main/java/io/deephaven/parquet/base/tempfix/ParquetMetadataConverter.java +++ /dev/null @@ -1,1538 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package io.deephaven.parquet.base.tempfix; - -/* -TODO(deephaven-core#901): Remove the hacked ParquetMetadataConverter.java and the need for the ParquetHadoop module. -NOTE this only exists for this line, inside addRowGroup -Without it the page offset is not being saved properly - if (columnMetaData.getDictionaryPageOffset() >= 0) { - columnChunk.meta_data.setDictionary_page_offset(columnMetaData.getDictionaryPageOffset()); - } - - */ - -import static java.util.Optional.empty; - -import static java.util.Optional.of; -import static org.apache.parquet.format.Util.readFileMetaData; -import static org.apache.parquet.format.Util.writePageHeader; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.CorruptStatistics; -import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.format.BsonType; -import org.apache.parquet.format.CompressionCodec; -import org.apache.parquet.format.DateType; -import org.apache.parquet.format.DecimalType; -import org.apache.parquet.format.EnumType; -import org.apache.parquet.format.IntType; -import org.apache.parquet.format.JsonType; -import org.apache.parquet.format.ListType; -import org.apache.parquet.format.LogicalType; -import org.apache.parquet.format.MapType; -import org.apache.parquet.format.MicroSeconds; -import org.apache.parquet.format.MilliSeconds; -import org.apache.parquet.format.NanoSeconds; -import org.apache.parquet.format.NullType; -import org.apache.parquet.format.PageEncodingStats; -import org.apache.parquet.format.StringType; -import org.apache.parquet.format.TimeType; -import org.apache.parquet.format.TimeUnit; -import org.apache.parquet.format.TimestampType; -import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.format.BoundaryOrder; -import org.apache.parquet.format.ColumnChunk; -import org.apache.parquet.format.ColumnIndex; -import org.apache.parquet.format.ColumnMetaData; -import org.apache.parquet.format.ColumnOrder; -import org.apache.parquet.format.ConvertedType; -import org.apache.parquet.format.DataPageHeader; -import org.apache.parquet.format.DataPageHeaderV2; -import org.apache.parquet.format.DictionaryPageHeader; -import org.apache.parquet.format.Encoding; -import org.apache.parquet.format.FieldRepetitionType; -import org.apache.parquet.format.FileMetaData; -import org.apache.parquet.format.KeyValue; -import org.apache.parquet.format.OffsetIndex; -import org.apache.parquet.format.PageHeader; -import org.apache.parquet.format.PageLocation; -import org.apache.parquet.format.PageType; -import org.apache.parquet.format.RowGroup; -import org.apache.parquet.format.SchemaElement; -import org.apache.parquet.format.Statistics; -import org.apache.parquet.format.Type; -import org.apache.parquet.format.TypeDefinedOrder; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.column.EncodingStats; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; -import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; -import org.apache.parquet.internal.hadoop.metadata.IndexReference; -import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.schema.ColumnOrder.ColumnOrderName; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; -import org.apache.parquet.schema.Type.Repetition; -import org.apache.parquet.schema.TypeVisitor; -import org.apache.parquet.schema.Types; -import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// TODO: This file has become too long! -// TODO: Lets split it up: https://issues.apache.org/jira/browse/PARQUET-310 -public class ParquetMetadataConverter { - - private static final TypeDefinedOrder TYPE_DEFINED_ORDER = new TypeDefinedOrder(); - public static final MetadataFilter NO_FILTER = new NoFilter(); - public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter(); - public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k - - private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class); - private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR = new LogicalTypeConverterVisitor(); - private static final ConvertedTypeConverterVisitor CONVERTED_TYPE_CONVERTER_VISITOR = new ConvertedTypeConverterVisitor(); - - private final boolean useSignedStringMinMax; - - public ParquetMetadataConverter() { - this(false); - } - - /** - * @param conf a configuration - * @deprecated will be removed in 2.0.0; use {@code ParquetMetadataConverter(ParquetReadOptions)} - */ - @Deprecated - public ParquetMetadataConverter(Configuration conf) { - this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); - } - - public ParquetMetadataConverter(ParquetReadOptions options) { - this(options.useSignedStringMinMax()); - } - - private ParquetMetadataConverter(boolean useSignedStringMinMax) { - this.useSignedStringMinMax = useSignedStringMinMax; - } - - // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate - // sets of encodings. It is important that all collections inserted to this cache be - // immutable and have thread-safe read-only access. This can be achieved by wrapping - // an unsynchronized collection in Collections.unmodifiable*(), and making sure to not - // keep any references to the original collection. - private static final ConcurrentHashMap, Set> - cachedEncodingSets = new ConcurrentHashMap, Set>(); - - public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) { - List blocks = parquetMetadata.getBlocks(); - List rowGroups = new ArrayList(); - long numRows = 0; - for (BlockMetaData block : blocks) { - numRows += block.getRowCount(); - addRowGroup(parquetMetadata, rowGroups, block); - } - FileMetaData fileMetaData = new FileMetaData( - currentVersion, - toParquetSchema(parquetMetadata.getFileMetaData().getSchema()), - numRows, - rowGroups); - - Set> keyValues = parquetMetadata.getFileMetaData().getKeyValueMetaData().entrySet(); - for (Entry keyValue : keyValues) { - addKeyValue(fileMetaData, keyValue.getKey(), keyValue.getValue()); - } - - fileMetaData.setCreated_by(parquetMetadata.getFileMetaData().getCreatedBy()); - - fileMetaData.setColumn_orders(getColumnOrders(parquetMetadata.getFileMetaData().getSchema())); - - return fileMetaData; - } - - private List getColumnOrders(MessageType schema) { - List columnOrders = new ArrayList<>(); - // Currently, only TypeDefinedOrder is supported, so we create a column order for each columns with - // TypeDefinedOrder even if some types (e.g. INT96) have undefined column orders. - for (int i = 0, n = schema.getPaths().size(); i < n; ++i) { - ColumnOrder columnOrder = new ColumnOrder(); - columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER); - columnOrders.add(columnOrder); - } - return columnOrders; - } - - // Visible for testing - List toParquetSchema(MessageType schema) { - List result = new ArrayList(); - addToList(result, schema); - return result; - } - - private void addToList(final List result, org.apache.parquet.schema.Type field) { - field.accept(new TypeVisitor() { - @Override - public void visit(PrimitiveType primitiveType) { - SchemaElement element = new SchemaElement(primitiveType.getName()); - element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition())); - element.setType(getType(primitiveType.getPrimitiveTypeName())); - if (primitiveType.getLogicalTypeAnnotation() != null) { - element.setConverted_type(convertToConvertedType(primitiveType.getLogicalTypeAnnotation())); - element.setLogicalType(convertToLogicalType(primitiveType.getLogicalTypeAnnotation())); - } - if (primitiveType.getDecimalMetadata() != null) { - element.setPrecision(primitiveType.getDecimalMetadata().getPrecision()); - element.setScale(primitiveType.getDecimalMetadata().getScale()); - } - if (primitiveType.getTypeLength() > 0) { - element.setType_length(primitiveType.getTypeLength()); - } - if (primitiveType.getId() != null) { - element.setField_id(primitiveType.getId().intValue()); - } - result.add(element); - } - - @Override - public void visit(MessageType messageType) { - SchemaElement element = new SchemaElement(messageType.getName()); - if (messageType.getId() != null) { - element.setField_id(messageType.getId().intValue()); - } - visitChildren(result, messageType.asGroupType(), element); - } - - @Override - public void visit(GroupType groupType) { - SchemaElement element = new SchemaElement(groupType.getName()); - element.setRepetition_type(toParquetRepetition(groupType.getRepetition())); - if (groupType.getLogicalTypeAnnotation() != null) { - element.setConverted_type(convertToConvertedType(groupType.getLogicalTypeAnnotation())); - element.setLogicalType(convertToLogicalType(groupType.getLogicalTypeAnnotation())); - } - if (groupType.getId() != null) { - element.setField_id(groupType.getId().intValue()); - } - visitChildren(result, groupType, element); - } - - private void visitChildren(final List result, - GroupType groupType, SchemaElement element) { - element.setNum_children(groupType.getFieldCount()); - result.add(element); - for (org.apache.parquet.schema.Type field : groupType.getFields()) { - addToList(result, field); - } - } - }); - } - - LogicalType convertToLogicalType(LogicalTypeAnnotation logicalTypeAnnotation) { - return logicalTypeAnnotation.accept(LOGICAL_TYPE_ANNOTATION_VISITOR).get(); - } - - ConvertedType convertToConvertedType(LogicalTypeAnnotation logicalTypeAnnotation) { - return logicalTypeAnnotation.accept(CONVERTED_TYPE_CONVERTER_VISITOR).orElse(null); - } - - static org.apache.parquet.format.TimeUnit convertUnit(LogicalTypeAnnotation.TimeUnit unit) { - switch (unit) { - case MICROS: - return org.apache.parquet.format.TimeUnit.MICROS(new MicroSeconds()); - case MILLIS: - return org.apache.parquet.format.TimeUnit.MILLIS(new MilliSeconds()); - case NANOS: - return TimeUnit.NANOS(new NanoSeconds()); - default: - throw new RuntimeException("Unknown time unit " + unit); - } - } - - private static class ConvertedTypeConverterVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor { - @Override - public Optional visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { - return of(ConvertedType.UTF8); - } - - @Override - public Optional visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) { - return of(ConvertedType.MAP); - } - - @Override - public Optional visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) { - return of(ConvertedType.LIST); - } - - @Override - public Optional visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { - return of(ConvertedType.ENUM); - } - - @Override - public Optional visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { - return of(ConvertedType.DECIMAL); - } - - @Override - public Optional visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { - return of(ConvertedType.DATE); - } - - @Override - public Optional visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { - if (!timeLogicalType.isAdjustedToUTC()) { - return empty(); - } - switch (timeLogicalType.getUnit()) { - case MILLIS: - return of(ConvertedType.TIME_MILLIS); - case MICROS: - return of(ConvertedType.TIME_MICROS); - case NANOS: - return empty(); - default: - throw new RuntimeException("Unknown converted type for " + timeLogicalType.toOriginalType()); - } - } - - @Override - public Optional visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - if (!timestampLogicalType.isAdjustedToUTC()) { - return empty(); - } - switch (timestampLogicalType.getUnit()) { - case MICROS: - return of(ConvertedType.TIMESTAMP_MICROS); - case MILLIS: - return of(ConvertedType.TIMESTAMP_MILLIS); - case NANOS: - return empty(); - default: - throw new RuntimeException("Unknown converted type for " + timestampLogicalType.toOriginalType()); - } - } - - @Override - public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { - boolean signed = intLogicalType.isSigned(); - switch (intLogicalType.getBitWidth()) { - case 8: - return of(signed ? ConvertedType.INT_8 : ConvertedType.UINT_8); - case 16: - return of(signed ? ConvertedType.INT_16 : ConvertedType.UINT_16); - case 32: - return of(signed ? ConvertedType.INT_32 : ConvertedType.UINT_32); - case 64: - return of(signed ? ConvertedType.INT_64 : ConvertedType.UINT_64); - default: - throw new RuntimeException("Unknown original type " + intLogicalType.toOriginalType()); - } - } - - @Override - public Optional visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return of(ConvertedType.JSON); - } - - @Override - public Optional visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { - return of(ConvertedType.BSON); - } - - @Override - public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { - return of(ConvertedType.INTERVAL); - } - - @Override - public Optional visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) { - return of(ConvertedType.MAP_KEY_VALUE); - } - } - - private static class LogicalTypeConverterVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor { - @Override - public Optional visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { - return of(LogicalType.STRING(new StringType())); - } - - @Override - public Optional visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) { - return of(LogicalType.MAP(new MapType())); - } - - @Override - public Optional visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) { - return of(LogicalType.LIST(new ListType())); - } - - @Override - public Optional visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { - return of(LogicalType.ENUM(new EnumType())); - } - - @Override - public Optional visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { - return of(LogicalType.DECIMAL(new DecimalType(decimalLogicalType.getScale(), decimalLogicalType.getPrecision()))); - } - - @Override - public Optional visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { - return of(LogicalType.DATE(new DateType())); - } - - @Override - public Optional visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { - return of(LogicalType.TIME(new TimeType(timeLogicalType.isAdjustedToUTC(), convertUnit(timeLogicalType.getUnit())))); - } - - @Override - public Optional visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - return of(LogicalType.TIMESTAMP(new TimestampType(timestampLogicalType.isAdjustedToUTC(), convertUnit(timestampLogicalType.getUnit())))); - } - - @Override - public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { - return of(LogicalType.INTEGER(new IntType((byte) intLogicalType.getBitWidth(), intLogicalType.isSigned()))); - } - - @Override - public Optional visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return of(LogicalType.JSON(new JsonType())); - } - - @Override - public Optional visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { - return of(LogicalType.BSON(new BsonType())); - } - - @Override - public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { - return of(LogicalType.UNKNOWN(new NullType())); - } - - @Override - public Optional visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) { - return of(LogicalType.UNKNOWN(new NullType())); - } - } - - private void addRowGroup(ParquetMetadata parquetMetadata, List rowGroups, BlockMetaData block) { - //rowGroup.total_byte_size = ; - List columns = block.getColumns(); - List parquetColumns = new ArrayList(); - for (ColumnChunkMetaData columnMetaData : columns) { - ColumnChunk columnChunk = new ColumnChunk(columnMetaData.getFirstDataPageOffset()); // verify this is the right offset - columnChunk.file_path = block.getPath(); // they are in the same file for now - columnChunk.meta_data = new ColumnMetaData( - getType(columnMetaData.getType()), - toFormatEncodings(columnMetaData.getEncodings()), - Arrays.asList(columnMetaData.getPath().toArray()), - toFormatCodec(columnMetaData.getCodec()), - columnMetaData.getValueCount(), - columnMetaData.getTotalUncompressedSize(), - columnMetaData.getTotalSize(), - columnMetaData.getFirstDataPageOffset()); - if (columnMetaData.getDictionaryPageOffset() >= 0) { - columnChunk.meta_data.setDictionary_page_offset(columnMetaData.getDictionaryPageOffset()); - } - if (!columnMetaData.getStatistics().isEmpty()) { - columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics())); - } - if (columnMetaData.getEncodingStats() != null) { - columnChunk.meta_data.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats())); - } -// columnChunk.meta_data.index_page_offset = ; -// columnChunk.meta_data.key_value_metadata = ; // nothing yet - - IndexReference columnIndexRef = columnMetaData.getColumnIndexReference(); - if (columnIndexRef != null) { - columnChunk.setColumn_index_offset(columnIndexRef.getOffset()); - columnChunk.setColumn_index_length(columnIndexRef.getLength()); - } - IndexReference offsetIndexRef = columnMetaData.getOffsetIndexReference(); - if (offsetIndexRef != null) { - columnChunk.setOffset_index_offset(offsetIndexRef.getOffset()); - columnChunk.setOffset_index_length(offsetIndexRef.getLength()); - } - - parquetColumns.add(columnChunk); - } - RowGroup rowGroup = new RowGroup(parquetColumns, block.getTotalByteSize(), block.getRowCount()); - rowGroups.add(rowGroup); - } - - private List toFormatEncodings(Set encodings) { - List converted = new ArrayList(encodings.size()); - for (org.apache.parquet.column.Encoding encoding : encodings) { - converted.add(getEncoding(encoding)); - } - return converted; - } - - // Visible for testing - Set fromFormatEncodings(List encodings) { - Set converted = new HashSet(); - - for (Encoding encoding : encodings) { - converted.add(getEncoding(encoding)); - } - - // make converted unmodifiable, drop reference to modifiable copy - converted = Collections.unmodifiableSet(converted); - - // atomically update the cache - Set cached = cachedEncodingSets.putIfAbsent(converted, converted); - - if (cached == null) { - // cached == null signifies that converted was *not* in the cache previously - // so we can return converted instead of throwing it away, it has now - // been cached - cached = converted; - } - - return cached; - } - - private CompressionCodecName fromFormatCodec(CompressionCodec codec) { - return CompressionCodecName.valueOf(codec.toString()); - } - - private CompressionCodec toFormatCodec(CompressionCodecName codec) { - return CompressionCodec.valueOf(codec.toString()); - } - - public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) { - return org.apache.parquet.column.Encoding.valueOf(encoding.name()); - } - - public Encoding getEncoding(org.apache.parquet.column.Encoding encoding) { - return Encoding.valueOf(encoding.name()); - } - - public EncodingStats convertEncodingStats(List stats) { - if (stats == null) { - return null; - } - - EncodingStats.Builder builder = new EncodingStats.Builder(); - for (PageEncodingStats stat : stats) { - switch (stat.getPage_type()) { - case DATA_PAGE_V2: - builder.withV2Pages(); - // falls through - case DATA_PAGE: - builder.addDataEncoding( - getEncoding(stat.getEncoding()), stat.getCount()); - break; - case DICTIONARY_PAGE: - builder.addDictEncoding( - getEncoding(stat.getEncoding()), stat.getCount()); - break; - } - } - return builder.build(); - } - - public List convertEncodingStats(EncodingStats stats) { - if (stats == null) { - return null; - } - - List formatStats = new ArrayList(); - for (org.apache.parquet.column.Encoding encoding : stats.getDictionaryEncodings()) { - formatStats.add(new PageEncodingStats( - PageType.DICTIONARY_PAGE, getEncoding(encoding), - stats.getNumDictionaryPagesEncodedAs(encoding))); - } - PageType dataPageType = (stats.usesV2Pages() ? PageType.DATA_PAGE_V2 : PageType.DATA_PAGE); - for (org.apache.parquet.column.Encoding encoding : stats.getDataEncodings()) { - formatStats.add(new PageEncodingStats( - dataPageType, getEncoding(encoding), - stats.getNumDataPagesEncodedAs(encoding))); - } - return formatStats; - } - - public static Statistics toParquetStatistics( - org.apache.parquet.column.statistics.Statistics stats) { - Statistics formatStats = new Statistics(); - // Don't write stats larger than the max size rather than truncating. The - // rationale is that some engines may use the minimum value in the page as - // the true minimum for aggregations and there is no way to mark that a - // value has been truncated and is a lower bound and not in the page. - if (!stats.isEmpty() && stats.isSmallerThan(MAX_STATS_SIZE)) { - formatStats.setNull_count(stats.getNumNulls()); - if (stats.hasNonNullValue()) { - byte[] min = stats.getMinBytes(); - byte[] max = stats.getMaxBytes(); - - // Fill the former min-max statistics only if the comparison logic is - // signed so the logic of V1 and V2 stats are the same (which is - // trivially true for equal min-max values) - if (sortOrder(stats.type()) == SortOrder.SIGNED || Arrays.equals(min, max)) { - formatStats.setMin(min); - formatStats.setMax(max); - } - - if (isMinMaxStatsSupported(stats.type()) || Arrays.equals(min, max)) { - formatStats.setMin_value(min); - formatStats.setMax_value(max); - } - } - } - return formatStats; - } - - private static boolean isMinMaxStatsSupported(PrimitiveType type) { - return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER; - } - - /** - * @param statistics parquet format statistics - * @param type a primitive type name - * @return the statistics - * @deprecated will be removed in 2.0.0. - */ - @Deprecated - public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics(Statistics statistics, PrimitiveTypeName type) { - return fromParquetStatistics(null, statistics, type); - } - - /** - * @param createdBy the created-by string from the file - * @param statistics parquet format statistics - * @param type a primitive type name - * @return the statistics - * @deprecated will be removed in 2.0.0. - */ - @Deprecated - public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics - (String createdBy, Statistics statistics, PrimitiveTypeName type) { - return fromParquetStatisticsInternal(createdBy, statistics, - new PrimitiveType(Repetition.OPTIONAL, type, "fake_type"), defaultSortOrder(type)); - } - - // Visible for testing - static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal - (String createdBy, Statistics formatStats, PrimitiveType type, SortOrder typeSortOrder) { - // create stats object based on the column type - org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = - org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type); - - if (formatStats != null) { - // Use the new V2 min-max statistics over the former one if it is filled - if (formatStats.isSetMin_value() && formatStats.isSetMax_value()) { - byte[] min = formatStats.min_value.array(); - byte[] max = formatStats.max_value.array(); - if (isMinMaxStatsSupported(type) || Arrays.equals(min, max)) { - statsBuilder.withMin(min); - statsBuilder.withMax(max); - } - } else { - boolean isSet = formatStats.isSetMax() && formatStats.isSetMin(); - boolean maxEqualsMin = isSet ? Arrays.equals(formatStats.getMin(), formatStats.getMax()) : false; - boolean sortOrdersMatch = SortOrder.SIGNED == typeSortOrder; - // NOTE: See docs in CorruptStatistics for explanation of why this check is needed - // The sort order is checked to avoid returning min/max stats that are not - // valid with the type's sort order. In previous releases, all stats were - // aggregated using a signed byte-wise ordering, which isn't valid for all the - // types (e.g. strings, decimals etc.). - if (!CorruptStatistics.shouldIgnoreStatistics(createdBy, type.getPrimitiveTypeName()) && - (sortOrdersMatch || maxEqualsMin)) { - if (isSet) { - statsBuilder.withMin(formatStats.min.array()); - statsBuilder.withMax(formatStats.max.array()); - } - } - } - - if (formatStats.isSetNull_count()) { - statsBuilder.withNumNulls(formatStats.null_count); - } - } - return statsBuilder.build(); - } - - public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( - String createdBy, Statistics statistics, PrimitiveType type) { - SortOrder expectedOrder = overrideSortOrderToSigned(type) ? - SortOrder.SIGNED : sortOrder(type); - return fromParquetStatisticsInternal( - createdBy, statistics, type, expectedOrder); - } - - /** - * Sort order for page and column statistics. Types are associated with sort - * orders (e.g., UTF8 columns should use UNSIGNED) and column stats are - * aggregated using a sort order. As of parquet-format version 2.3.1, the - * order used to aggregate stats is always SIGNED and is not stored in the - * Parquet file. These stats are discarded for types that need unsigned. - * - * See PARQUET-686. - */ - enum SortOrder { - SIGNED, - UNSIGNED, - UNKNOWN - } - - private static final Set STRING_TYPES = Collections - .unmodifiableSet(new HashSet<>(Arrays.asList( - LogicalTypeAnnotation.StringLogicalTypeAnnotation.class, - LogicalTypeAnnotation.EnumLogicalTypeAnnotation.class, - LogicalTypeAnnotation.JsonLogicalTypeAnnotation.class - ))); - - /** - * Returns whether to use signed order min and max with a type. It is safe to - * use signed min and max when the type is a string type and contains only - * ASCII characters (where the sign bit was 0). This checks whether the type - * is a string type and uses {@code useSignedStringMinMax} to determine if - * only ASCII characters were written. - * - * @param type a primitive type with a logical type annotation - * @return true if signed order min/max can be used with this type - */ - private boolean overrideSortOrderToSigned(PrimitiveType type) { - // even if the override is set, only return stats for string-ish types - // a null type annotation is considered string-ish because some writers - // failed to use the UTF8 annotation. - LogicalTypeAnnotation annotation = type.getLogicalTypeAnnotation(); - return useSignedStringMinMax && - PrimitiveTypeName.BINARY == type.getPrimitiveTypeName() && - (annotation == null || STRING_TYPES.contains(annotation.getClass())); - } - - /** - * @param primitive a primitive physical type - * @return the default sort order used when the logical type is not known - */ - private static SortOrder defaultSortOrder(PrimitiveTypeName primitive) { - switch (primitive) { - case BOOLEAN: - case INT32: - case INT64: - case FLOAT: - case DOUBLE: - return SortOrder.SIGNED; - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return SortOrder.UNSIGNED; - } - return SortOrder.UNKNOWN; - } - - /** - * @param primitive a primitive type with a logical type annotation - * @return the "correct" sort order of the type that applications assume - */ - private static SortOrder sortOrder(PrimitiveType primitive) { - LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation(); - if (annotation != null) { - return annotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { - @Override - public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { - return intLogicalType.isSigned() ? of(SortOrder.SIGNED) : of(SortOrder.UNSIGNED); - } - - @Override - public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { - return of(SortOrder.UNKNOWN); - } - - @Override - public Optional visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { - return of(SortOrder.SIGNED); - } - - @Override - public Optional visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { - return of(SortOrder.UNSIGNED); - } - - @Override - public Optional visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { - return of(SortOrder.UNSIGNED); - } - - @Override - public Optional visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return of(SortOrder.UNSIGNED); - } - - @Override - public Optional visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { - return of(SortOrder.UNSIGNED); - } - - @Override - public Optional visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { - return of(SortOrder.UNKNOWN); - } - - @Override - public Optional visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) { - return of(SortOrder.UNKNOWN); - } - - @Override - public Optional visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) { - return of(SortOrder.UNKNOWN); - } - - @Override - public Optional visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) { - return of(SortOrder.UNKNOWN); - } - - @Override - public Optional visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { - return of(SortOrder.SIGNED); - } - - @Override - public Optional visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - return of(SortOrder.SIGNED); - } - }).orElse(defaultSortOrder(primitive.getPrimitiveTypeName())); - } - - return defaultSortOrder(primitive.getPrimitiveTypeName()); - } - - public PrimitiveTypeName getPrimitive(Type type) { - switch (type) { - case BYTE_ARRAY: // TODO: rename BINARY and remove this switch - return PrimitiveTypeName.BINARY; - case INT64: - return PrimitiveTypeName.INT64; - case INT32: - return PrimitiveTypeName.INT32; - case BOOLEAN: - return PrimitiveTypeName.BOOLEAN; - case FLOAT: - return PrimitiveTypeName.FLOAT; - case DOUBLE: - return PrimitiveTypeName.DOUBLE; - case INT96: - return PrimitiveTypeName.INT96; - case FIXED_LEN_BYTE_ARRAY: - return PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; - default: - throw new RuntimeException("Unknown type " + type); - } - } - - // Visible for testing - Type getType(PrimitiveTypeName type) { - switch (type) { - case INT64: - return Type.INT64; - case INT32: - return Type.INT32; - case BOOLEAN: - return Type.BOOLEAN; - case BINARY: - return Type.BYTE_ARRAY; - case FLOAT: - return Type.FLOAT; - case DOUBLE: - return Type.DOUBLE; - case INT96: - return Type.INT96; - case FIXED_LEN_BYTE_ARRAY: - return Type.FIXED_LEN_BYTE_ARRAY; - default: - throw new RuntimeException("Unknown primitive type " + type); - } - } - - // Visible for testing - LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type, SchemaElement schemaElement) { - switch (type) { - case UTF8: - return LogicalTypeAnnotation.stringType(); - case MAP: - return LogicalTypeAnnotation.mapType(); - case MAP_KEY_VALUE: - return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(); - case LIST: - return LogicalTypeAnnotation.listType(); - case ENUM: - return LogicalTypeAnnotation.enumType(); - case DECIMAL: - int scale = (schemaElement == null ? 0 : schemaElement.scale); - int precision = (schemaElement == null ? 0 : schemaElement.precision); - return LogicalTypeAnnotation.decimalType(scale, precision); - case DATE: - return LogicalTypeAnnotation.dateType(); - case TIME_MILLIS: - return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); - case TIME_MICROS: - return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS); - case TIMESTAMP_MILLIS: - return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); - case TIMESTAMP_MICROS: - return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS); - case INTERVAL: - return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); - case INT_8: - return LogicalTypeAnnotation.intType(8, true); - case INT_16: - return LogicalTypeAnnotation.intType(16, true); - case INT_32: - return LogicalTypeAnnotation.intType(32, true); - case INT_64: - return LogicalTypeAnnotation.intType(64, true); - case UINT_8: - return LogicalTypeAnnotation.intType(8, false); - case UINT_16: - return LogicalTypeAnnotation.intType(16, false); - case UINT_32: - return LogicalTypeAnnotation.intType(32, false); - case UINT_64: - return LogicalTypeAnnotation.intType(64, false); - case JSON: - return LogicalTypeAnnotation.jsonType(); - case BSON: - return LogicalTypeAnnotation.bsonType(); - default: - throw new RuntimeException("Can't convert converted type to logical type, unknown converted type " + type); - } - } - - LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) { - switch (type.getSetField()) { - case MAP: - return LogicalTypeAnnotation.mapType(); - case BSON: - return LogicalTypeAnnotation.bsonType(); - case DATE: - return LogicalTypeAnnotation.dateType(); - case ENUM: - return LogicalTypeAnnotation.enumType(); - case JSON: - return LogicalTypeAnnotation.jsonType(); - case LIST: - return LogicalTypeAnnotation.listType(); - case TIME: - TimeType time = type.getTIME(); - return LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, convertTimeUnit(time.unit)); - case STRING: - return LogicalTypeAnnotation.stringType(); - case DECIMAL: - DecimalType decimal = type.getDECIMAL(); - return LogicalTypeAnnotation.decimalType(decimal.scale, decimal.precision); - case INTEGER: - IntType integer = type.getINTEGER(); - return LogicalTypeAnnotation.intType(integer.bitWidth, integer.isSigned); - case UNKNOWN: - return null; - case TIMESTAMP: - TimestampType timestamp = type.getTIMESTAMP(); - return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit)); - default: - throw new RuntimeException("Unknown logical type " + type); - } - } - - private LogicalTypeAnnotation.TimeUnit convertTimeUnit(TimeUnit unit) { - switch (unit.getSetField()) { - case MICROS: - return LogicalTypeAnnotation.TimeUnit.MICROS; - case MILLIS: - return LogicalTypeAnnotation.TimeUnit.MILLIS; - case NANOS: - return LogicalTypeAnnotation.TimeUnit.NANOS; - default: - throw new RuntimeException("Unknown time unit " + unit); - } - } - - private static void addKeyValue(FileMetaData fileMetaData, String key, String value) { - KeyValue keyValue = new KeyValue(key); - keyValue.value = value; - fileMetaData.addToKey_value_metadata(keyValue); - } - - private static interface MetadataFilterVisitor { - T visit(NoFilter filter) throws E; - T visit(SkipMetadataFilter filter) throws E; - T visit(RangeMetadataFilter filter) throws E; - T visit(OffsetMetadataFilter filter) throws E; - } - - public abstract static class MetadataFilter { - private MetadataFilter() {} - abstract T accept(MetadataFilterVisitor visitor) throws E; - } - - /** - * [ startOffset, endOffset ) - * @param startOffset a start offset (inclusive) - * @param endOffset an end offset (exclusive) - * @return a range filter from the offsets - */ - public static MetadataFilter range(long startOffset, long endOffset) { - return new RangeMetadataFilter(startOffset, endOffset); - } - - public static MetadataFilter offsets(long... offsets) { - Set set = new HashSet(); - for (long offset : offsets) { - set.add(offset); - } - return new OffsetMetadataFilter(set); - } - - private static final class NoFilter extends MetadataFilter { - private NoFilter() {} - @Override - T accept(MetadataFilterVisitor visitor) throws E { - return visitor.visit(this); - } - @Override - public String toString() { - return "NO_FILTER"; - } - } - private static final class SkipMetadataFilter extends MetadataFilter { - private SkipMetadataFilter() {} - @Override - T accept(MetadataFilterVisitor visitor) throws E { - return visitor.visit(this); - } - @Override - public String toString() { - return "SKIP_ROW_GROUPS"; - } - } - - /** - * [ startOffset, endOffset ) - */ - // Visible for testing - static final class RangeMetadataFilter extends MetadataFilter { - final long startOffset; - final long endOffset; - - RangeMetadataFilter(long startOffset, long endOffset) { - super(); - this.startOffset = startOffset; - this.endOffset = endOffset; - } - - @Override - T accept(MetadataFilterVisitor visitor) throws E { - return visitor.visit(this); - } - - public boolean contains(long offset) { - return offset >= this.startOffset && offset < this.endOffset; - } - - @Override - public String toString() { - return "range(s:" + startOffset + ", e:" + endOffset + ")"; - } - } - - static final class OffsetMetadataFilter extends MetadataFilter { - private final Set offsets; - - public OffsetMetadataFilter(Set offsets) { - this.offsets = offsets; - } - - public boolean contains(long offset) { - return offsets.contains(offset); - } - - @Override - T accept(MetadataFilterVisitor visitor) throws E { - return visitor.visit(this); - } - } - - @Deprecated - public ParquetMetadata readParquetMetadata(InputStream from) throws IOException { - return readParquetMetadata(from, NO_FILTER); - } - - // Visible for testing - static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) { - List rowGroups = metaData.getRow_groups(); - List newRowGroups = new ArrayList(); - for (RowGroup rowGroup : rowGroups) { - long totalSize = 0; - long startIndex = getOffset(rowGroup.getColumns().get(0)); - for (ColumnChunk col : rowGroup.getColumns()) { - totalSize += col.getMeta_data().getTotal_compressed_size(); - } - long midPoint = startIndex + totalSize / 2; - if (filter.contains(midPoint)) { - newRowGroups.add(rowGroup); - } - } - metaData.setRow_groups(newRowGroups); - return metaData; - } - - // Visible for testing - static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetadataFilter filter) { - List rowGroups = metaData.getRow_groups(); - List newRowGroups = new ArrayList(); - for (RowGroup rowGroup : rowGroups) { - long startIndex = getOffset(rowGroup.getColumns().get(0)); - if (filter.contains(startIndex)) { - newRowGroups.add(rowGroup); - } - } - metaData.setRow_groups(newRowGroups); - return metaData; - } - - static long getOffset(RowGroup rowGroup) { - return getOffset(rowGroup.getColumns().get(0)); - } - // Visible for testing - static long getOffset(ColumnChunk columnChunk) { - ColumnMetaData md = columnChunk.getMeta_data(); - long offset = md.getData_page_offset(); - if (md.isSetDictionary_page_offset() && offset > md.getDictionary_page_offset()) { - offset = md.getDictionary_page_offset(); - } - return offset; - } - - public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException { - FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor() { - @Override - public FileMetaData visit(NoFilter filter) throws IOException { - return readFileMetaData(from); - } - - @Override - public FileMetaData visit(SkipMetadataFilter filter) throws IOException { - return readFileMetaData(from, true); - } - - @Override - public FileMetaData visit(OffsetMetadataFilter filter) throws IOException { - return filterFileMetaDataByStart(readFileMetaData(from), filter); - } - - @Override - public FileMetaData visit(RangeMetadataFilter filter) throws IOException { - return filterFileMetaDataByMidpoint(readFileMetaData(from), filter); - } - }); - LOG.debug("{}", fileMetaData); - ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData); - if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata)); - return parquetMetadata; - } - - public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException { - MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders()); - List blocks = new ArrayList(); - List row_groups = parquetMetadata.getRow_groups(); - if (row_groups != null) { - for (RowGroup rowGroup : row_groups) { - BlockMetaData blockMetaData = new BlockMetaData(); - blockMetaData.setRowCount(rowGroup.getNum_rows()); - blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size()); - List columns = rowGroup.getColumns(); - String filePath = columns.get(0).getFile_path(); - for (ColumnChunk columnChunk : columns) { - if ((filePath == null && columnChunk.getFile_path() != null) - || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) { - throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now"); - } - ColumnMetaData metaData = columnChunk.meta_data; - ColumnPath path = getPath(metaData); - ColumnChunkMetaData column = ColumnChunkMetaData.get( - path, - messageType.getType(path.toArray()).asPrimitiveType(), - fromFormatCodec(metaData.codec), - convertEncodingStats(metaData.getEncoding_stats()), - fromFormatEncodings(metaData.encodings), - fromParquetStatistics( - parquetMetadata.getCreated_by(), - metaData.statistics, - messageType.getType(path.toArray()).asPrimitiveType()), - metaData.data_page_offset, - metaData.dictionary_page_offset, - metaData.num_values, - metaData.total_compressed_size, - metaData.total_uncompressed_size); - column.setColumnIndexReference(toColumnIndexReference(columnChunk)); - column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); - // TODO - // index_page_offset - // key_value_metadata - blockMetaData.addColumn(column); - } - blockMetaData.setPath(filePath); - blocks.add(blockMetaData); - } - } - Map keyValueMetaData = new HashMap(); - List key_value_metadata = parquetMetadata.getKey_value_metadata(); - if (key_value_metadata != null) { - for (KeyValue keyValue : key_value_metadata) { - keyValueMetaData.put(keyValue.key, keyValue.value); - } - } - return new ParquetMetadata( - new org.apache.parquet.hadoop.metadata.FileMetaData(messageType, keyValueMetaData, parquetMetadata.getCreated_by()), - blocks); - } - - private static IndexReference toColumnIndexReference(ColumnChunk columnChunk) { - if (columnChunk.isSetColumn_index_offset() && columnChunk.isSetColumn_index_length()) { - return new IndexReference(columnChunk.getColumn_index_offset(), columnChunk.getColumn_index_length()); - } - return null; - } - - private static IndexReference toOffsetIndexReference(ColumnChunk columnChunk) { - if (columnChunk.isSetOffset_index_offset() && columnChunk.isSetOffset_index_length()) { - return new IndexReference(columnChunk.getOffset_index_offset(), columnChunk.getOffset_index_length()); - } - return null; - } - - private static ColumnPath getPath(ColumnMetaData metaData) { - String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]); - return ColumnPath.get(path); - } - - // Visible for testing - MessageType fromParquetSchema(List schema, List columnOrders) { - Iterator iterator = schema.iterator(); - SchemaElement root = iterator.next(); - Types.MessageTypeBuilder builder = Types.buildMessage(); - if (root.isSetField_id()) { - builder.id(root.field_id); - } - buildChildren(builder, iterator, root.getNum_children(), columnOrders, 0); - return builder.named(root.name); - } - - private void buildChildren(Types.GroupBuilder builder, - Iterator schema, - int childrenCount, - List columnOrders, - int columnCount) { - for (int i = 0; i < childrenCount; i++) { - SchemaElement schemaElement = schema.next(); - - // Create Parquet Type. - Types.Builder childBuilder; - if (schemaElement.type != null) { - Types.PrimitiveBuilder primitiveBuilder = builder.primitive( - getPrimitive(schemaElement.type), - fromParquetRepetition(schemaElement.repetition_type)); - if (schemaElement.isSetType_length()) { - primitiveBuilder.length(schemaElement.type_length); - } - if (schemaElement.isSetPrecision()) { - primitiveBuilder.precision(schemaElement.precision); - } - if (schemaElement.isSetScale()) { - primitiveBuilder.scale(schemaElement.scale); - } - if (columnOrders != null) { - org.apache.parquet.schema.ColumnOrder columnOrder = fromParquetColumnOrder(columnOrders.get(columnCount)); - // As per parquet format 2.4.0 no UNDEFINED order is supported. So, set undefined column order for the types - // where ordering is not supported. - if (columnOrder.getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER - && (schemaElement.type == Type.INT96 || schemaElement.converted_type == ConvertedType.INTERVAL)) { - columnOrder = org.apache.parquet.schema.ColumnOrder.undefined(); - } - primitiveBuilder.columnOrder(columnOrder); - } - childBuilder = primitiveBuilder; - - } else { - childBuilder = builder.group(fromParquetRepetition(schemaElement.repetition_type)); - buildChildren((Types.GroupBuilder) childBuilder, schema, schemaElement.num_children, columnOrders, columnCount); - } - - if (schemaElement.isSetLogicalType()) { - childBuilder.as(getLogicalTypeAnnotation(schemaElement.logicalType)); - } - if (schemaElement.isSetConverted_type()) { - OriginalType originalType = getLogicalTypeAnnotation(schemaElement.converted_type, schemaElement).toOriginalType(); - OriginalType newOriginalType = (schemaElement.isSetLogicalType() && getLogicalTypeAnnotation(schemaElement.logicalType) != null) ? - getLogicalTypeAnnotation(schemaElement.logicalType).toOriginalType() : null; - if (!originalType.equals(newOriginalType)) { - if (newOriginalType != null) { - LOG.warn("Converted type and logical type metadata mismatch (convertedType: {}, logical type: {}). Using value in converted type.", - schemaElement.converted_type, schemaElement.logicalType); - } - childBuilder.as(originalType); - } - } - if (schemaElement.isSetField_id()) { - childBuilder.id(schemaElement.field_id); - } - - childBuilder.named(schemaElement.name); - ++columnCount; - } - } - - // Visible for testing - FieldRepetitionType toParquetRepetition(Repetition repetition) { - return FieldRepetitionType.valueOf(repetition.name()); - } - - // Visible for testing - Repetition fromParquetRepetition(FieldRepetitionType repetition) { - return Repetition.valueOf(repetition.name()); - } - - private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) { - if (columnOrder.isSetTYPE_ORDER()) { - return org.apache.parquet.schema.ColumnOrder.typeDefined(); - } - // The column order is not yet supported by this API - return org.apache.parquet.schema.ColumnOrder.undefined(); - } - - @Deprecated - public void writeDataPageHeader( - int uncompressedSize, - int compressedSize, - int valueCount, - org.apache.parquet.column.Encoding rlEncoding, - org.apache.parquet.column.Encoding dlEncoding, - org.apache.parquet.column.Encoding valuesEncoding, - OutputStream to) throws IOException { - writePageHeader(newDataPageHeader(uncompressedSize, - compressedSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding), to); - } - - // Statistics are no longer saved in page headers - @Deprecated - public void writeDataPageHeader( - int uncompressedSize, - int compressedSize, - int valueCount, - org.apache.parquet.column.statistics.Statistics statistics, - org.apache.parquet.column.Encoding rlEncoding, - org.apache.parquet.column.Encoding dlEncoding, - org.apache.parquet.column.Encoding valuesEncoding, - OutputStream to) throws IOException { - writePageHeader( - newDataPageHeader(uncompressedSize, compressedSize, valueCount, - rlEncoding, dlEncoding, valuesEncoding), - to); - } - - private PageHeader newDataPageHeader( - int uncompressedSize, int compressedSize, - int valueCount, - org.apache.parquet.column.Encoding rlEncoding, - org.apache.parquet.column.Encoding dlEncoding, - org.apache.parquet.column.Encoding valuesEncoding) { - PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize); - // TODO: pageHeader.crc = ...; - pageHeader.setData_page_header(new DataPageHeader( - valueCount, - getEncoding(valuesEncoding), - getEncoding(dlEncoding), - getEncoding(rlEncoding))); - return pageHeader; - } - - // Statistics are no longer saved in page headers - @Deprecated - public void writeDataPageV2Header( - int uncompressedSize, int compressedSize, - int valueCount, int nullCount, int rowCount, - org.apache.parquet.column.statistics.Statistics statistics, - org.apache.parquet.column.Encoding dataEncoding, - int rlByteLength, int dlByteLength, - OutputStream to) throws IOException { - writePageHeader( - newDataPageV2Header( - uncompressedSize, compressedSize, - valueCount, nullCount, rowCount, - dataEncoding, - rlByteLength, dlByteLength), to); - } - - public void writeDataPageV1Header( - int uncompressedSize, - int compressedSize, - int valueCount, - org.apache.parquet.column.Encoding rlEncoding, - org.apache.parquet.column.Encoding dlEncoding, - org.apache.parquet.column.Encoding valuesEncoding, - OutputStream to) throws IOException { - writePageHeader(newDataPageHeader(uncompressedSize, - compressedSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding), to); - } - - public void writeDataPageV2Header( - int uncompressedSize, int compressedSize, - int valueCount, int nullCount, int rowCount, - org.apache.parquet.column.Encoding dataEncoding, - int rlByteLength, int dlByteLength, - OutputStream to) throws IOException { - writePageHeader( - newDataPageV2Header( - uncompressedSize, compressedSize, - valueCount, nullCount, rowCount, - dataEncoding, - rlByteLength, dlByteLength), to); - } - - private PageHeader newDataPageV2Header( - int uncompressedSize, int compressedSize, - int valueCount, int nullCount, int rowCount, - org.apache.parquet.column.Encoding dataEncoding, - int rlByteLength, int dlByteLength) { - // TODO: pageHeader.crc = ...; - DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2( - valueCount, nullCount, rowCount, - getEncoding(dataEncoding), - dlByteLength, rlByteLength); - PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize); - pageHeader.setData_page_header_v2(dataPageHeaderV2); - return pageHeader; - } - - public void writeDictionaryPageHeader( - int uncompressedSize, int compressedSize, int valueCount, - org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException { - PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize); - pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding))); - writePageHeader(pageHeader, to); - } - - private static BoundaryOrder toParquetBoundaryOrder( - org.apache.parquet.internal.column.columnindex.BoundaryOrder boundaryOrder) { - switch (boundaryOrder) { - case ASCENDING: - return BoundaryOrder.ASCENDING; - case DESCENDING: - return BoundaryOrder.DESCENDING; - case UNORDERED: - return BoundaryOrder.UNORDERED; - default: - throw new IllegalArgumentException("Unsupported boundary order: " + boundaryOrder); - } - } - - private static org.apache.parquet.internal.column.columnindex.BoundaryOrder fromParquetBoundaryOrder( - BoundaryOrder boundaryOrder) { - switch (boundaryOrder) { - case ASCENDING: - return org.apache.parquet.internal.column.columnindex.BoundaryOrder.ASCENDING; - case DESCENDING: - return org.apache.parquet.internal.column.columnindex.BoundaryOrder.DESCENDING; - case UNORDERED: - return org.apache.parquet.internal.column.columnindex.BoundaryOrder.UNORDERED; - default: - throw new IllegalArgumentException("Unsupported boundary order: " + boundaryOrder); - } - } - - public static ColumnIndex toParquetColumnIndex(PrimitiveType type, - org.apache.parquet.internal.column.columnindex.ColumnIndex columnIndex) { - if (!isMinMaxStatsSupported(type) || columnIndex == null) { - return null; - } - ColumnIndex parquetColumnIndex = new ColumnIndex( - columnIndex.getNullPages(), - columnIndex.getMinValues(), - columnIndex.getMaxValues(), - toParquetBoundaryOrder(columnIndex.getBoundaryOrder())); - parquetColumnIndex.setNull_counts(columnIndex.getNullCounts()); - return parquetColumnIndex; - } - - public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromParquetColumnIndex(PrimitiveType type, - ColumnIndex parquetColumnIndex) { - if (!isMinMaxStatsSupported(type)) { - return null; - } - return ColumnIndexBuilder.build(type, - fromParquetBoundaryOrder(parquetColumnIndex.getBoundary_order()), - parquetColumnIndex.getNull_pages(), - parquetColumnIndex.getNull_counts(), - parquetColumnIndex.getMin_values(), - parquetColumnIndex.getMax_values()); - } - - public static OffsetIndex toParquetOffsetIndex(org.apache.parquet.internal.column.columnindex.OffsetIndex offsetIndex) { - List pageLocations = new ArrayList<>(offsetIndex.getPageCount()); - for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) { - pageLocations.add(new PageLocation( - offsetIndex.getOffset(i), - offsetIndex.getCompressedPageSize(i), - offsetIndex.getFirstRowIndex(i))); - } - return new OffsetIndex(pageLocations); - } - - public static org.apache.parquet.internal.column.columnindex.OffsetIndex fromParquetOffsetIndex( - OffsetIndex parquetOffsetIndex) { - OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); - for (PageLocation pageLocation : parquetOffsetIndex.getPage_locations()) { - builder.add(pageLocation.getOffset(), pageLocation.getCompressed_page_size(), pageLocation.getFirst_row_index()); - } - return builder.build(); - } -} \ No newline at end of file diff --git a/buildSrc/src/main/groovy/Classpaths.groovy b/buildSrc/src/main/groovy/Classpaths.groovy index db7ce197d3d..f67907c5e40 100644 --- a/buildSrc/src/main/groovy/Classpaths.groovy +++ b/buildSrc/src/main/groovy/Classpaths.groovy @@ -287,4 +287,32 @@ class Classpaths { Configuration config = p.configurations.getByName(configName) addDependency(config, GUAVA_GROUP, GUAVA_NAME, GUAVA_VERSION) } + + static void inheritParquetHadoop(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) { + Configuration config = p.configurations.getByName(configName) + addDependency(config, 'org.apache.parquet', 'parquet-hadoop', '1.13.0') + } + + /** configName controls only the Configuration's classpath, all transitive dependencies are runtimeOnly */ + static void inheritParquetHadoopConfiguration(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) { + Configuration config = p.configurations.getByName(configName) + addDependency(config, 'org.apache.hadoop', 'hadoop-common', '3.3.3') { + it.setTransitive(false) + // Do not take any extra dependencies of this project transitively. We just want a few classes for + // configuration and compression codecs. For any additional required dependencies, add them separately, as + // done for woodstox, shaded-guava, etc. below. Or we can replace setTransitive(false) here with more + // exclusions (we want to avoid pulling in netty, loggers, jetty-util, guice and asm). + } + + Configuration runtimeOnly = p.configurations.getByName(JavaPlugin.RUNTIME_ONLY_CONFIGURATION_NAME) + addDependency(runtimeOnly, 'com.fasterxml.woodstox', 'woodstox-core', '6.4.0') { + it.because('hadoop-common required dependency for Configuration') + } + addDependency(runtimeOnly, 'org.apache.hadoop.thirdparty', 'hadoop-shaded-guava', '1.1.1') { + it.because('hadoop-common required dependency for Configuration') + } + addDependency(runtimeOnly, 'commons-collections', 'commons-collections', '3.2.2') { + it.because('hadoop-common required dependency for Configuration') + } + } } diff --git a/extensions/parquet/base/build.gradle b/extensions/parquet/base/build.gradle index 82099cf0676..b6871736347 100644 --- a/extensions/parquet/base/build.gradle +++ b/extensions/parquet/base/build.gradle @@ -6,7 +6,7 @@ plugins { description 'Parquet Base: Libraries for working with Parquet files' dependencies { - api project(':ParquetHadoop') + Classpaths.inheritParquetHadoop(project) implementation project(':extensions-parquet-compression') implementation project(':Base') diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index 41f84f3cffc..d06ffe1cf8d 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -3,7 +3,7 @@ */ package io.deephaven.parquet.base; -import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.parquet.compress.CompressorAdapter; import io.deephaven.util.QueryConstants; import org.apache.parquet.bytes.ByteBufferAllocator; diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java index 5ff4f393fcb..b9f6a37b170 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java @@ -3,7 +3,7 @@ */ package io.deephaven.parquet.base; -import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.parquet.base.util.SeekableChannelsProvider; import io.deephaven.parquet.compress.CompressorAdapter; import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory; diff --git a/extensions/parquet/compression/build.gradle b/extensions/parquet/compression/build.gradle index 416f9e95591..b71871e5516 100644 --- a/extensions/parquet/compression/build.gradle +++ b/extensions/parquet/compression/build.gradle @@ -4,8 +4,11 @@ plugins { } dependencies { - api project(':ParquetHadoop'), - project(':Util') + api project(':Util') + + // TODO(deephaven-core#3148): LZ4_RAW parquet support + Classpaths.inheritParquetHadoop(project) + Classpaths.inheritParquetHadoopConfiguration(project) implementation project(':Configuration') diff --git a/extensions/parquet/table/build.gradle b/extensions/parquet/table/build.gradle index 68cef5f870b..98a40a10546 100644 --- a/extensions/parquet/table/build.gradle +++ b/extensions/parquet/table/build.gradle @@ -19,6 +19,8 @@ dependencies { api project(':engine-stringset') implementation project(':extensions-parquet-base') + Classpaths.inheritParquetHadoop(project) + implementation project(':engine-base') implementation project(':engine-table') implementation project(':extensions-csv') diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java index bd6cafb4ec5..cf9ac9d15e7 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java @@ -10,7 +10,7 @@ import io.deephaven.parquet.table.metadata.ColumnTypeInfo; import io.deephaven.parquet.table.metadata.TableInfo; import io.deephaven.parquet.base.ParquetFileReader; -import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.util.codec.SimpleByteArrayCodec; import io.deephaven.util.codec.UTF8StringAsByteArrayCodec; import org.apache.commons.lang3.mutable.MutableObject; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 33b81f54fa5..6bfe5ed8673 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -36,7 +36,7 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.parquet.base.ParquetFileReader; -import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.parquet.base.util.CachedChannelProvider; import io.deephaven.util.annotations.VisibleForTesting; import org.apache.commons.lang3.mutable.MutableObject; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java index 95be66585ae..b1720cdbdce 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java @@ -15,7 +15,7 @@ import io.deephaven.parquet.table.location.ParquetTableLocationKey; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.base.ParquetFileReader; -import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.util.type.TypeUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.parquet.format.RowGroup; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 7b6aec79022..3ac34501aeb 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -26,7 +26,7 @@ import io.deephaven.parquet.base.ColumnChunkReader; import io.deephaven.parquet.base.ParquetFileReader; import io.deephaven.parquet.base.RowGroupReader; -import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.parquet.table.*; import io.deephaven.parquet.table.metadata.CodecInfo; import io.deephaven.parquet.table.metadata.ColumnTypeInfo; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java index 8e1292be3d6..1925250eb8e 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java @@ -9,7 +9,7 @@ import io.deephaven.engine.table.impl.locations.local.FileTableLocationKey; import io.deephaven.parquet.table.ParquetTableWriter; import io.deephaven.parquet.base.ParquetFileReader; -import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.format.RowGroup; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.jetbrains.annotations.NotNull; diff --git a/settings.gradle b/settings.gradle index c1ad45d0946..3e16c0846e1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -144,8 +144,6 @@ include(':Stats') include(':Container') -include(':ParquetHadoop') - include(':codegen') include(':cpp-client')