Skip to content

Commit

Permalink
[eclipse-hono#8] Add "retry" to AbstractAtLeastOnceKafkaConsumer.
Browse files Browse the repository at this point in the history
The message handler passed into AbstractAtLeastOnceKafkaConsumer may throw a
ServerErrorException to indicate a transient error. In contrast to other
exceptions thrown by the message handler, the consumer will not be closed in
this case. Instead, the current offsets will be committed and the failed message
will be fetched again with the next poll operation.

Signed-off-by: Abel Buechner-Mihaljevic <abel.buechner-mihaljevic@bosch.io>
b-abel committed Feb 10, 2021
1 parent 2a91617 commit f441bd2
Showing 6 changed files with 94 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
import org.eclipse.hono.application.client.ApplicationClientFactory;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerCommitException;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerPollException;

@@ -44,12 +45,14 @@ public interface KafkaApplicationClientFactory extends ApplicationClientFactory<
* <p>
* Errors can happen when polling, in message processing, and when committing the offset to Kafka. If a {@code poll}
* operation fails, the consumer will be closed and the close handler will be passed a
* {@link KafkaConsumerPollException} indicating the cause. If the provided message handler throws an exception, the
* {@link KafkaConsumerPollException} indicating the cause. The provided message handler may throw
* a{@link ServerErrorException} to indicate a transient error. If the message handler throws another exception, the
* consumer will be closed and the exception will be passed to the close handler. If the offset commit fails, the
* consumer will be closed and the close handler will be passed a {@link KafkaConsumerCommitException}.
*
* @param tenantId The tenant to consume data for.
* @param messageHandler The handler to invoke with every message received. The handler should not throw exceptions.
* @param messageHandler The handler to be invoked for each message created from a record. The handler may throw a
* {@link ServerErrorException} to indicate a transient error but should not throw any other exceptions.
* @param closeHandler The handler invoked when the consumer is closed due to an error.
* @return A future that will complete with the consumer once it is ready. The future will fail if the consumer
* cannot be started.
@@ -83,7 +86,8 @@ Future<MessageConsumer> createTelemetryConsumer(
* consumer will be closed and the close handler will be passed a {@link KafkaConsumerCommitException}.
*
* @param tenantId The tenant to consume data for.
* @param messageHandler The handler to invoke with every message received. The handler should not throw exceptions.
* @param messageHandler The handler to be invoked for each message created from a record. The handler may throw a
* {@link ServerErrorException} to indicate a transient error but should not throw any other exceptions.
* @param closeHandler The handler invoked when the consumer is closed due to an error.
* @return A future that will complete with the consumer once it is ready. The future will fail if the consumer
* cannot be started.
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.consumer.AbstractAtLeastOnceKafkaConsumer;

import io.vertx.core.Future;
@@ -39,8 +40,8 @@ public abstract class DownstreamMessageConsumer
*
* @param kafkaConsumer The Kafka consumer to be exclusively used by this instance to consume records.
* @param topic The Kafka topic to consume records from.
* @param messageHandler The handler to be invoked for each message created from a record. The handler should not
* throw exceptions.
* @param messageHandler The handler to be invoked for each message created from a record. The handler may throw a
* {@link ServerErrorException} to indicate a transient error but should not throw any other exceptions.
* @param closeHandler The handler to be invoked when the Kafka consumer has been closed due to an error.
* @param pollTimeout The maximal number of milliseconds to wait for messages during a poll operation.
* @throws NullPointerException if any of the parameters is {@code null}.
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;

@@ -46,8 +47,8 @@ private EventConsumer(final KafkaConsumer<String, Buffer> kafkaConsumer, final S
* @param kafkaConsumer The Kafka consumer to be exclusively used by this instance to consume records.
* @param config The Kafka consumer configuration properties to use.
* @param tenantId The tenant to consume events for.
* @param messageHandler The handler to be invoked for each message created from a record. The handler should not
* throw exceptions.
* @param messageHandler The handler to be invoked for each message created from a record. The handler may throw a
* {@link ServerErrorException} to indicate a transient error but should not throw any other exceptions.
* @param closeHandler The handler to be invoked when the Kafka consumer has been closed due to an error.
* @return a future indicating the outcome. When {@link #start()} completes successfully, the future will be
* completed with the consumer. Otherwise the future will fail with the cause.
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;

@@ -46,8 +47,8 @@ private TelemetryConsumer(final KafkaConsumer<String, Buffer> kafkaConsumer, fin
* @param kafkaConsumer The Kafka consumer to be exclusively used by this instance to consume records.
* @param config The Kafka consumer configuration properties to use.
* @param tenantId The tenant to consume telemetry data for.
* @param messageHandler The handler to be invoked for each message created from a record. The handler should not
* throw exceptions.
* @param messageHandler The handler to be invoked for each message created from a record. The handler may throw a
* {@link ServerErrorException} to indicate a transient error but should not throw any other exceptions.
* @param closeHandler The handler to be invoked when the Kafka consumer has been closed due to an error.
* @return a future indicating the outcome. When {@link #start()} completes successfully, the future will be
* completed with the consumer. Otherwise the future will fail with the cause.
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import java.util.regex.Pattern;

import org.apache.kafka.common.errors.TimeoutException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +42,13 @@
* This consumer continuously polls for batches of messages from Kafka. Each message is passed to a message handler for
* processing. When all messages of a batch are processed, this consumer commits the offset and polls for a new batch.
* <p>
* This consumer provides AT LEAST ONCE message processing. When the message handler completes, the message is
* considered to be processed and will be "acknowledged" with the following commit. If an error occurs, the underlying
* Kafka consumer will be closed. In this case, other consumer instances in the same consumer group will consume the
* messages from the last committed offset. Already processed messages will then be again in the next batch and be
* passed to the message handler. If de-duplication of messages is required, it must be handled by the message handler.
* This consumer provides AT LEAST ONCE message processing. When the message handler completes without throwing an
* exception, the message is considered to be processed and will be "acknowledged" with the following commit.
* <p>
* If a fatal error occurs, the underlying Kafka consumer will be closed. In this case, other consumer instances in the
* same consumer group will consume the messages from the last committed offset. Already processed messages might be
* polled again with the next batch and then passed to the message handler. If de-duplication of messages is required,
* it must be handled by the message handler.
* <p>
* The consumer starts consuming when {@link #start()} is invoked. It needs to be closed by invoking {@link #stop()} to
* release the resources. A stopped instance cannot be started again.
@@ -65,9 +68,10 @@
* <p>
* If the message processing fails because either {@link #createMessage(KafkaConsumerRecord)} or the message handler
* throws an unexpected exception, the Kafka consumer will be closed and the exception will be passed to the close
* handler. <b>The message handler is expected to handle processing errors internally and should not deliberately throw
* exceptions.</b> Any exception in the message processing will stop the consumption permanently, because a new consumer
* will try to consume the same message again and will then get the same exception.
* handler. <b>The message handler may throw a{@link ServerErrorException} to indicate a transient error.</b> In this
* case the consumer will not be closed, instead the current offsets are committed and the failed message will be polled
* again with the next batch of records. Any other exceptions in the message processing will stop the consumption
* permanently, because a new consumer will try to consume the same message again and will then get the same exception.
* <p>
* If {@link KafkaConsumer#commit(Handler)} times out, the commit will be retried once. If the retry fails or the commit
* fails with another exception, the Kafka consumer will be closed and the close handler will be passed a
@@ -94,8 +98,8 @@ public abstract class AbstractAtLeastOnceKafkaConsumer<T> implements Lifecycle {
*
* @param kafkaConsumer The Kafka consumer to be exclusively used by this instance to consume records.
* @param topic The Kafka topic to consume records from.
* @param messageHandler The handler to be invoked for each message created from a record. The handler should not
* throw exceptions.
* @param messageHandler The handler to be invoked for each message created from a record. The handler may throw a
* {@link ServerErrorException} to indicate a transient error but should not throw any other exceptions.
* @param closeHandler The handler to be invoked when the Kafka consumer has been closed due to an error.
* @param pollTimeout The maximal number of milliseconds to wait for messages during a poll operation.
* @throws NullPointerException if any of the parameters is {@code null}.
@@ -116,8 +120,8 @@ public AbstractAtLeastOnceKafkaConsumer(final KafkaConsumer<String, Buffer> kafk
*
* @param kafkaConsumer The Kafka consumer to be exclusively used by this instance to consume records.
* @param topics The Kafka topics to consume records from.
* @param messageHandler The handler to be invoked for each message created from a record. The handler should not
* throw exceptions.
* @param messageHandler The handler to be invoked for each message created from a record. The handler may throw a
* {@link ServerErrorException} to indicate a transient error but should not throw any other exceptions.
* @param closeHandler The handler to be invoked when the Kafka consumer has been closed due to an error.
* @param pollTimeout The maximal number of milliseconds to wait for messages during a poll operation.
* @throws NullPointerException if any of the parameters is {@code null}.
@@ -138,8 +142,8 @@ public AbstractAtLeastOnceKafkaConsumer(final KafkaConsumer<String, Buffer> kafk
*
* @param kafkaConsumer The Kafka consumer to be exclusively used by this instance to consume records.
* @param topicPattern The pattern of Kafka topic names to consume records from.
* @param messageHandler The handler to be invoked for each message created from a record. The handler should not
* throw exceptions.
* @param messageHandler The handler to be invoked for each message created from a record. The handler may throw a
* {@link ServerErrorException} to indicate a transient error but should not throw any other exceptions.
* @param closeHandler The handler to be invoked when the Kafka consumer has been closed due to an error.
* @param pollTimeout The maximal number of milliseconds to wait for messages during a poll operation.
* @throws NullPointerException if any of the parameters is {@code null}.
@@ -244,12 +248,17 @@ private void handleBatch(final KafkaConsumerRecords<String, Buffer> records) {

final KafkaConsumerRecord<String, Buffer> record = records.recordAt(i);
final T message = createMessage(record);
messageHandler.handle(message);
addToCurrentOffsets(record);
try {
messageHandler.handle(message);
addToCurrentOffsets(record);
} catch (final ServerErrorException serverErrorException) {
LOG.debug("Message handler failed", serverErrorException);
// will commit the offset of the failed record and then poll again
}
}
commit(true).compose(ok -> poll()).onSuccess(this::handleBatch); // this is "the loop"
commit(true).compose(ok -> poll()).onSuccess(this::handleBatch);
} catch (final Exception ex) {
LOG.error("Error handling record, closing the consumer: ", ex);
LOG.error("Consumer failed, closing", ex);
tryCommitAndClose().onComplete(v -> closeHandler.handle(ex));
}
}
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -29,6 +30,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.eclipse.hono.client.ServerErrorException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -218,6 +220,55 @@ public void testThatConsumedMessagesAreProcessedInOrder(final VertxTestContext c
}));
}

/**
* Verifies that when the message handler throws the expected {@link org.eclipse.hono.client.ServerErrorException},
* the offsets are committed and the consumer polls again starting from the failed record without closing the
* consumer.
*
* @param ctx The vert.x test context.
*/
@Test
public void testExpectedServerErrorExceptionInMessageProcessing(final VertxTestContext ctx) {
final int recordsPerBatch = 20;
final int failingMessageOffset = recordsPerBatch - 1;

final AtomicInteger messageHandlerInvocations = new AtomicInteger();

final Handler<JsonObject> messageHandler = msg -> {

if (messageHandlerInvocations.getAndIncrement() == failingMessageOffset) {
// WHEN the message handler throws a ServerErrorException
throw new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE);
}

ctx.verify(() -> {
// THEN the offset of the failed message is committed...
if (messageHandlerInvocations.get() >= recordsPerBatch) {
assertThat(getCommittedOffset(topicPartition)).isEqualTo(failingMessageOffset);
}
});

// ...AND the consumer keeps consuming
if (messageHandlerInvocations.get() == (recordsPerBatch * 2) - 1) {
ctx.completeNow();
}
};

final Handler<Throwable> closeHandler = cause -> ctx
.failNow(new RuntimeException("Consumer closed unexpectedly", cause)); // must not happen

// GIVEN a started consumer that polls two batches
new TestConsumer(vertxKafkaConsumer, TOPIC, messageHandler, closeHandler).start()
.onComplete(ctx.succeeding(v -> {

mockConsumer.updateBeginningOffsets(Map.of(topicPartition, ((long) 0))); // define start offset
mockConsumer.rebalance(Collections.singletonList(topicPartition)); // trigger partition assignment
scheduleBatch(0, recordsPerBatch);
scheduleBatch(failingMessageOffset, recordsPerBatch); // start from failed message

}));
}

private long getCommittedOffset(final TopicPartition topicPartition) {
final long committedOffset;

0 comments on commit f441bd2

Please sign in to comment.