diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index 1f4d2c267975c..1d5e0a3568fae 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -22,6 +22,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.metrics.ObservableLongCounter; import io.prometheus.client.Gauge; +import java.util.ArrayDeque; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ScheduledExecutorService; @@ -31,7 +32,6 @@ import org.apache.pulsar.opentelemetry.Constants; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization; import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; -import org.jctools.queues.SpscArrayQueue; @Slf4j public class InflightReadsLimiter implements AutoCloseable { @@ -51,6 +51,7 @@ public class InflightReadsLimiter implements AutoCloseable { public static final String INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME = "pulsar.broker.managed_ledger.inflight.read.usage"; private final ObservableLongCounter inflightReadsUsageCounter; + private final int maxReadsInFlightAcquireQueueSize; @PulsarDeprecatedMetric(newMetricName = INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME) @Deprecated @@ -82,9 +83,10 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui this.remainingBytes = maxReadsInFlightSize; this.acquireTimeoutMillis = acquireTimeoutMillis; this.timeOutExecutor = timeOutExecutor; + this.maxReadsInFlightAcquireQueueSize = maxReadsInFlightAcquireQueueSize; if (maxReadsInFlightSize > 0) { enabled = true; - this.queuedHandles = new SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize); + this.queuedHandles = new ArrayDeque<>(); } else { enabled = false; this.queuedHandles = null; @@ -174,13 +176,14 @@ private synchronized Optional internalAcquire(long permits, Consumer= maxReadsInFlightAcquireQueueSize) { log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}", permits, handle.creationTime, remainingBytes); return Optional.of(new Handle(0, handle.creationTime, false)); + } else { + queuedHandles.offer(new QueuedHandle(handle, callback)); + scheduleTimeOutCheck(acquireTimeoutMillis); + return Optional.empty(); } } }