From 82d5e1cf1439b01609b180e2727e665986223416 Mon Sep 17 00:00:00 2001 From: feyman2016 Date: Tue, 26 Oct 2021 21:03:46 +0000 Subject: [PATCH] KAFKA-10800; Enhance the test for validation when the state machine creates a snapshot (#10593) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch adds additional test cases covering the validations done when snapshots are created by the state machine. Reviewers: José Armando García Sancio , Jason Gustafson --- .../raft/KafkaRaftClientSnapshotTest.java | 148 ++++++++++++++---- .../kafka/raft/RaftClientTestContext.java | 2 +- 2 files changed, 116 insertions(+), 34 deletions(-) diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index f77103110603b..42db40c7eff0f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -42,11 +42,13 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.OptionalLong; import java.util.OptionalInt; import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; final public class KafkaRaftClientSnapshotTest { @Test @@ -237,10 +239,7 @@ public void testFetchRequestOffsetLessThanLogStart() throws Exception { ); // Advance the highWatermark - context.deliverRequest(context.fetchRequest(epoch, otherNodeId, localLogEndOffset, epoch, 0)); - context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); - assertEquals(localLogEndOffset, context.client.highWatermark().getAsLong()); + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, epoch); try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId.offset - 1, snapshotId.epoch, 0).get()) { @@ -280,11 +279,7 @@ public void testFetchRequestWithLargerLastFetchedEpoch() throws Exception { assertEquals(oldestSnapshotId.epoch + 1, epoch); // Advance the highWatermark - long localLogEndOffset = context.log.endOffset().offset; - context.deliverRequest(context.fetchRequest(epoch, otherNodeId, localLogEndOffset, epoch, 0)); - context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); - assertEquals(localLogEndOffset, context.client.highWatermark().getAsLong()); + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { @@ -323,12 +318,7 @@ public void testFetchRequestTruncateToLogStart() throws Exception { assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch); // Advance the highWatermark - context.deliverRequest( - context.fetchRequest(epoch, syncNodeId, context.log.endOffset().offset, epoch, 0) - ); - context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); - assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong()); + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { @@ -371,12 +361,7 @@ public void testFetchRequestAtLogStartOffsetWithValidEpoch() throws Exception { assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch); // Advance the highWatermark - context.deliverRequest( - context.fetchRequest(epoch, syncNodeId, context.log.endOffset().offset, epoch, 0) - ); - context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); - assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong()); + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { @@ -414,12 +399,7 @@ public void testFetchRequestAtLogStartOffsetWithInvalidEpoch() throws Exception assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch); // Advance the highWatermark - context.deliverRequest( - context.fetchRequest(epoch, syncNodeId, context.log.endOffset().offset, epoch, 0) - ); - context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); - assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong()); + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { @@ -463,12 +443,7 @@ public void testFetchRequestWithLastFetchedEpochLessThanOldestSnapshot() throws assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch); // Advance the highWatermark - context.deliverRequest( - context.fetchRequest(epoch, syncNodeId, context.log.endOffset().offset, epoch, 0) - ); - context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); - assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong()); + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { @@ -1535,6 +1510,113 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception { context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); } + @Test + public void testCreateSnapshotAsLeaderWithInvalidSnapshotId() throws Exception { + int localId = 0; + int otherNodeId = localId + 1; + Set voters = Utils.mkSet(localId, otherNodeId); + int epoch = 2; + + List appendRecords = Arrays.asList("a", "b", "c"); + OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .appendToLog(epoch, appendRecords) + .withAppendLingerMs(1) + .build(); + + context.becomeLeader(); + int currentEpoch = context.currentEpoch(); + + // When leader creating snapshot: + // 1.1 high watermark cannot be empty + assertEquals(OptionalLong.empty(), context.client.highWatermark()); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1.offset, invalidSnapshotId1.epoch, 0)); + + // 1.2 high watermark must larger than or equal to the snapshotId's endOffset + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); + // append some more records to make the LEO > high watermark + List newRecords = Arrays.asList("d", "e", "f"); + context.client.scheduleAppend(currentEpoch, newRecords); + context.time.sleep(context.appendLingerMs()); + context.client.poll(); + assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong() + newRecords.size()); + + OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2.offset, invalidSnapshotId2.epoch, 0)); + + // 2 the quorum epoch must larger than or equal to the snapshotId's epoch + OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() - 2, currentEpoch + 1); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3.offset, invalidSnapshotId3.epoch, 0)); + + // 3 the snapshotId should be validated against endOffsetForEpoch + OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(epoch); + assertEquals(epoch, endOffsetForEpoch.epoch); + OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset + 1, epoch); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset, invalidSnapshotId4.epoch, 0)); + } + + @Test + public void testCreateSnapshotAsFollowerWithInvalidSnapshotId() throws Exception { + int localId = 0; + int leaderId = 1; + int otherFollowerId = 2; + int epoch = 5; + Set voters = Utils.mkSet(localId, leaderId, otherFollowerId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withElectedLeader(epoch, leaderId) + .build(); + context.assertElectedLeader(epoch, leaderId); + + // When follower creating snapshot: + // 1) The high watermark cannot be empty + assertEquals(OptionalLong.empty(), context.client.highWatermark()); + OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(0, 0); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1.offset, invalidSnapshotId1.epoch, 0)); + + // Poll for our first fetch request + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); + assertTrue(voters.contains(fetchRequest.destinationId())); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); + + // The response does not advance the high watermark + List records1 = Arrays.asList("a", "b", "c"); + MemoryRecords batch1 = context.buildBatch(0L, 3, records1); + context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), + context.fetchResponse(epoch, leaderId, batch1, 0L, Errors.NONE)); + context.client.poll(); + + // 2) The high watermark must be larger than or equal to the snapshotId's endOffset + int currentEpoch = context.currentEpoch(); + OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2.offset, invalidSnapshotId2.epoch, 0)); + + // 3) The quorum epoch must be larger than or equal to the snapshotId's epoch + OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3.offset, invalidSnapshotId3.epoch, 0)); + + // The high watermark advances to be larger than log.endOffsetForEpoch(3), to test the case 3 + context.pollUntilRequest(); + fetchRequest = context.assertSentFetchRequest(); + assertTrue(voters.contains(fetchRequest.destinationId())); + context.assertFetchRequestData(fetchRequest, epoch, 3L, 3); + + List records2 = Arrays.asList("d", "e", "f"); + MemoryRecords batch2 = context.buildBatch(3L, 4, records2); + context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(), + context.fetchResponse(epoch, leaderId, batch2, 6L, Errors.NONE)); + context.client.poll(); + assertEquals(6L, context.client.highWatermark().getAsLong()); + + // 4) The snapshotId should be validated against endOffsetForEpoch + OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(3); + assertEquals(3, endOffsetForEpoch.epoch); + OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset + 1, epoch); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset, invalidSnapshotId4.epoch, 0)); + } + private static FetchSnapshotRequestData fetchSnapshotRequest( TopicPartition topicPartition, int epoch, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index e8fcc06c3ef97..00c351cc26a26 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -427,7 +427,7 @@ void assertVotedCandidate(int epoch, int leaderId) throws IOException { assertEquals(ElectionState.withVotedCandidate(epoch, leaderId, voters), quorumStateStore.readElectionState()); } - void assertElectedLeader(int epoch, int leaderId) throws IOException { + public void assertElectedLeader(int epoch, int leaderId) throws IOException { assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState()); }