Skip to content

Commit

Permalink
Additional metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas committed Dec 17, 2024
1 parent f144e8b commit 02c9f15
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void invoke(Runnable runnable) throws Exception {

@Override
public Optional<Token> tryAcquire(boolean wait) {
return aimdLimitImpl.tryAcquire();
return aimdLimitImpl.tryAcquire(wait);
}

@SuppressWarnings("removal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ class AimdLimitImpl {

private final Supplier<Long> clock;
private final AtomicInteger concurrentRequests;
private final AtomicInteger rejectedRequests;
private final AdjustableSemaphore semaphore;
private final LimitHandlers.LimiterHandler handler;

private final AtomicInteger limit;
private final Lock limitLock = new ReentrantLock();
private final int queueLength;

private Timer rttTimer;
private Timer queueWaitTimer;

AimdLimitImpl(AimdLimitConfig config) {
int initialLimit = config.initialLimit();
Expand All @@ -60,18 +62,15 @@ class AimdLimitImpl {
this.clock = config.clock().orElseGet(() -> System::nanoTime);

this.concurrentRequests = new AtomicInteger();
this.rejectedRequests = new AtomicInteger();
this.limit = new AtomicInteger(initialLimit);

this.queueLength = config.queueLength();
this.semaphore = new AdjustableSemaphore(initialLimit, config.fair());
if (config.queueLength() == 0) {
this.handler = new LimitHandlers.RealSemaphoreHandler(semaphore,
() -> new AimdToken(clock, concurrentRequests));
} else {
this.handler = new LimitHandlers.QueuedSemaphoreHandler(semaphore,
config.queueLength(),
config.queueTimeout(),
() -> new AimdToken(clock, concurrentRequests));
}
this.handler = new LimitHandlers.QueuedSemaphoreHandler(semaphore,
queueLength,
config.queueTimeout(),
() -> new AimdToken(clock, concurrentRequests));

if (!(backoffRatio < 1.0 && backoffRatio >= 0.5)) {
throw new ConfigException("Backoff ratio must be within [0.5, 1.0)");
Expand All @@ -95,8 +94,23 @@ int currentLimit() {
return limit.get();
}

Optional<Limit.Token> tryAcquire() {
return handler.tryAcquire();
Optional<LimitAlgorithm.Token> tryAcquire(boolean wait) {
Optional<LimitAlgorithm.Token> token = handler.tryAcquire(false);
if (token.isPresent()) {
return token;
}
if (wait && queueLength > 0) {
long startWait = clock.get();
token = handler.tryAcquire(true);
if (token.isPresent()) {
if (queueWaitTimer != null) {
queueWaitTimer.record(clock.get() - startWait, TimeUnit.NANOSECONDS);
}
return token;
}
}
rejectedRequests.getAndIncrement();
return token;
}

void invoke(Runnable runnable) throws Exception {
Expand All @@ -107,7 +121,7 @@ void invoke(Runnable runnable) throws Exception {
}

<T> T invoke(Callable<T> callable) throws Exception {
Optional<LimitAlgorithm.Token> optionalToken = handler.tryAcquire();
Optional<LimitAlgorithm.Token> optionalToken = tryAcquire(true);
if (optionalToken.isPresent()) {
LimitAlgorithm.Token token = optionalToken.get();
try {
Expand Down Expand Up @@ -189,14 +203,23 @@ void initMetrics(String socketName, AimdLimitConfig config) {
namePrefix + "_concurrent_requests", concurrentRequests::get).scope(VENDOR);
meterRegistry.getOrCreate(concurrentRequestsBuilder);

Gauge.Builder<Integer> rejectedRequestsBuilder = metricsFactory.gaugeBuilder(
namePrefix + "_rejected_requests", rejectedRequests::get).scope(VENDOR);
meterRegistry.getOrCreate(rejectedRequestsBuilder);

Gauge.Builder<Integer> queueLengthBuilder = metricsFactory.gaugeBuilder(
namePrefix + "_queue_length", semaphore::getQueueLength).scope(VENDOR);
meterRegistry.getOrCreate(queueLengthBuilder);

Timer.Builder timerBuilder = metricsFactory.timerBuilder(namePrefix + "_rtt")
Timer.Builder rttTimerBuilder = metricsFactory.timerBuilder(namePrefix + "_rtt")
.scope(VENDOR)
.baseUnit(Timer.BaseUnits.MILLISECONDS);
rttTimer = meterRegistry.getOrCreate(rttTimerBuilder);

Timer.Builder waitTimerBuilder = metricsFactory.timerBuilder(namePrefix + "_queue_wait_time")
.scope(VENDOR)
.baseUnit(Timer.BaseUnits.MILLISECONDS);
rttTimer = meterRegistry.getOrCreate(timerBuilder);
queueWaitTimer = meterRegistry.getOrCreate(waitTimerBuilder);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

import io.helidon.builder.api.RuntimeType;
import io.helidon.common.config.Config;
import io.helidon.metrics.api.Gauge;
import io.helidon.metrics.api.MeterRegistry;
import io.helidon.metrics.api.Metrics;
import io.helidon.metrics.api.MetricsFactory;
import io.helidon.metrics.api.Timer;

import static io.helidon.metrics.api.Meter.Scope.VENDOR;

Expand All @@ -39,6 +43,7 @@
@SuppressWarnings("removal")
@RuntimeType.PrototypedBy(FixedLimitConfig.class)
public class FixedLimit implements Limit, SemaphoreLimit, RuntimeType.Api<FixedLimitConfig> {

/**
* Default limit, meaning unlimited execution.
*/
Expand All @@ -58,23 +63,32 @@ public class FixedLimit implements Limit, SemaphoreLimit, RuntimeType.Api<FixedL
private final LimitHandlers.LimiterHandler handler;
private final int initialPermits;
private final Semaphore semaphore;
private final Supplier<Long> clock;
private final AtomicInteger concurrentRequests;
private final AtomicInteger rejectedRequests;
private final int queueLength;

private Timer rttTimer;
private Timer queueWaitTimer;

private FixedLimit(FixedLimitConfig config) {
this.config = config;
this.concurrentRequests = new AtomicInteger();
this.rejectedRequests = new AtomicInteger();
this.clock = config.clock().orElseGet(() -> System::nanoTime);

if (config.permits() == 0 && config.semaphore().isEmpty()) {
this.handler = new LimitHandlers.NoOpSemaphoreHandler();
this.semaphore = null;
this.initialPermits = 0;
semaphore = null;
this.queueLength = 0;
this.handler = new LimitHandlers.NoOpSemaphoreHandler();
} else {
semaphore = config.semaphore().orElseGet(() -> new Semaphore(config.permits(), config.fair()));
this.semaphore = config.semaphore().orElseGet(() -> new Semaphore(config.permits(), config.fair()));
this.initialPermits = semaphore.availablePermits();
if (config.queueLength() == 0) {
this.handler = new LimitHandlers.RealSemaphoreHandler(semaphore);
} else {
this.handler = new LimitHandlers.QueuedSemaphoreHandler(semaphore,
config.queueLength(),
config.queueTimeout());
}
this.queueLength = Math.max(0, config.queueLength());
this.handler = new LimitHandlers.QueuedSemaphoreHandler(semaphore,
queueLength,
config.queueTimeout());
}
}

Expand Down Expand Up @@ -144,18 +158,59 @@ public static FixedLimit create(Consumer<FixedLimitConfig.Builder> consumer) {
}

@Override
public <T> T invoke(Callable<T> callable) throws Exception {
return handler.invoke(callable);
public Optional<Token> tryAcquire(boolean wait) {
Optional<LimitAlgorithm.Token> token = handler.tryAcquire(false);
if (token.isPresent()) {
return token;
}
if (wait && queueLength > 0) {
long startWait = clock.get();
token = handler.tryAcquire(true);
if (token.isPresent()) {
if (queueWaitTimer != null) {
queueWaitTimer.record(clock.get() - startWait, TimeUnit.NANOSECONDS);
}
return token;
}
}
rejectedRequests.getAndIncrement();
return token;
}

@Override
public void invoke(Runnable runnable) throws Exception {
handler.invoke(runnable);
public <T> T invoke(Callable<T> callable) throws Exception {
Optional<LimitAlgorithm.Token> optionalToken = tryAcquire(true);
if (optionalToken.isPresent()) {
LimitAlgorithm.Token token = optionalToken.get();
try {
concurrentRequests.getAndIncrement();
long startTime = clock.get();
T response = callable.call();
if (rttTimer != null) {
rttTimer.record(clock.get() - startTime, TimeUnit.NANOSECONDS);
}
token.success();
return response;
} catch (IgnoreTaskException e) {
token.ignore();
return e.handle();
} catch (Throwable e) {
token.dropped();
throw e;
} finally {
concurrentRequests.getAndDecrement();
}
} else {
throw new LimitException("No more permits available for the semaphore");
}
}

@Override
public Optional<Token> tryAcquire(boolean wait) {
return handler.tryAcquire(wait);
public void invoke(Runnable runnable) throws Exception {
invoke(() -> {
runnable.run();
return null;
});
}

@SuppressWarnings("removal")
Expand Down Expand Up @@ -209,6 +264,16 @@ public void init(String socketName) {
Gauge.Builder<Integer> queueLengthBuilder = metricsFactory.gaugeBuilder(
namePrefix + "_queue_length", semaphore::getQueueLength).scope(VENDOR);
meterRegistry.getOrCreate(queueLengthBuilder);

Timer.Builder rttTimerBuilder = metricsFactory.timerBuilder(namePrefix + "_rtt")
.scope(VENDOR)
.baseUnit(Timer.BaseUnits.MILLISECONDS);
rttTimer = meterRegistry.getOrCreate(rttTimerBuilder);

Timer.Builder waitTimerBuilder = metricsFactory.timerBuilder(namePrefix + "_queue_wait_time")
.scope(VENDOR)
.baseUnit(Timer.BaseUnits.MILLISECONDS);
queueWaitTimer = meterRegistry.getOrCreate(waitTimerBuilder);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;

import io.helidon.builder.api.Option;
import io.helidon.builder.api.Prototype;
Expand Down Expand Up @@ -96,6 +97,13 @@ interface FixedLimitConfigBlueprint extends Prototype.Factory<FixedLimit> {
*/
Optional<Semaphore> semaphore();

/**
* A clock that supplies nanosecond time.
*
* @return supplier of current nanoseconds, defaults to {@link java.lang.System#nanoTime()}
*/
Optional<Supplier<Long>> clock();

/**
* Whether to collect metrics for the AIMD implementation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,65 +72,6 @@ public Semaphore semaphore() {
}
}

@SuppressWarnings("removal")
static class RealSemaphoreHandler implements LimiterHandler {
private final Semaphore semaphore;
private final Supplier<Token> tokenSupplier;

RealSemaphoreHandler(Semaphore semaphore) {
this.semaphore = semaphore;
this.tokenSupplier = () -> new SemaphoreToken(semaphore);
}

RealSemaphoreHandler(Semaphore semaphore, Supplier<Token> tokenSupplier) {
this.semaphore = semaphore;
this.tokenSupplier = tokenSupplier;
}

@Override
public <T> T invoke(Callable<T> callable) throws Exception {
if (semaphore.tryAcquire()) {
try {
return callable.call();
} catch (IgnoreTaskException e) {
return e.handle();
} finally {
semaphore.release();
}
} else {
throw new LimitException("No more permits available for the semaphore");
}
}

@Override
public void invoke(Runnable runnable) throws Exception {
if (semaphore.tryAcquire()) {
try {
runnable.run();
} catch (IgnoreTaskException e) {
e.handle();
} finally {
semaphore.release();
}
} else {
throw new LimitException("No more permits available for the semaphore");
}
}

@Override
public Optional<Token> tryAcquire(boolean wait) {
if (!semaphore.tryAcquire()) {
return Optional.empty();
}
return Optional.of(tokenSupplier.get());
}

@Override
public Semaphore semaphore() {
return semaphore;
}
}

static class QueuedSemaphoreHandler implements LimiterHandler {
private final Semaphore semaphore;
private final int queueLength;
Expand All @@ -153,7 +94,7 @@ static class QueuedSemaphoreHandler implements LimiterHandler {

@Override
public Optional<Token> tryAcquire(boolean wait) {
if (semaphore.getQueueLength() >= this.queueLength) {
if (queueLength > 0 && semaphore.getQueueLength() >= queueLength) {
// this is an estimate - we do not promise to be precise here
return Optional.empty();
}
Expand All @@ -163,12 +104,9 @@ public Optional<Token> tryAcquire(boolean wait) {
if (!semaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) {
return Optional.empty();
}
} else {
if (!semaphore.tryAcquire()) {
return Optional.empty();
}
} else if (!semaphore.tryAcquire()) {
return Optional.empty();
}

} catch (InterruptedException e) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void testSemaphoreReleasedWithToken() {
}
}

@RepeatedTest(5)
@RepeatedTest(3)
public void testLimitWithQueue() throws Exception {
AimdLimit limiter = AimdLimit.builder()
.minLimit(1)
Expand Down

0 comments on commit 02c9f15

Please sign in to comment.