From 6dd94a3f4cdad5011fdfd801e59c44e3c9e0739e Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Mon, 19 Feb 2024 21:04:01 +0800 Subject: [PATCH] [branch-2.10][improve][broker] Do not retain the data in the system topic (#22031) ### Motivation For some use case, the users need to store all the messages even though these message are acked by all subscription. So they set the retention policy of the namespace to infinite retention (setting both time and size limits to `-1`). But the data in the system topic does not need for infinite retention. ### Modifications For system topics, do not retain messages that have already been acknowledged. --- ...i-integration-backwards-compatibility.yaml | 14 +++++- .../ci-integration-pulsar-io-ora.yaml | 13 ++++- .../workflows/ci-integration-pulsar-io.yaml | 13 ++++- .../server/src/assemble/LICENSE.bin.txt | 3 +- .../apache/bookkeeper/test/ZooKeeperUtil.java | 4 ++ .../pulsar/broker/service/BrokerService.java | 16 +++++-- .../broker/service/OneWayReplicatorTest.java | 1 - .../pulsar/broker/service/ReplicatorTest.java | 1 + .../pendingack/PendingAckPersistentTest.java | 4 +- .../compaction/CompactionRetentionTest.java | 48 +++++++++++++++++++ .../common/events/EventsTopicNames.java | 7 +++ .../pulsar/common/naming/TopicName.java | 1 + pulsar-sql/presto-distribution/LICENSE | 2 +- 13 files changed, 112 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci-integration-backwards-compatibility.yaml b/.github/workflows/ci-integration-backwards-compatibility.yaml index 2a5eca9d9e1ad..525aed662509b 100644 --- a/.github/workflows/ci-integration-backwards-compatibility.yaml +++ b/.github/workflows/ci-integration-backwards-compatibility.yaml @@ -77,9 +77,19 @@ jobs: - name: clean disk if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | - sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc - sudo apt clean + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" + sudo swapoff -a + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL + echo "::group::Cleaning apt state" + time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove" + time df -BM / /mnt + echo "::endgroup::" docker rmi $(docker images -q) -f + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" df -h - name: run install by skip tests diff --git a/.github/workflows/ci-integration-pulsar-io-ora.yaml b/.github/workflows/ci-integration-pulsar-io-ora.yaml index 41fd95355c394..22d8ba320ee37 100644 --- a/.github/workflows/ci-integration-pulsar-io-ora.yaml +++ b/.github/workflows/ci-integration-pulsar-io-ora.yaml @@ -77,10 +77,19 @@ jobs: - name: clean disk if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" sudo swapoff -a - sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc - sudo apt clean + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL + echo "::group::Cleaning apt state" + time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove" + time df -BM / /mnt + echo "::endgroup::" docker rmi $(docker images -q) -f + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" df -h - name: run install by skip tests diff --git a/.github/workflows/ci-integration-pulsar-io.yaml b/.github/workflows/ci-integration-pulsar-io.yaml index e08cc261e7928..e7a590057c218 100644 --- a/.github/workflows/ci-integration-pulsar-io.yaml +++ b/.github/workflows/ci-integration-pulsar-io.yaml @@ -77,10 +77,19 @@ jobs: - name: clean disk if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" sudo swapoff -a - sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc - sudo apt clean + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL + echo "::group::Cleaning apt state" + time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove" + time df -BM / /mnt + echo "::endgroup::" docker rmi $(docker images -q) -f + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" df -h - name: run install by skip tests diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 2fb5fc09ac2f9..6084942e8c039 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -367,7 +367,6 @@ The Apache Software License, Version 2.0 - io.netty-netty-transport-4.1.100.Final.jar - io.netty-netty-transport-classes-epoll-4.1.100.Final.jar - io.netty-netty-transport-native-epoll-4.1.100.Final-linux-x86_64.jar - - io.netty-netty-transport-native-epoll-4.1.100.Final.jar - io.netty-netty-transport-native-unix-common-4.1.100.Final.jar - io.netty-netty-transport-native-unix-common-4.1.100.Final-linux-x86_64.jar - io.netty-netty-tcnative-boringssl-static-2.0.61.Final.jar @@ -498,7 +497,7 @@ The Apache Software License, Version 2.0 - org.apache.curator-curator-framework-5.1.0.jar - org.apache.curator-curator-recipes-5.1.0.jar * Apache Yetus - - org.apache.yetus-audience-annotations-0.13.0.jar + - org.apache.yetus-audience-annotations-0.12.0.jar * Kubernetes Client - io.kubernetes-client-java-18.0.0.jar - io.kubernetes-client-java-api-18.0.0.jar diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java index 662d20333baca..602497e799037 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; @@ -54,6 +55,9 @@ public class ZooKeeperUtil { private String connectString; public ZooKeeperUtil() { + String loopbackIPAddr = InetAddress.getLoopbackAddress().getHostAddress(); + zkaddr = new InetSocketAddress(loopbackIPAddr, 0); + connectString = loopbackIPAddr + ":" + zooKeeperPort; } public ZooKeeper getZooKeeperClient() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e604721f6b3ef..875162df1c146 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1706,10 +1706,18 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T } if (retentionPolicies == null) { - retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( - () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), - serviceConfig.getDefaultRetentionSizeInMB()) - ); + if (EventsTopicNames.checkTopicIsEventsNames(topicName) + || EventsTopicNames.isTransactionInternalName(topicName)) { + if (log.isDebugEnabled()) { + log.debug("{} Disable data retention policy for system topic.", topicName); + } + retentionPolicies = new RetentionPolicies(0, 0); + } else { + retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( + () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), + serviceConfig.getDefaultRetentionSizeInMB()) + ); + } } ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index f4101dfdaa3a9..0e9262efadf8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 85c8eca8fa054..dbeb98aa59060 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1682,6 +1682,7 @@ public void testReplicatorWithFailedAck() throws Exception { MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + Awaitility.await().untilAsserted(() -> assertNotNull(topic.getReplicators().get("r2"))); ConcurrentOpenHashMap replicators = topic.getReplicators(); PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 2ef8b8f68eee3..1eb04ca1ccf0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -460,7 +460,9 @@ public void testDeleteUselessLogDataWhenSubCursorMoved() throws Exception { Message message2 = consumer.receive(5, TimeUnit.SECONDS); consumer.acknowledgeAsync(message2.getMessageId(), transaction2).get(); - Assert.assertEquals(pendingAckLogIndex.size(), 0); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(pendingAckLogIndex.size(), 0); + }); maxIndexLag = (long) field4.get(pendingAckStore); Assert.assertEquals(maxIndexLag, 5); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java index 8696e76150676..bd645d4e979d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -37,6 +37,7 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -44,9 +45,13 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.events.EventsTopicNames; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -208,6 +213,49 @@ public void testCompactionRetentionOnTopicCreationWithTopicPolicies() throws Exc ); } + @Test + public void testRetentionPolicesForSystemTopic() throws Exception { + String namespace = "my-tenant/my-ns"; + String topicPrefix = "persistent://" + namespace + "/"; + admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, -1)); + // Check event topics and transaction internal topics. + for (String eventTopic : EventsTopicNames.EVENTS_TOPIC_NAMES) { + checkSystemTopicRetentionPolicy(topicPrefix + eventTopic); + } + checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_ASSIGN); + checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_LOG); + checkSystemTopicRetentionPolicy(topicPrefix + TopicName.PENDING_ACK_STORE_SUFFIX); + + // Check common topics. + checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + System.nanoTime()); + // Specify retention policies for system topic. + pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true); + pulsar.getConfiguration().setSystemTopicEnabled(true); + admin.topics().createNonPartitionedTopic(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + admin.topicPolicies().setRetention(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT, + new RetentionPolicies(10, 10)); + Awaitility.await().untilAsserted(() -> { + checkTopicRetentionPolicy(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT, + new RetentionPolicies(10, 10)); + }); + } + + private void checkSystemTopicRetentionPolicy(String topicName) throws Exception { + checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0)); + + } + + private void checkCommonTopicRetentionPolicy(String topicName) throws Exception { + checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1)); + } + + private void checkTopicRetentionPolicy(String topicName, RetentionPolicies retentionPolicies) throws Exception { + ManagedLedgerConfig config = pulsar.getBrokerService() + .getManagedLedgerConfig(TopicName.get(topicName)).get(); + Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB()); + Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L); + } + private void testCompactionCursorRetention(String topic) throws Exception { Set keys = Sets.newHashSet("a", "b", "c"); Set keysToExpire = Sets.newHashSet("x1", "x2"); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java index 79e950842478d..f6f2c17fd8629 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java @@ -53,6 +53,13 @@ public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicNa .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); } + public static boolean isTransactionInternalName(TopicName topicName) { + String topic = topicName.toString(); + return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) + || topic.startsWith(TopicName.TRANSACTION_COORDINATOR_LOG.toString()) + || topic.endsWith(TopicName.PENDING_ACK_STORE_SUFFIX); + } + public static boolean isTopicPoliciesSystemTopic(String topic) { if (topic == null) { return false; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 97f99b19f19cc..04d96fd54a110 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -66,6 +66,7 @@ public TopicName load(String name) throws Exception { public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); + public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; return TopicName.get(name); diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index 0c8c111795942..e003a28c27c24 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -473,7 +473,7 @@ The Apache Software License, Version 2.0 - zookeeper-3.9.1.jar - zookeeper-jute-3.9.1.jar * Apache Yetus Audience Annotations - - audience-annotations-0.13.0.jar + - audience-annotations-0.12.0.jar * Swagger - swagger-annotations-1.6.10.jar * Perfmark