diff --git a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer.java b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer.java
index 9893e4d3bf..5565e4bc7f 100644
--- a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer.java
+++ b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
+ * Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
@@ -63,9 +63,10 @@
* completed gets committed. This prevents incompletely handled records and thereby enables at-least-once semantics.
*
* In terms of when offsets are committed, the behaviour is similar to the one used for a consumer with
- * enable.auto.commit. Commits are done periodically (using commitAsync) and when a rebalance
- * happens or the consumer is stopped (using commitSync). The periodic commit interval is defined via
- * the standard auto.commit.interval.ms configuration property.
+ * enable.auto.commit. Commits are done periodically (using commitAsync), and they are also done
+ * (using commitSync) when the consumer is stopped or a rebalance happens (with eager rebalancing
+ * or a non-empty revoked partitions list).
+ * The periodic commit interval is defined via the standard auto.commit.interval.ms configuration property.
*
* In order to not fall behind with the position of the committed offset vs. the last received offset, users of this
* class have to make sure that the record handling function, which provides the completion Future, is completed in time.
@@ -90,7 +91,7 @@
* still done) but record fetching from all assigned topic partitions is suspended until the throttling threshold is
* reached again.
* The overall limit, i.e. the maximum number of incomplete record handler result futures at a given point in time, is
- * calculated from the above mentioned throttling threshold plus the maximum number of records per poll operation.
+ * calculated from the above-mentioned throttling threshold plus the maximum number of records per poll operation.
*
* @param The type of record payload this consumer can process.
*/
@@ -653,7 +654,7 @@ public OffsetsQueueEntry addOffset(final long offset) {
* the 'skipOffsetRecommitPeriodSeconds'. Note that for the actual commit, {@code 1} has to be added to the
* returned value.
*
- * Otherwise an empty Optional is returned.
+ * Otherwise, an empty Optional is returned.
*
* @return The offset wrapped in an Optional or an empty Optional if no offset commit is needed.
*/
diff --git a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java
index 8bf59d0747..87ff9e50b7 100644
--- a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java
+++ b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation
+ * Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
@@ -720,7 +720,8 @@ public void onPartitionsAssigned(final Collection partitionsSet = Helper.from(partitions);
if (LOG.isDebugEnabled()) {
- LOG.debug("partitions assigned: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
+ LOG.debug("partitions assigned: [{}] [client-id: {}]",
+ HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), getClientId());
}
ensurePositionsHaveBeenSetIfNeeded(partitionsSet);
updateSubscribedTopicPatternTopicsAndRemoveMetrics();
@@ -744,7 +745,8 @@ public void onPartitionsRevoked(final Collection partitionsSet = Helper.from(partitions);
if (LOG.isDebugEnabled()) {
- LOG.debug("partitions revoked: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
+ LOG.debug("partitions revoked: [{}] [client-id: {}]",
+ HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), getClientId());
}
onPartitionsRevokedBlocking(partitionsSet);
context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet));
diff --git a/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumerTest.java b/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumerTest.java
index cb1d6bcf71..4538c415ac 100644
--- a/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumerTest.java
+++ b/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumerTest.java
@@ -331,15 +331,18 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
consumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Set.of(TOPIC), handler, consumerConfig);
consumer.setKafkaConsumerSupplier(() -> mockConsumer);
consumer.addOnKafkaConsumerReadyHandler(readyTracker);
- consumer.start()
- .compose(ok -> readyTracker.future())
- .onComplete(ctx.succeeding(v2 -> {
- mockConsumer.schedulePollTask(() -> {
- IntStream.range(0, numTestRecords).forEach(offset -> {
- mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, PARTITION, offset, "key_" + offset, Buffer.buffer()));
- });
- });
- }));
+ final Context consumerVertxContext = vertx.getOrCreateContext();
+ consumerVertxContext.runOnContext(v -> {
+ consumer.start()
+ .compose(ok -> readyTracker.future())
+ .onComplete(ctx.succeeding(v2 -> {
+ mockConsumer.schedulePollTask(() -> {
+ IntStream.range(0, numTestRecords).forEach(offset -> {
+ mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, PARTITION, offset, "key_" + offset, Buffer.buffer()));
+ });
+ });
+ }));
+ });
assertWithMessage("records received in 5s")
.that(receivedRecordsCtx.awaitCompletion(5, TimeUnit.SECONDS))
.isTrue();
@@ -356,66 +359,23 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
final AtomicInteger latestFullyHandledOffset = new AtomicInteger(1);
recordsHandlingPromiseMap.get(4L).complete();
- // define VertxTestContexts for 3 checks (3x rebalance/commit)
- final var checkIndex = new AtomicInteger(0);
- final var commitCheckContexts = IntStream.range(0, 3)
- .mapToObj(i -> new VertxTestContext()).toList();
- final var commitCheckpoints = commitCheckContexts.stream()
- .map(c -> c.laxCheckpoint(1)).toList();
- final InterruptableSupplier waitForCurrentCommitCheckResult = () -> {
- final var checkContext = commitCheckContexts.get(checkIndex.get());
- assertWithMessage("partition assigned in 5s for checking of commits")
- .that(checkContext.awaitCompletion(5, TimeUnit.SECONDS))
- .isTrue();
- if (checkContext.failed()) {
- ctx.failNow(checkContext.causeOfFailure());
- return false;
- }
- return true;
- };
+ final Checkpoint commitCheckDone = ctx.checkpoint(1);
consumer.setOnPartitionsAssignedHandler(partitions -> {
- LOG.debug("onPartitionsAssignedHandler invoked [check index: {}, newly assigned partitions: {}]",
- checkIndex.get(), partitions.stream().map(t -> t.toString()).collect(Collectors.joining(", ")));
- final var committedPartitions = mockConsumer.committed(Set.of(TOPIC_PARTITION));
- final var offsetAndMetadata = committedPartitions.get(TOPIC_PARTITION);
- LOG.debug("committed partition [name: {}, offset: {}, expected offset: {}]",
- TOPIC_PARTITION, offsetAndMetadata.offset(), latestFullyHandledOffset.get() + 1L);
- ctx.verify(() -> {
- assertThat(offsetAndMetadata).isNotNull();
-// assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L);
- });
- if (offsetAndMetadata.offset() == latestFullyHandledOffset.get() + 1L) {
- commitCheckpoints.get(checkIndex.get()).flag();
+ if (!partitions.isEmpty()) {
+ final var committedPartitions = mockConsumer.committed(Set.of(TOPIC_PARTITION));
+ final var offsetAndMetadata = committedPartitions.get(TOPIC_PARTITION);
+ ctx.verify(() -> {
+ assertThat(offsetAndMetadata).isNotNull();
+ assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L);
+ });
+ commitCheckDone.flag();
}
});
// now force a rebalance which should trigger the above onPartitionsAssignedHandler
- LOG.debug("force rebalance 1");
- mockConsumer.rebalance(List.of(TOPIC_PARTITION));
- if (!waitForCurrentCommitCheckResult.get()) {
- return;
- }
- checkIndex.incrementAndGet();
-
- // now another rebalance (ie. commit trigger) - no change in offsets
- LOG.debug("force rebalance 2");
- mockConsumer.rebalance(List.of(TOPIC_PARTITION));
- if (!waitForCurrentCommitCheckResult.get()) {
- return;
- }
- checkIndex.incrementAndGet();
-
- // now complete some more promises
- recordsHandlingPromiseMap.get(2L).complete();
- recordsHandlingPromiseMap.get(3L).complete();
- // offset 4 already complete
- latestFullyHandledOffset.set(4);
- // again rebalance/commit
- LOG.debug("force rebalance 3");
+ LOG.debug("force rebalance");
+ mockConsumer.rebalance(List.of());
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
- if (waitForCurrentCommitCheckResult.get()) {
- ctx.completeNow();
- }
}
/**
@@ -448,6 +408,8 @@ public void testConsumerCommitsOffsetsOnRebalanceAfterWaitingForRecordCompletion
mockConsumer.updateBeginningOffsets(Map.of(TOPIC_PARTITION, 0L));
mockConsumer.updateEndOffsets(Map.of(TOPIC_PARTITION, 0L));
+ mockConsumer.updateBeginningOffsets(Map.of(TOPIC2_PARTITION, 0L));
+ mockConsumer.updateEndOffsets(Map.of(TOPIC2_PARTITION, 0L));
mockConsumer.updatePartitions(TOPIC_PARTITION, KafkaMockConsumer.DEFAULT_NODE);
mockConsumer.setRebalancePartitionAssignmentAfterSubscribe(List.of(TOPIC_PARTITION));
final AtomicReference> onNextPartitionsRevokedBlockingHandlerRef = new AtomicReference<>();
@@ -483,38 +445,59 @@ protected void onPartitionsRevokedBlocking(
ctx.failNow(receivedRecordsCtx.causeOfFailure());
return;
}
- // records received, complete the handling of all except the first 2 records
+ LOG.debug("all records received, complete the handling of all except the first 2 records");
LongStream.range(2, numTestRecords).forEach(offset -> recordsHandlingPromiseMap.get(offset).complete());
- ctx.verify(() -> assertThat(recordsHandlingPromiseMap.get(1L).future().isComplete()).isFalse());
+ ctx.verify(() -> {
+ LongStream.range(0, 2).forEach(offset -> {
+ assertThat(recordsHandlingPromiseMap.get(offset).future().isComplete()).isFalse();
+ });
+ });
+ final Checkpoint commitCheckDone = ctx.checkpoint(1);
- // partitions revoked handler shall get called after the blocking partitions-revoked handling has waited for the records to be marked as completed
consumer.setOnPartitionsRevokedHandler(s -> {
- ctx.verify(() -> assertThat(recordsHandlingPromiseMap.get(1L).future().isComplete()).isTrue());
+ // (3) this partitions revoked handler is called after the blocking partitions-revoked handling (2)
+ // has waited for the records to be marked as completed and after the offsets were committed
+ // (we can't check for committed offsets of the just revoked partition here because
+ // mockConsumer.committed() only returns offsets of assigned partitions)
+ ctx.verify(() -> {
+ LongStream.range(0, 2).forEach(offset -> {
+ assertThat(recordsHandlingPromiseMap.get(offset).future().isComplete()).isTrue();
+ });
+ });
});
- final Checkpoint commitCheckDone = ctx.laxCheckpoint(1);
consumer.setOnPartitionsAssignedHandler(partitions -> {
- final var committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
- final var offsetAndMetadata = committed.get(TOPIC_PARTITION);
+ if (partitions.isEmpty()) {
+ // (4) ignore if invoked when all partitions got revoked (1); only the subsequent invocation with assigned partitions is relevant
+ return;
+ }
+ // (5) ensure all offsets were committed
+ final var committedOffsets = mockConsumer.committed(Set.of(TOPIC_PARTITION));
+ LOG.debug("committed partition offsets: {}", committedOffsets);
+ final var offsetAndMetadata = committedOffsets.get(TOPIC_PARTITION);
ctx.verify(() -> {
assertThat(offsetAndMetadata).isNotNull();
+ assertThat(offsetAndMetadata.offset()).isEqualTo(numTestRecords);
});
- if (offsetAndMetadata.offset() == numTestRecords) {
- commitCheckDone.flag();
- }
+ commitCheckDone.flag();
});
- // trigger a rebalance where the currently assigned partition is revoked
- // (and then assigned again - otherwise its offset wouldn't be returned by mockConsumer.committed())
- // the remaining 2 records are to be marked as completed with some delay
+
onNextPartitionsRevokedBlockingHandlerRef.set(v -> {
+ // (2) handler to complete the remaining record handling promises (on the Kafka polling thread; invoked before the OnPartitionsRevokedHandler)
consumerVertxContext.runOnContext(v2 -> {
+ LOG.debug("complete remaining record handling promises");
recordsHandlingPromiseMap.get(0L).complete();
recordsHandlingPromiseMap.get(1L).complete();
});
+ // trigger another rebalance; this time the partition is assigned again;
+ // this means we can then (see (5)) check the committed offsets (only available from MockConsumer for currently assigned partitions)
+ mockConsumer.setNextPollRebalancePartitionAssignment(List.of(TOPIC_PARTITION, TOPIC2_PARTITION));
});
- mockConsumer.setRevokeAllOnRebalance(true);
- mockConsumer.updateBeginningOffsets(Map.of(TOPIC2_PARTITION, 0L));
- mockConsumer.updateEndOffsets(Map.of(TOPIC2_PARTITION, 0L));
- mockConsumer.setNextPollRebalancePartitionAssignment(List.of(TOPIC_PARTITION, TOPIC2_PARTITION));
+ // (1) Trigger a rebalance where the currently assigned partition is revoked (via mockConsumer.setNextPollRebalancePartitionAssignment(List.of())).
+ // Since the records at offsets 0 and 1 are not yet completely handled here, the revocation logic will
+ // wait some time (up to 300ms by default) for the handling to be marked as completed, until committing offsets of completed records.
+ // The wait time will not be long here, since the blocking partitions-revoked handler above (2) will complete
+ // the remaining 2 record-handling promises shortly after the partition revocation (via consumerVertxContext.runOnContext()).
+ mockConsumer.setNextPollRebalancePartitionAssignment(List.of());
}
/**
@@ -674,7 +657,7 @@ public void testConsumerCommitsInitialOffset(final VertxTestContext ctx) throws
mockConsumer.setRebalancePartitionAssignmentAfterSubscribe(List.of(TOPIC_PARTITION));
final VertxTestContext consumerStartedCtx = new VertxTestContext();
- final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.laxCheckpoint(2);
+ final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.checkpoint(2);
consumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Set.of(TOPIC), handler, consumerConfig);
consumer.setKafkaConsumerSupplier(() -> mockConsumer);
consumer.setOnRebalanceDoneHandler(s -> {
@@ -816,7 +799,7 @@ public void testScenarioWithPartitionRevokedWhileHandlingIncomplete(final VertxT
recordsHandlingPromiseMap.get(1L).complete();
recordsHandlingPromiseMap.get(2L).complete();
- final Checkpoint commitCheckDone = ctx.laxCheckpoint(1);
+ final Checkpoint commitCheckDone = ctx.checkpoint(1);
consumer.setOnPartitionsAssignedHandler(partitions -> {
LOG.info("rebalancing ...");
final Map committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
@@ -895,13 +878,15 @@ protected void onRecordHandlerSkippedForExpiredRecord(final KafkaConsumerRecord<
final Checkpoint commitCheckpoint = commitCheckContext.checkpoint(1);
consumer.setOnPartitionsAssignedHandler(partitions -> {
- final Map committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
- ctx.verify(() -> {
- final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION);
- assertThat(offsetAndMetadata).isNotNull();
- assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset + 1L);
- });
- commitCheckpoint.flag();
+ if (!partitions.isEmpty()) {
+ final Map committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
+ ctx.verify(() -> {
+ final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION);
+ assertThat(offsetAndMetadata).isNotNull();
+ assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset + 1L);
+ });
+ commitCheckpoint.flag();
+ }
});
// now force a rebalance which should trigger the above onPartitionsAssignedHandler
// (rebalance is done as part of the poll() invocation; the vert.x consumer will schedule that invocation
@@ -910,6 +895,7 @@ protected void onRecordHandlerSkippedForExpiredRecord(final KafkaConsumerRecord<
final CountDownLatch latch = new CountDownLatch(1);
consumerVertxContext.runOnContext(v -> latch.countDown());
latch.await();
+ mockConsumer.rebalance(List.of());
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
assertWithMessage("partition assigned in 5s for checking of commits")
.that(commitCheckContext.awaitCompletion(5, TimeUnit.SECONDS))
@@ -941,18 +927,4 @@ private ConsumerRecord createRecordWithElapsedTtl() {
new RecordHeaders(new Header[] { ttl, creationTime }),
Optional.empty());
}
-
- /**
- * Supplier whose get() method might throw an {@link InterruptedException}.
- * @param The type of results supplied by this supplier.
- */
- @FunctionalInterface
- interface InterruptableSupplier {
- /**
- * Gets a result.
- * @return The result.
- * @throws InterruptedException If getting the result was interrupted.
- */
- T get() throws InterruptedException;
- }
}
diff --git a/test-utils/kafka-test-utils/src/main/java/org/eclipse/hono/kafka/test/KafkaMockConsumer.java b/test-utils/kafka-test-utils/src/main/java/org/eclipse/hono/kafka/test/KafkaMockConsumer.java
index be450dc876..db50c32150 100644
--- a/test-utils/kafka-test-utils/src/main/java/org/eclipse/hono/kafka/test/KafkaMockConsumer.java
+++ b/test-utils/kafka-test-utils/src/main/java/org/eclipse/hono/kafka/test/KafkaMockConsumer.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
+ * Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
@@ -21,7 +21,6 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -51,7 +50,6 @@ public class KafkaMockConsumer extends MockConsumer {
private final AtomicBoolean skipSettingClosedFlagOnNextClose = new AtomicBoolean();
private final List>> commitListeners = new ArrayList<>();
- private boolean revokeAllOnRebalance = true;
private Collection nextPollRebalancePartitionAssignment;
private Collection onSubscribeRebalancePartitionAssignment;
private ConsumerRebalanceListener rebalanceListener;
@@ -81,18 +79,6 @@ private static PartitionInfo getPartitionInfo(final String topic, final int part
return new PartitionInfo(topic, partition, node, replicas, replicas);
}
- /**
- * Sets whether the onPartitionsRevoked method shall be invoked with all currently assigned partitions when
- * a rebalance is triggered.
- * If set to {@code false}, only the partitions will be revoked that are not in the list of newly assigned
- * partitions.
- *
- * @param revokeAllOnRebalance {@code true} if all assigned partitions shall be revoked on a rebalance.
- */
- public void setRevokeAllOnRebalance(final boolean revokeAllOnRebalance) {
- this.revokeAllOnRebalance = revokeAllOnRebalance;
- }
-
/**
* Marks the following subscribe() invocations to be followed by a rebalance with the given partition
* assignment, if the given assignment collection isn't {@code null}. The rebalance will be invoked
@@ -145,19 +131,6 @@ public synchronized ConsumerRecords poll(final Duration timeout) {
return super.poll(timeout);
}
- @Override
- public synchronized void rebalance(final Collection newAssignment) {
- Optional.ofNullable(rebalanceListener)
- .ifPresent(listener -> {
- listener.onPartitionsRevoked(assignment().stream()
- .filter(tp -> revokeAllOnRebalance || !newAssignment.contains(tp))
- .collect(Collectors.toList()));
- });
- super.rebalance(newAssignment);
- Optional.ofNullable(rebalanceListener)
- .ifPresent(listener -> listener.onPartitionsAssigned(newAssignment));
- }
-
/**
* Skips setting the "closed" flag on the next close() invocation.
*