Skip to content

Commit

Permalink
[improve][broker] Remove enableReplicatedSubscriptions config
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Feb 20, 2025
1 parent 5a59ab7 commit e4b9369
Show file tree
Hide file tree
Showing 14 changed files with 5 additions and 140 deletions.
3 changes: 0 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,6 @@ delayedDeliveryMaxDelayInMillis=0
# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

# Enable tracking of replicated subscriptions state across clusters.
enableReplicatedSubscriptions=true

# Frequency of snapshots for replicated subscriptions tracking.
replicatedSubscriptionsSnapshotFrequencyMillis=1000

Expand Down
3 changes: 0 additions & 3 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,6 @@ delayedDeliveryTickTimeMillis=1000
# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

# Enable tracking of replicated subscriptions state across clusters.
enableReplicatedSubscriptions=true

# Frequency of snapshots for replicated subscriptions tracking.
replicatedSubscriptionsSnapshotFrequencyMillis=1000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1426,11 +1426,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable tracking of replicated subscriptions state across clusters.")
private boolean enableReplicatedSubscriptions = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Frequency of snapshots for replicated subscriptions tracking.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ protected CompletableFuture<Void> prepareCreateProducer() {
@Override
protected boolean replicateEntries(List<Entry> entries) {
boolean atLeastOneMessageSentForReplication = false;
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();

boolean isEnableReplicatedSubscriptions = topic.getReplicatedSubscriptionController().isPresent();
try {
// This flag is set to true when we skip at least one local message,
// in order to skip remaining local messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public boolean isReplicated() {
public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
if (!replicated) {
this.replicatedSubscriptionSnapshotCache = null;
} else if (this.replicatedSubscriptionSnapshotCache == null) {
this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
Expand All @@ -215,12 +215,7 @@ public boolean setReplicated(boolean replicated) {

if (this.cursor != null) {
if (replicated) {
if (!config.isEnableReplicatedSubscriptions()) {
log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the "
+ "configuration enableReplicatedSubscriptions", topicName, subName, replicated);
} else {
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
} else {
return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,12 +925,6 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
Boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;
if (replicatedSubscriptionState != null && replicatedSubscriptionState
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
replicatedSubscriptionState = false;
}

if (subType == SubType.Key_Shared
&& !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
return FutureUtil.failedFuture(
Expand Down Expand Up @@ -4066,16 +4060,13 @@ public synchronized void checkReplicatedSubscriptionControllerState() {

private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
boolean replicationEnabled = this.topicPolicies.getReplicationClusters().get().size() > 1;

if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions && replicationEnabled) {
if (shouldBeEnabled && !isCurrentlyEnabled && replicationEnabled) {
log.info("[{}] Enabling replicated subscriptions controller", topic);
replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
brokerService.pulsar().getConfiguration().getClusterName()));
} else if (isCurrentlyEnabled && (!shouldBeEnabled || !isEnableReplicatedSubscriptions
|| !replicationEnabled)) {
} else if (isCurrentlyEnabled && (!shouldBeEnabled || !replicationEnabled)) {
log.info("[{}] Disabled replicated subscriptions controller", topic);
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
replicatedSubscriptionsController = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName,
config.setBacklogQuotaCheckIntervalInSeconds(5);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setTlsTrustCertsFilePath(caCertPath);
config.setTlsCertificateFilePath(brokerCertPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setBacklogQuotaCheckIntervalInSeconds(5);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadBalancerSheddingEnabled(false);
config.setTlsTrustCertsFilePath(caCertPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setBacklogQuotaCheckIntervalInSeconds(5);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadBalancerSheddingEnabled(false);
config.setForceDeleteNamespaceAllowed(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setBacklogQuotaCheckIntervalInSeconds(5);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadBalancerSheddingEnabled(false);
config.setForceDeleteNamespaceAllowed(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ public void setConfig2DefaultValue() {

public void setConfig4DefaultValue() {
setConfigDefaults(config4, cluster4, bkEnsemble4);
config4.setEnableReplicatedSubscriptions(false);
}

private void setConfigDefaults(ServiceConfiguration config, String clusterName,
Expand Down Expand Up @@ -373,7 +372,6 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName,
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadManagerClassName(getLoadManagerClassName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder p

@Test
public void createReplicatedSubscription() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
String topic = BrokerTestUtil.newUniqueName("createReplicatedSubscription");

@Cleanup
Expand All @@ -77,7 +76,6 @@ public void createReplicatedSubscription() throws Exception {

@Test
public void upgradeToReplicatedSubscription() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
String topic = BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscription");

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
Expand All @@ -103,7 +101,6 @@ public void upgradeToReplicatedSubscription() throws Exception {

@Test
public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
String topic = BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscriptionAfterRestart");

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
Expand All @@ -128,19 +125,4 @@ public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception {
assertTrue(stats.getSubscriptions().get("sub").isReplicated());
consumer.close();
}

@Test
public void testDisableReplicatedSubscriptions() throws Exception {
this.conf.setEnableReplicatedSubscriptions(false);
String topic = BrokerTestUtil.newUniqueName("disableReplicatedSubscriptions");
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.replicateSubscriptionState(true)
.subscribe();

TopicStats stats = admin.topics().getStats(topic);
assertFalse(stats.getSubscriptions().get("sub").isReplicated());
consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,6 @@ public void testTBRecoverChangeStateError() throws InterruptedException, Timeout
ScheduledExecutorService executorServiceRecover = mock(ScheduledExecutorService.class);
// Mock serviceConfiguration.
ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false);
when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
// Mock executorProvider.
ExecutorProvider executorProvider = mock(ExecutorProvider.class);
Expand Down

This file was deleted.

0 comments on commit e4b9369

Please sign in to comment.