Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow parquet column access by field_id #6156

Closed
wants to merge 14 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ public RowGroupReader getRowGroup(final int groupNumber, final String version) {
fileMetaData.getRow_groups().get(groupNumber),
channelsProvider,
rootURI,
type,
getSchema(),
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.parquet.base;

import io.deephaven.util.annotations.InternalUseOnly;
import org.apache.parquet.format.RowGroup;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -12,16 +13,19 @@
/**
* Provides read access to a parquet Row Group
*/
@InternalUseOnly
public interface RowGroupReader {
/**
* Returns the accessor to a given Column Chunk
* Returns the accessor to a given Column Chunk. If {@code fieldId} is present, it will be matched over
* {@code path}.
*
* @param columnName the name of the column
* @param path the full column path
* @param fieldId the field_id to fetch
* @return the accessor to a given Column Chunk, or null if the column is not present in this Row Group
*/
@Nullable
ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List<String> path);
ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List<String> path, @Nullable Integer fieldId);

long numRows();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

final class RowGroupReaderImpl implements RowGroupReader {
private final RowGroup rowGroup;
private final SeekableChannelsProvider channelsProvider;
private final MessageType type;
private final Map<String, List<Type>> schemaMap = new HashMap<>();
private final Map<String, ColumnChunk> chunkMap = new HashMap<>();
private final MessageType schema;
private final Map<String, List<Type>> schemaMap;
private final Map<String, ColumnChunk> chunkMap;
private final Map<Integer, List<Type>> schemaMapByFieldId;
private final Map<Integer, ColumnChunk> chunkMapByFieldId;
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved

/**
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
Expand All @@ -34,40 +37,74 @@ final class RowGroupReaderImpl implements RowGroupReader {
@NotNull final RowGroup rowGroup,
@NotNull final SeekableChannelsProvider channelsProvider,
@NotNull final URI rootURI,
@NotNull final MessageType type,
@NotNull final MessageType schema,
@Nullable final String version) {
final int fieldCount = schema.getFieldCount();
if (rowGroup.getColumnsSize() != fieldCount) {
throw new IllegalStateException(String.format(
"Expected schema fieldCount and row group columns siize to be equal, schema.getFieldCount()=%d, rowGroup.getColumnsSize()=%d, rootURI=%s",
fieldCount, rowGroup.getColumnsSize(), rootURI));
}
this.channelsProvider = channelsProvider;
this.rowGroup = rowGroup;
this.rootURI = rootURI;
this.type = type;
for (ColumnChunk column : rowGroup.columns) {
List<String> path_in_schema = column.getMeta_data().path_in_schema;
String key = path_in_schema.toString();
chunkMap.put(key, column);
List<Type> nonRequiredFields = new ArrayList<>();
this.schema = schema;
schemaMap = new HashMap<>(fieldCount);
chunkMap = new HashMap<>(fieldCount);
schemaMapByFieldId = new HashMap<>(fieldCount);
chunkMapByFieldId = new HashMap<>(fieldCount);
final Iterator<Type> fieldsIt = schema.getFields().iterator();
final Iterator<ColumnChunk> colsIt = rowGroup.getColumnsIterator();
while (fieldsIt.hasNext() && colsIt.hasNext()) {
final Type ft = fieldsIt.next();
final ColumnChunk column = colsIt.next();
final List<String> path_in_schema = column.getMeta_data().path_in_schema;
final String key = path_in_schema.toString();
final List<Type> nonRequiredFields = new ArrayList<>();
for (int indexInPath = 0; indexInPath < path_in_schema.size(); indexInPath++) {
Type fieldType = schema
.getType(path_in_schema.subList(0, indexInPath + 1).toArray(new String[0]));
if (fieldType.getRepetition() != Type.Repetition.REQUIRED) {
nonRequiredFields.add(fieldType);
}
}
chunkMap.put(key, column);
schemaMap.put(key, nonRequiredFields);
if (ft.getId() != null) {
chunkMapByFieldId.put(ft.getId().intValue(), column);
schemaMapByFieldId.put(ft.getId().intValue(), nonRequiredFields);
}
}
if (fieldsIt.hasNext() || colsIt.hasNext()) {
throw new IllegalStateException(String.format("Unexpected, iterators not exhausted, rootURI=%s", rootURI));
}
this.version = version;
}

@Override
@Nullable
public ColumnChunkReaderImpl getColumnChunk(@NotNull final String columnName, @NotNull final List<String> path) {
final String key = path.toString();
final ColumnChunk columnChunk = chunkMap.get(key);
final List<Type> fieldTypes = schemaMap.get(key);
if (columnChunk == null) {
return null;
public ColumnChunkReaderImpl getColumnChunk(@NotNull final String columnName, @NotNull final List<String> path,
@Nullable Integer fieldId) {
final ColumnChunk columnChunk;
final List<Type> nonRequiredFields;
PARTS: {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
if (fieldId != null) {
final ColumnChunk cc = chunkMapByFieldId.get(fieldId);
if (cc != null) {
columnChunk = cc;
nonRequiredFields = schemaMapByFieldId.get(fieldId);
break PARTS;
}
}
final String key = path.toString();
final ColumnChunk cc = chunkMap.get(key);
if (cc == null) {
return null;
}
columnChunk = cc;
nonRequiredFields = schemaMap.get(key);
}
return new ColumnChunkReaderImpl(columnName, columnChunk, channelsProvider, rootURI, type, fieldTypes,
return new ColumnChunkReaderImpl(columnName, columnChunk, channelsProvider, rootURI, schema, nonRequiredFields,
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
numRows(), version);
}

Expand Down
Loading
Loading