diff --git a/conf/broker.conf b/conf/broker.conf index 82dd5640740c0..e4494be0666b5 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1822,6 +1822,10 @@ subscriptionKeySharedEnable=true # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1 +# After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is too large to +# persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent. +dispatcherPauseOnAckStatePersistentEnabled=false + # If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in # multiple entries. persistentUnackedRangesWithMultipleEntriesEnabled=false diff --git a/conf/standalone.conf b/conf/standalone.conf index cf13f12c8fe6f..a916a2f477e8f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1237,6 +1237,10 @@ configurationStoreServers= # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1 +# After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is too large to +# persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent. +dispatcherPauseOnAckStatePersistentEnabled=false + # Whether to enable the delayed delivery for messages. # If disabled, messages will be immediately delivered and there will # be no tracking overhead. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 4f2d56fc07ea7..cfb66a7df78f4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2096,6 +2096,13 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)" ) private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true; + @FieldContext( + dynamic = true, + category = CATEGORY_STORAGE_ML, + doc = "After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is" + + " too large to persist, it will help to reduce the duplicates caused by the ack state that can not be" + + " fully persistent. Default false.") + private boolean dispatcherPauseOnAckStatePersistentEnabled = false; @FieldContext( dynamic = true, category = CATEGORY_STORAGE_ML, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index 5100826530cd1..b99f8d5338f60 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -18,11 +18,12 @@ */ package org.apache.pulsar.broker.service; +import org.apache.pulsar.PulsarStandaloneStarter; +import org.testng.annotations.Test; + import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; -import org.apache.pulsar.PulsarStandaloneStarter; -import org.testng.annotations.Test; @Test(groups = "broker") public class StandaloneTest { @@ -54,12 +55,13 @@ public void testWithoutMetadataStoreUrlInConfFile() throws Exception { } @Test - public void testAdvertised() throws Exception { + public void testInitialize() throws Exception { String[] args = new String[]{"--config", "./src/test/resources/configurations/pulsar_broker_test_standalone.conf"}; PulsarStandaloneStarter standalone = new TestPulsarStandaloneStarter(args); assertNull(standalone.getConfig().getAdvertisedAddress()); assertEquals(standalone.getConfig().getAdvertisedListeners(), "internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651"); + assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 55971c15adf68..ebeaffc48e4b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -73,6 +73,7 @@ public void testInit() throws Exception { assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first"); assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05); assertEquals(config.getHttpMaxRequestHeaderSize(), 1234); + assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); } @@ -289,6 +290,7 @@ public void testTransactionBatchConfigurations() throws Exception{ assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 512); assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1024 * 1024 * 4); assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 1); + assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), false); } // pulsar_broker_test.conf. try (InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(fileName)) { @@ -301,6 +303,7 @@ public void testTransactionBatchConfigurations() throws Exception{ assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 44); assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 55); assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 66); + assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), true); } // string input stream. StringBuilder stringBuilder = new StringBuilder(); @@ -312,6 +315,7 @@ public void testTransactionBatchConfigurations() throws Exception{ stringBuilder.append("transactionPendingAckBatchedWriteMaxRecords=521").append(System.lineSeparator()); stringBuilder.append("transactionPendingAckBatchedWriteMaxSize=1025").append(System.lineSeparator()); stringBuilder.append("transactionPendingAckBatchedWriteMaxDelayInMillis=20").append(System.lineSeparator()); + stringBuilder.append("dispatcherPauseOnAckStatePersistentEnabled=true").append(System.lineSeparator()); try(ByteArrayInputStream inputStream = new ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))){ configuration = PulsarConfigurationLoader.create(inputStream, ServiceConfiguration.class); @@ -323,6 +327,7 @@ public void testTransactionBatchConfigurations() throws Exception{ assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 521); assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1025); assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 20); + assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), true); } } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index bfbbfb7487c42..f2316111f8017 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -93,6 +93,7 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 +dispatcherPauseOnAckStatePersistentEnabled=true ### --- Transaction config variables --- ### transactionLogBatchedWriteEnabled=true diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index c733409fc0043..4a40d9f0c6565 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -94,3 +94,4 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 +dispatcherPauseOnAckStatePersistentEnabled=true