Skip to content

Commit

Permalink
implement PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Jan 28, 2025
1 parent 509b361 commit 1f489b3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,41 +215,19 @@ public void checkpoint(Engine engine, Clock clock, long version)
////////////////////

/**
* Given a list of delta versions, verifies that they are (1) contiguous, (2) start with
* expectedStartVersion (if provided), and (3) end with expectedEndVersionOpt (if provided).
* Throws an exception if any of these are not true.
* Verify that a list of delta versions is contiguous.
*
* @param versions List of versions in sorted increasing order according
* @throws InvalidTableException if the versions are not contiguous
*/
@VisibleForTesting
public static void verifyDeltaVersions(
List<Long> versions,
Optional<Long> expectedStartVersion,
Optional<Long> expectedEndVersion,
Path tablePath) {
public static void verifyDeltaVersionsContiguous(List<Long> versions, Path tablePath) {
for (int i = 1; i < versions.size(); i++) {
if (versions.get(i) != versions.get(i - 1) + 1) {
throw new InvalidTableException(
tablePath.toString(),
String.format("Missing delta files: versions are not contiguous: (%s)", versions));
}
}
expectedStartVersion.ifPresent(
v -> {
if (versions.isEmpty() || !Objects.equals(versions.get(0), v)) {
throw new InvalidTableException(
tablePath.toString(),
String.format("Cannot compute snapshot. Missing delta file version %d.", v));
}
});
expectedEndVersion.ifPresent(
v -> {
if (versions.isEmpty() || !Objects.equals(ListUtils.getLast(versions), v)) {
throw new InvalidTableException(
tablePath.toString(),
String.format("Cannot compute snapshot. Missing delta file version %d.", v));
}
});
}

/**
Expand Down Expand Up @@ -482,13 +460,13 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT
// Step 9: Perform some basic validations. //
/////////////////////////////////////////////

// Check that we have found at least one checkpoint or delta file
if (!latestCompleteCheckpointOpt.isPresent() && deltasAfterCheckpoint.isEmpty()) {
throw new InvalidTableException(
tablePath.toString(), "No complete checkpoint found and no delta files found");
}

// If there's a checkpoint at version N then we also require that there's a delta file at that
// version.
// Check that, for a checkpoint at version N, there's a delta file at N, too.
if (latestCompleteCheckpointOpt.isPresent()
&& listedDeltaFileStatuses.stream()
.map(x -> FileNames.deltaVersion(new Path(x.getPath())))
Expand All @@ -498,6 +476,7 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT
String.format("Missing delta file for version %s", latestCompleteCheckpointVersion));
}

// Check that the $newVersion we actually loaded is the desired $versionToLoad
versionToLoadOpt.ifPresent(
versionToLoad -> {
if (newVersion < versionToLoad) {
Expand All @@ -512,11 +491,22 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT
});

if (!deltasAfterCheckpoint.isEmpty()) {
verifyDeltaVersions(
deltaVersionsAfterCheckpoint,
Optional.of(latestCompleteCheckpointVersion + 1) /* expected first version */,
versionToLoadOpt /* expected end version */,
tablePath);
// Check that the delta versions are contiguous
verifyDeltaVersionsContiguous(deltaVersionsAfterCheckpoint, tablePath);

// Check that the delta versions start with $latestCompleteCheckpointVersion + 1. If they
// don't, then we have a gap in between the checkpoint and the first delta file.
if (!deltaVersionsAfterCheckpoint.get(0).equals(latestCompleteCheckpointVersion + 1)) {
throw new InvalidTableException(
tablePath.toString(),
String.format(
"Cannot compute snapshot. Missing delta file version %d.",
latestCompleteCheckpointVersion + 1));
}

// Note: We have already asserted above that $versionToLoad equals $newVersion.
// Note: We already know that the last element of deltasAfterCheckpoint is $newVersion IF
// $deltasAfterCheckpoint is not empty.

logger.info(
"Verified delta files are contiguous from version {} to {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,87 +34,25 @@ import org.scalatest.funsuite.AnyFunSuite

class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {

test("verifyDeltaVersions") {
test("verifyDeltaVersionsContiguous") {
val path = new Path("/path/to/table")
// empty array
SnapshotManager.verifyDeltaVersions(
Collections.emptyList(),
Optional.empty(),
Optional.empty(),
new Path("/path/to/table"))
SnapshotManager.verifyDeltaVersionsContiguous(Collections.emptyList(), path)
// array of size 1
SnapshotManager.verifyDeltaVersionsContiguous(Collections.singletonList(1), path)
// contiguous versions
SnapshotManager.verifyDeltaVersions(
Arrays.asList(1, 2, 3),
Optional.empty(),
Optional.empty(),
new Path("/path/to/table"))
// contiguous versions with correct `expectedStartVersion` and `expectedStartVersion`
SnapshotManager.verifyDeltaVersions(
Arrays.asList(1, 2, 3),
Optional.empty(),
Optional.of(3),
new Path("/path/to/table"))
SnapshotManager.verifyDeltaVersions(
Arrays.asList(1, 2, 3),
Optional.of(1),
Optional.empty(),
new Path("/path/to/table"))
SnapshotManager.verifyDeltaVersions(
Arrays.asList(1, 2, 3),
Optional.of(1),
Optional.of(3),
new Path("/path/to/table"))
// `expectedStartVersion` or `expectedEndVersion` doesn't match
SnapshotManager.verifyDeltaVersionsContiguous(Arrays.asList(1, 2, 3), path)
// non-contiguous versions
intercept[InvalidTableException] {
SnapshotManager.verifyDeltaVersions(
Arrays.asList(1, 2),
Optional.of(0),
Optional.empty(),
new Path("/path/to/table"))
}
intercept[InvalidTableException] {
SnapshotManager.verifyDeltaVersions(
Arrays.asList(1, 2),
Optional.empty(),
Optional.of(3),
new Path("/path/to/table"))
}
intercept[InvalidTableException] {
SnapshotManager.verifyDeltaVersions(
Collections.emptyList(),
Optional.of(0),
Optional.empty(),
new Path("/path/to/table"))
}
intercept[InvalidTableException] {
SnapshotManager.verifyDeltaVersions(
Collections.emptyList(),
Optional.empty(),
Optional.of(3),
new Path("/path/to/table"))
}
// non contiguous versions
intercept[InvalidTableException] {
SnapshotManager.verifyDeltaVersions(
Arrays.asList(1, 3),
Optional.empty(),
Optional.empty(),
new Path("/path/to/table"))
SnapshotManager.verifyDeltaVersionsContiguous(Arrays.asList(1, 3), path)
}
// duplicates in versions
intercept[InvalidTableException] {
SnapshotManager.verifyDeltaVersions(
Arrays.asList(1, 2, 2, 3),
Optional.empty(),
Optional.empty(),
new Path("/path/to/table"))
SnapshotManager.verifyDeltaVersionsContiguous(Arrays.asList(1, 2, 2, 3), path)
}
// unsorted versions
intercept[InvalidTableException] {
SnapshotManager.verifyDeltaVersions(
Arrays.asList(3, 2, 1),
Optional.empty(),
Optional.empty(),
new Path("/path/to/table"))
SnapshotManager.verifyDeltaVersionsContiguous(Arrays.asList(3, 2, 1), path)
}
}

Expand Down Expand Up @@ -805,11 +743,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
test("getLogSegmentForVersion: corrupt _last_checkpoint with empty delta log") {
val exMsg = intercept[InvalidTableException] {
snapshotManager.getLogSegmentForVersion(
mockEngine(
jsonHandler = new MockReadLastCheckpointFileJsonHandler(
s"$logPath/_last_checkpoint", 1),
fileSystemClient = new MockListFromFileSystemClient(_ => Seq.empty)
),
createMockFSAndJsonEngineForLastCheckpoint(Seq.empty, Optional.of(1)),
Optional.empty()
)
}.getMessage
Expand Down

0 comments on commit 1f489b3

Please sign in to comment.