Skip to content

Commit

Permalink
[improve][broker] Fix non-persistent system topic schema compatibility (
Browse files Browse the repository at this point in the history
#23286)

### Motivation

When upgrading broker version from `3.0.x` to `3.3.x` with `ExtensibleLoadManagerImpl` enabled, it will have an `Unable to read schema` exception. And the broker will fail to start. This issue is caused by #22055 .

### Modifications

Add a new class `NonPersistentSystemTopic`, and it will use for system non-persistent topic.

(cherry picked from commit 7dbd8a5)
  • Loading branch information
Demogorgon314 authored and lhotari committed Feb 20, 2025
1 parent 0c2649d commit 5daea49
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
Expand Down Expand Up @@ -1325,7 +1326,11 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic;
try {
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
if (isSystemTopic(topic)) {
nonPersistentTopic = new NonPersistentSystemTopic(topic, this);
} else {
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
}
nonPersistentTopic.setCreateFuture(topicFuture);
} catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;

import org.apache.pulsar.broker.service.BrokerService;

public class NonPersistentSystemTopic extends NonPersistentTopic {
public NonPersistentSystemTopic(String topic, BrokerService brokerService) {
super(topic, brokerService);
}

@Override
public boolean isSystemTopic() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand All @@ -101,6 +102,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import org.apache.pulsar.broker.loadbalance.extensions.store.TableViewLoadDataStoreImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
Expand All @@ -110,6 +112,7 @@
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -130,13 +133,15 @@
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
Expand Down Expand Up @@ -2183,6 +2188,43 @@ public void compactionScheduleTest() {
});
}

@Test
public void testSystemNonPersistentTopicSchemaCompatibility() throws Exception {
String topicName = ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
NonPersistentSystemTopic topic = new NonPersistentSystemTopic(topicName, pulsar.getBrokerService());
Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());

var brokerLoadDataStore = LoadDataStoreFactory.create(pulsar, topicName, BrokerLoadDataV1.class);
brokerLoadDataStore.init();
brokerLoadDataStore.pushAsync("key", new BrokerLoadDataV1()).get();
Awaitility.await().until(() -> {
var data = brokerLoadDataStore.get("key");
return data.isPresent();
});
brokerLoadDataStore.pushAsync("key", null).get();
brokerLoadDataStore.close();
}

@Data
private static class BrokerLoadDataV1 {
private ResourceUsage cpu;
private ResourceUsage memory;
private ResourceUsage directMemory;
private ResourceUsage bandwidthIn;
private ResourceUsage bandwidthOut;
private double msgThroughputIn;
private double msgThroughputOut;
private double msgRateIn;
private double msgRateOut;
private int bundleCount;
private int topics;
private double maxResourceUsage;
private double weightedMaxEMA;
private double msgThroughputEMA;
private long updatedAt;
private long reportedAt;
}

@Test(timeOut = 30 * 1000)
public void testMonitorBrokerRegistry() throws MetadataStoreException {
primaryLoadManager.getBrokerRegistry().unregister();
Expand Down

0 comments on commit 5daea49

Please sign in to comment.