Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18399: Add ParquetColumnResolver #6558

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import java.lang.annotation.Target;

/**
* Indicates that a particular method is for internal use only and should not be used by client code. It is subject to
* change/removal at any time.
* Indicates that a particular {@link ElementType#METHOD method}, {@link ElementType#CONSTRUCTOR constructor},
* {@link ElementType#TYPE type}, or {@link ElementType#PACKAGE package} is for internal use only and should not be used
* by client code. It is subject to change/removal at any time.
*/
@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE})
@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE, ElementType.PACKAGE})
@Inherited
@Documented
public @interface InternalUseOnly {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.LogicalTypeAnnotation;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.parquet.schema.MessageType;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -53,6 +54,11 @@
import java.util.List;
import java.util.stream.Collectors;
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Types.buildMessage;
import static org.apache.parquet.schema.Types.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;

Expand Down Expand Up @@ -416,8 +422,12 @@ void testColumnRenameWhileWriting() throws URISyntaxException {
{
final List<String> parquetFiles = getAllParquetFilesFromDataFiles(tableIdentifier);
assertThat(parquetFiles).hasSize(1);
verifyFieldIdsFromParquetFile(parquetFiles.get(0), originalDefinition.getColumnNames(),
nameToFieldIdFromSchema);
final MessageType expectedSchema = buildMessage()
.addFields(
optional(INT32).id(1).as(intType(32, true)).named("intCol"),
optional(DOUBLE).id(2).named("doubleCol"))
.named("root");
verifySchema(parquetFiles.get(0), expectedSchema);
}

final Table moreData = TableTools.emptyTable(5)
Expand All @@ -442,10 +452,18 @@ void testColumnRenameWhileWriting() throws URISyntaxException {

final List<String> parquetFiles = getAllParquetFilesFromDataFiles(tableIdentifier);
assertThat(parquetFiles).hasSize(2);
verifyFieldIdsFromParquetFile(parquetFiles.get(0), moreData.getDefinition().getColumnNames(),
newNameToFieldId);
verifyFieldIdsFromParquetFile(parquetFiles.get(1), originalDefinition.getColumnNames(),
nameToFieldIdFromSchema);
final MessageType expectedSchema0 = buildMessage()
.addFields(
optional(INT32).id(1).as(intType(32, true)).named("newIntCol"),
optional(DOUBLE).id(2).named("newDoubleCol"))
.named("root");
final MessageType expectedSchema1 = buildMessage()
.addFields(
optional(INT32).id(1).as(intType(32, true)).named("intCol"),
optional(DOUBLE).id(2).named("doubleCol"))
.named("root");
verifySchema(parquetFiles.get(0), expectedSchema0);
verifySchema(parquetFiles.get(1), expectedSchema1);
}

// TODO: This is failing because we don't map columns based on the column ID when reading. Uncomment this
Expand All @@ -455,31 +473,13 @@ void testColumnRenameWhileWriting() throws URISyntaxException {
// moreData.renameColumns("intCol = newIntCol", "doubleCol = newDoubleCol")), fromIceberg);
}

/**
* Verify that the schema of the parquet file read from the provided path has the provided column and corresponding
* field IDs.
*/
private void verifyFieldIdsFromParquetFile(
final String path,
final List<String> columnNames,
final Map<String, Integer> nameToFieldId) throws URISyntaxException {
private void verifySchema(String path, MessageType expectedSchema) throws URISyntaxException {
final ParquetMetadata metadata =
new ParquetTableLocationKey(new URI(path), 0, null, ParquetInstructions.builder()
.setSpecialInstructions(dataInstructions())
.build())
.getMetadata();
final List<ColumnDescriptor> columnsMetadata = metadata.getFileMetaData().getSchema().getColumns();

final int numColumns = columnNames.size();
for (int colIdx = 0; colIdx < numColumns; colIdx++) {
final String columnName = columnNames.get(colIdx);
final String columnNameFromParquetFile = columnsMetadata.get(colIdx).getPath()[0];
assertThat(columnName).isEqualTo(columnNameFromParquetFile);

final int expectedFieldId = nameToFieldId.get(columnName);
final int fieldIdFromParquetFile = columnsMetadata.get(colIdx).getPrimitiveType().getId().intValue();
assertThat(fieldIdFromParquetFile).isEqualTo(expectedFieldId);
}
assertThat(metadata.getFileMetaData().getSchema()).isEqualTo(expectedSchema);
}

/**
Expand Down
13 changes: 12 additions & 1 deletion extensions/parquet/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,16 @@ dependencies {
implementation libs.guava

compileOnly libs.jetbrains.annotations
testImplementation libs.junit4

testImplementation libs.assertj

testImplementation platform(libs.junit.bom)
testImplementation libs.junit.jupiter
testRuntimeOnly libs.junit.jupiter.engine
testRuntimeOnly libs.junit.platform.launcher
}

tasks.withType(Test).configureEach {
useJUnitPlatform {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.parquet.impl.ParquetSchemaUtil;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.parquet.compress.CompressorAdapter;
Expand Down Expand Up @@ -68,8 +69,8 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
this.columnName = columnName;
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.path = type
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
this.path =
ParquetSchemaUtil.columnDescriptor(type, columnChunk.meta_data.getPath_in_schema()).orElseThrow();
if (columnChunk.getMeta_data().isSetCodec()) {
decompressor = DeephavenCompressorAdapterFactory.getInstance()
.getByName(columnChunk.getMeta_data().getCodec().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
Expand Down Expand Up @@ -76,11 +77,11 @@ final class ColumnWriterImpl implements ColumnWriter {
final CompressorAdapter compressorAdapter,
final int targetPageSize,
final ByteBufferAllocator allocator) {
this.countingOutput = countingOutput;
this.column = column;
this.compressorAdapter = compressorAdapter;
this.countingOutput = Objects.requireNonNull(countingOutput);
this.column = Objects.requireNonNull(column);
this.compressorAdapter = Objects.requireNonNull(compressorAdapter);
this.targetPageSize = targetPageSize;
this.allocator = allocator;
this.allocator = Objects.requireNonNull(allocator);
dlEncoder = column.getMaxDefinitionLevel() == 0 ? null
: new RunLengthBitPackingHybridEncoder(
getWidthFromMaxInt(column.getMaxDefinitionLevel()), MIN_SLAB_SIZE, targetPageSize, allocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.google.common.io.CountingOutputStream;
import io.deephaven.parquet.compress.CompressorAdapter;
import io.deephaven.parquet.impl.ParquetSchemaUtil;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
Expand All @@ -16,10 +17,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

final class RowGroupWriterImpl implements RowGroupWriter {
private final CountingOutputStream countingOutput;
private final MessageType type;
private final MessageType schema;
private final int targetPageSize;
private final ByteBufferAllocator allocator;
private ColumnWriterImpl activeWriter;
Expand All @@ -28,33 +30,33 @@ final class RowGroupWriterImpl implements RowGroupWriter {
private final CompressorAdapter compressorAdapter;

RowGroupWriterImpl(CountingOutputStream countingOutput,
MessageType type,
MessageType schema,
int targetPageSize,
ByteBufferAllocator allocator,
CompressorAdapter compressorAdapter) {
this(countingOutput, type, targetPageSize, allocator, new BlockMetaData(), compressorAdapter);
this(countingOutput, schema, targetPageSize, allocator, new BlockMetaData(), compressorAdapter);
}


private RowGroupWriterImpl(CountingOutputStream countingOutput,
MessageType type,
MessageType schema,
int targetPageSize,
ByteBufferAllocator allocator,
BlockMetaData blockMetaData,
CompressorAdapter compressorAdapter) {
this.countingOutput = countingOutput;
this.type = type;
this.countingOutput = Objects.requireNonNull(countingOutput);
this.schema = Objects.requireNonNull(schema);
this.targetPageSize = targetPageSize;
this.allocator = allocator;
this.blockMetaData = blockMetaData;
this.compressorAdapter = compressorAdapter;
this.allocator = Objects.requireNonNull(allocator);
this.blockMetaData = Objects.requireNonNull(blockMetaData);
this.compressorAdapter = Objects.requireNonNull(compressorAdapter);
}

String[] getPrimitivePath(String columnName) {
String[] result = {columnName};

Type rollingType;
while (!(rollingType = type.getType(result)).isPrimitive()) {
while (!(rollingType = schema.getType(result)).isPrimitive()) {
GroupType groupType = rollingType.asGroupType();
if (groupType.getFieldCount() != 1) {
throw new UnsupportedOperationException("Encountered struct at:" + Arrays.toString(result));
Expand All @@ -74,7 +76,7 @@ public ColumnWriter addColumn(String columnName) {
}
activeWriter = new ColumnWriterImpl(this,
countingOutput,
type.getColumnDescription(getPrimitivePath(columnName)),
ParquetSchemaUtil.columnDescriptor(schema, getPrimitivePath(columnName)).orElseThrow(),
compressorAdapter,
targetPageSize,
allocator);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.impl;

import org.apache.parquet.column.ColumnDescriptor;
import org.jetbrains.annotations.NotNull;

import javax.annotation.Nullable;

public final class ColumnDescriptorUtil {
/**
* A more thorough check of {@link ColumnDescriptor} equality. In addition to
* {@link ColumnDescriptor#equals(Object)} which only checks the {@link ColumnDescriptor#getPath()}, this also
* checks for the equality of {@link ColumnDescriptor#getPrimitiveType()},
* {@link ColumnDescriptor#getMaxRepetitionLevel()}, and {@link ColumnDescriptor#getMaxDefinitionLevel()}.
*/
public static boolean equals(@NotNull ColumnDescriptor x, @Nullable ColumnDescriptor y) {
return x == y || (x.equals(y)
&& x.getPrimitiveType().equals(y.getPrimitiveType())
&& x.getMaxRepetitionLevel() == y.getMaxRepetitionLevel()
&& x.getMaxDefinitionLevel() == y.getMaxDefinitionLevel());
}
}
Loading