diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java index 1350561789c8..a06994a6cdb8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java @@ -22,7 +22,6 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -31,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataTableType; @@ -54,6 +52,7 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -127,6 +126,11 @@ public RewritePositionDeleteFiles.Result execute() { validateAndInitOptions(); + if (hasDeletionVectors()) { + LOG.info("v2 deletes in {} have already been rewritten to v3 DVs", table.name()); + return EMPTY_RESULT; + } + StructLikeMap>> fileGroupsByPartition = planFileGroups(); RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); @@ -144,6 +148,33 @@ public RewritePositionDeleteFiles.Result execute() { } } + private boolean hasDeletionVectors() { + if (TableUtil.formatVersion(table) >= 3) { + PositionDeletesBatchScan scan = + (PositionDeletesBatchScan) + MetadataTableUtils.createMetadataTableInstance( + table, MetadataTableType.POSITION_DELETES) + .newBatchScan(); + try (CloseableIterator it = + CloseableIterable.filter( + CloseableIterable.transform( + scan.baseTableFilter(filter) + .caseSensitive(caseSensitive) + .select(PositionDeletesTable.DELETE_FILE_PATH) + .ignoreResiduals() + .planFiles(), + task -> (PositionDeletesScanTask) task), + t -> t.file().format() == FileFormat.PUFFIN) + .iterator()) { + return it.hasNext(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return false; + } + private StructLikeMap>> planFileGroups() { Table deletesTable = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); @@ -407,32 +438,6 @@ private void validateAndInitOptions() { PARTIAL_PROGRESS_MAX_COMMITS, maxCommits, PARTIAL_PROGRESS_ENABLED); - - if (TableUtil.formatVersion(table) >= 3) { - PositionDeletesBatchScan scan = - (PositionDeletesBatchScan) - MetadataTableUtils.createMetadataTableInstance( - table, MetadataTableType.POSITION_DELETES) - .newBatchScan(); - Optional foundPuffinFiles = - StreamSupport.stream( - CloseableIterable.transform( - scan.baseTableFilter(filter) - .caseSensitive(caseSensitive) - .select(PositionDeletesTable.DELETE_FILE_PATH) - .ignoreResiduals() - .planFiles(), - task -> (PositionDeletesScanTask) task) - .spliterator(), - false) - .filter(t -> t.file().format() == FileFormat.PUFFIN) - .findAny(); - - if (foundPuffinFiles.isPresent()) { - throw new IllegalArgumentException( - "Cannot rewrite DVs: v2 deletes have already been rewritten to v3 DVs"); - } - } } private String jobDesc(RewritePositionDeletesGroup group, RewriteExecutionContext ctx) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index c9e93bb0a6a4..ca20e03cd189 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -82,6 +82,7 @@ public class SparkPositionDeletesRewrite implements Write { private final int specId; private final StructLike partition; private final Map writeProperties; + private final int formatVersion; /** * Constructs a {@link SparkPositionDeletesRewrite}. @@ -116,6 +117,18 @@ public class SparkPositionDeletesRewrite implements Write { this.specId = specId; this.partition = partition; this.writeProperties = writeConf.writeProperties(); + this.formatVersion = TableUtil.formatVersion(underlyingTable(table)); + } + + private Table underlyingTable(Table table) { + if (table instanceof SerializableTable.SerializableMetadataTable) { + return underlyingTable( + ((SerializableTable.SerializableMetadataTable) table).underlyingTable()); + } else if (table instanceof BaseMetadataTable) { + return ((BaseMetadataTable) table).table(); + } + + return table; } @Override @@ -141,7 +154,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { dsSchema, specId, partition, - writeProperties); + writeProperties, + formatVersion); } @Override @@ -193,6 +207,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { private final int specId; private final StructLike partition; private final Map writeProperties; + private final int formatVersion; PositionDeletesWriterFactory( Broadcast tableBroadcast, @@ -204,7 +219,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { StructType dsSchema, int specId, StructLike partition, - Map writeProperties) { + Map writeProperties, + int formatVersion) { this.tableBroadcast = tableBroadcast; this.queryId = queryId; this.format = format; @@ -215,24 +231,13 @@ static class PositionDeletesWriterFactory implements DataWriterFactory { this.specId = specId; this.partition = partition; this.writeProperties = writeProperties; - } - - private Table underlyingTable(Table table) { - if (table instanceof SerializableTable.SerializableMetadataTable) { - return underlyingTable( - ((SerializableTable.SerializableMetadataTable) table).underlyingTable()); - } else if (table instanceof BaseMetadataTable) { - return ((BaseMetadataTable) table).table(); - } - - return table; + this.formatVersion = formatVersion; } @Override public DataWriter createWriter(int partitionId, long taskId) { Table table = tableBroadcast.value(); - int formatVersion = TableUtil.formatVersion(underlyingTable(table)); OutputFileFactory deleteFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) .format(formatVersion >= 3 ? FileFormat.PUFFIN : format) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index 80aa2af1fa0e..83f699160610 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -758,9 +758,8 @@ public void testRewriteV2PositionDeletesToV3DVs() throws IOException { assertThat(dvRecords(table)).hasSize(2); // rewriting DVs via rewritePositionDeletes shouldn't be possible anymore - assertThatThrownBy(() -> SparkActions.get(spark).rewritePositionDeletes(table).execute()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot rewrite DVs: v2 deletes have already been rewritten to v3 DVs"); + assertThat(SparkActions.get(spark).rewritePositionDeletes(table).execute().rewriteResults()) + .isEmpty(); } private List dvRecords(Table table) {