Skip to content

Commit

Permalink
KAFKA-10800; Enhance the test for validation when the state machine c…
Browse files Browse the repository at this point in the history
…reates a snapshot (apache#10593)

This patch adds additional test cases covering the validations done when snapshots are created by the state machine.

Reviewers: José Armando García Sancio <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
feyman2016 authored Oct 26, 2021
1 parent 37b3f8c commit 82d5e1c
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.OptionalInt;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;

final public class KafkaRaftClientSnapshotTest {
@Test
Expand Down Expand Up @@ -237,10 +239,7 @@ public void testFetchRequestOffsetLessThanLogStart() throws Exception {
);

// Advance the highWatermark
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, localLogEndOffset, epoch, 0));
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
assertEquals(localLogEndOffset, context.client.highWatermark().getAsLong());
context.advanceLocalLeaderHighWatermarkToLogEndOffset();

OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, epoch);
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(snapshotId.offset - 1, snapshotId.epoch, 0).get()) {
Expand Down Expand Up @@ -280,11 +279,7 @@ public void testFetchRequestWithLargerLastFetchedEpoch() throws Exception {
assertEquals(oldestSnapshotId.epoch + 1, epoch);

// Advance the highWatermark
long localLogEndOffset = context.log.endOffset().offset;
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, localLogEndOffset, epoch, 0));
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
assertEquals(localLogEndOffset, context.client.highWatermark().getAsLong());
context.advanceLocalLeaderHighWatermarkToLogEndOffset();

// Create a snapshot at the high watermark
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) {
Expand Down Expand Up @@ -323,12 +318,7 @@ public void testFetchRequestTruncateToLogStart() throws Exception {
assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch);

// Advance the highWatermark
context.deliverRequest(
context.fetchRequest(epoch, syncNodeId, context.log.endOffset().offset, epoch, 0)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong());
context.advanceLocalLeaderHighWatermarkToLogEndOffset();

// Create a snapshot at the high watermark
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) {
Expand Down Expand Up @@ -371,12 +361,7 @@ public void testFetchRequestAtLogStartOffsetWithValidEpoch() throws Exception {
assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch);

// Advance the highWatermark
context.deliverRequest(
context.fetchRequest(epoch, syncNodeId, context.log.endOffset().offset, epoch, 0)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong());
context.advanceLocalLeaderHighWatermarkToLogEndOffset();

// Create a snapshot at the high watermark
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) {
Expand Down Expand Up @@ -414,12 +399,7 @@ public void testFetchRequestAtLogStartOffsetWithInvalidEpoch() throws Exception
assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch);

// Advance the highWatermark
context.deliverRequest(
context.fetchRequest(epoch, syncNodeId, context.log.endOffset().offset, epoch, 0)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong());
context.advanceLocalLeaderHighWatermarkToLogEndOffset();

// Create a snapshot at the high watermark
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) {
Expand Down Expand Up @@ -463,12 +443,7 @@ public void testFetchRequestWithLastFetchedEpochLessThanOldestSnapshot() throws
assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch);

// Advance the highWatermark
context.deliverRequest(
context.fetchRequest(epoch, syncNodeId, context.log.endOffset().offset, epoch, 0)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong());
context.advanceLocalLeaderHighWatermarkToLogEndOffset();

// Create a snapshot at the high watermark
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) {
Expand Down Expand Up @@ -1535,6 +1510,113 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
}

@Test
public void testCreateSnapshotAsLeaderWithInvalidSnapshotId() throws Exception {
int localId = 0;
int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
int epoch = 2;

List<String> appendRecords = Arrays.asList("a", "b", "c");
OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(3, epoch);

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(epoch, appendRecords)
.withAppendLingerMs(1)
.build();

context.becomeLeader();
int currentEpoch = context.currentEpoch();

// When leader creating snapshot:
// 1.1 high watermark cannot be empty
assertEquals(OptionalLong.empty(), context.client.highWatermark());
assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1.offset, invalidSnapshotId1.epoch, 0));

// 1.2 high watermark must larger than or equal to the snapshotId's endOffset
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
// append some more records to make the LEO > high watermark
List<String> newRecords = Arrays.asList("d", "e", "f");
context.client.scheduleAppend(currentEpoch, newRecords);
context.time.sleep(context.appendLingerMs());
context.client.poll();
assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong() + newRecords.size());

OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch);
assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2.offset, invalidSnapshotId2.epoch, 0));

// 2 the quorum epoch must larger than or equal to the snapshotId's epoch
OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() - 2, currentEpoch + 1);
assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3.offset, invalidSnapshotId3.epoch, 0));

// 3 the snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(epoch);
assertEquals(epoch, endOffsetForEpoch.epoch);
OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset + 1, epoch);
assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset, invalidSnapshotId4.epoch, 0));
}

@Test
public void testCreateSnapshotAsFollowerWithInvalidSnapshotId() throws Exception {
int localId = 0;
int leaderId = 1;
int otherFollowerId = 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, leaderId, otherFollowerId);

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withElectedLeader(epoch, leaderId)
.build();
context.assertElectedLeader(epoch, leaderId);

// When follower creating snapshot:
// 1) The high watermark cannot be empty
assertEquals(OptionalLong.empty(), context.client.highWatermark());
OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(0, 0);
assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1.offset, invalidSnapshotId1.epoch, 0));

// Poll for our first fetch request
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destinationId()));
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);

// The response does not advance the high watermark
List<String> records1 = Arrays.asList("a", "b", "c");
MemoryRecords batch1 = context.buildBatch(0L, 3, records1);
context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(),
context.fetchResponse(epoch, leaderId, batch1, 0L, Errors.NONE));
context.client.poll();

// 2) The high watermark must be larger than or equal to the snapshotId's endOffset
int currentEpoch = context.currentEpoch();
OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch);
assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2.offset, invalidSnapshotId2.epoch, 0));

// 3) The quorum epoch must be larger than or equal to the snapshotId's epoch
OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3.offset, invalidSnapshotId3.epoch, 0));

// The high watermark advances to be larger than log.endOffsetForEpoch(3), to test the case 3
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destinationId()));
context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);

List<String> records2 = Arrays.asList("d", "e", "f");
MemoryRecords batch2 = context.buildBatch(3L, 4, records2);
context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(),
context.fetchResponse(epoch, leaderId, batch2, 6L, Errors.NONE));
context.client.poll();
assertEquals(6L, context.client.highWatermark().getAsLong());

// 4) The snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(3);
assertEquals(3, endOffsetForEpoch.epoch);
OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset + 1, epoch);
assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset, invalidSnapshotId4.epoch, 0));
}

private static FetchSnapshotRequestData fetchSnapshotRequest(
TopicPartition topicPartition,
int epoch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ void assertVotedCandidate(int epoch, int leaderId) throws IOException {
assertEquals(ElectionState.withVotedCandidate(epoch, leaderId, voters), quorumStateStore.readElectionState());
}

void assertElectedLeader(int epoch, int leaderId) throws IOException {
public void assertElectedLeader(int epoch, int leaderId) throws IOException {
assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState());
}

Expand Down

0 comments on commit 82d5e1c

Please sign in to comment.