Skip to content

Commit

Permalink
Fix leaky bucket issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed Jan 2, 2025
1 parent 69579e6 commit 67c04c6
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
*/
public abstract class AbstractRateLimiter implements RateLimiter {

protected static final Long MICROSECOND_OF_ONE_SECOND = 1000 * 1000L;

/**
* The rate limit policy that defines the limits for the rate limiter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import com.jd.live.agent.governance.policy.service.limit.SlidingWindow;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -29,6 +28,8 @@ public class LeakyBucketLimiter extends TokenBucketLimiter {

private static final String KEY_CAPACITY = "capacity";

private static final String KEY_FLOW_WINDOW_SECONDS = "flowWindowSeconds";

private final long capacity;

private final AtomicLong requests = new AtomicLong(0);
Expand All @@ -39,8 +40,14 @@ public LeakyBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWind
}

@Override
protected double getMaxPermits() {
return TimeUnit.SECONDS.toMicros(1L) / stableIntervalMicros;
protected double getMaxStoredPermits() {
// Keep a constant rate and prevent excessive token accumulation.
return getPermits(option.getPositive(KEY_FLOW_WINDOW_SECONDS, 1));
}

@Override
protected long adjustRequiredPermitsWaitTime(long startTime, long timeoutMicros, long nowMicros, long waitTime) {
return nowMicros + waitTime - startTime > timeoutMicros ? TIMEOUT : waitTime;
}

@Override
Expand All @@ -56,6 +63,5 @@ protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) {
} finally {
requests.decrementAndGet();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import com.jd.live.agent.governance.policy.service.limit.SlidingWindow;

import java.util.concurrent.TimeUnit;

/**
* SmoothBurstyLimiter
* <p>
Expand All @@ -37,8 +35,7 @@ public SmoothBurstyLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWin
}

@Override
protected double getMaxPermits() {
long maxBurstSeconds = option.getPositive(KEY_MAX_BURST_SECONDS, DEFAULT_MAX_BURST_SECONDS);
return TimeUnit.SECONDS.toMicros(maxBurstSeconds) / stableIntervalMicros;
protected double getMaxStoredPermits() {
return getPermits(option.getPositive(KEY_MAX_BURST_SECONDS, DEFAULT_MAX_BURST_SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import com.jd.live.agent.governance.policy.service.limit.SlidingWindow;

import java.util.concurrent.TimeUnit;

import static java.lang.Math.min;

/**
Expand Down Expand Up @@ -53,42 +51,42 @@ public SmoothWarmupLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWin

@Override
protected void initialize() {
this.warmupMicros = TimeUnit.SECONDS.toMicros(option.getPositive(KEY_WARMUP_SECONDS, DEFAULT_WARMUP_SECONDS));
this.thresholdPermits = 0.5 * warmupMicros / stableIntervalMicros;
this.coldIntervalMicros = stableIntervalMicros * option.getPositive(KEY_COLD_FACTOR, DEFAULT_COLD_FACTOR);
this.warmupMicros = option.getPositive(KEY_WARMUP_SECONDS, DEFAULT_WARMUP_SECONDS) * MICROSECOND_OF_ONE_SECOND;
this.thresholdPermits = 0.5 * warmupMicros / permitIntervalMicros;
this.coldIntervalMicros = permitIntervalMicros * option.getPositive(KEY_COLD_FACTOR, DEFAULT_COLD_FACTOR);
super.initialize();
this.slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
this.slope = (coldIntervalMicros - permitIntervalMicros) / (maxStoredPermits - thresholdPermits);
}

@Override
protected double getMaxPermits() {
return thresholdPermits + 2.0 * warmupMicros / (stableIntervalMicros + coldIntervalMicros);
protected double getMaxStoredPermits() {
return thresholdPermits + 2.0 * warmupMicros / (permitIntervalMicros + coldIntervalMicros);
}

@Override
protected long waitForStoredPermits(double storedPermits, double takePermits) {
protected long estimateStorePermitsWaitTime(double storedPermits, double targetPermits) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, takePermits);
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, targetPermits);
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
takePermits -= permitsAboveThresholdToTake;
targetPermits -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (long) (stableIntervalMicros * takePermits);
micros += (long) (permitIntervalMicros * targetPermits);
return micros;
}

@Override
protected double coolDownIntervalMicros() {
return warmupMicros / maxPermits;
return warmupMicros / maxStoredPermits;
}

private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
return permitIntervalMicros + permits * slope;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,35 @@
*/
public abstract class TokenBucketLimiter extends AbstractRateLimiter {

private static final int DEFAULT_SECOND_PERMITS = 1000;
protected static final int TIMEOUT = Integer.MIN_VALUE;

protected final SleepingStopwatch stopwatch;

/**
* The maximum number of stored permits.
*/
protected double maxPermits;
protected double maxStoredPermits;

/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
* The time interval (in microseconds) between each permit
*/
protected double stableIntervalMicros;

protected long nextFreeTicketMicros;

protected final Object mutex = new Object();
protected double permitIntervalMicros;

/**
* The currently stored permits.
*/
private double storedPermits;
protected double storedPermits;

protected long nextPermitMicros;

protected final Object mutex = new Object();

public TokenBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWindow) {
super(limitPolicy, TimeUnit.MILLISECONDS);
this.stopwatch = SleepingStopwatch.createFromSystemTimer();
double secondPermits = slidingWindow.getSecondPermits();
this.stableIntervalMicros = secondPermits <= 0 ? DEFAULT_SECOND_PERMITS : TimeUnit.SECONDS.toMicros(1L) / secondPermits;
this.permitIntervalMicros = slidingWindow.getPermitIntervalMicros();
initialize();
update(stopwatch.readMicros());
refresh(stopwatch.readMicros());
}

@Override
Expand Down Expand Up @@ -90,7 +89,10 @@ protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) {
if (isTimeout(nowMicros, timeoutMicros)) {
return false;
}
microsToWait = computeWaitFor(permits, nowMicros);
microsToWait = estimateRequiredPermitsWaitTime(permits, nowMicros, timeoutMicros);
}
if (microsToWait == TIMEOUT) {
return false;
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
Expand All @@ -100,15 +102,15 @@ protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) {
* Initializes the rate limiter by setting the maximum number of permits.
*/
protected void initialize() {
this.maxPermits = getMaxPermits();
this.maxStoredPermits = getMaxStoredPermits();
}

/**
* Calculates and returns the maximum number of permits that can be accumulated.
*
* @return the maximum number of permits
*/
protected abstract double getMaxPermits();
protected abstract double getMaxStoredPermits();

/**
* Checks if the current time is before the timeout time for acquiring a free ticket.
Expand All @@ -118,7 +120,7 @@ protected void initialize() {
* @return true if the current time is before the timeout time, false otherwise
*/
protected boolean isTimeout(long nowMicros, long timeoutMicros) {
return nextFreeTicketMicros > nowMicros + timeoutMicros;
return nextPermitMicros > nowMicros + timeoutMicros;
}

/**
Expand All @@ -131,69 +133,88 @@ protected boolean isFull() {
}

/**
* Waits for the specified number of permits to become available.
* Estimates the wait time required to acquire the specified number of permits.
*
* @param permits the number of permits to wait for
* @param nowMicros the current time in microseconds
* @return the time waited in microseconds, or 0 if no wait was necessary
* @param permits The number of permits to acquire.
* @param startTime The request start time in microseconds.
* @param timeoutMicros The timeout time in microseconds
* @return The estimated wait time in microseconds, or 0 if no wait is required.
*/
protected long computeWaitFor(long permits, long nowMicros) {
long momentAvailable = waitForEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
protected long estimateRequiredPermitsWaitTime(long permits, long startTime, long timeoutMicros) {
// update stored permits according to the current time
long nowMicros = stopwatch.readMicros();
refresh(nowMicros);
// compute wait time
double available = min(permits, storedPermits);
double lack = permits - available;
long waitTime = estimateStorePermitsWaitTime(storedPermits, available) + (long) (lack * permitIntervalMicros);
// adjust wait time to facilitate pre-fetching
long result = adjustRequiredPermitsWaitTime(startTime, timeoutMicros, nowMicros, waitTime);
if (result == TIMEOUT) {
// it's timeout.
return TIMEOUT;
}
// update next token time
nextPermitMicros = saturatedAdd(nextPermitMicros, waitTime);
storedPermits -= available;
return result;
}

/**
* Waits for the specified number of permits to become available, returning the time at which the earliest permit will be available.
* Adjusts the required wait time for acquiring permits based on the current time and the next token time.
*
* @param permits the number of permits to wait for
* @param nowMicros the current time in microseconds
* @return the time at which the earliest permit will be available, in microseconds
* @param startTime The request start time in microseconds.
* @param waitTime The original wait time (in microseconds). This parameter is not used in the calculation.
* @param nowMicros The current time in microseconds
* @param timeoutMicros The timeout time in microseconds
* @return The adjusted wait time (in microseconds), which is guaranteed to be non-negative.
*/
protected long waitForEarliestAvailable(long permits, long nowMicros) {
update(nowMicros);
long returnValue = nextFreeTicketMicros;

double available = min(permits, storedPermits);
double lack = permits - available;
long waitMicros = waitForStoredPermits(storedPermits, available) + (long) (lack * stableIntervalMicros);

nextFreeTicketMicros = saturatedAdd(nextFreeTicketMicros, waitMicros);
storedPermits -= available;
return returnValue;
protected long adjustRequiredPermitsWaitTime(long startTime, long timeoutMicros, long nowMicros, long waitTime) {
return max(nextPermitMicros - startTime, 0);
}

/**
* Waits for the specified number of stored permits to become available.
*
* @param storedPermits the current number of stored permits
* @param permitsToTake the number of permits to wait for
* @param targetPermits the number of permits to wait for
* @return the time waited in microseconds
*/
protected long waitForStoredPermits(double storedPermits, double permitsToTake) {
protected long estimateStorePermitsWaitTime(double storedPermits, double targetPermits) {
return 0L;
}

/**
* Returns the number of microseconds during cool down that we have to wait to get a new permit.
*/
protected double coolDownIntervalMicros() {
return stableIntervalMicros;
return permitIntervalMicros;
}

/**
* Updates the internal state of the rate limiter based on the current time.
*
* @param nowMicros the current time in microseconds
* Refresh permits based on the current time.
* @param nowMicros the current time in microseconds
*/
protected void update(long nowMicros) {
if (nowMicros > nextFreeTicketMicros) {
// if nextFreeTicket is in the past.
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
protected void refresh(long nowMicros) {
if (nowMicros > nextPermitMicros) {
// if nextTokenMicros is in the past.
double permits = (nowMicros - nextPermitMicros) / coolDownIntervalMicros();
permits = storedPermits + permits;
storedPermits = maxStoredPermits <= 0 ? permits : min(maxStoredPermits, permits);
nextPermitMicros = nowMicros;
}
}

/**
* Converts the given time duration (in seconds) into the equivalent number of permits.
*
* @param seconds The time duration (in seconds) for which the permits are to be calculated. Must be a positive value.
* @return The number of permits that can be acquired within the specified time duration.
*/
protected double getPermits(long seconds) {
return seconds * MICROSECOND_OF_ONE_SECOND / permitIntervalMicros;
}

/**
* Adds two long values, handling overflow by returning the maximum or minimum value.
*
Expand Down
Loading

0 comments on commit 67c04c6

Please sign in to comment.