Skip to content

Commit

Permalink
[improve][ml] Use lock-free queue in InflightReadsLimiter since there…
Browse files Browse the repository at this point in the history
…'s no concurrent access (apache#23962)

(cherry picked from commit 38a41e0)
(cherry picked from commit c237cb5)
  • Loading branch information
guan46 authored and nikhil-ctds committed Feb 27, 2025
1 parent 1705e6b commit 96dd136
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Gauge;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.jctools.queues.SpscArrayQueue;

@Slf4j
public class InflightReadsLimiter {
Expand All @@ -37,6 +37,7 @@ public class InflightReadsLimiter {
.help("Estimated number of bytes retained by data read from storage or cache")
.register();

private final int maxReadsInFlightAcquireQueueSize;
private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
.build()
.name("pulsar_ml_reads_available_inflight_bytes")
Expand Down Expand Up @@ -64,9 +65,10 @@ public InflightReadsLimiter(long maxReadsInFlightSize, int maxReadsInFlightAcqui
this.remainingBytes = maxReadsInFlightSize;
this.acquireTimeoutMillis = acquireTimeoutMillis;
this.timeOutExecutor = timeOutExecutor;
this.maxReadsInFlightAcquireQueueSize = maxReadsInFlightAcquireQueueSize;
if (maxReadsInFlightSize > 0) {
enabled = true;
this.queuedHandles = new SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize);
this.queuedHandles = new ArrayDeque<>();
} else {
enabled = false;
this.queuedHandles = null;
Expand Down Expand Up @@ -129,13 +131,14 @@ private synchronized Optional<Handle> internalAcquire(long permits, Consumer<Han
updateMetrics();
return Optional.of(new Handle(maxReadsInFlightSize, handle.creationTime, true));
} else {
if (queuedHandles.offer(new QueuedHandle(handle, callback))) {
scheduleTimeOutCheck(acquireTimeoutMillis);
return Optional.empty();
} else {
if (queuedHandles.size() >= maxReadsInFlightAcquireQueueSize) {
log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}",
permits, handle.creationTime, remainingBytes);
return Optional.of(new Handle(0, handle.creationTime, false));
} else {
queuedHandles.offer(new QueuedHandle(handle, callback));
scheduleTimeOutCheck(acquireTimeoutMillis);
return Optional.empty();
}
}
}
Expand Down

0 comments on commit 96dd136

Please sign in to comment.