Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel]Cache the crc info for a snapshot if its version's checksum file is read #4113

Merged
merged 16 commits into from
Feb 5, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.replay.CRCInfo;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
Expand Down Expand Up @@ -153,6 +154,11 @@ public Map<String, DomainMetadata> getDomainMetadataMap() {
return logReplay.getDomainMetadataMap();
}

/** Returns the crc info for the current snapshot if the checksum file is read */
public Optional<CRCInfo> getCurrentCrcInfo() {
return logReplay.getCurrentCrcInfo();
}

public Metadata getMetadata() {
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) {
private final LogSegment logSegment;
private final Tuple2<Protocol, Metadata> protocolAndMetadata;
private final Lazy<Map<String, DomainMetadata>> domainMetadataMap;
private final Optional<CRCInfo> currentCrcInfo;

public LogReplay(
Path logPath,
Expand All @@ -130,13 +131,19 @@ public LogReplay(
LogSegment logSegment,
Optional<SnapshotHint> snapshotHint,
SnapshotMetrics snapshotMetrics) {
assertLogFilesBelongToTable(logPath, logSegment.allLogFilesUnsorted());

assertLogFilesBelongToTable(logPath, logSegment.allLogFilesUnsorted());
Tuple2<Optional<SnapshotHint>, Optional<CRCInfo>> newerSnapshotHintAndCurrentCrcInfo =
maybeGetNewerSnapshotHintAndCurrentCrcInfo(
engine, logSegment, snapshotHint, snapshotVersion);
this.currentCrcInfo = newerSnapshotHintAndCurrentCrcInfo._2;
this.dataPath = dataPath;
this.logSegment = logSegment;
this.protocolAndMetadata =
snapshotMetrics.loadInitialDeltaActionsTimer.time(
() -> loadTableProtocolAndMetadata(engine, logSegment, snapshotHint, snapshotVersion));
() ->
loadTableProtocolAndMetadata(
engine, logSegment, newerSnapshotHintAndCurrentCrcInfo._1, snapshotVersion));
// Lazy loading of domain metadata only when needed
this.domainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine));
}
Expand Down Expand Up @@ -165,6 +172,11 @@ public long getVersion() {
return logSegment.getVersion();
}

/** Returns the crc info for the current snapshot if the checksum file is read */
public Optional<CRCInfo> getCurrentCrcInfo() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason that we only save it if it's the current version? in the future won't we also want to still save the info for an earlier one? (to pass along as much info as possible)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current snapshot is sufficient for unblocking simple crc write, so I only expose the currently version. We could incrementally expose more if needed in the future.

return currentCrcInfo;
}

/**
* Returns an iterator of {@link FilteredColumnarBatch} representing all the active AddFiles in
* the table.
Expand Down Expand Up @@ -212,38 +224,6 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
return new Tuple2<>(snapshotHint.get().getProtocol(), snapshotHint.get().getMetadata());
}

// Snapshot hit is not use-able in this case for determine the lower bound.
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() > snapshotVersion) {
snapshotHint = Optional.empty();
}

long crcSearchLowerBound =
max(
asList(
// Prefer reading hint over CRC, so start listing from hint's version + 1.
snapshotHint.map(SnapshotHint::getVersion).orElse(0L) + 1,
logSegment.getCheckpointVersionOpt().orElse(0L),
// Only find the CRC within 100 versions.
snapshotVersion - 100,
0L));
Optional<CRCInfo> crcInfoOpt =
ChecksumReader.getCRCInfo(
engine, logSegment.getLogPath(), snapshotVersion, crcSearchLowerBound);
if (crcInfoOpt.isPresent()) {
CRCInfo crcInfo = crcInfoOpt.get();
if (crcInfo.getVersion() == snapshotVersion) {
// CRC is related to the desired snapshot version. Load protocol and metadata from CRC.
return new Tuple2<>(crcInfo.getProtocol(), crcInfo.getMetadata());
}
checkArgument(
crcInfo.getVersion() >= crcSearchLowerBound && crcInfo.getVersion() <= snapshotVersion);
// We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion)
// but (b) newer than the current hint. Use this CRCInfo to create a new hint
snapshotHint =
Optional.of(
new SnapshotHint(crcInfo.getVersion(), crcInfo.getProtocol(), crcInfo.getMetadata()));
}

Protocol protocol = null;
Metadata metadata = null;

Expand Down Expand Up @@ -392,4 +372,52 @@ private Map<String, DomainMetadata> loadDomainMetadataMap(Engine engine) {
throw new UncheckedIOException("Could not close iterator", ex);
}
}

/**
* Calculates the latest snapshot hint before or at the current snapshot version, returns the
* CRCInfo if checksum file at the current version is read
*/
private Tuple2<Optional<SnapshotHint>, Optional<CRCInfo>>
maybeGetNewerSnapshotHintAndCurrentCrcInfo(
Engine engine,
LogSegment logSegment,
Optional<SnapshotHint> snapshotHint,
long snapshotVersion) {

// Snapshot hint's version is current.
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() == snapshotVersion) {
return new Tuple2<>(snapshotHint, Optional.empty());
}

// Ignore the snapshot hint whose version is larger.
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() > snapshotVersion) {
snapshotHint = Optional.empty();
}

long crcSearchLowerBound =
max(
asList(
// Prefer reading hint over CRC, so start listing from hint's version + 1,
// if hint is not present, list from version 0.
snapshotHint.map(SnapshotHint::getVersion).orElse(-1L) + 1,
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
logSegment.getCheckpointVersionOpt().orElse(0L),
// Only find the CRC within 100 versions.
snapshotVersion - 100,
0L));
Optional<CRCInfo> crcInfoOpt =
ChecksumReader.getCRCInfo(
engine, logSegment.getLogPath(), snapshotVersion, crcSearchLowerBound);
if (!crcInfoOpt.isPresent()) {
return new Tuple2<>(snapshotHint, Optional.empty());
}
CRCInfo crcInfo = crcInfoOpt.get();
checkArgument(
crcInfo.getVersion() >= crcSearchLowerBound && crcInfo.getVersion() <= snapshotVersion);
// We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion)
// but (b) newer than the current hint. Use this CRCInfo to create a new hint, and return this
// crc info if it matches the current version.
return new Tuple2<>(
Optional.of(SnapshotHint.fromCrcInfo(crcInfo)),
crcInfo.getVersion() == snapshotVersion ? crcInfoOpt : Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.replay.CRCInfo;

/** Contains summary information of a {@link io.delta.kernel.Snapshot}. */
public class SnapshotHint {
private final long version;
private final Protocol protocol;
private final Metadata metadata;

/** Constructs a new SnapshotHint based on a CRCInfo */
public static SnapshotHint fromCrcInfo(CRCInfo crcInfo) {
return new SnapshotHint(crcInfo.getVersion(), crcInfo.getProtocol(), crcInfo.getMetadata());
}

public SnapshotHint(long version, Protocol protocol, Metadata metadata) {
this.version = version;
this.protocol = protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils {
// Tests for loading P & M through checksums files //
/////////////////////////////////////////////////////////////////////////////////////////////////

Seq(-1L, 3L, 4L).foreach { version => // -1 means latest version
Seq(-1L, 0L, 3L, 4L).foreach { version => // -1 means latest version
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test(s"checksum found at the read version: ${if (version == -1) "latest" else version}") {
withTempDirAndMetricsEngine { (path, engine) =>
// Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import io.delta.kernel.internal.data.ScanStateRow
import io.delta.kernel.defaults.engine.DefaultEngine
import io.delta.kernel.defaults.utils.{TestRow, TestUtils}
import io.delta.kernel.Table
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.util.FileNames
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import java.nio.file.Files

class LogReplaySuite extends AnyFunSuite with TestUtils {

Expand Down Expand Up @@ -298,4 +303,60 @@ class LogReplaySuite extends AnyFunSuite with TestUtils {
assert(snapshotImpl.getLatestTransactionVersion(defaultEngine, "fakeAppId") === Optional.of(3L))
assert(!snapshotImpl.getLatestTransactionVersion(defaultEngine, "nonExistentAppId").isPresent)
}

test("current checksum read => snapshot provides crc info") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
spark.sql(
s"CREATE TABLE delta.`$tablePath` USING DELTA AS " +
s"SELECT 0L as id"
)
spark.sql(
s"INSERT INTO delta.`$tablePath` SELECT 1L as id"
)
val table = Table.forPath(defaultEngine, tablePath)
val snapshot = table.getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl]
assert(snapshot.getCurrentCrcInfo.isPresent)
val crcInfo = snapshot.getCurrentCrcInfo.get()
assert(crcInfo.getVersion == 1)
assert(crcInfo.getProtocol == snapshot.getProtocol)
assert(crcInfo.getMetadata == snapshot.getMetadata)
}
}

test("stale checksum read => snapshot doesn't provides crc info") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
spark.sql(
s"CREATE TABLE delta.`$tablePath` USING DELTA AS " +
s"SELECT 0L as id"
)
spark.sql(
s"INSERT INTO delta.`$tablePath` SELECT 1L as id"
)
deleteChecksumFileForTable(tablePath, versions = Seq(1))

val table = Table.forPath(defaultEngine, tablePath)
val snapshot = table.getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl]
assert(!snapshot.getCurrentCrcInfo.isPresent)
}
}

test("no checksum read => snapshot doesn't provides crc info") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
spark.sql(
s"CREATE TABLE delta.`$tablePath` USING DELTA AS " +
s"SELECT 0L as id"
)
spark.sql(
s"INSERT INTO delta.`$tablePath` SELECT 1L as id"
)
deleteChecksumFileForTable(tablePath, versions = Seq(0, 1))

val table = Table.forPath(defaultEngine, tablePath)
val snapshot = table.getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl]
assert(!snapshot.getCurrentCrcInfo.isPresent)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import io.delta.kernel.internal.util.Utils.singletonCloseableIterator
import io.delta.kernel.types._
import io.delta.kernel.utils.CloseableIterator
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.shaded.org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{types => sparktypes}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.delta.util.FileNames
import org.scalatest.Assertions

trait TestUtils extends Assertions with SQLHelper {
Expand Down Expand Up @@ -717,4 +719,16 @@ trait TestUtils extends Assertions with SQLHelper {
}
resource.getFile
}

def deleteChecksumFileForTable(tablePath: String, versions: Seq[Int]): Unit = {
versions.foreach(
v => {
Files.deleteIfExists(
new File(
FileNames.checksumFile(new Path(s"$tablePath/_delta_log"), v).toString
).toPath
)
}
)
}
}
Loading