From 3c62bdc2d31986f472793dd00a87549c8b09852b Mon Sep 17 00:00:00 2001 From: account guanyue <690010051@qq.com> Date: Wed, 12 Feb 2025 00:05:33 +0800 Subject: [PATCH 1/5] Use lock-free queues in InflightReadsLimiter.java to improve performance --- .../bookkeeper/mledger/impl/cache/InflightReadsLimiter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index 1f4d2c267975c..035d3f9831659 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -22,6 +22,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.metrics.ObservableLongCounter; import io.prometheus.client.Gauge; +import java.util.ArrayDeque; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ScheduledExecutorService; @@ -31,7 +32,6 @@ import org.apache.pulsar.opentelemetry.Constants; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization; import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; -import org.jctools.queues.SpscArrayQueue; @Slf4j public class InflightReadsLimiter implements AutoCloseable { @@ -84,7 +84,7 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui this.timeOutExecutor = timeOutExecutor; if (maxReadsInFlightSize > 0) { enabled = true; - this.queuedHandles = new SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize); + this.queuedHandles = new ArrayDeque<>(maxReadsInFlightAcquireQueueSize); } else { enabled = false; this.queuedHandles = null; From 39e0bd28ae43d46628735ec75a936019efaec892 Mon Sep 17 00:00:00 2001 From: account guanyue <690010051@qq.com> Date: Sat, 15 Feb 2025 21:36:34 +0800 Subject: [PATCH 2/5] Use lock-free queues in InflightReadsLimiter.java to improve performance --- .../mledger/impl/cache/InflightReadsLimiter.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index 035d3f9831659..af75cb2b23aef 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -32,6 +32,7 @@ import org.apache.pulsar.opentelemetry.Constants; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization; import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; +import org.jctools.queues.SpscArrayQueue; @Slf4j public class InflightReadsLimiter implements AutoCloseable { @@ -51,6 +52,7 @@ public class InflightReadsLimiter implements AutoCloseable { public static final String INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME = "pulsar.broker.managed_ledger.inflight.read.usage"; private final ObservableLongCounter inflightReadsUsageCounter; + private final int maxReadsInFlightAcquireQueueSize; @PulsarDeprecatedMetric(newMetricName = INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME) @Deprecated @@ -82,6 +84,7 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui this.remainingBytes = maxReadsInFlightSize; this.acquireTimeoutMillis = acquireTimeoutMillis; this.timeOutExecutor = timeOutExecutor; + this.maxReadsInFlightAcquireQueueSize = maxReadsInFlightAcquireQueueSize; if (maxReadsInFlightSize > 0) { enabled = true; this.queuedHandles = new ArrayDeque<>(maxReadsInFlightAcquireQueueSize); @@ -174,13 +177,14 @@ private synchronized Optional internalAcquire(long permits, Consumer= maxReadsInFlightAcquireQueueSize) { log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}", permits, handle.creationTime, remainingBytes); return Optional.of(new Handle(0, handle.creationTime, false)); + }else { + queuedHandles.offer(new QueuedHandle(handle, callback)); + scheduleTimeOutCheck(acquireTimeoutMillis); + return Optional.empty(); } } } From 19c433155e7d40bf6001b464e5d4b2ceb7a5092c Mon Sep 17 00:00:00 2001 From: account guanyue <690010051@qq.com> Date: Sat, 15 Feb 2025 21:37:20 +0800 Subject: [PATCH 3/5] Use lock-free queues in InflightReadsLimiter.java to improve performance --- .../bookkeeper/mledger/impl/cache/InflightReadsLimiter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index af75cb2b23aef..0c0850d978e6a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -32,7 +32,6 @@ import org.apache.pulsar.opentelemetry.Constants; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization; import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; -import org.jctools.queues.SpscArrayQueue; @Slf4j public class InflightReadsLimiter implements AutoCloseable { From 3aa80462ef749d887a21b79b3407eb2aaaffa46b Mon Sep 17 00:00:00 2001 From: account guanyue <690010051@qq.com> Date: Sat, 15 Feb 2025 21:43:34 +0800 Subject: [PATCH 4/5] Use lock-free queues in InflightReadsLimiter.java to improve performance --- .../bookkeeper/mledger/impl/cache/InflightReadsLimiter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index 0c0850d978e6a..6b43bb4b64b46 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -180,7 +180,7 @@ private synchronized Optional internalAcquire(long permits, Consumer Date: Thu, 20 Feb 2025 12:24:17 +0800 Subject: [PATCH 5/5] Use lock-free queues in InflightReadsLimiter.java to improve performance --- .../bookkeeper/mledger/impl/cache/InflightReadsLimiter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index 6b43bb4b64b46..1d5e0a3568fae 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -86,7 +86,7 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui this.maxReadsInFlightAcquireQueueSize = maxReadsInFlightAcquireQueueSize; if (maxReadsInFlightSize > 0) { enabled = true; - this.queuedHandles = new ArrayDeque<>(maxReadsInFlightAcquireQueueSize); + this.queuedHandles = new ArrayDeque<>(); } else { enabled = false; this.queuedHandles = null;