diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 2983639c76aa8..1e3e2f3303817 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1308,7 +1308,7 @@ public boolean checkAndUnblockIfStuck() { return false; } // consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read - if (totalAvailablePermits > 0 && !havePendingReplayRead && !havePendingRead + if (isAtleastOneConsumerAvailable() && !havePendingReplayRead && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) { log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name); readMoreEntries(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index d236b5b1db062..e6a9b9c910e41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -612,9 +612,9 @@ public boolean checkAndUnblockIfStuck() { if (consumer == null || cursor.checkAndUpdateReadPositionChanged()) { return false; } - int totalAvailablePermits = consumer.getAvailablePermits(); + boolean isConsumerAvailable = !consumer.isBlocked() && consumer.getAvailablePermits() > 0; // consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read - if (totalAvailablePermits > 0 && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) { + if (isConsumerAvailable && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) { log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name); readMoreEntries(consumer); return true; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 243a5ccadb369..6bbd598b454b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -18,7 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; -import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -35,8 +36,6 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -74,6 +73,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -102,6 +102,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; + @Slf4j @Test(groups = "broker") public class PersistentTopicTest extends BrokerTestBase { @@ -216,6 +217,13 @@ public void testUnblockStuckSubscription() throws Exception { assertNotNull(msg); msg = consumer2.receive(5, TimeUnit.SECONDS); assertNotNull(msg); + + org.apache.pulsar.broker.service.Consumer sharedConsumer = sharedDispatcher.getConsumers().get(0); + Field blockField = org.apache.pulsar.broker.service.Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs"); + blockField.setAccessible(true); + blockField.set(sharedConsumer, true); + producer.newMessage().value("test").eventTime(5).send(); + assertFalse(sharedSub.checkAndUnblockIfStuck()); } @Test