From 67c04c690a151ce6cd5dc3382ac0e07cc5663065 Mon Sep 17 00:00:00 2001 From: hexiaofeng Date: Thu, 2 Jan 2025 17:26:19 +0800 Subject: [PATCH] Fix leaky bucket issue. --- .../invoke/ratelimit/AbstractRateLimiter.java | 2 + .../leakybucket/LeakyBucketLimiter.java | 14 +- .../tokenbucket/SmoothBurstyLimiter.java | 7 +- .../tokenbucket/SmoothWarmupLimiter.java | 26 ++-- .../tokenbucket/TokenBucketLimiter.java | 123 ++++++++++-------- .../policy/service/limit/SlidingWindow.java | 37 ++++-- 6 files changed, 124 insertions(+), 85 deletions(-) diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java index 53b234f1f..9e7bca7d0 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/AbstractRateLimiter.java @@ -32,6 +32,8 @@ */ public abstract class AbstractRateLimiter implements RateLimiter { + protected static final Long MICROSECOND_OF_ONE_SECOND = 1000 * 1000L; + /** * The rate limit policy that defines the limits for the rate limiter. */ diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/leakybucket/LeakyBucketLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/leakybucket/LeakyBucketLimiter.java index 850221328..e5893ab6b 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/leakybucket/LeakyBucketLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/leakybucket/LeakyBucketLimiter.java @@ -17,7 +17,6 @@ import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy; import com.jd.live.agent.governance.policy.service.limit.SlidingWindow; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -29,6 +28,8 @@ public class LeakyBucketLimiter extends TokenBucketLimiter { private static final String KEY_CAPACITY = "capacity"; + private static final String KEY_FLOW_WINDOW_SECONDS = "flowWindowSeconds"; + private final long capacity; private final AtomicLong requests = new AtomicLong(0); @@ -39,8 +40,14 @@ public LeakyBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWind } @Override - protected double getMaxPermits() { - return TimeUnit.SECONDS.toMicros(1L) / stableIntervalMicros; + protected double getMaxStoredPermits() { + // Keep a constant rate and prevent excessive token accumulation. + return getPermits(option.getPositive(KEY_FLOW_WINDOW_SECONDS, 1)); + } + + @Override + protected long adjustRequiredPermitsWaitTime(long startTime, long timeoutMicros, long nowMicros, long waitTime) { + return nowMicros + waitTime - startTime > timeoutMicros ? TIMEOUT : waitTime; } @Override @@ -56,6 +63,5 @@ protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) { } finally { requests.decrementAndGet(); } - } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/SmoothBurstyLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/SmoothBurstyLimiter.java index c2028d408..5eb04bdb5 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/SmoothBurstyLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/SmoothBurstyLimiter.java @@ -16,8 +16,6 @@ import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy; import com.jd.live.agent.governance.policy.service.limit.SlidingWindow; -import java.util.concurrent.TimeUnit; - /** * SmoothBurstyLimiter *

@@ -37,8 +35,7 @@ public SmoothBurstyLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWin } @Override - protected double getMaxPermits() { - long maxBurstSeconds = option.getPositive(KEY_MAX_BURST_SECONDS, DEFAULT_MAX_BURST_SECONDS); - return TimeUnit.SECONDS.toMicros(maxBurstSeconds) / stableIntervalMicros; + protected double getMaxStoredPermits() { + return getPermits(option.getPositive(KEY_MAX_BURST_SECONDS, DEFAULT_MAX_BURST_SECONDS)); } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/SmoothWarmupLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/SmoothWarmupLimiter.java index 9ef2be28e..f66c9e65b 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/SmoothWarmupLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/SmoothWarmupLimiter.java @@ -16,8 +16,6 @@ import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy; import com.jd.live.agent.governance.policy.service.limit.SlidingWindow; -import java.util.concurrent.TimeUnit; - import static java.lang.Math.min; /** @@ -53,42 +51,42 @@ public SmoothWarmupLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWin @Override protected void initialize() { - this.warmupMicros = TimeUnit.SECONDS.toMicros(option.getPositive(KEY_WARMUP_SECONDS, DEFAULT_WARMUP_SECONDS)); - this.thresholdPermits = 0.5 * warmupMicros / stableIntervalMicros; - this.coldIntervalMicros = stableIntervalMicros * option.getPositive(KEY_COLD_FACTOR, DEFAULT_COLD_FACTOR); + this.warmupMicros = option.getPositive(KEY_WARMUP_SECONDS, DEFAULT_WARMUP_SECONDS) * MICROSECOND_OF_ONE_SECOND; + this.thresholdPermits = 0.5 * warmupMicros / permitIntervalMicros; + this.coldIntervalMicros = permitIntervalMicros * option.getPositive(KEY_COLD_FACTOR, DEFAULT_COLD_FACTOR); super.initialize(); - this.slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); + this.slope = (coldIntervalMicros - permitIntervalMicros) / (maxStoredPermits - thresholdPermits); } @Override - protected double getMaxPermits() { - return thresholdPermits + 2.0 * warmupMicros / (stableIntervalMicros + coldIntervalMicros); + protected double getMaxStoredPermits() { + return thresholdPermits + 2.0 * warmupMicros / (permitIntervalMicros + coldIntervalMicros); } @Override - protected long waitForStoredPermits(double storedPermits, double takePermits) { + protected long estimateStorePermitsWaitTime(double storedPermits, double targetPermits) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // measuring the integral on the right part of the function (the climbing line) if (availablePermitsAboveThreshold > 0.0) { - double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, takePermits); + double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, targetPermits); double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); - takePermits -= permitsAboveThresholdToTake; + targetPermits -= permitsAboveThresholdToTake; } // measuring the integral on the left part of the function (the horizontal line) - micros += (long) (stableIntervalMicros * takePermits); + micros += (long) (permitIntervalMicros * targetPermits); return micros; } @Override protected double coolDownIntervalMicros() { - return warmupMicros / maxPermits; + return warmupMicros / maxStoredPermits; } private double permitsToTime(double permits) { - return stableIntervalMicros + permits * slope; + return permitIntervalMicros + permits * slope; } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/TokenBucketLimiter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/TokenBucketLimiter.java index 812e750d5..8207cdd85 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/TokenBucketLimiter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/ratelimit/tokenbucket/TokenBucketLimiter.java @@ -32,36 +32,35 @@ */ public abstract class TokenBucketLimiter extends AbstractRateLimiter { - private static final int DEFAULT_SECOND_PERMITS = 1000; + protected static final int TIMEOUT = Integer.MIN_VALUE; protected final SleepingStopwatch stopwatch; /** * The maximum number of stored permits. */ - protected double maxPermits; + protected double maxStoredPermits; + /** - * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits - * per second has a stable interval of 200ms. + * The time interval (in microseconds) between each permit */ - protected double stableIntervalMicros; - - protected long nextFreeTicketMicros; - - protected final Object mutex = new Object(); + protected double permitIntervalMicros; /** * The currently stored permits. */ - private double storedPermits; + protected double storedPermits; + + protected long nextPermitMicros; + + protected final Object mutex = new Object(); public TokenBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWindow) { super(limitPolicy, TimeUnit.MILLISECONDS); this.stopwatch = SleepingStopwatch.createFromSystemTimer(); - double secondPermits = slidingWindow.getSecondPermits(); - this.stableIntervalMicros = secondPermits <= 0 ? DEFAULT_SECOND_PERMITS : TimeUnit.SECONDS.toMicros(1L) / secondPermits; + this.permitIntervalMicros = slidingWindow.getPermitIntervalMicros(); initialize(); - update(stopwatch.readMicros()); + refresh(stopwatch.readMicros()); } @Override @@ -90,7 +89,10 @@ protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) { if (isTimeout(nowMicros, timeoutMicros)) { return false; } - microsToWait = computeWaitFor(permits, nowMicros); + microsToWait = estimateRequiredPermitsWaitTime(permits, nowMicros, timeoutMicros); + } + if (microsToWait == TIMEOUT) { + return false; } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; @@ -100,7 +102,7 @@ protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) { * Initializes the rate limiter by setting the maximum number of permits. */ protected void initialize() { - this.maxPermits = getMaxPermits(); + this.maxStoredPermits = getMaxStoredPermits(); } /** @@ -108,7 +110,7 @@ protected void initialize() { * * @return the maximum number of permits */ - protected abstract double getMaxPermits(); + protected abstract double getMaxStoredPermits(); /** * Checks if the current time is before the timeout time for acquiring a free ticket. @@ -118,7 +120,7 @@ protected void initialize() { * @return true if the current time is before the timeout time, false otherwise */ protected boolean isTimeout(long nowMicros, long timeoutMicros) { - return nextFreeTicketMicros > nowMicros + timeoutMicros; + return nextPermitMicros > nowMicros + timeoutMicros; } /** @@ -131,45 +133,54 @@ protected boolean isFull() { } /** - * Waits for the specified number of permits to become available. + * Estimates the wait time required to acquire the specified number of permits. * - * @param permits the number of permits to wait for - * @param nowMicros the current time in microseconds - * @return the time waited in microseconds, or 0 if no wait was necessary + * @param permits The number of permits to acquire. + * @param startTime The request start time in microseconds. + * @param timeoutMicros The timeout time in microseconds + * @return The estimated wait time in microseconds, or 0 if no wait is required. */ - protected long computeWaitFor(long permits, long nowMicros) { - long momentAvailable = waitForEarliestAvailable(permits, nowMicros); - return max(momentAvailable - nowMicros, 0); + protected long estimateRequiredPermitsWaitTime(long permits, long startTime, long timeoutMicros) { + // update stored permits according to the current time + long nowMicros = stopwatch.readMicros(); + refresh(nowMicros); + // compute wait time + double available = min(permits, storedPermits); + double lack = permits - available; + long waitTime = estimateStorePermitsWaitTime(storedPermits, available) + (long) (lack * permitIntervalMicros); + // adjust wait time to facilitate pre-fetching + long result = adjustRequiredPermitsWaitTime(startTime, timeoutMicros, nowMicros, waitTime); + if (result == TIMEOUT) { + // it's timeout. + return TIMEOUT; + } + // update next token time + nextPermitMicros = saturatedAdd(nextPermitMicros, waitTime); + storedPermits -= available; + return result; } /** - * Waits for the specified number of permits to become available, returning the time at which the earliest permit will be available. + * Adjusts the required wait time for acquiring permits based on the current time and the next token time. * - * @param permits the number of permits to wait for - * @param nowMicros the current time in microseconds - * @return the time at which the earliest permit will be available, in microseconds + * @param startTime The request start time in microseconds. + * @param waitTime The original wait time (in microseconds). This parameter is not used in the calculation. + * @param nowMicros The current time in microseconds + * @param timeoutMicros The timeout time in microseconds + * @return The adjusted wait time (in microseconds), which is guaranteed to be non-negative. */ - protected long waitForEarliestAvailable(long permits, long nowMicros) { - update(nowMicros); - long returnValue = nextFreeTicketMicros; - - double available = min(permits, storedPermits); - double lack = permits - available; - long waitMicros = waitForStoredPermits(storedPermits, available) + (long) (lack * stableIntervalMicros); - - nextFreeTicketMicros = saturatedAdd(nextFreeTicketMicros, waitMicros); - storedPermits -= available; - return returnValue; + protected long adjustRequiredPermitsWaitTime(long startTime, long timeoutMicros, long nowMicros, long waitTime) { + return max(nextPermitMicros - startTime, 0); } /** * Waits for the specified number of stored permits to become available. * * @param storedPermits the current number of stored permits - * @param permitsToTake the number of permits to wait for + * @param targetPermits the number of permits to wait for * @return the time waited in microseconds */ - protected long waitForStoredPermits(double storedPermits, double permitsToTake) { + protected long estimateStorePermitsWaitTime(double storedPermits, double targetPermits) { return 0L; } @@ -177,23 +188,33 @@ protected long waitForStoredPermits(double storedPermits, double permitsToTake) * Returns the number of microseconds during cool down that we have to wait to get a new permit. */ protected double coolDownIntervalMicros() { - return stableIntervalMicros; + return permitIntervalMicros; } /** - * Updates the internal state of the rate limiter based on the current time. - * - * @param nowMicros the current time in microseconds + * Refresh permits based on the current time. + * @param nowMicros the current time in microseconds */ - protected void update(long nowMicros) { - if (nowMicros > nextFreeTicketMicros) { - // if nextFreeTicket is in the past. - double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); - storedPermits = min(maxPermits, storedPermits + newPermits); - nextFreeTicketMicros = nowMicros; + protected void refresh(long nowMicros) { + if (nowMicros > nextPermitMicros) { + // if nextTokenMicros is in the past. + double permits = (nowMicros - nextPermitMicros) / coolDownIntervalMicros(); + permits = storedPermits + permits; + storedPermits = maxStoredPermits <= 0 ? permits : min(maxStoredPermits, permits); + nextPermitMicros = nowMicros; } } + /** + * Converts the given time duration (in seconds) into the equivalent number of permits. + * + * @param seconds The time duration (in seconds) for which the permits are to be calculated. Must be a positive value. + * @return The number of permits that can be acquired within the specified time duration. + */ + protected double getPermits(long seconds) { + return seconds * MICROSECOND_OF_ONE_SECOND / permitIntervalMicros; + } + /** * Adds two long values, handling overflow by returning the maximum or minimum value. * diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/SlidingWindow.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/SlidingWindow.java index 8b6bc0a8f..4816d1af8 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/SlidingWindow.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/service/limit/SlidingWindow.java @@ -32,24 +32,33 @@ * * @since 1.0.0 */ -@Getter -@Setter public class SlidingWindow implements Serializable { + private static final int DEFAULT_PERMITS = 1000; + + private static final long DEFAULT_TIME_WINDOW = 1000L; + /** * The maximum number of allowed actions within the time window. */ + @Getter + @Setter private int threshold; /** * The duration of the time window in milliseconds. */ + @Getter + @Setter private long timeWindowInMs; + private volatile transient Double permitIntervalMicros; + /** * Default constructor for creating an instance without initializing fields. */ public SlidingWindow() { + this(DEFAULT_PERMITS, DEFAULT_TIME_WINDOW); } /** @@ -59,20 +68,26 @@ public SlidingWindow() { * @param timeWindowInMs the duration of the time window in milliseconds */ public SlidingWindow(int threshold, long timeWindowInMs) { - this.threshold = threshold; - this.timeWindowInMs = timeWindowInMs; + this.threshold = threshold <= 0 ? DEFAULT_PERMITS : threshold; + this.timeWindowInMs = timeWindowInMs <= 0 ? DEFAULT_TIME_WINDOW : timeWindowInMs; } /** - * Calculates and returns the allowed actions per second based on the current threshold - * and time window duration. This can be used to understand the rate of allowed actions - * in a more commonly used time unit. + * Returns the time interval (in microseconds) between each permit. * - * @return the number of allowed actions per second. If the time window duration is not - * positive, returns 0 to indicate an invalid configuration. + * @return The time interval (in microseconds) between each permit. */ - public double getSecondPermits() { - return timeWindowInMs <= 0 ? 0 : (double) threshold / ((double) timeWindowInMs / 1000); + public double getPermitIntervalMicros() { + if (permitIntervalMicros == null) { + synchronized (this) { + if (permitIntervalMicros == null) { + double timeWindowMicros = (timeWindowInMs <= 0 ? DEFAULT_TIME_WINDOW : timeWindowInMs) * 1000D; + long maxRequests = threshold <= 0 ? DEFAULT_PERMITS : threshold; + permitIntervalMicros = timeWindowMicros / maxRequests; + } + } + } + return permitIntervalMicros; } }