From 906e41cadd1c5fa292017290094788e925c026aa Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 13 May 2022 10:40:49 +0200 Subject: [PATCH] Clean up errors sent to the client during Topic Unload (#1282) ### Motivation While investigating on #1281 I have found a couple of enhancements: - we should add more logging to troubleshoot unknown errors during topic unload - we can improve some logging - in case of CursorAlreadyClosedException we return UNKNOWN_SERVER_ERROR and NOT_LEADER_FOR_PARTITION and this generates a "received an unknown error" log line on the client, that is pretty scary ### Modifications - improve the toString representation of the callback passed to asyncFindPosition - return NOT_LEADER_FOR_PARTITION in case of CursorAlreadyClosedException and ManagedLedgerFencedException - reduce log level of some parts of the code (cherry picked from commit cc49e7cb3123184093e1746e95d421314108e9b7) --- .../kop/KafkaTopicConsumerManager.java | 8 ++++--- .../handlers/kop/MessageFetchContext.java | 15 ++++++++---- .../kop/format/PulsarEntryFormatter.java | 4 ++-- .../kop/utils/MessageMetadataUtils.java | 23 +++++++++++++++++-- .../kop/KafkaTopicConsumerManagerTest.java | 4 +++- 5 files changed, 42 insertions(+), 12 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java index a08c1c3534..954328c59d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java @@ -229,10 +229,12 @@ private CompletableFuture> asyncGetCursorByOffset(long final ManagedLedger ledger = topic.getManagedLedger(); if (((ManagedLedgerImpl) ledger).getState() == ManagedLedgerImpl.State.Closed) { - log.error("[{}] Async get cursor for offset {} failed, because current managedLedger has been closed", - requestHandler.ctx.channel(), offset); + log.error("[{}] Async get cursor for offset {} for topic {} failed, " + + "because current managedLedger has been closed", + requestHandler.ctx.channel(), offset, topic.getName()); CompletableFuture> future = new CompletableFuture<>(); - future.completeExceptionally(new Exception("Current managedLedger has been closed.")); + future.completeExceptionally(new Exception("Current managedLedger for " + + topic.getName() + " has been closed.")); return future; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index c36db85757..4b8123ba50 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -351,8 +351,6 @@ private void handlePartitionData(final TopicPartition topicPartition, } else { cursorFuture.whenComplete((cursorLongPair, ex) -> { if (ex != null) { - log.error("KafkaTopicConsumerManager.asyncGetCursorByOffset({}) failed for topic {}.", - offset, topicPartition, ex.getCause()); registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); requestHandler.getKafkaTopicManagerSharedState() .getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(fullTopicName); @@ -375,7 +373,16 @@ private void handlePartitionData(final TopicPartition topicPartition, if (throwable != null) { tcm.deleteOneCursorAsync(cursorLongPair.getLeft(), "cursor.readEntry fail. deleteCursor"); - addErrorPartitionResponse(topicPartition, Errors.forException(throwable)); + if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException + || throwable + instanceof ManagedLedgerException.ManagedLedgerFencedException) { + addErrorPartitionResponse(topicPartition, + Errors.NOT_LEADER_FOR_PARTITION); + } else { + log.error("Read entry error on {}", partitionData, throwable); + addErrorPartitionResponse(topicPartition, + Errors.forException(throwable)); + } } else if (entries == null) { addErrorPartitionResponse(topicPartition, Errors.forException(new ApiException("Cursor is null"))); @@ -605,7 +612,7 @@ public void markDeleteComplete(Object ctx) { // this is OK, since this is kind of cumulative ack, following commit will come. @Override public void markDeleteFailed(ManagedLedgerException e, Object ctx) { - log.warn("Mark delete success for position: {} with error:", + log.warn("Mark delete failed for position: {} with error:", currentPosition, e); } }, null); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index 31adff73a0..c3a25beae5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -91,8 +91,8 @@ public EncodeResult encode(final EncodeRequest encodeRequest) { sequenceId = Commands.initBatchMessageMetadata(msgMetadata, message.getMessageBuilder()); } currentBatchSizeBytes += message.getDataBuffer().readableBytes(); - if (log.isDebugEnabled()) { - log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", + if (log.isTraceEnabled()) { + log.trace("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", sequenceId, numMessagesInBatch, currentBatchSizeBytes); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java index 2ba9ace3ec..bfb2bce0b5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java @@ -13,10 +13,12 @@ */ package io.streamnative.pulsar.handlers.kop.utils; +import com.google.common.base.Predicate; import io.netty.buffer.ByteBuf; import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException; import java.util.concurrent.CompletableFuture; import javax.annotation.Nullable; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -171,7 +173,18 @@ public static long getMockOffset(long ledgerId, long entryId) { public static CompletableFuture asyncFindPosition(final ManagedLedger managedLedger, final long offset, final boolean skipMessagesWithoutIndex) { - return managedLedger.asyncFindPosition(entry -> { + return managedLedger.asyncFindPosition(new FindEntryByOffset(managedLedger, + offset, skipMessagesWithoutIndex)); + } + + @AllArgsConstructor + private static class FindEntryByOffset implements Predicate { + private final ManagedLedger managedLedger; + private final long offset; + private final boolean skipMessagesWithoutIndex; + + @Override + public boolean apply(Entry entry) { if (entry == null) { // `entry` should not be null, add the null check here to fix the spotbugs check return false; @@ -191,6 +204,12 @@ public static CompletableFuture asyncFindPosition(final ManagedLedger } finally { entry.release(); } - }); + } + + @Override + public String toString() { + return "FindEntryByOffset{ " + offset + "}"; + } } + } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java index 5dbb8f118d..c2cbaff47d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java @@ -514,7 +514,9 @@ public void testUnloadTopic() throws Exception { topicConsumerManager.removeCursorFuture(totalMessages - 1).get(); fail("should have failed"); } catch (ExecutionException ex) { - assertTrue(ex.getCause().getMessage().contains("Current managedLedger has been closed.")); + log.info("error", ex); + assertTrue(ex.getCause().getMessage().contains("Current managedLedger for " + + fullTopicName + " has been closed.")); } }