Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow parquet column access by field_id #6156

Closed
wants to merge 14 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
* @param materializerFactory The factory for creating {@link PageMaterializer}.
* @param path The path of the column.
* @param uri The uri of the parquet file.
* @param nonRequiredFields The types of the fields in the column.
* @param nonRequiredFields The types of the non-required fields in the column.
* @param dataOffset The offset for data following the page header in the file.
* @param pageHeader The page header, should not be {@code null}.
* @param numValues The number of values in the page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import shaded.parquet.org.apache.thrift.protocol.TSimpleJSONProtocol;
import shaded.parquet.org.apache.thrift.transport.TIOStreamTransport;
import shaded.parquet.org.apache.thrift.transport.TTransport;
Comment on lines +15 to +17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels a little questionable to depend on someone else's shaded packages.


import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;

import static io.deephaven.parquet.base.ParquetUtils.MAGIC;
Expand Down Expand Up @@ -482,4 +488,12 @@ public MessageType getSchema() {
public int rowGroupCount() {
return fileMetaData.getRow_groups().size();
}

// Useful debugging utility
void writeFileMetadata(Path path) throws Exception {
try (final TTransport out = new TIOStreamTransport(new BufferedOutputStream(Files.newOutputStream(path)))) {
fileMetaData.write(new TSimpleJSONProtocol(out));
out.flush();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.base;

import io.deephaven.util.annotations.InternalUseOnly;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.TypeVisitor;

@InternalUseOnly
public final class ParquetTotalColumns {

public static int of(Type type) {
final TotalColumnsVisitor visitor = new TotalColumnsVisitor();
type.accept(visitor);
return visitor.out;
}

public static int of(@SuppressWarnings("unused") PrimitiveType primitiveType) {
return 1;
}

public static int of(GroupType groupType) {
return groupType.getFields().stream().mapToInt(ParquetTotalColumns::of).sum();
}

public static int of(MessageType messageType) {
final int numColumns = of((GroupType) messageType);
// same size as messageType.getColumns().size(), but this is cheaper.
final int numPaths = messageType.getPaths().size();
if (numColumns != numPaths) {
throw new IllegalStateException(
String.format("Inconsistent sizes, numColumns=%d, numPaths=%d", numColumns, numPaths));
}
return numColumns;
}

private static class TotalColumnsVisitor implements TypeVisitor {
private int out;

@Override
public void visit(GroupType groupType) {
out = of(groupType);
}

@Override
public void visit(MessageType messageType) {
out = of(messageType);
}

@Override
public void visit(PrimitiveType primitiveType) {
out = of(primitiveType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
public interface RowGroupReader {
/**
* Returns the accessor to a given Column Chunk. If {@code fieldId} is present, it will be matched over
* {@code path}.
* {@code parquetColumnNamePath}.
*
* @param columnName the name of the column
* @param path the full column path
* @param parquetColumnNamePath the full column parquetColumnNamePath
* @param fieldId the field_id to fetch
* @return the accessor to a given Column Chunk, or null if the column is not present in this Row Group
*/
@Nullable
ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List<String> path, @Nullable Integer fieldId);
ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List<String> defaultPath,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document defaultPath. Is it a parquet path?

@Nullable List<String> parquetColumnNamePath, @Nullable Integer fieldId);

long numRows();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.parquet.format.RowGroup;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Type.ID;
import org.apache.parquet.schema.Type.Repetition;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -16,20 +17,71 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

final class RowGroupReaderImpl implements RowGroupReader {

private class ColumnHolder {
private final int fieldIndex;
private final int columnIndex;

public ColumnHolder(int fieldIndex, int columnIndex) {
this.fieldIndex = fieldIndex;
this.columnIndex = columnIndex;
}

String pathKey() {
return pathInSchema().toString();
}

Integer fieldId() {
final ID id = fieldType().getId();
return id == null ? null : id.intValue();
}

ColumnChunkReaderImpl reader(String columnName) {
return new ColumnChunkReaderImpl(columnName, columnChunk(), channelsProvider, rootURI, schema,
nonRequiredFields(), numRows(), version);
}

private Type fieldType() {
return schema.getFields().get(fieldIndex);
}

private List<String> pathInSchema() {
return columnChunk().getMeta_data().path_in_schema;
}

private ColumnChunk columnChunk() {
return rowGroup.getColumns().get(columnIndex);
}

private List<Type> nonRequiredFields() {
final List<String> path_in_schema = pathInSchema();
final List<Type> nonRequiredFields = new ArrayList<>();
for (int indexInPath = 0; indexInPath < path_in_schema.size(); indexInPath++) {
Type fieldType = schema
.getType(path_in_schema.subList(0, indexInPath + 1).toArray(new String[0]));
if (fieldType.getRepetition() != Repetition.REQUIRED) {
nonRequiredFields.add(fieldType);
}
}
return nonRequiredFields;
}

private String debugString() {
return String.format("colIx=%d, pathKey=%s, fieldId=%d", columnIndex, pathKey(), fieldId());
}
}

private final RowGroup rowGroup;
private final SeekableChannelsProvider channelsProvider;
private final MessageType schema;
private final Map<String, List<Type>> schemaMap;
private final Map<String, ColumnChunk> chunkMap;
private final Map<Integer, List<Type>> schemaMapByFieldId;
private final Map<Integer, ColumnChunk> chunkMapByFieldId;
private final Map<String, ColumnHolder> byPath;
private final Map<Integer, ColumnHolder> byFieldId;

/**
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
Expand All @@ -44,96 +96,80 @@ final class RowGroupReaderImpl implements RowGroupReader {
@NotNull final MessageType schema,
@Nullable final String version) {
final int fieldCount = schema.getFieldCount();
if (rowGroup.getColumnsSize() != fieldCount) {
throw new IllegalStateException(String.format(
"Expected schema columnsCount and row group columns size to be equal, schema.getFieldCount()=%d, rowGroup.getColumnsSize()=%d, rootURI=%s",
fieldCount, rowGroup.getColumnsSize(), rootURI));
}
this.channelsProvider = Objects.requireNonNull(channelsProvider);
this.rowGroup = Objects.requireNonNull(rowGroup);
this.rootURI = Objects.requireNonNull(rootURI);
this.schema = Objects.requireNonNull(schema);
schemaMap = new HashMap<>(fieldCount);
chunkMap = new HashMap<>(fieldCount);
schemaMapByFieldId = new HashMap<>(fieldCount);
chunkMapByFieldId = new HashMap<>(fieldCount);
byPath = new HashMap<>(fieldCount);
byFieldId = new HashMap<>(fieldCount);
// Note: there is no technical guarantee from parquet that column names, path_in_schema, or field_ids are
// unique; it's technically possible that they are duplicated. Ultimately, getColumnChunk is a bad abstraction -
// we shouldn't need to re-do matching for every single row group column chunk, the matching should be done
// _once_ per column to get the column index, and then for every row group we should just need to do
// rowGroup.getColumns().get(columnIndex).
// rowGroup.getColumns().get(columnIx).
//
// Also, this logic divorced from our inference
// (io.deephaven.parquet.table.ParquetSchemaReader.readParquetSchema)
// makes it harder to keep the two in-sync.
final Set<String> nonUniqueKeys = new HashSet<>();
final Set<String> nonUniquePaths = new HashSet<>();
final Set<Integer> nonUniqueFieldIds = new HashSet<>();
final Iterator<Type> fieldsIt = schema.getFields().iterator();
final Iterator<ColumnChunk> colsIt = rowGroup.getColumnsIterator();
while (fieldsIt.hasNext() && colsIt.hasNext()) {
final Type ft = fieldsIt.next();
final ColumnChunk column = colsIt.next();
final List<String> path_in_schema = column.getMeta_data().path_in_schema;
final String key = path_in_schema.toString();
final List<Type> nonRequiredFields = new ArrayList<>();
for (int indexInPath = 0; indexInPath < path_in_schema.size(); indexInPath++) {
Type fieldType = schema
.getType(path_in_schema.subList(0, indexInPath + 1).toArray(new String[0]));
if (fieldType.getRepetition() != Repetition.REQUIRED) {
nonRequiredFields.add(fieldType);
int fieldIx = 0;
int columnIx = 0;
for (final Type fieldType : schema.getFields()) {
final int totalColumns = ParquetTotalColumns.of(fieldType);
if (totalColumns == 1) {
final ColumnHolder holder = new ColumnHolder(fieldIx, columnIx);
final String key = holder.pathKey();
final Integer fieldId = holder.fieldId();
if (byPath.putIfAbsent(key, holder) != null) {
nonUniquePaths.add(key);
}
}
if (chunkMap.putIfAbsent(key, column) != null) {
nonUniqueKeys.add(key);
}
schemaMap.putIfAbsent(key, nonRequiredFields);
if (ft.getId() != null) {
final int fieldId = ft.getId().intValue();
if (chunkMapByFieldId.putIfAbsent(fieldId, column) != null) {
nonUniqueFieldIds.add(fieldId);
if (fieldId != null) {
if (byFieldId.putIfAbsent(fieldId, holder) != null) {
nonUniqueFieldIds.add(fieldId);
}
}
schemaMapByFieldId.putIfAbsent(fieldId, nonRequiredFields);
}
columnIx += totalColumns;
++fieldIx;
}
if (fieldsIt.hasNext() || colsIt.hasNext()) {
throw new IllegalStateException(String.format("Unexpected, iterators not exhausted, rootURI=%s", rootURI));
if (columnIx != schema.getPaths().size()) {
throw new IllegalStateException(
String.format("Inconsistent column count, columnIx=%d, schema.getPaths().size()=%d", columnIx,
schema.getPaths().size()));
}
for (String nonUniqueKey : nonUniqueKeys) {
chunkMap.remove(nonUniqueKey);
schemaMap.remove(nonUniqueKey);
for (String nonUniquePath : nonUniquePaths) {
byPath.remove(nonUniquePath);
}
for (Integer nonUniqueFieldId : nonUniqueFieldIds) {
chunkMapByFieldId.remove(nonUniqueFieldId);
schemaMapByFieldId.remove(nonUniqueFieldId);
byFieldId.remove(nonUniqueFieldId);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last wins or first wins is better than "pretend we had nothing, and just give nulls".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what pyarrow does.

this.version = version;
}

@Override
@Nullable
public ColumnChunkReaderImpl getColumnChunk(@NotNull final String columnName, @NotNull final List<String> path,
public @Nullable ColumnChunkReader getColumnChunk(
@NotNull String columnName,
@NotNull List<String> defaultPath,
@Nullable List<String> parquetColumnNamePath,
@Nullable Integer fieldId) {
final ColumnChunk columnChunk;
final List<Type> nonRequiredFields;
PARTS: {
if (fieldId != null) {
final ColumnChunk cc = chunkMapByFieldId.get(fieldId);
if (cc != null) {
columnChunk = cc;
nonRequiredFields = schemaMapByFieldId.get(fieldId);
break PARTS;
final ColumnHolder holder;
if (fieldId == null && parquetColumnNamePath == null) {
holder = byPath.get(defaultPath.toString());
} else {
final ColumnHolder byFieldId = fieldId == null ? null : this.byFieldId.get(fieldId);
final ColumnHolder byPath =
parquetColumnNamePath == null ? null : this.byPath.get(parquetColumnNamePath.toString());
if (byFieldId != null && byPath != null) {
if (byFieldId != byPath) {
throw new IllegalArgumentException(String.format(
"For columnName=%s, providing an explicit parquet column name path (%s) and field id (%d) mapping, but they are resolving to different columns, byFieldId=[%s], byPath=[%s]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"For columnName=%s, providing an explicit parquet column name path (%s) and field id (%d) mapping, but they are resolving to different columns, byFieldId=[%s], byPath=[%s]",
"For columnName=%s, instructions provided an explicit parquet column name path (%s) and field id (%d) mapping, but they are resolving to different columns, byFieldId=[%s], byPath=[%s]",

columnName, parquetColumnNamePath, fieldId, byFieldId.debugString(), byPath.debugString()));
}
}
final String key = path.toString();
final ColumnChunk cc = chunkMap.get(key);
if (cc == null) {
return null;
}
columnChunk = cc;
nonRequiredFields = schemaMap.get(key);
holder = byFieldId != null ? byFieldId : byPath;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user specified a field ID and we didn't find it, I'm not sure it's correct to fall back to name mapping.
https://iceberg.apache.org/spec/#schema-evolution specifies a set of rules, and we should be making sure our Parquet implementation will let our Iceberg implementation follow them.

For Iceberg support, it looks like we need:

  1. A list of name mappings, which we fall back to if and only if the field ID was not found.
  2. Some kind of handling for encountering multiple Parquet fields with names from the name mappings: first? last? exception?
  3. Some kind of handling for finding a fallback field by name mappings, and determining that it does not match the expected field ID. Exception?

}
return new ColumnChunkReaderImpl(columnName, columnChunk, channelsProvider, rootURI, schema, nonRequiredFields,
numRows(), version);
return holder == null ? null : holder.reader(columnName);
}

@Override
Expand Down
Loading