diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 362fc66e4d867..922d85df709cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -128,6 +128,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.TopicEventsListener.EventStage; import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; @@ -1325,7 +1326,11 @@ private CompletableFuture> createNonPersistentTopic(String topic final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); NonPersistentTopic nonPersistentTopic; try { - nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); + if (isSystemTopic(topic)) { + nonPersistentTopic = new NonPersistentSystemTopic(topic, this); + } else { + nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); + } nonPersistentTopic.setCreateFuture(topicFuture); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java new file mode 100644 index 0000000000000..9b867c9a8b3b6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import org.apache.pulsar.broker.service.BrokerService; + +public class NonPersistentSystemTopic extends NonPersistentTopic { + public NonPersistentSystemTopic(String topic, BrokerService brokerService) { + super(topic, brokerService); + } + + @Override + public boolean isSystemTopic() { + return true; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 0909944f349d8..0417b6fc14412 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -78,6 +78,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.Cleanup; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.reflect.FieldUtils; @@ -101,6 +102,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; import org.apache.pulsar.broker.loadbalance.extensions.store.TableViewLoadDataStoreImpl; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.lookup.LookupResult; @@ -110,6 +112,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -130,6 +133,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -137,6 +141,7 @@ import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.AssertJUnit; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; @@ -2183,6 +2188,43 @@ public void compactionScheduleTest() { }); } + @Test + public void testSystemNonPersistentTopicSchemaCompatibility() throws Exception { + String topicName = ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC; + NonPersistentSystemTopic topic = new NonPersistentSystemTopic(topicName, pulsar.getBrokerService()); + Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy()); + + var brokerLoadDataStore = LoadDataStoreFactory.create(pulsar, topicName, BrokerLoadDataV1.class); + brokerLoadDataStore.init(); + brokerLoadDataStore.pushAsync("key", new BrokerLoadDataV1()).get(); + Awaitility.await().until(() -> { + var data = brokerLoadDataStore.get("key"); + return data.isPresent(); + }); + brokerLoadDataStore.pushAsync("key", null).get(); + brokerLoadDataStore.close(); + } + + @Data + private static class BrokerLoadDataV1 { + private ResourceUsage cpu; + private ResourceUsage memory; + private ResourceUsage directMemory; + private ResourceUsage bandwidthIn; + private ResourceUsage bandwidthOut; + private double msgThroughputIn; + private double msgThroughputOut; + private double msgRateIn; + private double msgRateOut; + private int bundleCount; + private int topics; + private double maxResourceUsage; + private double weightedMaxEMA; + private double msgThroughputEMA; + private long updatedAt; + private long reportedAt; + } + @Test(timeOut = 30 * 1000) public void testMonitorBrokerRegistry() throws MetadataStoreException { primaryLoadManager.getBrokerRegistry().unregister();