Skip to content

Commit

Permalink
[improve][broker] PIP-299-part-2: add config dispatcherPauseOnAckStat…
Browse files Browse the repository at this point in the history
…ePersistentEnabled (#21370)

The part 2 of PIP-299: add config dispatcherPauseOnAckStatePersistentEnabled
  • Loading branch information
poorbarcode authored Jan 2, 2024
1 parent 529e1ab commit c0b89eb
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 3 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,10 @@ subscriptionKeySharedEnable=true
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

# After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is too large to
# persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent.
dispatcherPauseOnAckStatePersistentEnabled=false

# If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in
# multiple entries.
persistentUnackedRangesWithMultipleEntriesEnabled=false
Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,10 @@ configurationStoreServers=
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

# After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is too large to
# persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent.
dispatcherPauseOnAckStatePersistentEnabled=false

# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2096,6 +2096,13 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
)
private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is"
+ " too large to persist, it will help to reduce the duplicates caused by the ack state that can not be"
+ " fully persistent. Default false.")
private boolean dispatcherPauseOnAckStatePersistentEnabled = false;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.PulsarStandaloneStarter;
import org.testng.annotations.Test;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import org.apache.pulsar.PulsarStandaloneStarter;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class StandaloneTest {
Expand Down Expand Up @@ -54,12 +55,13 @@ public void testWithoutMetadataStoreUrlInConfFile() throws Exception {
}

@Test
public void testAdvertised() throws Exception {
public void testInitialize() throws Exception {
String[] args = new String[]{"--config",
"./src/test/resources/configurations/pulsar_broker_test_standalone.conf"};
PulsarStandaloneStarter standalone = new TestPulsarStandaloneStarter(args);
assertNull(standalone.getConfig().getAdvertisedAddress());
assertEquals(standalone.getConfig().getAdvertisedListeners(),
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void testInit() throws Exception {
assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first");
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
}
Expand Down Expand Up @@ -289,6 +290,7 @@ public void testTransactionBatchConfigurations() throws Exception{
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 512);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1024 * 1024 * 4);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 1);
assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), false);
}
// pulsar_broker_test.conf.
try (InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(fileName)) {
Expand All @@ -301,6 +303,7 @@ public void testTransactionBatchConfigurations() throws Exception{
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 44);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 55);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 66);
assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), true);
}
// string input stream.
StringBuilder stringBuilder = new StringBuilder();
Expand All @@ -312,6 +315,7 @@ public void testTransactionBatchConfigurations() throws Exception{
stringBuilder.append("transactionPendingAckBatchedWriteMaxRecords=521").append(System.lineSeparator());
stringBuilder.append("transactionPendingAckBatchedWriteMaxSize=1025").append(System.lineSeparator());
stringBuilder.append("transactionPendingAckBatchedWriteMaxDelayInMillis=20").append(System.lineSeparator());
stringBuilder.append("dispatcherPauseOnAckStatePersistentEnabled=true").append(System.lineSeparator());
try(ByteArrayInputStream inputStream =
new ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))){
configuration = PulsarConfigurationLoader.create(inputStream, ServiceConfiguration.class);
Expand All @@ -323,6 +327,7 @@ public void testTransactionBatchConfigurations() throws Exception{
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 521);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1025);
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(), 20);
assertEquals(configuration.isDispatcherPauseOnAckStatePersistentEnabled(), true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true

### --- Transaction config variables --- ###
transactionLogBatchedWriteEnabled=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,4 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true

0 comments on commit c0b89eb

Please sign in to comment.