Skip to content

Commit

Permalink
[eclipse-hono#8] Refactor mocked test cases to prevent stack overflow.
Browse files Browse the repository at this point in the history
Mocked handlers that are invoked immediately could cause a stack overflow
when polling too often. Changed the tests to prevent this.

Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
  • Loading branch information
b-abel committed Feb 9, 2021
1 parent 255352c commit 2a91617
Showing 1 changed file with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class AbstractAtLeastOnceKafkaConsumerMockitoTest {
@Test
public void testThatStartSubscribesAndPolls(final VertxTestContext ctx) {

final KafkaConsumer<String, Buffer> mockKafkaConsumer = mockVertxKafkaConsumer(0, 1);
final KafkaConsumer<String, Buffer> mockKafkaConsumer = mockVertxKafkaConsumer(0);
final AbstractAtLeastOnceKafkaConsumer<JsonObject> underTest = new TestConsumer(mockKafkaConsumer, TOPIC);
underTest.start()
.onComplete(ctx.succeeding(v -> ctx.verify(() -> {
Expand All @@ -100,7 +100,7 @@ public void testThatCommitErrorClosesConsumer(final VertxTestContext ctx) {
final AtomicReference<TestConsumer> testConsumerRef = new AtomicReference<>();
final KafkaException commitError = new KafkaException("commit failed");

final KafkaConsumer<String, Buffer> mockKafkaConsumer = mockVertxKafkaConsumer(0, 0);
final KafkaConsumer<String, Buffer> mockKafkaConsumer = mockVertxKafkaConsumer(1);

// WHEN committing fails
doAnswer(invocation -> {
Expand Down Expand Up @@ -137,15 +137,14 @@ public void testThatCommitErrorClosesConsumer(final VertxTestContext ctx) {
@Test
public void testMessageProcessingError(final VertxTestContext ctx) {

final int startOffset = 50;
final int corruptMessageOffset = 100;
final int numberOfRecords = 50;
final int corruptMessageOffset = numberOfRecords - 1; // offsets start with 0

final IllegalStateException unexpectedError = new IllegalStateException("this should not have happened");

final AtomicLong committedOffset = new AtomicLong();

final KafkaConsumer<String, Buffer> mockKafkaConsumer = mockVertxKafkaConsumer(startOffset,
corruptMessageOffset);
final KafkaConsumer<String, Buffer> mockKafkaConsumer = mockVertxKafkaConsumer(numberOfRecords);

doAnswer(invocation -> {
final Map<io.vertx.kafka.client.common.TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata> offsets = invocation
Expand Down Expand Up @@ -183,7 +182,7 @@ public void testMessageProcessingError(final VertxTestContext ctx) {
}

@SuppressWarnings("unchecked")
private KafkaConsumer<String, Buffer> mockVertxKafkaConsumer(final int startOffset, final int lastOffset) {
private KafkaConsumer<String, Buffer> mockVertxKafkaConsumer(final int recordsPerPoll) {

final KafkaConsumer<String, Buffer> mockKafkaConsumer = mock(KafkaConsumer.class);

Expand Down Expand Up @@ -214,7 +213,7 @@ private KafkaConsumer<String, Buffer> mockVertxKafkaConsumer(final int startOffs
}).when(mockKafkaConsumer).close(VertxMockSupport.anyHandler());

final List<ConsumerRecord<String, Buffer>> recordList = new ArrayList<>();
for (int i = startOffset; i < lastOffset + 1; i++) {
for (int i = 0; i < recordsPerPoll; i++) {
recordList.add(new ConsumerRecord<>(TOPIC, PARTITION, i, null, Buffer.buffer()));
}
final TopicPartition topicPartition = new TopicPartition(TOPIC, PARTITION);
Expand Down

0 comments on commit 2a91617

Please sign in to comment.