Skip to content

Commit

Permalink
KAFKA-9538; Fix flaky test testResetOffsetsExportImportPlan (apache…
Browse files Browse the repository at this point in the history
…#6561)

This patch adds logic to the test case to ensure that consumer groups are in a valid state prior to attempting offset reset.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
huxihx authored Feb 13, 2020
1 parent 97d2c72 commit 46e80db
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,11 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
val cgcArgs = buildArgsForGroups(Seq(group1, group2), "--all-topics", "--to-offset", "2", "--export")
val consumerGroupCommand = getConsumerGroupService(cgcArgs)

produceConsumeAndShutdown(topic = topic1, group = group1, totalMessages = 100, numConsumers = 2)
produceConsumeAndShutdown(topic = topic2, group = group2, totalMessages = 100, numConsumers = 5)
produceConsumeAndShutdown(topic = topic1, group = group1, totalMessages = 100)
produceConsumeAndShutdown(topic = topic2, group = group2, totalMessages = 100)

awaitConsumerGroupInactive(consumerGroupCommand, group1)
awaitConsumerGroupInactive(consumerGroupCommand, group2)

val file = File.createTempFile("reset", ".csv")
file.deleteOnExit()
Expand Down Expand Up @@ -487,6 +490,13 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {

}

private def awaitConsumerGroupInactive(consumerGroupService: ConsumerGroupService, group: String): Unit = {
TestUtils.waitUntilTrue(() => {
val state = consumerGroupService.collectGroupState(group).state
state == "Empty" || state == "Dead"
}, s"Expected that consumer group is inactive. Actual state: ${consumerGroupService.collectGroupState(group).state}")
}

private def resetAndAssertOffsets(args: Array[String],
expectedOffset: Long,
dryRun: Boolean = false,
Expand Down

0 comments on commit 46e80db

Please sign in to comment.