Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: support statistics files in RewriteTablePath #11929

Merged
merged 2 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
Expand Down Expand Up @@ -127,8 +128,8 @@ public static TableMetadata replacePaths(
metadata.snapshotLog(),
metadataLogEntries,
metadata.refs(),
// TODO: update statistic file paths
metadata.statisticsFiles(),
updatePathInStatisticsFiles(metadata.statisticsFiles(), sourcePrefix, targetPrefix),
// TODO: update partition statistics file paths
metadata.partitionStatisticsFiles(),
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
metadata.changes(),
metadata.rowLineageEnabled(),
Expand Down Expand Up @@ -160,6 +161,20 @@ private static void updatePathInProperty(
}
}

private static List<StatisticsFile> updatePathInStatisticsFiles(
List<StatisticsFile> statisticsFiles, String sourcePrefix, String targetPrefix) {
return statisticsFiles.stream()
.map(
existing ->
new GenericStatisticsFile(
existing.snapshotId(),
newPath(existing.path(), sourcePrefix, targetPrefix),
existing.fileSizeInBytes(),
existing.fileFooterSizeInBytes(),
existing.blobMetadata()))
.collect(Collectors.toList());
}

private static List<TableMetadata.MetadataLogEntry> updatePathInMetadataLogs(
TableMetadata metadata, String sourcePrefix, String targetPrefix) {
List<TableMetadata.MetadataLogEntry> metadataLogEntries =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -272,8 +273,9 @@ private String rebuildMetadata() {
((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current();

Preconditions.checkArgument(
endMetadata.statisticsFiles() == null || endMetadata.statisticsFiles().isEmpty(),
"Statistic files are not supported yet.");
endMetadata.partitionStatisticsFiles() == null
|| endMetadata.partitionStatisticsFiles().isEmpty(),
"Partition statistics files are not supported yet.");

// rebuild version files
RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata);
Expand Down Expand Up @@ -341,7 +343,7 @@ private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot>
private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) {
RewriteResult<Snapshot> result = new RewriteResult<>();
result.toRewrite().addAll(endMetadata.snapshots());
result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName));
result.copyPlan().addAll(rewriteVersionFile(endMetadata, endVersionName));

List<MetadataLogEntry> versions = endMetadata.previousFiles();
for (int i = versions.size() - 1; i >= 0; i--) {
Expand All @@ -357,19 +359,50 @@ private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) {
new StaticTableOperations(versionFilePath, table.io()).current();

result.toRewrite().addAll(tableMetadata.snapshots());
result.copyPlan().add(rewriteVersionFile(tableMetadata, versionFilePath));
result.copyPlan().addAll(rewriteVersionFile(tableMetadata, versionFilePath));
}

return result;
}

private Pair<String, String> rewriteVersionFile(TableMetadata metadata, String versionFilePath) {
private Set<Pair<String, String>> rewriteVersionFile(
szehon-ho marked this conversation as resolved.
Show resolved Hide resolved
TableMetadata metadata, String versionFilePath) {
Set<Pair<String, String>> result = Sets.newHashSet();
String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir);
TableMetadata newTableMetadata =
RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix);
TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath));
return Pair.of(
stagingPath, RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix));
result.add(
Pair.of(
stagingPath,
RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix)));

// include statistics files in copy plan
result.addAll(
statsFileCopyPlan(metadata.statisticsFiles(), newTableMetadata.statisticsFiles()));
return result;
}

private Set<Pair<String, String>> statsFileCopyPlan(
List<StatisticsFile> beforeStats, List<StatisticsFile> afterStats) {
Set<Pair<String, String>> result = Sets.newHashSet();
if (beforeStats.isEmpty()) {
return result;
}

Preconditions.checkArgument(
beforeStats.size() == afterStats.size(),
"Before and after path rewrite, statistic files count should be same");
for (int i = 0; i < beforeStats.size(); i++) {
StatisticsFile before = beforeStats.get(i);
StatisticsFile after = afterStats.get(i);
Preconditions.checkArgument(
before.fileSizeInBytes() == after.fileSizeInBytes(),
"Before and after path rewrite, statistic file size should be same");
result.add(
Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), after.path()));
}
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
Expand All @@ -59,7 +59,6 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -868,25 +867,27 @@ public void testInvalidArgs() {
}

@Test
public void testStatisticFile() throws IOException {
public void testPartitionStatisticFile() throws IOException {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
properties.put("format-version", "2");
String tableName = "v2tblwithstats";
String tableName = "v2tblwithPartStats";
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0);

TableMetadata metadata = currentMetadata(sourceTable);
TableMetadata withStatistics =
TableMetadata withPartStatistics =
TableMetadata.buildFrom(metadata)
.setStatistics(
43,
new GenericStatisticsFile(
43, "/some/path/to/stats/file", 128, 27, ImmutableList.of()))
.setPartitionStatistics(
ImmutableGenericPartitionStatisticsFile.builder()
.snapshotId(11L)
.path("/some/partition/stats/file.parquet")
.fileSizeInBytes(42L)
.build())
.build();

OutputFile file = sourceTable.io().newOutputFile(metadata.metadataFileLocation());
TableMetadataParser.overwrite(withStatistics, file);
TableMetadataParser.overwrite(withPartStatistics, file);

assertThatThrownBy(
() ->
Expand All @@ -895,7 +896,36 @@ public void testStatisticFile() throws IOException {
.rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
.execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Statistic files are not supported yet");
.hasMessageContaining("Partition statistics files are not supported yet");
}

@Test
public void testTableWithManyStatisticFiles() throws IOException {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
properties.put("format-version", "2");
String tableName = "v2tblwithmanystats";
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0);

int iterations = 10;
for (int i = 0; i < iterations; i++) {
sql("insert into hive.default.%s values (%s, 'AAAAAAAAAA', 'AAAA')", tableName, i);
sourceTable.refresh();
actions().computeTableStats(sourceTable).execute();
}

sourceTable.refresh();
assertThat(sourceTable.statisticsFiles().size()).isEqualTo(iterations);

RewriteTablePath.Result result =
actions()
.rewriteTablePath(sourceTable)
.rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
.execute();

checkFileNum(
iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 + 1, result);
}

@Test
Expand Down Expand Up @@ -1063,6 +1093,16 @@ protected void checkFileNum(
int manifestFileCount,
int totalCount,
RewriteTablePath.Result result) {
checkFileNum(versionFileCount, manifestListCount, manifestFileCount, 0, totalCount, result);
}

protected void checkFileNum(
int versionFileCount,
int manifestListCount,
int manifestFileCount,
int statisticsFileCount,
int totalCount,
RewriteTablePath.Result result) {
List<String> filesToMove =
spark
.read()
Expand All @@ -1083,6 +1123,9 @@ protected void checkFileNum(
assertThat(filesToMove.stream().filter(isManifest).count())
.as("Wrong rebuilt Manifest file file count")
.isEqualTo(manifestFileCount);
assertThat(filesToMove.stream().filter(f -> f.endsWith(".stats")).count())
.withFailMessage("Wrong rebuilt Statistic file count")
.isEqualTo(statisticsFileCount);
assertThat(filesToMove.size()).as("Wrong total file count").isEqualTo(totalCount);
}

Expand Down