Skip to content

Commit

Permalink
Review with Devin
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Dec 10, 2024
1 parent 9ea33ec commit ec3897b
Showing 1 changed file with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from
Expand Down Expand Up @@ -53,24 +51,27 @@ public IcebergKeyValuePartitionedLayout(

// We can assume due to upstream validation that there are no duplicate names (after renaming) that are included
// in the output definition, so we can ignore duplicates.
final AtomicInteger icebergIndex = new AtomicInteger(0);
// TODO (DH-18160): Improve support for handling non-identity transforms
identityPartitioningColumns = partitionSpec.fields().stream()
.filter(partitionField -> partitionField.transform().isIdentity())
.map(PartitionField::name)
.map(icebergColName -> {
final String dhColName = instructions.columnRenames().getOrDefault(icebergColName, icebergColName);
final ColumnDefinition<?> columnDef = tableDef.getColumn(dhColName);
if (tableDef.getColumn(dhColName) == null) {
throw new TableDataException("Partitioning column " + dhColName + " not found in table " +
"definition but corresponding identity partitioning column " + icebergColName + " is " +
"present in the partition spec, table definition: " + tableDef + ", partition spec: " +
partitionSpec);
}
return new IdentityPartitioningColData(dhColName, TypeUtils.getBoxedType(columnDef.getDataType()),
icebergIndex.getAndIncrement());
})
.collect(Collectors.toList());
final List<PartitionField> partitionFields = partitionSpec.fields();
final int numPartitionFields = partitionFields.size();
identityPartitioningColumns = new ArrayList<>(numPartitionFields);
for (int fieldId = 0; fieldId < numPartitionFields; ++fieldId) {
final PartitionField partitionField = partitionFields.get(fieldId);
if (!partitionField.transform().isIdentity()) {
// TODO (DH-18160): Improve support for handling non-identity transforms
continue;
}
final String icebergColName = partitionField.name();
final String dhColName = instructions.columnRenames().getOrDefault(icebergColName, icebergColName);
final ColumnDefinition<?> columnDef = tableDef.getColumn(dhColName);
if (columnDef == null) {
throw new TableDataException("Partitioning column " + dhColName + " not found in table definition " +
"but corresponding identity partitioning column " + icebergColName + " is present in the " +
"partition spec, table definition: " + tableDef + ", partition spec: " + partitionSpec);
}
identityPartitioningColumns.add(new IdentityPartitioningColData(dhColName,
TypeUtils.getBoxedType(columnDef.getDataType()), fieldId));

}
}

@Override
Expand Down

0 comments on commit ec3897b

Please sign in to comment.