Skip to content

Commit

Permalink
[fix][broker] fix broker identifying incorrect stuck topic (#24006)
Browse files Browse the repository at this point in the history
(cherry picked from commit 28f7845)
  • Loading branch information
rdhabalia authored and lhotari committed Feb 21, 2025
1 parent f0c8faf commit fb21183
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,7 @@ public boolean checkAndUnblockIfStuck() {
return false;
}
// consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read
if (totalAvailablePermits > 0 && !havePendingReplayRead && !havePendingRead
if (isAtleastOneConsumerAvailable() && !havePendingReplayRead && !havePendingRead
&& cursor.getNumberOfEntriesInBacklog(false) > 0) {
log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name);
readMoreEntries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,9 @@ public boolean checkAndUnblockIfStuck() {
if (consumer == null || cursor.checkAndUpdateReadPositionChanged()) {
return false;
}
int totalAvailablePermits = consumer.getAvailablePermits();
boolean isConsumerAvailable = !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
// consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read
if (totalAvailablePermits > 0 && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) {
if (isConsumerAvailable && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) {
log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name);
readMoreEntries(consumer);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand All @@ -35,8 +36,6 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -74,6 +73,7 @@
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -102,6 +102,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


@Slf4j
@Test(groups = "broker")
public class PersistentTopicTest extends BrokerTestBase {
Expand Down Expand Up @@ -216,6 +217,13 @@ public void testUnblockStuckSubscription() throws Exception {
assertNotNull(msg);
msg = consumer2.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);

org.apache.pulsar.broker.service.Consumer sharedConsumer = sharedDispatcher.getConsumers().get(0);
Field blockField = org.apache.pulsar.broker.service.Consumer.class.getDeclaredField("blockedConsumerOnUnackedMsgs");
blockField.setAccessible(true);
blockField.set(sharedConsumer, true);
producer.newMessage().value("test").eventTime(5).send();
assertFalse(sharedSub.checkAndUnblockIfStuck());
}

@Test
Expand Down

0 comments on commit fb21183

Please sign in to comment.