Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Feb 14, 2025
1 parent 40cc3b5 commit 37eb0d2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -31,7 +30,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataTableType;
Expand All @@ -54,6 +52,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -127,6 +126,11 @@ public RewritePositionDeleteFiles.Result execute() {

validateAndInitOptions();

if (hasDeletionVectors()) {
LOG.info("v2 deletes in {} have already been rewritten to v3 DVs", table.name());
return EMPTY_RESULT;
}

StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition = planFileGroups();
RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);

Expand All @@ -144,6 +148,33 @@ public RewritePositionDeleteFiles.Result execute() {
}
}

private boolean hasDeletionVectors() {
if (TableUtil.formatVersion(table) >= 3) {
PositionDeletesBatchScan scan =
(PositionDeletesBatchScan)
MetadataTableUtils.createMetadataTableInstance(
table, MetadataTableType.POSITION_DELETES)
.newBatchScan();
try (CloseableIterator<PositionDeletesScanTask> it =
CloseableIterable.filter(
CloseableIterable.transform(
scan.baseTableFilter(filter)
.caseSensitive(caseSensitive)
.select(PositionDeletesTable.DELETE_FILE_PATH)
.ignoreResiduals()
.planFiles(),
task -> (PositionDeletesScanTask) task),
t -> t.file().format() == FileFormat.PUFFIN)
.iterator()) {
return it.hasNext();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

return false;
}

private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
Expand Down Expand Up @@ -407,32 +438,6 @@ private void validateAndInitOptions() {
PARTIAL_PROGRESS_MAX_COMMITS,
maxCommits,
PARTIAL_PROGRESS_ENABLED);

if (TableUtil.formatVersion(table) >= 3) {
PositionDeletesBatchScan scan =
(PositionDeletesBatchScan)
MetadataTableUtils.createMetadataTableInstance(
table, MetadataTableType.POSITION_DELETES)
.newBatchScan();
Optional<PositionDeletesScanTask> foundPuffinFiles =
StreamSupport.stream(
CloseableIterable.transform(
scan.baseTableFilter(filter)
.caseSensitive(caseSensitive)
.select(PositionDeletesTable.DELETE_FILE_PATH)
.ignoreResiduals()
.planFiles(),
task -> (PositionDeletesScanTask) task)
.spliterator(),
false)
.filter(t -> t.file().format() == FileFormat.PUFFIN)
.findAny();

if (foundPuffinFiles.isPresent()) {
throw new IllegalArgumentException(
"Cannot rewrite DVs: v2 deletes have already been rewritten to v3 DVs");
}
}
}

private String jobDesc(RewritePositionDeletesGroup group, RewriteExecutionContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class SparkPositionDeletesRewrite implements Write {
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;
private final int formatVersion;

/**
* Constructs a {@link SparkPositionDeletesRewrite}.
Expand Down Expand Up @@ -116,6 +117,18 @@ public class SparkPositionDeletesRewrite implements Write {
this.specId = specId;
this.partition = partition;
this.writeProperties = writeConf.writeProperties();
this.formatVersion = TableUtil.formatVersion(underlyingTable(table));
}

private Table underlyingTable(Table table) {
if (table instanceof SerializableTable.SerializableMetadataTable) {
return underlyingTable(
((SerializableTable.SerializableMetadataTable) table).underlyingTable());
} else if (table instanceof BaseMetadataTable) {
return ((BaseMetadataTable) table).table();
}

return table;
}

@Override
Expand All @@ -141,7 +154,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
dsSchema,
specId,
partition,
writeProperties);
writeProperties,
formatVersion);
}

@Override
Expand Down Expand Up @@ -193,6 +207,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;
private final int formatVersion;

PositionDeletesWriterFactory(
Broadcast<Table> tableBroadcast,
Expand All @@ -204,7 +219,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
StructType dsSchema,
int specId,
StructLike partition,
Map<String, String> writeProperties) {
Map<String, String> writeProperties,
int formatVersion) {
this.tableBroadcast = tableBroadcast;
this.queryId = queryId;
this.format = format;
Expand All @@ -215,24 +231,13 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
this.specId = specId;
this.partition = partition;
this.writeProperties = writeProperties;
}

private Table underlyingTable(Table table) {
if (table instanceof SerializableTable.SerializableMetadataTable) {
return underlyingTable(
((SerializableTable.SerializableMetadataTable) table).underlyingTable());
} else if (table instanceof BaseMetadataTable) {
return ((BaseMetadataTable) table).table();
}

return table;
this.formatVersion = formatVersion;
}

@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
Table table = tableBroadcast.value();

int formatVersion = TableUtil.formatVersion(underlyingTable(table));
OutputFileFactory deleteFileFactory =
OutputFileFactory.builderFor(table, partitionId, taskId)
.format(formatVersion >= 3 ? FileFormat.PUFFIN : format)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,9 +758,8 @@ public void testRewriteV2PositionDeletesToV3DVs() throws IOException {
assertThat(dvRecords(table)).hasSize(2);

// rewriting DVs via rewritePositionDeletes shouldn't be possible anymore
assertThatThrownBy(() -> SparkActions.get(spark).rewritePositionDeletes(table).execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot rewrite DVs: v2 deletes have already been rewritten to v3 DVs");
assertThat(SparkActions.get(spark).rewritePositionDeletes(table).execute().rewriteResults())
.isEmpty();
}

private List<Row> dvRecords(Table table) {
Expand Down

0 comments on commit 37eb0d2

Please sign in to comment.