Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Parquet field_id writing #6381

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.nio.IntBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.Set;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
Expand All @@ -57,7 +57,9 @@ final class ColumnWriterImpl implements ColumnWriter {
private final RunLengthBitPackingHybridEncoder dlEncoder;
private final RunLengthBitPackingHybridEncoder rlEncoder;
private long dictionaryOffset = -1;
private final Set<Encoding> encodings = new HashSet<>();
// The downstream writing code (ParquetFileWriter) seems to respect the traversal order of this set. As such, to
// improve determinism, we are using an EnumSet.
private final Set<Encoding> encodings = EnumSet.noneOf(Encoding.class);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
private long firstDataPageOffset = -1;
private long uncompressedLength;
private long compressedLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
*/
class MappedSchema {

@VisibleForTesting
static final String SCHEMA_NAME = "root";

static MappedSchema create(
final Map<String, Map<ParquetCacheTags, Object>> computedCache,
final TableDefinition definition,
Expand All @@ -32,7 +35,7 @@ static MappedSchema create(
for (final ColumnDefinition<?> columnDefinition : definition.getColumns()) {
builder.addField(createType(computedCache, columnDefinition, rowSet, columnSourceMap, instructions));
}
final MessageType schema = builder.named("root");
final MessageType schema = builder.named(SCHEMA_NAME);
return new MappedSchema(definition, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
//
package io.deephaven.parquet.table;

import io.deephaven.api.util.NameValidator;
import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.ColumnToCodecMappings;
import io.deephaven.hash.KeyedObjectHashMap;
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.hash.KeyedObjectKey.Basic;
import io.deephaven.parquet.base.ParquetUtils;
import io.deephaven.util.annotations.VisibleForTesting;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -21,6 +23,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -114,6 +117,14 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract boolean useDictionary(String columnName);

/**
* The field ID for the given {@code columnName}.
*
* @param columnName the Deephaven column name
* @return the field id
*/
public abstract OptionalInt getFieldId(final String columnName);

public abstract Object getSpecialInstructions();

public abstract String getCompressionCodecName();
Expand Down Expand Up @@ -200,6 +211,7 @@ public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions
}

public static final ParquetInstructions EMPTY = new ParquetInstructions() {

@Override
public String getParquetColumnNameFromColumnNameOrDefault(final String columnName) {
return columnName;
Expand Down Expand Up @@ -228,6 +240,11 @@ public boolean useDictionary(final String columnName) {
return false;
}

@Override
public OptionalInt getFieldId(String columnName) {
return OptionalInt.empty();
}

@Override
@Nullable
public Object getSpecialInstructions() {
Expand Down Expand Up @@ -319,14 +336,31 @@ ParquetInstructions withIndexColumns(final Collection<List<String>> indexColumns
};

private static class ColumnInstructions {

private static final KeyedObjectKey<String, ColumnInstructions> COLUMN_NAME_KEY = new Basic<>() {
@Override
public String getKey(@NotNull final ColumnInstructions columnInstructions) {
return columnInstructions.getColumnName();
}
};

private static final KeyedObjectKey<String, ColumnInstructions> PARQUET_COLUMN_NAME_KEY = new Basic<>() {
@Override
public String getKey(@NotNull final ColumnInstructions columnInstructions) {
return columnInstructions.getParquetColumnName();
}
};

private final String columnName;
private String parquetColumnName;
private String codecName;
private String codecArgs;
private boolean useDictionary;
private Integer fieldId;

public ColumnInstructions(final String columnName) {
this.columnName = columnName;
this.columnName = Objects.requireNonNull(columnName);
NameValidator.validateColumnName(columnName);
}

public String getColumnName() {
Expand All @@ -338,6 +372,12 @@ public String getParquetColumnName() {
}

public ColumnInstructions setParquetColumnName(final String parquetColumnName) {
if (this.parquetColumnName != null && !this.parquetColumnName.equals(parquetColumnName)) {
throw new IllegalArgumentException(
"Cannot add a mapping from parquetColumnName=" + parquetColumnName
+ ": columnName=" + columnName + " already mapped to parquetColumnName="
+ this.parquetColumnName);
}
this.parquetColumnName = parquetColumnName;
return this;
}
Expand Down Expand Up @@ -367,6 +407,19 @@ public boolean useDictionary() {
public void useDictionary(final boolean useDictionary) {
this.useDictionary = useDictionary;
}

public OptionalInt fieldId() {
return fieldId == null ? OptionalInt.empty() : OptionalInt.of(fieldId);
}

public void setFieldId(final int fieldId) {
if (this.fieldId != null && this.fieldId != fieldId) {
throw new IllegalArgumentException(
String.format("Inconsistent fieldId for columnName=%s, already set fieldId=%d", columnName,
this.fieldId));
}
this.fieldId = fieldId;
}
}

private static final class ReadOnly extends ParquetInstructions {
Expand Down Expand Up @@ -424,8 +477,8 @@ private ReadOnly(
.collect(Collectors.toUnmodifiableList());
}

private String getOrDefault(final String columnName, final String defaultValue,
final Function<ColumnInstructions, String> fun) {
private <T> T getOrDefault(final String columnName, final T defaultValue,
final Function<ColumnInstructions, T> fun) {
if (columnNameToInstructions == null) {
return defaultValue;
}
Expand Down Expand Up @@ -480,6 +533,11 @@ public boolean useDictionary(final String columnName) {
return getOrDefault(columnName, false, ColumnInstructions::useDictionary);
}

@Override
public OptionalInt getFieldId(String columnName) {
return getOrDefault(columnName, OptionalInt.empty(), ColumnInstructions::fieldId);
}

@Override
public String getCompressionCodecName() {
return compressionCodecName;
Expand Down Expand Up @@ -656,75 +714,22 @@ public Builder(final ParquetInstructions parquetInstructions) {
indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null);
}

private void newColumnNameToInstructionsMap() {
columnNameToInstructions = new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() {
@Override
public String getKey(@NotNull final ColumnInstructions value) {
return value.getColumnName();
}
});
}

private void newParquetColumnNameToInstructionsMap() {
parquetColumnNameToInstructions =
new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() {
@Override
public String getKey(@NotNull final ColumnInstructions value) {
return value.getParquetColumnName();
}
});
}

public Builder addColumnNameMapping(final String parquetColumnName, final String columnName) {
if (parquetColumnName.equals(columnName)) {
return this;
}
if (columnNameToInstructions == null) {
newColumnNameToInstructionsMap();
final ColumnInstructions ci = new ColumnInstructions(columnName);
ci.setParquetColumnName(parquetColumnName);
columnNameToInstructions.put(columnName, ci);
newParquetColumnNameToInstructionsMap();
parquetColumnNameToInstructions.put(parquetColumnName, ci);
return this;
}

ColumnInstructions ci = columnNameToInstructions.get(columnName);
if (ci != null) {
if (ci.parquetColumnName != null) {
if (ci.parquetColumnName.equals(parquetColumnName)) {
return this;
}
throw new IllegalArgumentException(
"Cannot add a mapping from parquetColumnName=" + parquetColumnName
+ ": columnName=" + columnName + " already mapped to parquetColumnName="
+ ci.parquetColumnName);
}
} else {
ci = new ColumnInstructions(columnName);
columnNameToInstructions.put(columnName, ci);
}

final ColumnInstructions ci = getOrCreateColumnInstructions(columnName);
ci.setParquetColumnName(parquetColumnName);
if (parquetColumnNameToInstructions == null) {
newParquetColumnNameToInstructionsMap();
parquetColumnNameToInstructions.put(parquetColumnName, ci);
return this;
parquetColumnNameToInstructions = new KeyedObjectHashMap<>(ColumnInstructions.PARQUET_COLUMN_NAME_KEY);
}

final ColumnInstructions fromParquetColumnNameInstructions =
parquetColumnNameToInstructions.get(parquetColumnName);
if (fromParquetColumnNameInstructions != null) {
if (fromParquetColumnNameInstructions == ci) {
return this;
}
final ColumnInstructions existing = parquetColumnNameToInstructions.putIfAbsent(parquetColumnName, ci);
if (existing != null) {
// Note: this is a limitation that doesn't need to exist. Technically, we could allow a single physical
// parquet column to manifest as multiple Deephaven columns.
throw new IllegalArgumentException(
"Cannot add new mapping from parquetColumnName=" + parquetColumnName + " to columnName="
+ columnName
+ ": already mapped to columnName="
+ fromParquetColumnNameInstructions.getColumnName());
+ existing.getColumnName());
}
ci.setParquetColumnName(parquetColumnName);
parquetColumnNameToInstructions.put(parquetColumnName, ci);
return this;
}

Expand All @@ -737,7 +742,7 @@ public Builder addColumnCodec(final String columnName, final String codecName) {
}

public Builder addColumnCodec(final String columnName, final String codecName, final String codecArgs) {
final ColumnInstructions ci = getColumnInstructions(columnName);
final ColumnInstructions ci = getOrCreateColumnInstructions(columnName);
ci.setCodecName(codecName);
ci.setCodecArgs(codecArgs);
return this;
Expand All @@ -751,21 +756,35 @@ public Builder addColumnCodec(final String columnName, final String codecName, f
* @param useDictionary The hint value
*/
public Builder useDictionary(final String columnName, final boolean useDictionary) {
final ColumnInstructions ci = getColumnInstructions(columnName);
final ColumnInstructions ci = getOrCreateColumnInstructions(columnName);
ci.useDictionary(useDictionary);
return this;
}

private ColumnInstructions getColumnInstructions(final String columnName) {
final ColumnInstructions ci;
/**
* This is currently only used for writing, allowing the setting of {@code field_id} in the proper Parquet
* {@code SchemaElement}.
*
* <p>
* Setting multiple field ids for a single column name is not allowed.
*
* <p>
* Field ids are not typically configured by end users.
*
* @param columnName the Deephaven column name
* @param fieldId the field id
*/
public Builder setFieldId(final String columnName, final int fieldId) {
final ColumnInstructions ci = getOrCreateColumnInstructions(columnName);
ci.setFieldId(fieldId);
return this;
}

private ColumnInstructions getOrCreateColumnInstructions(final String columnName) {
if (columnNameToInstructions == null) {
newColumnNameToInstructionsMap();
ci = new ColumnInstructions(columnName);
columnNameToInstructions.put(columnName, ci);
} else {
ci = columnNameToInstructions.putIfAbsent(columnName, ColumnInstructions::new);
columnNameToInstructions = new KeyedObjectHashMap<>(ColumnInstructions.COLUMN_NAME_KEY);
}
return ci;
return columnNameToInstructions.putIfAbsent(columnName, ColumnInstructions::new);
}

public Builder setCompressionCodecName(final String compressionCodecName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.util.codec.SerializableCodec;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
Expand Down Expand Up @@ -481,14 +482,17 @@ default Type createSchemaType(
isRepeating = false;
}
if (!isRepeating) {
instructions.getFieldId(columnDefinition.getName()).ifPresent(builder::id);
return builder.named(parquetColumnName);
}
// Note: the Parquet type builder would take care of the element name for us if we were constructing it
// ahead of time via ListBuilder.optionalElement
// (org.apache.parquet.schema.Types.BaseListBuilder.ElementBuilder.named) when we named the outer list; but
// since we are constructing types recursively (without regard to the outer type), we are responsible for
// setting the element name correctly at this point in time.
return Types.optionalList()
final Types.ListBuilder<GroupType> listBuilder = Types.optionalList();
instructions.getFieldId(columnDefinition.getName()).ifPresent(listBuilder::id);
return listBuilder
.element(builder.named(ELEMENT_NAME))
.named(parquetColumnName);
}
Expand Down
Loading