From badd21588ed13de16a6d7907aa35033a0bad066b Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Fri, 15 Nov 2024 10:16:44 -0800 Subject: [PATCH 1/5] feat: Add Parquet field_id writing This allows for the writing of Parquet column field_ids. This is an extraction from #6156 (which will need to expose deeper hooks to allow Iceberg to fully control field_id resolution during reading). This is to ensure we can correctly write down Iceberg tables which must write field_ids which is necessary for #5989 In addition, it was noticed that Parquet ColumnMetaData encoding was written down in a non-deterministic order due to the use of a HashSet; it has been updated to an EnumSet to support a more consistent Parquet serialization. This was necessary to test out field_id writing. --- .../parquet/base/ColumnWriterImpl.java | 4 +- .../parquet/table/ParquetInstructions.java | 206 +++--- .../io/deephaven/parquet/table/TypeInfos.java | 8 +- .../table/ParquetInstructionsTest.java | 132 ++++ .../parquet/table/TestParquetTools.java | 612 +++++++++++++++++- .../src/test/resources/NestedStruct1.parquet | 3 + .../src/test/resources/NestedStruct2.parquet | 3 + .../ReferenceListParquetFieldIds.parquet | 3 + .../Partition=0/table.parquet | 3 + .../Partition=1/table.parquet | 3 + .../ReferenceSimpleParquetFieldIds.parquet | 3 + 11 files changed, 899 insertions(+), 81 deletions(-) create mode 100644 extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java create mode 100644 extensions/parquet/table/src/test/resources/NestedStruct1.parquet create mode 100644 extensions/parquet/table/src/test/resources/NestedStruct2.parquet create mode 100644 extensions/parquet/table/src/test/resources/ReferenceListParquetFieldIds.parquet create mode 100644 extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=0/table.parquet create mode 100644 extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=1/table.parquet create mode 100644 extensions/parquet/table/src/test/resources/ReferenceSimpleParquetFieldIds.parquet diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index 8d3b523af01..32ddc126c54 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -31,7 +31,7 @@ import java.nio.IntBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.util.HashSet; +import java.util.EnumSet; import java.util.Set; import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; @@ -57,7 +57,7 @@ final class ColumnWriterImpl implements ColumnWriter { private final RunLengthBitPackingHybridEncoder dlEncoder; private final RunLengthBitPackingHybridEncoder rlEncoder; private long dictionaryOffset = -1; - private final Set encodings = new HashSet<>(); + private final Set encodings = EnumSet.noneOf(Encoding.class); private long firstDataPageOffset = -1; private long uncompressedLength; private long compressedLength; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index 3df5b7a6e4c..358cbf0c931 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -3,16 +3,18 @@ // package io.deephaven.parquet.table; +import io.deephaven.api.util.NameValidator; import io.deephaven.base.verify.Require; import io.deephaven.configuration.Configuration; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.ColumnToCodecMappings; import io.deephaven.hash.KeyedObjectHashMap; import io.deephaven.hash.KeyedObjectKey; +import io.deephaven.hash.KeyedObjectKey.Basic; import io.deephaven.parquet.base.ParquetUtils; +import io.deephaven.util.annotations.InternalUseOnly; import io.deephaven.util.annotations.VisibleForTesting; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.ArrayList; @@ -21,6 +23,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; @@ -114,6 +117,14 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par */ public abstract boolean useDictionary(String columnName); + /** + * The field ID for the given {@code columnName}. + * + * @param columnName the Deephaven column name + * @return the field id + */ + public abstract OptionalInt getFieldId(final String columnName); + public abstract Object getSpecialInstructions(); public abstract String getCompressionCodecName(); @@ -200,6 +211,7 @@ public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions } public static final ParquetInstructions EMPTY = new ParquetInstructions() { + @Override public String getParquetColumnNameFromColumnNameOrDefault(final String columnName) { return columnName; @@ -228,6 +240,11 @@ public boolean useDictionary(final String columnName) { return false; } + @Override + public OptionalInt getFieldId(String columnName) { + return OptionalInt.empty(); + } + @Override @Nullable public Object getSpecialInstructions() { @@ -319,14 +336,31 @@ ParquetInstructions withIndexColumns(final Collection> indexColumns }; private static class ColumnInstructions { + + private static final KeyedObjectKey COLUMN_NAME_KEY = new Basic<>() { + @Override + public String getKey(ColumnInstructions columnInstructions) { + return columnInstructions.getColumnName(); + } + }; + + private static final KeyedObjectKey PARQUET_COLUMN_NAME_KEY = new Basic<>() { + @Override + public String getKey(ColumnInstructions columnInstructions) { + return columnInstructions.getParquetColumnName(); + } + }; + private final String columnName; private String parquetColumnName; private String codecName; private String codecArgs; private boolean useDictionary; + private Integer fieldId; public ColumnInstructions(final String columnName) { - this.columnName = columnName; + this.columnName = Objects.requireNonNull(columnName); + NameValidator.validateColumnName(columnName); } public String getColumnName() { @@ -338,6 +372,12 @@ public String getParquetColumnName() { } public ColumnInstructions setParquetColumnName(final String parquetColumnName) { + if (this.parquetColumnName != null && !this.parquetColumnName.equals(parquetColumnName)) { + throw new IllegalArgumentException( + "Cannot add a mapping from parquetColumnName=" + parquetColumnName + + ": columnName=" + columnName + " already mapped to parquetColumnName=" + + this.parquetColumnName); + } this.parquetColumnName = parquetColumnName; return this; } @@ -367,6 +407,19 @@ public boolean useDictionary() { public void useDictionary(final boolean useDictionary) { this.useDictionary = useDictionary; } + + public OptionalInt fieldId() { + return fieldId == null ? OptionalInt.empty() : OptionalInt.of(fieldId); + } + + public void setFieldId(final int fieldId) { + if (this.fieldId != null && this.fieldId != fieldId) { + throw new IllegalArgumentException( + String.format("Inconsistent fieldId for columnName=%s, already set fieldId=%d", columnName, + this.fieldId)); + } + this.fieldId = fieldId; + } } private static final class ReadOnly extends ParquetInstructions { @@ -424,8 +477,8 @@ private ReadOnly( .collect(Collectors.toUnmodifiableList()); } - private String getOrDefault(final String columnName, final String defaultValue, - final Function fun) { + private T getOrDefault(final String columnName, final T defaultValue, + final Function fun) { if (columnNameToInstructions == null) { return defaultValue; } @@ -480,6 +533,11 @@ public boolean useDictionary(final String columnName) { return getOrDefault(columnName, false, ColumnInstructions::useDictionary); } + @Override + public OptionalInt getFieldId(String columnName) { + return getOrDefault(columnName, OptionalInt.empty(), ColumnInstructions::fieldId); + } + @Override public String getCompressionCodecName() { return compressionCodecName; @@ -656,75 +714,22 @@ public Builder(final ParquetInstructions parquetInstructions) { indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null); } - private void newColumnNameToInstructionsMap() { - columnNameToInstructions = new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() { - @Override - public String getKey(@NotNull final ColumnInstructions value) { - return value.getColumnName(); - } - }); - } - - private void newParquetColumnNameToInstructionsMap() { - parquetColumnNameToInstructions = - new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() { - @Override - public String getKey(@NotNull final ColumnInstructions value) { - return value.getParquetColumnName(); - } - }); - } - public Builder addColumnNameMapping(final String parquetColumnName, final String columnName) { - if (parquetColumnName.equals(columnName)) { - return this; - } - if (columnNameToInstructions == null) { - newColumnNameToInstructionsMap(); - final ColumnInstructions ci = new ColumnInstructions(columnName); - ci.setParquetColumnName(parquetColumnName); - columnNameToInstructions.put(columnName, ci); - newParquetColumnNameToInstructionsMap(); - parquetColumnNameToInstructions.put(parquetColumnName, ci); - return this; - } - - ColumnInstructions ci = columnNameToInstructions.get(columnName); - if (ci != null) { - if (ci.parquetColumnName != null) { - if (ci.parquetColumnName.equals(parquetColumnName)) { - return this; - } - throw new IllegalArgumentException( - "Cannot add a mapping from parquetColumnName=" + parquetColumnName - + ": columnName=" + columnName + " already mapped to parquetColumnName=" - + ci.parquetColumnName); - } - } else { - ci = new ColumnInstructions(columnName); - columnNameToInstructions.put(columnName, ci); - } - + final ColumnInstructions ci = getOrCreateColumnInstructions(columnName); + ci.setParquetColumnName(parquetColumnName); if (parquetColumnNameToInstructions == null) { - newParquetColumnNameToInstructionsMap(); - parquetColumnNameToInstructions.put(parquetColumnName, ci); - return this; + parquetColumnNameToInstructions = new KeyedObjectHashMap<>(ColumnInstructions.PARQUET_COLUMN_NAME_KEY); } - - final ColumnInstructions fromParquetColumnNameInstructions = - parquetColumnNameToInstructions.get(parquetColumnName); - if (fromParquetColumnNameInstructions != null) { - if (fromParquetColumnNameInstructions == ci) { - return this; - } + final ColumnInstructions existing = parquetColumnNameToInstructions.putIfAbsent(parquetColumnName, ci); + if (existing != null) { + // Note: this is a limitation that doesn't need to exist. Technically, we could allow a single physical + // parquet column to manifest as multiple Deephaven columns. throw new IllegalArgumentException( "Cannot add new mapping from parquetColumnName=" + parquetColumnName + " to columnName=" + columnName + ": already mapped to columnName=" - + fromParquetColumnNameInstructions.getColumnName()); + + existing.getColumnName()); } - ci.setParquetColumnName(parquetColumnName); - parquetColumnNameToInstructions.put(parquetColumnName, ci); return this; } @@ -737,7 +742,7 @@ public Builder addColumnCodec(final String columnName, final String codecName) { } public Builder addColumnCodec(final String columnName, final String codecName, final String codecArgs) { - final ColumnInstructions ci = getColumnInstructions(columnName); + final ColumnInstructions ci = getOrCreateColumnInstructions(columnName); ci.setCodecName(codecName); ci.setCodecArgs(codecArgs); return this; @@ -751,21 +756,72 @@ public Builder addColumnCodec(final String columnName, final String codecName, f * @param useDictionary The hint value */ public Builder useDictionary(final String columnName, final boolean useDictionary) { - final ColumnInstructions ci = getColumnInstructions(columnName); + final ColumnInstructions ci = getOrCreateColumnInstructions(columnName); ci.useDictionary(useDictionary); return this; } - private ColumnInstructions getColumnInstructions(final String columnName) { - final ColumnInstructions ci; + // /** + // * For reading, provides a mapping between a Deephaven column name and a parquet column by field id. This + // allows + // * resolving a parquet column where the physical "parquet column name" may not be known apriori by the caller. + // * In the case where both a field id mapping and a parquet colum name mapping is provided, the field id will + // * take precedence over the parquet column name. This may happen in cases where the parquet file is managed by + // a + // * higher-level schema that has the concept of a "field id"; for example, Iceberg. As documented + // * in the parquet format: + // * + // *
+        // * When the original schema supports field ids, this will save the original field id in the parquet schema
+        // * 
+ // * + // * In the case where a field id mapping is provided but no matching parquet column is found, the column will + // not + // * be inferred; and in the case where it's explicitly included as part of a + // * {@link #setTableDefinition(TableDefinition)}, the resulting column will contain the appropriate default + // * ({@code null}) values. In the case where there are multiple parquet columns with the same field_id, those + // * parquet columns will not be resolvable via a field id. + // * + // *

+ // * For writing, this will set the {@code field_id} in the proper Parquet {@code SchemaElement}. + // * + // *

+ // * Setting multiple field ids for a single column name is not allowed. + // * + // *

+ // * Field ids are not typically configured by end users. + // * + // * @param columnName the Deephaven column name + // * @param fieldId the field id + // */ + + /** + * This is currently only used for writing, allowing the setting of {@code field_id} in the proper Parquet + * {@code SchemaElement}. + * + *

+ * Setting multiple field ids for a single column name is not allowed. + * + *

+ * Field ids are not typically configured by end users. + * + * @param columnName the Deephaven column name + * @param fieldId the field id + */ + @InternalUseOnly + public Builder setFieldId(final String columnName, final int fieldId) { + final ColumnInstructions ci = getOrCreateColumnInstructions(columnName); + ci.setFieldId(fieldId); + return this; + } + + private ColumnInstructions getOrCreateColumnInstructions(final String columnName) { if (columnNameToInstructions == null) { - newColumnNameToInstructionsMap(); - ci = new ColumnInstructions(columnName); - columnNameToInstructions.put(columnName, ci); - } else { - ci = columnNameToInstructions.putIfAbsent(columnName, ColumnInstructions::new); + columnNameToInstructions = new KeyedObjectHashMap<>(ColumnInstructions.COLUMN_NAME_KEY); } - return ci; + return columnNameToInstructions.putIfAbsent(columnName, ColumnInstructions::new); } public Builder setCompressionCodecName(final String compressionCodecName) { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java index b95dfe98412..d380b953353 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java @@ -13,11 +13,14 @@ import io.deephaven.util.codec.SerializableCodec; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.Types.GroupBuilder; import org.apache.parquet.schema.Types.PrimitiveBuilder; import org.jetbrains.annotations.NotNull; @@ -475,9 +478,12 @@ default Type createSchemaType( isRepeating = false; } if (!isRepeating) { + instructions.getFieldId(columnDefinition.getName()).ifPresent(builder::id); return builder.named(parquetColumnName); } - return Types.buildGroup(Type.Repetition.OPTIONAL).addField( + final GroupBuilder groupBuilder = Types.buildGroup(Repetition.OPTIONAL); + instructions.getFieldId(columnDefinition.getName()).ifPresent(groupBuilder::id); + return groupBuilder.addField( Types.buildGroup(Type.Repetition.REPEATED).addField( builder.named("item")).named(parquetColumnName)) .as(LogicalTypeAnnotation.listType()).named(parquetColumnName); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java new file mode 100644 index 00000000000..8fe5a94e19e --- /dev/null +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java @@ -0,0 +1,132 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +public class ParquetInstructionsTest { + + @Test + public void setFieldId() { + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId("Foo", 42) + .setFieldId("Bar", 99) + .setFieldId("Baz", 99) + .build(); + + assertThat(instructions.getFieldId("Foo")).hasValue(42); + assertThat(instructions.getFieldId("Bar")).hasValue(99); + assertThat(instructions.getFieldId("Baz")).hasValue(99); + assertThat(instructions.getFieldId("Zap")).isEmpty(); + + // assertThat(instructions.getColumnNamesFromParquetFieldId(42)).containsExactly("Foo"); + // assertThat(instructions.getColumnNamesFromParquetFieldId(99)).containsExactly("Bar", "Baz"); + // assertThat(instructions.getColumnNamesFromParquetFieldId(100)).isEmpty(); + } + + @Test + public void setFieldIdAlreadySet() { + + // Setting the same fieldId on a given column name is "ok" if it's the same value, this is to be more consistent + // with how addColumnNameMapping works. + { + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId("Foo", 42) + .setFieldId("Foo", 42) + .build(); + assertThat(instructions.getFieldId("Foo")).hasValue(42); + } + + try { + ParquetInstructions.builder() + .setFieldId("Foo", 42) + .setFieldId("Foo", 43) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("Inconsistent fieldId for columnName=Foo, already set fieldId=42"); + } + } + + @Test + public void setFieldBadName() { + try { + ParquetInstructions.builder() + .setFieldId("Not a legal column name", 42) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Invalid column name"); + } + } + + @Test + public void addColumnNameMapping() { + final ParquetInstructions instructions = ParquetInstructions.builder() + .addColumnNameMapping("Foo", "Foo") + .addColumnNameMapping("PARQUET COLUMN 2!", "Bar") + .addColumnNameMapping("ParquetColumn3", "Baz") + .build(); + + assertThat(instructions.getColumnNameFromParquetColumnName("Foo")).isEqualTo("Foo"); + assertThat(instructions.getColumnNameFromParquetColumnName("PARQUET COLUMN 2!")).isEqualTo("Bar"); + assertThat(instructions.getColumnNameFromParquetColumnName("ParquetColumn3")).isEqualTo("Baz"); + assertThat(instructions.getColumnNameFromParquetColumnName("Does Not Exist")).isNull(); + + // assertThat(instructions.getParquetColumnName("Foo")).hasValue("Foo"); + // assertThat(instructions.getParquetColumnName("Bar")).hasValue("PARQUET COLUMN 2!"); + // assertThat(instructions.getParquetColumnName("Baz")).hasValue("ParquetColumn3"); + // assertThat(instructions.getParquetColumnName("Zap")).isEmpty(); + + assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Foo")).isEqualTo("Foo"); + assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Bar")).isEqualTo("PARQUET COLUMN 2!"); + assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Baz")).isEqualTo("ParquetColumn3"); + assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Zap")).isEqualTo("Zap"); + } + + @Test + public void addColumnNameMappingMultipleParquetColumnsToSameDeephavenColumn() { + try { + ParquetInstructions.builder() + .addColumnNameMapping("ParquetColumn1", "Foo") + .addColumnNameMapping("ParquetColumn2", "Foo") + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage( + "Cannot add a mapping from parquetColumnName=ParquetColumn2: columnName=Foo already mapped to parquetColumnName=ParquetColumn1"); + } + } + + @Test + public void addColumnNameMappingSameParquetColumnToMultipleDeephavenColumns() { + // Note: this is a limitation that doesn't need to exist. Technically, we could allow a single physical + // parquet column to manifest as multiple Deephaven columns. + try { + ParquetInstructions.builder() + .addColumnNameMapping("ParquetColumn1", "Foo") + .addColumnNameMapping("ParquetColumn1", "Bar") + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage( + "Cannot add new mapping from parquetColumnName=ParquetColumn1 to columnName=Bar: already mapped to columnName=Foo"); + } + } + + @Test + public void addColumnNameMappingBadName() { + try { + ParquetInstructions.builder() + .addColumnNameMapping("SomeParquetColumnName", "Not a legal column name") + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Invalid column name"); + } + } +} diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java index d096669b192..bcaafdae7ce 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java @@ -3,6 +3,7 @@ // package io.deephaven.parquet.table; +import com.google.common.io.BaseEncoding; import io.deephaven.UncheckedDeephavenException; import io.deephaven.base.FileUtils; import io.deephaven.engine.context.ExecutionContext; @@ -13,22 +14,45 @@ import io.deephaven.engine.table.impl.UncoalescedTable; import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.table.impl.util.ColumnHolder; import io.deephaven.engine.table.vectors.ColumnVectors; import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.engine.util.TableTools; import io.deephaven.parquet.base.InvalidParquetFileException; import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout; +import io.deephaven.qst.type.Type; import io.deephaven.stringset.HashStringSet; import io.deephaven.stringset.StringSet; import io.deephaven.time.DateTimeUtils; -import io.deephaven.vector.*; +import io.deephaven.util.QueryConstants; +import io.deephaven.vector.DoubleVector; +import io.deephaven.vector.DoubleVectorDirect; +import io.deephaven.vector.FloatVector; +import io.deephaven.vector.FloatVectorDirect; +import io.deephaven.vector.IntVector; +import io.deephaven.vector.IntVectorDirect; +import io.deephaven.vector.LongVector; +import io.deephaven.vector.LongVectorDirect; +import io.deephaven.vector.ObjectVector; +import io.deephaven.vector.ObjectVectorDirect; import junit.framework.TestCase; -import org.junit.*; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.lang.reflect.Proxy; import java.nio.file.Files; +import java.nio.file.Path; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -40,7 +64,14 @@ import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.engine.testutil.TstUtils.tableRangesAreEqual; -import static io.deephaven.engine.util.TableTools.*; +import static io.deephaven.engine.util.TableTools.col; +import static io.deephaven.engine.util.TableTools.doubleCol; +import static io.deephaven.engine.util.TableTools.emptyTable; +import static io.deephaven.engine.util.TableTools.intCol; +import static io.deephaven.engine.util.TableTools.longCol; +import static io.deephaven.engine.util.TableTools.newTable; +import static io.deephaven.engine.util.TableTools.shortCol; +import static io.deephaven.engine.util.TableTools.stringCol; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -580,4 +611,579 @@ public void testNoDictionaryOffset() { DateTimeUtils.epochNanos((Instant) withNullsAndMissingOffsets.getColumnSource("CREATE_DATE").get(0))); assertTableEquals(withNullsAndMissingOffsets, clean); } + + @Test + public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOException { + final int BAZ_ID = 111; + final int ZAP_ID = 112; + final String BAZ = "Baz"; + final String ZAP = "Zap"; + final ColumnDefinition bazCol = ColumnDefinition.ofLong(BAZ); + final ColumnDefinition zapCol = ColumnDefinition.of(ZAP, Type.stringType().arrayType()); + final TableDefinition td = TableDefinition.of(bazCol, zapCol); + final Table expected = newTable(td, + longCol(BAZ, 99, 101), + new ColumnHolder<>(ZAP, String[].class, String.class, false, new String[] {"Foo", "Bar"}, + new String[] {"Hello"})); + final File file = new File(testRoot, "testWriteParquetFieldIds.parquet"); + { + // Writing down random parquet column names that we _don't_ keep a reference to. This way, the only way we + // can successfully resolve them is by field id. + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .addColumnNameMapping("Some Random Parquet Column Name", BAZ) + .addColumnNameMapping("ABCDEFG", ZAP) + .build(); + ParquetTools.writeTable(expected, file.getPath(), writeInstructions); + } + + // This is somewhat fragile, but has been manually verified to contain the field_ids that we expect. + // We may want to consider more explicit tests that verify our writing logic is consistent, as it would be good + // to know whenever our serialization changes in any way. + assertEquals("c21f162b2c186d0a95a8d2302c9ed3fab172747b45501b2ee5e5bb04b98e92e0", sha256sum(file.toPath())); + + // This test is a bit circular; but assuming we trust our reading code, we should have relative confidence that + // we are writing it down correctly if we can read it correctly. + // { + // final ParquetInstructions readInstructions = ParquetInstructions.builder() + // .setFieldId(BAZ, BAZ_ID) + // .setFieldId(ZAP, ZAP_ID) + // .build(); + // { + // final Table actual = ParquetTools.readTable(file.getPath(), readInstructions); + // assertEquals(td, actual.getDefinition()); + // assertTableEquals(expected, actual); + // } + // { + // final Table actual = ParquetTools.readTable(file.getPath(), readInstructions.withTableDefinition(td)); + // assertEquals(td, actual.getDefinition()); + // assertTableEquals(expected, actual); + // } + // } + } + + /** + * This data was generated via the script: + * + *

+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     * field_id = b"PARQUET:field_id"
+     * fields = [
+     *     pa.field(
+     *         "e0cf7927-45dc-4dfc-b4ef-36bf4b6ae463", pa.int64(), metadata={field_id: b"0"}
+     *     ),
+     *     pa.field(
+     *         "53f0de5a-e06f-476e-b82a-a3f9294fcd05", pa.string(), metadata={field_id: b"1"}
+     *     ),
+     * ]
+     * table = pa.table([[99, 101], ["Foo", "Bar"]], schema=pa.schema(fields))
+     * pq.write_table(table, "ReferenceSimpleParquetFieldIds.parquet")
+     * 
+ * + * @see Arrow Parquet field_id + */ + @Ignore("We don't have reading field_id support yet") + @Test + public void testParquetFieldIds() { + final String file = TestParquetTools.class.getResource("/ReferenceSimpleParquetFieldIds.parquet").getFile(); + + // No instructions; will sanitize the names. Both columns get the "-" removed and the second column gets the + // "column_" prefix added because it starts with a digit. + { + final TableDefinition expectedInferredTD = TableDefinition.of( + ColumnDefinition.ofLong("e0cf792745dc4dfcb4ef36bf4b6ae463"), + ColumnDefinition.ofString("column_53f0de5ae06f476eb82aa3f9294fcd05")); + final Table table = ParquetTools.readTable(file); + assertEquals(expectedInferredTD, table.getDefinition()); + assertTableEquals(newTable(expectedInferredTD, + longCol("e0cf792745dc4dfcb4ef36bf4b6ae463", 99, 101), + stringCol("column_53f0de5ae06f476eb82aa3f9294fcd05", "Foo", "Bar")), table); + } + + final int BAZ_ID = 0; + final int ZAP_ID = 1; + final String BAZ = "Baz"; + final String ZAP = "Zap"; + final ColumnDefinition bazCol = ColumnDefinition.ofLong(BAZ); + final ColumnDefinition zapCol = ColumnDefinition.ofString(ZAP); + + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .build(); + + final TableDefinition td = TableDefinition.of(bazCol, zapCol); + + // It's enough to just provide the mapping based on field_id + { + final Table table = ParquetTools.readTable(file, instructions); + assertEquals(td, table.getDefinition()); + assertTableEquals(newTable(td, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar")), table); + } + + // But, the user can still provide a TableDefinition + { + final Table table = ParquetTools.readTable(file, instructions.withTableDefinition(td)); + assertEquals(td, table.getDefinition()); + assertTableEquals(newTable(td, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar")), table); + } + + // The user can provide the full mapping, but still a more limited definition + { + final TableDefinition justIdTD = TableDefinition.of(bazCol); + final Table table = ParquetTools.readTable(file, instructions.withTableDefinition(justIdTD)); + assertEquals(justIdTD, table.getDefinition()); + assertTableEquals(newTable(justIdTD, + longCol(BAZ, 99, 101)), table); + } + + // If only a partial id mapping is provided, only that will be "properly" mapped + { + final TableDefinition partialTD = TableDefinition.of( + ColumnDefinition.ofLong(BAZ), + ColumnDefinition.ofString("column_53f0de5ae06f476eb82aa3f9294fcd05")); + final ParquetInstructions partialInstructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .build(); + final Table table = ParquetTools.readTable(file, partialInstructions); + assertEquals(partialTD, table.getDefinition()); + } + + // There are no errors if a field ID is configured but not found; it won't be inferred. + { + final Table table = ParquetTools.readTable(file, ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .setFieldId("Fake", 99) + .build()); + assertEquals(td, table.getDefinition()); + assertTableEquals(newTable(td, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar")), table); + } + + // If it's explicitly asked for, like other columns, it will return an appropriate null value + { + final TableDefinition tdWithFake = + TableDefinition.of(bazCol, zapCol, ColumnDefinition.ofShort("Fake")); + final Table table = ParquetTools.readTable(file, ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .setFieldId("Fake", 99) + .build() + .withTableDefinition(tdWithFake)); + assertEquals(tdWithFake, table.getDefinition()); + assertTableEquals(newTable(tdWithFake, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar"), + shortCol("Fake", QueryConstants.NULL_SHORT, QueryConstants.NULL_SHORT)), table); + } + + // You can even re-use IDs to get the same physical column out multiple times + { + final String BAZ_DUPE = "BazDupe"; + final TableDefinition dupeTd = + TableDefinition.of(bazCol, zapCol, ColumnDefinition.ofLong(BAZ_DUPE)); + final ParquetInstructions dupeInstructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .setFieldId(BAZ_DUPE, BAZ_ID) + .build(); + { + final Table table = ParquetTools.readTable(file, dupeInstructions.withTableDefinition(dupeTd)); + assertEquals(dupeTd, table.getDefinition()); + assertTableEquals(newTable(dupeTd, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar"), + longCol(BAZ_DUPE, 99, 101)), table); + } + + // In the case where we have dupe field IDs and don't provide an explicit definition, we are preferring to + // fail during the inference step + { + try { + ParquetTools.readTable(file, dupeInstructions); + Assertions.failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + Assertions.assertThat(e).hasMessageContaining("Non-unique Field ID mapping provided"); + } + } + } + + // If both a field id and parquet column name mapping is provided, they need to map to the same parquet column. + { + final TableDefinition bazTd = TableDefinition.of(bazCol); + final ParquetInstructions inconsistent = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .addColumnNameMapping("53f0de5a-e06f-476e-b82a-a3f9294fcd05", BAZ) + .build(); + // In the case where we are inferring the TableDefinition from parquet schema, the inconsistency will be + // noticed up front + try { + ParquetTools.readTable(file, inconsistent); + Assertions.failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + Assertions.assertThat(e) + .hasMessageContaining("Supplied ColumnDefinitions include duplicate names [Baz]"); + } + // In the case where we provide a TableDefinition, the inconsistency will be noticed when reading the + // data + try { + // Need to force read of data + ParquetTools.readTable(file, inconsistent.withTableDefinition(bazTd)).select(); + Assertions.failBecauseExceptionWasNotThrown(TableDataException.class); + } catch (TableDataException e) { + Assertions.assertThat(e).getRootCause().hasMessageContaining( + "For columnName=Baz, providing an explicit parquet column name path ([53f0de5a-e06f-476e-b82a-a3f9294fcd05]) and field id (0) mapping, but they are resolving to different columns, byFieldId=[colIx=0, pathKey=[e0cf7927-45dc-4dfc-b4ef-36bf4b6ae463], fieldId=0], byPath=[colIx=1, pathKey=[53f0de5a-e06f-476e-b82a-a3f9294fcd05], fieldId=1]"); + } + } + } + + /** + * This data was generated via the script: + * + *
+     * import uuid
+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     *
+     * def write_to(path: str):
+     *     field_id = b"PARQUET:field_id"
+     *     fields = [
+     *         pa.field(str(uuid.uuid4()), pa.int64(), metadata={field_id: b"42"}),
+     *         pa.field(str(uuid.uuid4()), pa.string(), metadata={field_id: b"43"}),
+     *     ]
+     *     table = pa.table([[] for _ in fields], schema=pa.schema(fields))
+     *     pq.write_table(table, path)
+     *
+     *
+     * write_to("/ReferencePartitionedFieldIds/Partition=0/table.parquet")
+     * write_to("/ReferencePartitionedFieldIds/Partition=1/table.parquet")
+     * 
+ * + * It mimics the case of a higher-level schema management where the physical column names may be random. + * + * @see Arrow Parquet field_id + */ + @Ignore("We don't have reading field_id support yet") + @Test + public void testPartitionedParquetFieldIds() { + final String file = TestParquetTools.class.getResource("/ReferencePartitionedFieldIds").getFile(); + + final int BAZ_ID = 42; + final int ZAP_ID = 43; + final String BAZ = "Baz"; + final String ZAP = "Zap"; + final String PARTITION = "Partition"; + final ColumnDefinition partitionColumn = ColumnDefinition.ofInt(PARTITION).withPartitioning(); + final ColumnDefinition bazCol = ColumnDefinition.ofLong(BAZ); + final ColumnDefinition zapCol = ColumnDefinition.ofString(ZAP); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .build(); + + final TableDefinition expectedTd = TableDefinition.of(partitionColumn, bazCol, zapCol); + + final Table expected = newTable(expectedTd, + intCol(PARTITION, 0, 0, 1, 1), + longCol(BAZ, 99, 101, 99, 101), + stringCol(ZAP, "Foo", "Bar", "Foo", "Bar")); + + { + final Table actual = ParquetTools.readTable(file, instructions); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + { + final Table actual = ParquetTools.readTable(file, instructions.withTableDefinition(expectedTd)); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(expected, actual); + } + } + + /** + * This data was generated via the script: + * + *
+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     * field_id = b"PARQUET:field_id"
+     * schema = pa.schema(
+     *     [pa.field("some random name", pa.list_(pa.int32()), metadata={field_id: b"999"})]
+     * )
+     * data = [pa.array([[1, 2, 3], None, [], [42]], type=pa.list_(pa.int32()))]
+     * table = pa.Table.from_arrays(data, schema=schema)
+     * pq.write_table(table, "ReferenceListParquetFieldIds.parquet")
+     * 
+ * + * @see Arrow Parquet field_id + */ + @Ignore("We don't have reading field_id support yet") + @Test + public void testParquetFieldIdsWithListType() { + final String file = TestParquetTools.class.getResource("/ReferenceListParquetFieldIds.parquet").getFile(); + final String FOO = "Foo"; + final TableDefinition td = TableDefinition.of(ColumnDefinition.of(FOO, Type.intType().arrayType())); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(FOO, 999) + .build(); + final Table expected = TableTools.newTable(td, new ColumnHolder<>(FOO, int[].class, int.class, false, + new int[] {1, 2, 3}, + null, + new int[0], + new int[] {42})); + { + final Table actual = ParquetTools.readTable(file, instructions); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + { + final Table actual = ParquetTools.readTable(file, instructions.withTableDefinition(td)); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + } + + /** + * This is meant to test a "common" renaming scenario. Originally, a schema might be written down with a column + * named "Name" where semantically, this was really a first name. Later, the schema might be "corrected" to label + * this column as "FirstName". Both standalone, and in combination with the newer file, we should be able to read it + * with the latest schema. + */ + @Ignore("We don't have reading field_id support yet") + @Test + public void testRenamingResolveViaFieldId() { + final File f1 = new File(testRoot, "testRenamingResolveViaFieldId.00.parquet"); + final File f2 = new File(testRoot, "testRenamingResolveViaFieldId.01.parquet"); + + final int FIRST_NAME_ID = 15; + { + final String NAME = "Name"; + final Table t1 = newTable(TableDefinition.of(ColumnDefinition.ofString(NAME)), + stringCol(NAME, "Shivam", "Ryan")); + ParquetTools.writeTable(t1, f1.getPath(), ParquetInstructions.builder() + .setFieldId(NAME, FIRST_NAME_ID) + .build()); + } + + final int LAST_NAME_ID = 16; + final String FIRST_NAME = "FirstName"; + final String LAST_NAME = "LastName"; + final TableDefinition td = TableDefinition.of( + ColumnDefinition.ofString(FIRST_NAME), + ColumnDefinition.ofString(LAST_NAME)); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(FIRST_NAME, FIRST_NAME_ID) + .setFieldId(LAST_NAME, LAST_NAME_ID) + .build(); + { + final Table t = newTable(td, + stringCol(FIRST_NAME, "Pete", "Colin"), + stringCol(LAST_NAME, "Goddard", "Alworth")); + ParquetTools.writeTable(t, f2.getPath(), instructions); + } + + // If we read first file without an explicit definition, we should only get the column from the file + { + final TableDefinition expectedTd = TableDefinition.of(ColumnDefinition.ofString(FIRST_NAME)); + final Table expected = newTable(expectedTd, stringCol(FIRST_NAME, "Shivam", "Ryan")); + final Table actual = ParquetTools.readTable(f1.getPath(), instructions); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + // If we read first file with an explicit definition, the new column should return nulls + { + final Table expected = newTable(td, + stringCol(FIRST_NAME, "Shivam", "Ryan"), + stringCol(LAST_NAME, null, null)); + final Table actual = ParquetTools.readTable(f1.getPath(), instructions.withTableDefinition(td)); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + // We should be able to read both (flat partitioning) with the latest schema + { + final Table expected = newTable(td, + stringCol(FIRST_NAME, "Shivam", "Ryan", "Pete", "Colin"), + stringCol(LAST_NAME, null, null, "Goddard", "Alworth")); + { + final Table actual = ParquetTools.readTable(testRoot, instructions); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + { + final Table actual = ParquetTools.readTable(testRoot, instructions.withTableDefinition(td)); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + } + } + + + @Test + public void parquetWithNonUniqueFieldIds() { + final File f = new File(testRoot, "parquetWithNonUniqueFieldIds.parquet"); + final String FOO = "Foo"; + final String BAR = "Bar"; + final int fieldId = 31337; + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(FOO, fieldId) + .setFieldId(BAR, fieldId) + .build(); + final TableDefinition td = TableDefinition.of(ColumnDefinition.ofInt(FOO), ColumnDefinition.ofString(BAR)); + final Table expected = newTable(td, + intCol(FOO, 44, 45), + stringCol(BAR, "Zip", "Zap")); + { + ParquetTools.writeTable(expected, f.getPath(), instructions); + } + + { + final String BAZ = "Baz"; + final ParquetInstructions bazInstructions = ParquetInstructions.builder() + .setFieldId(BAZ, fieldId) + .build(); + + // fieldId _won't_ be used to actually create a Baz column since the underlying file has multiple. In this + // case, we just infer the physical parquet column names. + { + + final Table actual = ParquetTools.readTable(f.getPath(), bazInstructions); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + // If the user explicitly asks for a definition with a mapping to a non-unique field id, they will get back + // the column of default (null) values. + { + final TableDefinition bazTd = TableDefinition.of(ColumnDefinition.ofInt(BAZ)); + final Table bazTable = newTable(bazTd, intCol(BAZ, QueryConstants.NULL_INT, QueryConstants.NULL_INT)); + final Table actual = ParquetTools.readTable(f.getPath(), bazInstructions.withTableDefinition(bazTd)); + assertEquals(bazTd, actual.getDefinition()); + assertTableEquals(bazTable, actual); + } + } + } + + // // We are unable to generate this sort of file via DH atm. + // @Test + // public void parquetWithNonUniqueColumnNames() { + // + // } + + /** + *
+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     * fields = [
+     *     pa.field("Foo", pa.int64()),
+     *     pa.field(
+     *         "MyStruct",
+     *         pa.struct(
+     *             [
+     *                 pa.field("Zip", pa.int16()),
+     *                 pa.field("Zap", pa.int32()),
+     *             ]
+     *         ),
+     *     ),
+     *     pa.field("Bar", pa.string()),
+     * ]
+     *
+     * table = pa.table([[] for _ in fields], schema=pa.schema(fields))
+     * pq.write_table(table, "NestedStruct1.parquet", compression="none")
+     * 
+ */ + @Test + public void nestedMessageEmpty() { + final String file = TestParquetTools.class.getResource("/NestedStruct1.parquet").getFile(); + final TableDefinition expectedTd = + TableDefinition.of(ColumnDefinition.ofLong("Foo"), ColumnDefinition.ofString("Bar")); + final Table table = TableTools.newTable(expectedTd, longCol("Foo"), stringCol("Bar")); + // If we use an explicit definition, we can skip over MyStruct and read Foo, Bar + { + final Table actual = + ParquetTools.readTable(file, ParquetInstructions.EMPTY.withTableDefinition(expectedTd)); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(table, actual); + } + + // If we try to infer, we currently throw an error. + // TODO(deephaven-core#871): Parquet: Support repetition level >1 and multi-column fields + try { + ParquetTools.readTable(file); + Assertions.failBecauseExceptionWasNotThrown(UnsupportedOperationException.class); + } catch (UnsupportedOperationException e) { + Assertions.assertThat(e) + .hasMessageContaining("Encountered unsupported multi-column field MyStruct"); + } + } + + /** + *
+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     * fields = [
+     *     pa.field("Foo", pa.int64()),
+     *     pa.field(
+     *         "MyStruct",
+     *         pa.struct(
+     *             [
+     *                 pa.field("Zip", pa.int16()),
+     *                 pa.field("Zap", pa.int32()),
+     *             ]
+     *         ),
+     *     ),
+     *     pa.field("Bar", pa.string()),
+     * ]
+     *
+     * table = pa.table([[None] for _ in fields], schema=pa.schema(fields))
+     * pq.write_table(table, "NestedStruct2.parquet", compression="none")
+     * 
+ */ + @Test + public void nestedMessage1Row() { + final String file = TestParquetTools.class.getResource("/NestedStruct2.parquet").getFile(); + final TableDefinition expectedTd = + TableDefinition.of(ColumnDefinition.ofLong("Foo"), ColumnDefinition.ofString("Bar")); + final Table table = TableTools.newTable(expectedTd, longCol("Foo", QueryConstants.NULL_LONG), + stringCol("Bar", (String) null)); + // If we use an explicit definition, we can skip over MyStruct and read Foo, Bar + { + final Table actual = + ParquetTools.readTable(file, ParquetInstructions.EMPTY.withTableDefinition(expectedTd)); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(table, actual); + } + + // If we try to infer, we currently throw an error. + // TODO(deephaven-core#871): Parquet: Support repetition level >1 and multi-column fields + try { + ParquetTools.readTable(file); + Assertions.failBecauseExceptionWasNotThrown(UnsupportedOperationException.class); + } catch (UnsupportedOperationException e) { + Assertions.assertThat(e) + .hasMessageContaining("Encountered unsupported multi-column field MyStruct"); + } + } + + private static String sha256sum(Path path) throws NoSuchAlgorithmException, IOException { + final MessageDigest digest = MessageDigest.getInstance("SHA-256"); + final DigestOutputStream out = new DigestOutputStream(OutputStream.nullOutputStream(), digest); + Files.copy(path, out); + return BaseEncoding.base16().lowerCase().encode(digest.digest()); + } } diff --git a/extensions/parquet/table/src/test/resources/NestedStruct1.parquet b/extensions/parquet/table/src/test/resources/NestedStruct1.parquet new file mode 100644 index 00000000000..d592f378b92 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/NestedStruct1.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d3b0b28c657e8991f0ffa48aedb68f23d1a746da8ddeaa473dbee82a3f80c66c +size 1019 diff --git a/extensions/parquet/table/src/test/resources/NestedStruct2.parquet b/extensions/parquet/table/src/test/resources/NestedStruct2.parquet new file mode 100644 index 00000000000..f8cee910583 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/NestedStruct2.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a6ea5e3b9a2d5907adf6ecc3909a6491a62e78c0ae7ca9516f96ad9d4db3432d +size 1237 diff --git a/extensions/parquet/table/src/test/resources/ReferenceListParquetFieldIds.parquet b/extensions/parquet/table/src/test/resources/ReferenceListParquetFieldIds.parquet new file mode 100644 index 00000000000..72ebdb26baa --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceListParquetFieldIds.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:037307d886a9eade2dca4982917eec86516686fdb2d33b66ec9e8724ee59aeb0 +size 843 diff --git a/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=0/table.parquet b/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=0/table.parquet new file mode 100644 index 00000000000..5a20bee6eeb --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=0/table.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6f7c9501d597752d7295cd7b5f098336df5c67a341cceb583f661f2d0ac6d265 +size 1323 diff --git a/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=1/table.parquet b/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=1/table.parquet new file mode 100644 index 00000000000..b31a2c3ee60 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=1/table.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:61fa98e653bb6d913c4828a53d92977d3c443796e616996d3fcec76821d89b92 +size 1323 diff --git a/extensions/parquet/table/src/test/resources/ReferenceSimpleParquetFieldIds.parquet b/extensions/parquet/table/src/test/resources/ReferenceSimpleParquetFieldIds.parquet new file mode 100644 index 00000000000..4fa8b5ebdb2 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceSimpleParquetFieldIds.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:439904cfa5209c40a7c0c75c38e0835d7ce7af79a850ab270216dfff833ec0a8 +size 1315 From d23179ce9a125d2dca83e4c8dad73e85052e4d30 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Mon, 18 Nov 2024 10:52:31 -0800 Subject: [PATCH 2/5] review response --- .../parquet/base/ColumnWriterImpl.java | 2 + .../parquet/table/ParquetInstructions.java | 41 ++----------------- .../table/ParquetInstructionsTest.java | 10 ----- .../parquet/table/TestParquetTools.java | 15 +++---- 4 files changed, 10 insertions(+), 58 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index 32ddc126c54..53e8f47103c 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -57,6 +57,8 @@ final class ColumnWriterImpl implements ColumnWriter { private final RunLengthBitPackingHybridEncoder dlEncoder; private final RunLengthBitPackingHybridEncoder rlEncoder; private long dictionaryOffset = -1; + // The downstream writing code (ParquetFileWriter) seems to respect the traversal order of this set. As such, to + // improve determinism, we are using an EnumSet. private final Set encodings = EnumSet.noneOf(Encoding.class); private long firstDataPageOffset = -1; private long uncompressedLength; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index 358cbf0c931..4c418a82e29 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -15,6 +15,7 @@ import io.deephaven.util.annotations.InternalUseOnly; import io.deephaven.util.annotations.VisibleForTesting; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.ArrayList; @@ -339,14 +340,14 @@ private static class ColumnInstructions { private static final KeyedObjectKey COLUMN_NAME_KEY = new Basic<>() { @Override - public String getKey(ColumnInstructions columnInstructions) { + public String getKey(@NotNull final ColumnInstructions columnInstructions) { return columnInstructions.getColumnName(); } }; private static final KeyedObjectKey PARQUET_COLUMN_NAME_KEY = new Basic<>() { @Override - public String getKey(ColumnInstructions columnInstructions) { + public String getKey(@NotNull final ColumnInstructions columnInstructions) { return columnInstructions.getParquetColumnName(); } }; @@ -761,42 +762,6 @@ public Builder useDictionary(final String columnName, final boolean useDictionar return this; } - // /** - // * For reading, provides a mapping between a Deephaven column name and a parquet column by field id. This - // allows - // * resolving a parquet column where the physical "parquet column name" may not be known apriori by the caller. - // * In the case where both a field id mapping and a parquet colum name mapping is provided, the field id will - // * take precedence over the parquet column name. This may happen in cases where the parquet file is managed by - // a - // * higher-level schema that has the concept of a "field id"; for example, Iceberg. As documented - // * in the parquet format: - // * - // *
-        // * When the original schema supports field ids, this will save the original field id in the parquet schema
-        // * 
- // * - // * In the case where a field id mapping is provided but no matching parquet column is found, the column will - // not - // * be inferred; and in the case where it's explicitly included as part of a - // * {@link #setTableDefinition(TableDefinition)}, the resulting column will contain the appropriate default - // * ({@code null}) values. In the case where there are multiple parquet columns with the same field_id, those - // * parquet columns will not be resolvable via a field id. - // * - // *

- // * For writing, this will set the {@code field_id} in the proper Parquet {@code SchemaElement}. - // * - // *

- // * Setting multiple field ids for a single column name is not allowed. - // * - // *

- // * Field ids are not typically configured by end users. - // * - // * @param columnName the Deephaven column name - // * @param fieldId the field id - // */ - /** * This is currently only used for writing, allowing the setting of {@code field_id} in the proper Parquet * {@code SchemaElement}. diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java index 8fe5a94e19e..c1a6a4864b8 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java @@ -22,15 +22,10 @@ public void setFieldId() { assertThat(instructions.getFieldId("Bar")).hasValue(99); assertThat(instructions.getFieldId("Baz")).hasValue(99); assertThat(instructions.getFieldId("Zap")).isEmpty(); - - // assertThat(instructions.getColumnNamesFromParquetFieldId(42)).containsExactly("Foo"); - // assertThat(instructions.getColumnNamesFromParquetFieldId(99)).containsExactly("Bar", "Baz"); - // assertThat(instructions.getColumnNamesFromParquetFieldId(100)).isEmpty(); } @Test public void setFieldIdAlreadySet() { - // Setting the same fieldId on a given column name is "ok" if it's the same value, this is to be more consistent // with how addColumnNameMapping works. { @@ -77,11 +72,6 @@ public void addColumnNameMapping() { assertThat(instructions.getColumnNameFromParquetColumnName("ParquetColumn3")).isEqualTo("Baz"); assertThat(instructions.getColumnNameFromParquetColumnName("Does Not Exist")).isNull(); - // assertThat(instructions.getParquetColumnName("Foo")).hasValue("Foo"); - // assertThat(instructions.getParquetColumnName("Bar")).hasValue("PARQUET COLUMN 2!"); - // assertThat(instructions.getParquetColumnName("Baz")).hasValue("ParquetColumn3"); - // assertThat(instructions.getParquetColumnName("Zap")).isEmpty(); - assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Foo")).isEqualTo("Foo"); assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Bar")).isEqualTo("PARQUET COLUMN 2!"); assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Baz")).isEqualTo("ParquetColumn3"); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java index bcaafdae7ce..bce24632b06 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java @@ -643,6 +643,7 @@ public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOExcept // to know whenever our serialization changes in any way. assertEquals("c21f162b2c186d0a95a8d2302c9ed3fab172747b45501b2ee5e5bb04b98e92e0", sha256sum(file.toPath())); + // TODO(deephaven-core#6128): Allow Parquet column access by field_id // This test is a bit circular; but assuming we trust our reading code, we should have relative confidence that // we are writing it down correctly if we can read it correctly. // { @@ -685,7 +686,7 @@ public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOExcept * * @see Arrow Parquet field_id */ - @Ignore("We don't have reading field_id support yet") + @Ignore("TODO(deephaven-core#6128): Allow Parquet column access by field_id") @Test public void testParquetFieldIds() { final String file = TestParquetTools.class.getResource("/ReferenceSimpleParquetFieldIds.parquet").getFile(); @@ -873,7 +874,7 @@ public void testParquetFieldIds() { * * @see Arrow Parquet field_id */ - @Ignore("We don't have reading field_id support yet") + @Ignore("TODO(deephaven-core#6128): Allow Parquet column access by field_id") @Test public void testPartitionedParquetFieldIds() { final String file = TestParquetTools.class.getResource("/ReferencePartitionedFieldIds").getFile(); @@ -929,7 +930,7 @@ public void testPartitionedParquetFieldIds() { * * @see Arrow Parquet field_id */ - @Ignore("We don't have reading field_id support yet") + @Ignore("TODO(deephaven-core#6128): Allow Parquet column access by field_id") @Test public void testParquetFieldIdsWithListType() { final String file = TestParquetTools.class.getResource("/ReferenceListParquetFieldIds.parquet").getFile(); @@ -961,7 +962,7 @@ public void testParquetFieldIdsWithListType() { * this column as "FirstName". Both standalone, and in combination with the newer file, we should be able to read it * with the latest schema. */ - @Ignore("We don't have reading field_id support yet") + @Ignore("TODO(deephaven-core#6128): Allow Parquet column access by field_id") @Test public void testRenamingResolveViaFieldId() { final File f1 = new File(testRoot, "testRenamingResolveViaFieldId.00.parquet"); @@ -1077,12 +1078,6 @@ public void parquetWithNonUniqueFieldIds() { } } - // // We are unable to generate this sort of file via DH atm. - // @Test - // public void parquetWithNonUniqueColumnNames() { - // - // } - /** *

      * import pyarrow as pa

From ca85b334b0a2bfc1b8efb95ed2011e9d28ace0fc Mon Sep 17 00:00:00 2001
From: Devin Smith 
Date: Mon, 18 Nov 2024 13:23:13 -0800
Subject: [PATCH 3/5] Review responses

---
 .../parquet/table/ParquetInstructions.java    |  2 -
 .../io/deephaven/parquet/table/TypeInfos.java | 11 ++---
 .../parquet/table/TestParquetTools.java       | 48 ++++++++++++++++---
 3 files changed, 47 insertions(+), 14 deletions(-)

diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java
index 4c418a82e29..07bb2df5549 100644
--- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java
+++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java
@@ -12,7 +12,6 @@
 import io.deephaven.hash.KeyedObjectKey;
 import io.deephaven.hash.KeyedObjectKey.Basic;
 import io.deephaven.parquet.base.ParquetUtils;
-import io.deephaven.util.annotations.InternalUseOnly;
 import io.deephaven.util.annotations.VisibleForTesting;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.jetbrains.annotations.NotNull;
@@ -775,7 +774,6 @@ public Builder useDictionary(final String columnName, final boolean useDictionar
          * @param columnName the Deephaven column name
          * @param fieldId the field id
          */
-        @InternalUseOnly
         public Builder setFieldId(final String columnName, final int fieldId) {
             final ColumnInstructions ci = getOrCreateColumnInstructions(columnName);
             ci.setFieldId(fieldId);
diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java
index d380b953353..3c260511071 100644
--- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java
+++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java
@@ -481,12 +481,11 @@ default Type createSchemaType(
                 instructions.getFieldId(columnDefinition.getName()).ifPresent(builder::id);
                 return builder.named(parquetColumnName);
             }
-            final GroupBuilder groupBuilder = Types.buildGroup(Repetition.OPTIONAL);
-            instructions.getFieldId(columnDefinition.getName()).ifPresent(groupBuilder::id);
-            return groupBuilder.addField(
-                    Types.buildGroup(Type.Repetition.REPEATED).addField(
-                            builder.named("item")).named(parquetColumnName))
-                    .as(LogicalTypeAnnotation.listType()).named(parquetColumnName);
+            final Types.ListBuilder listBuilder = Types.optionalList();
+            instructions.getFieldId(columnDefinition.getName()).ifPresent(listBuilder::id);
+            return listBuilder
+                    .element(builder.named("element"))
+                    .named(parquetColumnName);
         }
     }
 
diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java
index bce24632b06..6ff0bd6625f 100644
--- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java
+++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java
@@ -20,11 +20,14 @@
 import io.deephaven.engine.util.TableTools;
 import io.deephaven.parquet.base.InvalidParquetFileException;
 import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout;
+import io.deephaven.parquet.table.location.ParquetTableLocationKey;
 import io.deephaven.qst.type.Type;
 import io.deephaven.stringset.HashStringSet;
 import io.deephaven.stringset.StringSet;
 import io.deephaven.time.DateTimeUtils;
 import io.deephaven.util.QueryConstants;
+import io.deephaven.util.channel.SeekableChannelsProvider;
+import io.deephaven.util.channel.SeekableChannelsProviderLoader;
 import io.deephaven.vector.DoubleVector;
 import io.deephaven.vector.DoubleVectorDirect;
 import io.deephaven.vector.FloatVector;
@@ -36,6 +39,10 @@
 import io.deephaven.vector.ObjectVector;
 import io.deephaven.vector.ObjectVectorDirect;
 import junit.framework.TestCase;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
@@ -48,6 +55,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.Proxy;
+import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.DigestOutputStream;
@@ -618,6 +626,8 @@ public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOExcept
         final int ZAP_ID = 112;
         final String BAZ = "Baz";
         final String ZAP = "Zap";
+        final String BAZ_PARQUET_NAME = "Some Random Parquet Column Name";
+        final String ZAP_PARQUET_NAME = "ABCDEFG";
         final ColumnDefinition bazCol = ColumnDefinition.ofLong(BAZ);
         final ColumnDefinition zapCol = ColumnDefinition.of(ZAP, Type.stringType().arrayType());
         final TableDefinition td = TableDefinition.of(bazCol, zapCol);
@@ -627,21 +637,35 @@ public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOExcept
                         new String[] {"Hello"}));
         final File file = new File(testRoot, "testWriteParquetFieldIds.parquet");
         {
-            // Writing down random parquet column names that we _don't_ keep a reference to. This way, the only way we
-            // can successfully resolve them is by field id.
             final ParquetInstructions writeInstructions = ParquetInstructions.builder()
                     .setFieldId(BAZ, BAZ_ID)
                     .setFieldId(ZAP, ZAP_ID)
-                    .addColumnNameMapping("Some Random Parquet Column Name", BAZ)
-                    .addColumnNameMapping("ABCDEFG", ZAP)
+                    .addColumnNameMapping(BAZ_PARQUET_NAME, BAZ)
+                    .addColumnNameMapping(ZAP_PARQUET_NAME, ZAP)
                     .build();
             ParquetTools.writeTable(expected, file.getPath(), writeInstructions);
         }
 
+        {
+            final MessageType expectedSchema = Types.buildMessage()
+                    .optional(PrimitiveTypeName.INT64)
+                    .id(BAZ_ID)
+                    .named(BAZ_PARQUET_NAME)
+                    .optionalList()
+                    .id(ZAP_ID)
+                    .optionalElement(PrimitiveTypeName.BINARY)
+                    .as(LogicalTypeAnnotation.stringType())
+                    .named(ZAP_PARQUET_NAME)
+                    .named("root");
+            final MessageType actualSchema = readSchema(file);
+            assertEquals(expectedSchema, actualSchema);
+        }
+
+        //
         // This is somewhat fragile, but has been manually verified to contain the field_ids that we expect.
         // We may want to consider more explicit tests that verify our writing logic is consistent, as it would be good
-        // to know whenever our serialization changes in any way.
-        assertEquals("c21f162b2c186d0a95a8d2302c9ed3fab172747b45501b2ee5e5bb04b98e92e0", sha256sum(file.toPath()));
+        // to know whenever serialization changes in any way.
+        assertEquals("2ea68b0ddaeb432e9c2721f15460b6c42449a479c1960e836f6ebe3b14f33dc1", sha256sum(file.toPath()));
 
         // TODO(deephaven-core#6128): Allow Parquet column access by field_id
         // This test is a bit circular; but assuming we trust our reading code, we should have relative confidence that
@@ -1181,4 +1205,16 @@ private static String sha256sum(Path path) throws NoSuchAlgorithmException, IOEx
         Files.copy(path, out);
         return BaseEncoding.base16().lowerCase().encode(digest.digest());
     }
+
+    private static MessageType readSchema(File file) {
+        final URI uri = FileUtils.convertToURI(file, false);
+        try (final SeekableChannelsProvider channelsProvider =
+                SeekableChannelsProviderLoader.getInstance().load(FileUtils.FILE_URI_SCHEME, null)) {
+            final ParquetTableLocationKey locationKey = new ParquetTableLocationKey(uri, 0, null, channelsProvider);
+            // TODO: which is more appropriate?
+            // locationKey.getFileReader().getSchema();
+            // locationKey.getMetadata().getFileMetaData().getSchema();
+            return locationKey.getFileReader().getSchema();
+        }
+    }
 }

From e015d7562c7b8f81ed80adcfcdf885f89250b341 Mon Sep 17 00:00:00 2001
From: Devin Smith 
Date: Tue, 19 Nov 2024 07:42:28 -0800
Subject: [PATCH 4/5] Add ref to root schema name

---
 .../main/java/io/deephaven/parquet/table/MappedSchema.java   | 5 ++++-
 .../java/io/deephaven/parquet/table/TestParquetTools.java    | 2 +-
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/MappedSchema.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/MappedSchema.java
index 46780a933c8..f6ce19a4bb5 100644
--- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/MappedSchema.java
+++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/MappedSchema.java
@@ -22,6 +22,9 @@
  */
 class MappedSchema {
 
+    @VisibleForTesting
+    static final String SCHEMA_NAME = "root";
+
     static MappedSchema create(
             final Map> computedCache,
             final TableDefinition definition,
@@ -32,7 +35,7 @@ static MappedSchema create(
         for (final ColumnDefinition columnDefinition : definition.getColumns()) {
             builder.addField(createType(computedCache, columnDefinition, rowSet, columnSourceMap, instructions));
         }
-        final MessageType schema = builder.named("root");
+        final MessageType schema = builder.named(SCHEMA_NAME);
         return new MappedSchema(definition, schema);
     }
 
diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java
index 6ff0bd6625f..b8703d54342 100644
--- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java
+++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java
@@ -656,7 +656,7 @@ public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOExcept
                     .optionalElement(PrimitiveTypeName.BINARY)
                     .as(LogicalTypeAnnotation.stringType())
                     .named(ZAP_PARQUET_NAME)
-                    .named("root");
+                    .named(MappedSchema.SCHEMA_NAME);
             final MessageType actualSchema = readSchema(file);
             assertEquals(expectedSchema, actualSchema);
         }

From 94f2ef4dd53f2c50ce23d292fb5be1f56826db30 Mon Sep 17 00:00:00 2001
From: Devin Smith 
Date: Tue, 19 Nov 2024 07:45:54 -0800
Subject: [PATCH 5/5] undo import changes

---
 .../parquet/table/TestParquetTools.java       | 33 ++++---------------
 1 file changed, 6 insertions(+), 27 deletions(-)

diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java
index b8703d54342..ddb1894d25a 100644
--- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java
+++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java
@@ -28,28 +28,14 @@
 import io.deephaven.util.QueryConstants;
 import io.deephaven.util.channel.SeekableChannelsProvider;
 import io.deephaven.util.channel.SeekableChannelsProviderLoader;
-import io.deephaven.vector.DoubleVector;
-import io.deephaven.vector.DoubleVectorDirect;
-import io.deephaven.vector.FloatVector;
-import io.deephaven.vector.FloatVectorDirect;
-import io.deephaven.vector.IntVector;
-import io.deephaven.vector.IntVectorDirect;
-import io.deephaven.vector.LongVector;
-import io.deephaven.vector.LongVectorDirect;
-import io.deephaven.vector.ObjectVector;
-import io.deephaven.vector.ObjectVectorDirect;
+import io.deephaven.vector.*;
 import junit.framework.TestCase;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Types;
 import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -72,14 +58,7 @@
 
 import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
 import static io.deephaven.engine.testutil.TstUtils.tableRangesAreEqual;
-import static io.deephaven.engine.util.TableTools.col;
-import static io.deephaven.engine.util.TableTools.doubleCol;
-import static io.deephaven.engine.util.TableTools.emptyTable;
-import static io.deephaven.engine.util.TableTools.intCol;
-import static io.deephaven.engine.util.TableTools.longCol;
-import static io.deephaven.engine.util.TableTools.newTable;
-import static io.deephaven.engine.util.TableTools.shortCol;
-import static io.deephaven.engine.util.TableTools.stringCol;
+import static io.deephaven.engine.util.TableTools.*;
 import static org.junit.Assert.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -648,12 +627,12 @@ public void testWriteParquetFieldIds() throws NoSuchAlgorithmException, IOExcept
 
         {
             final MessageType expectedSchema = Types.buildMessage()
-                    .optional(PrimitiveTypeName.INT64)
+                    .optional(PrimitiveType.PrimitiveTypeName.INT64)
                     .id(BAZ_ID)
                     .named(BAZ_PARQUET_NAME)
                     .optionalList()
                     .id(ZAP_ID)
-                    .optionalElement(PrimitiveTypeName.BINARY)
+                    .optionalElement(PrimitiveType.PrimitiveTypeName.BINARY)
                     .as(LogicalTypeAnnotation.stringType())
                     .named(ZAP_PARQUET_NAME)
                     .named(MappedSchema.SCHEMA_NAME);