Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow parquet column access by field_id #6156

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.channels.SeekableByteChannel;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;

import static io.deephaven.parquet.base.ParquetUtils.resolve;
Expand All @@ -44,7 +45,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
private final CompressorAdapter decompressor;
private final ColumnDescriptor path;
private final OffsetIndexReader offsetIndexReader;
private final List<Type> fieldTypes;
private final List<Type> nonRequiredFields;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final URI columnChunkURI;
/**
Expand All @@ -62,12 +63,12 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
final SeekableChannelsProvider channelsProvider,
final URI rootURI,
final MessageType type,
final List<Type> fieldTypes,
final List<Type> nonRequiredFields,
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
final long numRows,
final String version) {
this.columnName = columnName;
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.columnName = Objects.requireNonNull(columnName);
this.channelsProvider = Objects.requireNonNull(channelsProvider);
this.columnChunk = Objects.requireNonNull(columnChunk);
this.path = type
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
if (columnChunk.getMeta_data().isSetCodec()) {
Expand All @@ -76,7 +77,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
} else {
decompressor = CompressorAdapter.PASSTHRU;
}
this.fieldTypes = fieldTypes;
this.nonRequiredFields = Objects.requireNonNull(nonRequiredFields);
this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary);
this.numRows = numRows;
this.version = version;
Expand Down Expand Up @@ -280,7 +281,8 @@ public ColumnPageReader next(@NotNull final SeekableChannelContext channelContex
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier,
pageMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage);
pageMaterializerFactory, path, getURI(), nonRequiredFields, dataOffset, pageHeader,
numValuesInPage);
} catch (IOException e) {
throw new UncheckedDeephavenException("Error reading page header at offset " + headerOffset + " for " +
"column: " + columnName + ", uri: " + getURI(), e);
Expand Down Expand Up @@ -358,7 +360,7 @@ public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelCo
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier,
pageMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader,
pageMaterializerFactory, path, getURI(), nonRequiredFields, dataOffset, pageHeader,
getNumValues(pageHeader));
} catch (final IOException e) {
throw new UncheckedDeephavenException("Error reading page header for page number " + pageNum +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
private final PageMaterializerFactory pageMaterializerFactory;
private final ColumnDescriptor path;
private final URI uri;
private final List<Type> fieldTypes;
private final List<Type> nonRequiredFields;

/**
* The offset for data following the page header in the file.
Expand All @@ -80,7 +80,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
* @param materializerFactory The factory for creating {@link PageMaterializer}.
* @param path The path of the column.
* @param uri The uri of the parquet file.
* @param fieldTypes The types of the fields in the column.
* @param nonRequiredFields The types of the non-required fields in the column.
* @param dataOffset The offset for data following the page header in the file.
* @param pageHeader The page header, should not be {@code null}.
* @param numValues The number of values in the page.
Expand All @@ -93,7 +93,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
final PageMaterializerFactory materializerFactory,
final ColumnDescriptor path,
final URI uri,
final List<Type> fieldTypes,
final List<Type> nonRequiredFields,
final long dataOffset,
final PageHeader pageHeader,
final int numValues) {
Expand All @@ -104,7 +104,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
this.pageMaterializerFactory = materializerFactory;
this.path = path;
this.uri = uri;
this.fieldTypes = fieldTypes;
this.nonRequiredFields = nonRequiredFields;
this.dataOffset = dataOffset;
this.pageHeader = Require.neqNull(pageHeader, "pageHeader");
this.numValues = Require.geqZero(numValues, "numValues");
Expand Down Expand Up @@ -716,7 +716,7 @@ private Pair<Pair<Type.Repetition, IntBuffer>[], Integer> getOffsetsAndNulls(
int currentRl = rlDecoder == null ? 0 : rlDecoder.currentValue();

final LevelsController levelsController = new LevelsController(
fieldTypes.stream().map(Type::getRepetition).toArray(Type.Repetition[]::new));
nonRequiredFields.stream().map(Type::getRepetition).toArray(Type.Repetition[]::new));
for (int valuesProcessed = 0; valuesProcessed < numValues;) {
if (dlRangeSize == 0) {
dlDecoder.readNextRange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import shaded.parquet.org.apache.thrift.protocol.TSimpleJSONProtocol;
import shaded.parquet.org.apache.thrift.transport.TIOStreamTransport;
import shaded.parquet.org.apache.thrift.transport.TTransport;
Comment on lines +15 to +17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels a little questionable to depend on someone else's shaded packages.


import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;

import static io.deephaven.parquet.base.ParquetUtils.MAGIC;
Expand All @@ -39,7 +45,7 @@ public class ParquetFileReader {
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final MessageType type;
private final MessageType schema;

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
Expand Down Expand Up @@ -102,7 +108,7 @@ private ParquetFileReader(
fileMetaData = Util.readFileMetaData(in);
}
}
type = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
schema = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
}

/**
Expand Down Expand Up @@ -239,7 +245,6 @@ public RowGroupReader getRowGroup(final int groupNumber, final String version) {
fileMetaData.getRow_groups().get(groupNumber),
channelsProvider,
rootURI,
type,
getSchema(),
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
version);
}
Expand Down Expand Up @@ -477,10 +482,18 @@ private static boolean isAdjustedToUTC(final LogicalType logicalType) {
}

public MessageType getSchema() {
return type;
return schema;
}

public int rowGroupCount() {
return fileMetaData.getRow_groups().size();
}

// Useful debugging utility
void writeFileMetadata(Path path) throws Exception {
try (final TTransport out = new TIOStreamTransport(new BufferedOutputStream(Files.newOutputStream(path)))) {
fileMetaData.write(new TSimpleJSONProtocol(out));
out.flush();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.base;

import io.deephaven.util.annotations.InternalUseOnly;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.TypeVisitor;

@InternalUseOnly
public final class ParquetTotalColumns {

public static int of(Type type) {
final TotalColumnsVisitor visitor = new TotalColumnsVisitor();
type.accept(visitor);
return visitor.out;
}

public static int of(@SuppressWarnings("unused") PrimitiveType primitiveType) {
return 1;
}

public static int of(GroupType groupType) {
return groupType.getFields().stream().mapToInt(ParquetTotalColumns::of).sum();
}

public static int of(MessageType messageType) {
final int numColumns = of((GroupType) messageType);
// same size as messageType.getColumns().size(), but this is cheaper.
final int numPaths = messageType.getPaths().size();
if (numColumns != numPaths) {
throw new IllegalStateException(
String.format("Inconsistent sizes, numColumns=%d, numPaths=%d", numColumns, numPaths));
}
return numColumns;
}

private static class TotalColumnsVisitor implements TypeVisitor {
private int out;

@Override
public void visit(GroupType groupType) {
out = of(groupType);
}

@Override
public void visit(MessageType messageType) {
out = of(messageType);
}

@Override
public void visit(PrimitiveType primitiveType) {
out = of(primitiveType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.parquet.base;

import io.deephaven.util.annotations.InternalUseOnly;
import org.apache.parquet.format.RowGroup;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -12,16 +13,20 @@
/**
* Provides read access to a parquet Row Group
*/
@InternalUseOnly
public interface RowGroupReader {
/**
* Returns the accessor to a given Column Chunk
* Returns the accessor to a given Column Chunk. If {@code fieldId} is present, it will be matched over
* {@code parquetColumnNamePath}.
*
* @param columnName the name of the column
* @param path the full column path
* @param parquetColumnNamePath the full column parquetColumnNamePath
* @param fieldId the field_id to fetch
* @return the accessor to a given Column Chunk, or null if the column is not present in this Row Group
*/
@Nullable
ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List<String> path);
ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List<String> defaultPath,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document defaultPath. Is it a parquet path?

@Nullable List<String> parquetColumnNamePath, @Nullable Integer fieldId);

long numRows();

Expand Down
Loading