diff --git a/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AbstractAtLeastOnceKafkaConsumerMockitoTest.java b/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AbstractAtLeastOnceKafkaConsumerMockitoTest.java index 90de976eb5..306e33dfef 100644 --- a/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AbstractAtLeastOnceKafkaConsumerMockitoTest.java +++ b/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AbstractAtLeastOnceKafkaConsumerMockitoTest.java @@ -76,7 +76,7 @@ public class AbstractAtLeastOnceKafkaConsumerMockitoTest { @Test public void testThatStartSubscribesAndPolls(final VertxTestContext ctx) { - final KafkaConsumer mockKafkaConsumer = mockVertxKafkaConsumer(0, 1); + final KafkaConsumer mockKafkaConsumer = mockVertxKafkaConsumer(0); final AbstractAtLeastOnceKafkaConsumer underTest = new TestConsumer(mockKafkaConsumer, TOPIC); underTest.start() .onComplete(ctx.succeeding(v -> ctx.verify(() -> { @@ -100,7 +100,7 @@ public void testThatCommitErrorClosesConsumer(final VertxTestContext ctx) { final AtomicReference testConsumerRef = new AtomicReference<>(); final KafkaException commitError = new KafkaException("commit failed"); - final KafkaConsumer mockKafkaConsumer = mockVertxKafkaConsumer(0, 0); + final KafkaConsumer mockKafkaConsumer = mockVertxKafkaConsumer(1); // WHEN committing fails doAnswer(invocation -> { @@ -137,15 +137,14 @@ public void testThatCommitErrorClosesConsumer(final VertxTestContext ctx) { @Test public void testMessageProcessingError(final VertxTestContext ctx) { - final int startOffset = 50; - final int corruptMessageOffset = 100; + final int numberOfRecords = 50; + final int corruptMessageOffset = numberOfRecords - 1; // offsets start with 0 final IllegalStateException unexpectedError = new IllegalStateException("this should not have happened"); final AtomicLong committedOffset = new AtomicLong(); - final KafkaConsumer mockKafkaConsumer = mockVertxKafkaConsumer(startOffset, - corruptMessageOffset); + final KafkaConsumer mockKafkaConsumer = mockVertxKafkaConsumer(numberOfRecords); doAnswer(invocation -> { final Map offsets = invocation @@ -183,7 +182,7 @@ public void testMessageProcessingError(final VertxTestContext ctx) { } @SuppressWarnings("unchecked") - private KafkaConsumer mockVertxKafkaConsumer(final int startOffset, final int lastOffset) { + private KafkaConsumer mockVertxKafkaConsumer(final int recordsPerPoll) { final KafkaConsumer mockKafkaConsumer = mock(KafkaConsumer.class); @@ -214,7 +213,7 @@ private KafkaConsumer mockVertxKafkaConsumer(final int startOffs }).when(mockKafkaConsumer).close(VertxMockSupport.anyHandler()); final List> recordList = new ArrayList<>(); - for (int i = startOffset; i < lastOffset + 1; i++) { + for (int i = 0; i < recordsPerPoll; i++) { recordList.add(new ConsumerRecord<>(TOPIC, PARTITION, i, null, Buffer.buffer())); } final TopicPartition topicPartition = new TopicPartition(TOPIC, PARTITION);