From 3e63b989096c005e3164d7a74d2ebba5f5a63f57 Mon Sep 17 00:00:00 2001 From: hexiaofeng Date: Wed, 1 Jan 2025 10:22:50 +0800 Subject: [PATCH] Fix limiter synchronized --- .../invoke/ratelimit/AbstractRateLimiter.java | 35 +++++---- .../ratelimit/AbstractRateLimiterGroup.java | 15 ++-- .../leakybucket/LeakyBucketLimiter.java | 9 +-- .../tokenbucket/TokenBucketLimiter.java | 72 +++++++------------ .../ratelimit/Resilience4jRateLimiter.java | 8 +-- 5 files changed, 67 insertions(+), 72 deletions(-) diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java index 03bb86a43..53b234f1f 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java @@ -20,7 +20,6 @@ import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy; import lombok.Getter; -import java.time.Duration; import java.util.concurrent.TimeUnit; /** @@ -39,10 +38,15 @@ public abstract class AbstractRateLimiter implements RateLimiter { @Getter protected final RateLimitPolicy policy; + /** + * The default time unit. + */ + protected final TimeUnit timeUnit; + /** * The default timeout duration for permit acquisition. */ - protected final Duration timeout; + protected final long timeout; /** * The option that contains additional settings that may affect the behavior of the rate limiter. @@ -50,37 +54,43 @@ public abstract class AbstractRateLimiter implements RateLimiter { protected final Option option; @Getter - private long lastAcquireTime; + protected long lastAcquireTime; /** - * Constructs a new AbstractRateLimiter with the specified rate limit policy. - * - * @param policy The rate limit policy to be applied by this rate limiter. + * Constructs an instance of the AbstractRateLimiter class with the given rate limit policy and time unit. + * @param policy the rate limit policy to use + * @param timeUnit the time unit to use for rate limiting */ - public AbstractRateLimiter(RateLimitPolicy policy) { + public AbstractRateLimiter(RateLimitPolicy policy, TimeUnit timeUnit) { this.policy = policy; + this.timeUnit = timeUnit; + this.timeout = timeUnit.convert(policy.getMaxWaitMs() == null || policy.getMaxWaitMs() < 0 ? 0 : policy.getMaxWaitMs(), TimeUnit.MILLISECONDS); this.option = MapOption.of(policy.getParameters()); - this.timeout = Duration.ofMillis(policy.getMaxWaitMs() == null || policy.getMaxWaitMs() < 0 ? 0 : policy.getMaxWaitMs()); } @Override public boolean acquire() { this.lastAcquireTime = System.currentTimeMillis(); - return acquire(1, timeout.toNanos(), TimeUnit.NANOSECONDS); + return doAcquire(1, timeout, timeUnit); } @Override public boolean acquire(int permits) { + if (permits <= 0) { + return false; + } this.lastAcquireTime = System.currentTimeMillis(); - return doAcquire(permits, timeout.toNanos(), TimeUnit.NANOSECONDS); + return doAcquire(permits, timeout, timeUnit); } @Override public boolean acquire(int permits, long timeout, TimeUnit timeUnit) { + if (permits <= 0) { + return false; + } this.lastAcquireTime = System.currentTimeMillis(); return doAcquire(permits, timeout, timeUnit); } - /** * Try to get some permits within a duration and return the result * @@ -89,6 +99,7 @@ public boolean acquire(int permits, long timeout, TimeUnit timeUnit) { * @param timeUnit Time unit * @return result */ - public abstract boolean doAcquire(int permits, long timeout, TimeUnit timeUnit); + protected abstract boolean doAcquire(int permits, long timeout, TimeUnit timeUnit); + } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiterGroup.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiterGroup.java index dcfe2b8be..45adadc95 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiterGroup.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiterGroup.java @@ -42,7 +42,7 @@ public abstract class AbstractRateLimiterGroup extends AbstractRateLimiter { * @param policy The rate limit policy to be applied to the limiters in the group. */ public AbstractRateLimiterGroup(RateLimitPolicy policy) { - super(policy); + super(policy, TimeUnit.NANOSECONDS); int i = 0; for (SlidingWindow window : policy.getSlidingWindows()) { limiters.add(create(window, policy.getName() + "-" + i++)); @@ -61,14 +61,15 @@ public AbstractRateLimiterGroup(RateLimitPolicy policy) { protected abstract RateLimiter create(SlidingWindow window, String name); @Override - public boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) { - if (permits <= 0) { - return false; - } + protected boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) { + // Convert to nanoseconds to avoid losing precision. long startTime = System.nanoTime(); - timeout = Long.max(0, timeout); + timeout = timeout <= 0 ? 0 : timeUnit.toNanos(timeout); + long expire; for (RateLimiter limiter : limiters) { - if (!limiter.acquire(permits, timeout - (System.nanoTime() - startTime), timeUnit)) { + expire = timeout - (System.nanoTime() - startTime); + expire = Long.max(0, expire); + if (!limiter.acquire(permits, expire, TimeUnit.NANOSECONDS)) { return false; } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/leakybucket/LeakyBucketLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/leakybucket/LeakyBucketLimiter.java index 790556056..850221328 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/leakybucket/LeakyBucketLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/leakybucket/LeakyBucketLimiter.java @@ -44,17 +44,18 @@ protected double getMaxPermits() { } @Override - protected boolean isTimeout(long nowMicros, long timeoutMicros) { - return super.isTimeout(nowMicros, timeoutMicros) || (capacity > 0 && requests.get() >= capacity); + protected boolean isFull() { + return capacity > 0 && requests.get() >= capacity; } @Override - protected void doAcquire(int permits, long nowMicros) { + protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) { requests.incrementAndGet(); try { - super.doAcquire(permits, nowMicros); + return super.doAcquire(permits, nowMicros, timeoutMicros); } finally { requests.decrementAndGet(); } + } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/TokenBucketLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/TokenBucketLimiter.java index 274ea7592..812e750d5 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/TokenBucketLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/TokenBucketLimiter.java @@ -48,7 +48,7 @@ public abstract class TokenBucketLimiter extends AbstractRateLimiter { protected long nextFreeTicketMicros; - protected volatile Object mutex; + protected final Object mutex = new Object(); /** * The currently stored permits. @@ -56,7 +56,7 @@ public abstract class TokenBucketLimiter extends AbstractRateLimiter { private double storedPermits; public TokenBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWindow) { - super(limitPolicy); + super(limitPolicy, TimeUnit.MILLISECONDS); this.stopwatch = SleepingStopwatch.createFromSystemTimer(); double secondPermits = slidingWindow.getSecondPermits(); this.stableIntervalMicros = secondPermits <= 0 ? DEFAULT_SECOND_PERMITS : TimeUnit.SECONDS.toMicros(1L) / secondPermits; @@ -65,32 +65,34 @@ public TokenBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWind } @Override - public boolean acquire() { - return acquire(1, timeout.toMillis(), TimeUnit.MILLISECONDS); - } - - @Override - public boolean acquire(int permits) { - return acquire(permits, timeout.toMillis(), TimeUnit.MILLISECONDS); - } - - @Override - public boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) { - if (permits <= 0) { - throw new IllegalArgumentException("Permits must be greater than 0"); - } - long timeoutMicros = timeUnit.toMicros(timeout < 0 ? 0 : timeout); + protected boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) { + long timeoutMicros = timeout <= 0 ? 0 : timeUnit.toMicros(timeout); long nowMicros = stopwatch.readMicros(); - if (isTimeout(nowMicros, timeoutMicros)) { + if (isTimeout(nowMicros, timeoutMicros) && isFull()) { return false; } - synchronized (mutex()) { + return doAcquire(permits, nowMicros, timeoutMicros); + } + + /** + * Attempts to acquire the specified number of permits, + * waiting if necessary until the permits become available or the specified timeout expires. + * + * @param permits the number of permits to acquire + * @param nowMicros the current time in microseconds + * @param timeoutMicros the maximum time to wait in microseconds + * @return true if the permits were acquired, false if the timeout expired + */ + protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) { + long microsToWait; + synchronized (mutex) { // double check in lock if (isTimeout(nowMicros, timeoutMicros)) { return false; } - doAcquire(permits, nowMicros); + microsToWait = computeWaitFor(permits, nowMicros); } + stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } @@ -120,14 +122,12 @@ protected boolean isTimeout(long nowMicros, long timeoutMicros) { } /** - * Acquires the specified number of permits, blocking until they become available. + * Checks if the current state is full. * - * @param permits the number of permits to acquire - * @param nowMicros the current time in microseconds + * @return {@code true} if the state is full, {@code false} otherwise. */ - protected void doAcquire(int permits, long nowMicros) { - // always pay in advance - stopwatch.sleepMicrosUninterruptibly(waitFor(permits, nowMicros)); + protected boolean isFull() { + return false; } /** @@ -137,7 +137,7 @@ protected void doAcquire(int permits, long nowMicros) { * @param nowMicros the current time in microseconds * @return the time waited in microseconds, or 0 if no wait was necessary */ - protected long waitFor(long permits, long nowMicros) { + protected long computeWaitFor(long permits, long nowMicros) { long momentAvailable = waitForEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } @@ -194,24 +194,6 @@ protected void update(long nowMicros) { } } - /** - * Returns the mutex object used for synchronization. - * - * @return the mutex object - */ - private Object mutex() { - Object mutex = this.mutex; - if (mutex == null) { - synchronized (this) { - mutex = this.mutex; - if (mutex == null) { - this.mutex = mutex = new Object(); - } - } - } - return mutex; - } - /** * Adds two long values, handling overflow by returning the maximum or minimum value. * diff --git a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/resilience4j/ratelimit/Resilience4jRateLimiter.java b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/resilience4j/ratelimit/Resilience4jRateLimiter.java index 2003d0116..20e9230d0 100644 --- a/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/resilience4j/ratelimit/Resilience4jRateLimiter.java +++ b/joylive-implement/joylive-flowcontrol/joylive-flowcontrol-resilience4j/src/main/java/com/jd/live/agent/implement/flowcontrol/resilience4j/ratelimit/Resilience4jRateLimiter.java @@ -37,17 +37,17 @@ public Resilience4jRateLimiter(RateLimitPolicy policy, SlidingWindow window) { } public Resilience4jRateLimiter(RateLimitPolicy policy, SlidingWindow window, String name) { - super(policy); + super(policy, TimeUnit.NANOSECONDS); // Create a RateLimiter this.limiter = new AtomicRateLimiter(name, RateLimiterConfig.custom() - .timeoutDuration(timeout) + .timeoutDuration(Duration.ofNanos(timeout)) // the timeout is nanoseconds .limitRefreshPeriod(Duration.ofMillis(window.getTimeWindowInMs())) .limitForPeriod(window.getThreshold()) .build()); } @Override - public boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) { - return permits > 0 && limiter.acquirePermission(permits, timeout, timeUnit); + protected boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) { + return limiter.acquirePermission(permits, timeout, timeUnit); } }