Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added support to read iceberg tables partitioned by date #6430

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@
//
package io.deephaven.iceberg.base;

import io.deephaven.base.Pair;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -26,7 +21,6 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.jetbrains.annotations.NotNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.iceberg.*;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
Expand Down Expand Up @@ -54,6 +55,13 @@ public IcebergKeyValuePartitionedLayout(
// in the output definition, so we can ignore duplicates.
final MutableInt icebergIndex = new MutableInt(0);
final Map<String, Integer> availablePartitioningColumns = partitionSpec.fields().stream()
.peek(partitionField -> {
// TODO (deephaven-core#6438): Add support to handle non-identity transforms
if (!partitionField.transform().isIdentity()) {
throw new TableDataException("Partition field " + partitionField.name() + " has a " +
"non-identity transform: " + partitionField.transform() + ", which is not supported");
}
})
.map(PartitionField::name)
.map(name -> instructions.columnRenames().getOrDefault(name, name))
.collect(Collectors.toMap(
Expand Down Expand Up @@ -89,11 +97,19 @@ IcebergTableLocationKey keyFromDataFile(
final PartitionData partitionData = (PartitionData) dataFile.partition();
for (final ColumnData colData : outputPartitioningColumns) {
final String colName = colData.name;
final Object colValue = partitionData.get(colData.index);
if (colValue != null && !colData.type.isAssignableFrom(colValue.getClass())) {
throw new TableDataException("Partitioning column " + colName
+ " has type " + colValue.getClass().getName()
+ " but expected " + colData.type.getName());
final Object colValue;
final Object valueFromPartitionData = partitionData.get(colData.index);
if (valueFromPartitionData != null) {
// TODO (deephaven-core#6438): Assuming identity transform here
colValue = IdentityPartitionConverters.convertConstant(
partitionData.getType(colData.index), valueFromPartitionData);
Comment on lines +104 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this okay to universally apply? Should we only do it when the partition spec specifies identity? https://iceberg.apache.org/spec/#partition-transforms

Copy link
Contributor Author

@malhotrashivam malhotrashivam Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check to ensure identity transform and created this issue #6438.
We would need to test properly for non-identity transform.

if (!colData.type.isAssignableFrom(colValue.getClass())) {
throw new TableDataException("Partitioning column " + colName
+ " has type " + colValue.getClass().getName()
+ " but expected " + colData.type.getName());
}
} else {
colValue = null;
}
partitions.put(colName, (Comparable<?>) colValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,14 +754,7 @@ void testPartitionedAppendWithAllPartitioningTypes() {
"DoublePC = (double) 4.0",
"LocalDatePC = LocalDate.parse(`2023-10-01`)")
.moveColumns(7, "data");

// TODO (deephaven-core#6419) Dropping the local data column since it is not supported on the read side.
// Remove this when the issue is fixed.
final TableDefinition tableDefinitionWithoutLocalDate = fromIceberg.dropColumns("LocalDatePC").getDefinition();
final Table fromIcebergWithoutLocalDate = tableAdapter.table(IcebergReadInstructions.builder()
.tableDefinition(tableDefinitionWithoutLocalDate)
.build());
assertTableEquals(expected.dropColumns("LocalDatePC"), fromIcebergWithoutLocalDate);
assertTableEquals(expected, fromIceberg);
}

@Test
Expand Down