Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Don't fail when reading non-identity partitioning field #6477

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,33 @@
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.iceberg.*;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from
* a {@link Snapshot}
*/
public final class IcebergKeyValuePartitionedLayout extends IcebergBaseLayout {
private static class ColumnData {
private static class IdentityPartitioningColData {
final String name;
final Class<?> type;
final int index;
final int index; // position in the partition spec

public ColumnData(String name, Class<?> type, int index) {
private IdentityPartitioningColData(String name, Class<?> type, int index) {
this.name = name;
this.type = type;
this.index = index;
}
}

private final List<ColumnData> outputPartitioningColumns;
private final List<IdentityPartitioningColData> identityPartitioningColumns;

/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
Expand All @@ -53,32 +53,23 @@ public IcebergKeyValuePartitionedLayout(

// We can assume due to upstream validation that there are no duplicate names (after renaming) that are included
// in the output definition, so we can ignore duplicates.
final MutableInt icebergIndex = new MutableInt(0);
final Map<String, Integer> availablePartitioningColumns = partitionSpec.fields().stream()
.peek(partitionField -> {
// TODO (deephaven-core#6438): Add support to handle non-identity transforms
if (!partitionField.transform().isIdentity()) {
throw new TableDataException("Partition field " + partitionField.name() + " has a " +
"non-identity transform: " + partitionField.transform() + ", which is not supported");
}
})
final AtomicInteger icebergIndex = new AtomicInteger(0);
// TODO (deephaven-core#6438): Add better support for handling non-identity transforms
identityPartitioningColumns = partitionSpec.fields().stream()
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
.filter(partitionField -> partitionField.transform().isIdentity())
.map(PartitionField::name)
.map(name -> instructions.columnRenames().getOrDefault(name, name))
.collect(Collectors.toMap(
name -> name,
name -> icebergIndex.getAndIncrement(),
(v1, v2) -> v1,
LinkedHashMap::new));

outputPartitioningColumns = tableDef.getColumnStream()
.map((final ColumnDefinition<?> columnDef) -> {
final Integer index = availablePartitioningColumns.get(columnDef.getName());
if (index == null) {
return null;
.map(icebergColName -> {
final String dhColName = instructions.columnRenames().getOrDefault(icebergColName, icebergColName);
final ColumnDefinition<?> columnDef = tableDef.getColumn(dhColName);
if (tableDef.getColumn(dhColName) == null) {
throw new TableDataException("Partitioning column " + dhColName + " not found in table " +
"definition but corresponding identity partitioning column " + icebergColName + " is " +
"present in the partition spec, table definition: " + tableDef + ", partition spec: " +
partitionSpec);
}
return new ColumnData(columnDef.getName(), TypeUtils.getBoxedType(columnDef.getDataType()), index);
return new IdentityPartitioningColData(dhColName, TypeUtils.getBoxedType(columnDef.getDataType()),
icebergIndex.getAndIncrement());
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

Expand All @@ -95,12 +86,11 @@ IcebergTableLocationKey keyFromDataFile(
final Map<String, Comparable<?>> partitions = new LinkedHashMap<>();

final PartitionData partitionData = (PartitionData) dataFile.partition();
for (final ColumnData colData : outputPartitioningColumns) {
for (final IdentityPartitioningColData colData : identityPartitioningColumns) {
final String colName = colData.name;
final Object colValue;
final Object valueFromPartitionData = partitionData.get(colData.index);
if (valueFromPartitionData != null) {
// TODO (deephaven-core#6438): Assuming identity transform here
colValue = IdentityPartitionConverters.convertConstant(
partitionData.getType(colData.index), valueFromPartitionData);
if (!colData.type.isAssignableFrom(colValue.getClass())) {
Expand Down
Loading