Skip to content

Commit

Permalink
Merge pull request #190 from jd-opensource/189-fix-the-issue-of-overl…
Browse files Browse the repository at this point in the history
…y-long-synchronized-block-in-tokenlimiter

Fix limiter synchronized
  • Loading branch information
hexiaofeng authored Jan 2, 2025
2 parents 050b7c4 + 3e63b98 commit 9b2dcee
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.jd.live.agent.governance.policy.service.limit.RateLimitPolicy;
import lombok.Getter;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -39,48 +38,59 @@ public abstract class AbstractRateLimiter implements RateLimiter {
@Getter
protected final RateLimitPolicy policy;

/**
* The default time unit.
*/
protected final TimeUnit timeUnit;

/**
* The default timeout duration for permit acquisition.
*/
protected final Duration timeout;
protected final long timeout;

/**
* The option that contains additional settings that may affect the behavior of the rate limiter.
*/
protected final Option option;

@Getter
private long lastAcquireTime;
protected long lastAcquireTime;

/**
* Constructs a new AbstractRateLimiter with the specified rate limit policy.
*
* @param policy The rate limit policy to be applied by this rate limiter.
* Constructs an instance of the AbstractRateLimiter class with the given rate limit policy and time unit.
* @param policy the rate limit policy to use
* @param timeUnit the time unit to use for rate limiting
*/
public AbstractRateLimiter(RateLimitPolicy policy) {
public AbstractRateLimiter(RateLimitPolicy policy, TimeUnit timeUnit) {
this.policy = policy;
this.timeUnit = timeUnit;
this.timeout = timeUnit.convert(policy.getMaxWaitMs() == null || policy.getMaxWaitMs() < 0 ? 0 : policy.getMaxWaitMs(), TimeUnit.MILLISECONDS);
this.option = MapOption.of(policy.getParameters());
this.timeout = Duration.ofMillis(policy.getMaxWaitMs() == null || policy.getMaxWaitMs() < 0 ? 0 : policy.getMaxWaitMs());
}

@Override
public boolean acquire() {
this.lastAcquireTime = System.currentTimeMillis();
return acquire(1, timeout.toNanos(), TimeUnit.NANOSECONDS);
return doAcquire(1, timeout, timeUnit);
}

@Override
public boolean acquire(int permits) {
if (permits <= 0) {
return false;
}
this.lastAcquireTime = System.currentTimeMillis();
return doAcquire(permits, timeout.toNanos(), TimeUnit.NANOSECONDS);
return doAcquire(permits, timeout, timeUnit);
}

@Override
public boolean acquire(int permits, long timeout, TimeUnit timeUnit) {
if (permits <= 0) {
return false;
}
this.lastAcquireTime = System.currentTimeMillis();
return doAcquire(permits, timeout, timeUnit);
}

/**
* Try to get some permits within a duration and return the result
*
Expand All @@ -89,6 +99,7 @@ public boolean acquire(int permits, long timeout, TimeUnit timeUnit) {
* @param timeUnit Time unit
* @return result
*/
public abstract boolean doAcquire(int permits, long timeout, TimeUnit timeUnit);
protected abstract boolean doAcquire(int permits, long timeout, TimeUnit timeUnit);

}

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class AbstractRateLimiterGroup extends AbstractRateLimiter {
* @param policy The rate limit policy to be applied to the limiters in the group.
*/
public AbstractRateLimiterGroup(RateLimitPolicy policy) {
super(policy);
super(policy, TimeUnit.NANOSECONDS);
int i = 0;
for (SlidingWindow window : policy.getSlidingWindows()) {
limiters.add(create(window, policy.getName() + "-" + i++));
Expand All @@ -61,14 +61,15 @@ public AbstractRateLimiterGroup(RateLimitPolicy policy) {
protected abstract RateLimiter create(SlidingWindow window, String name);

@Override
public boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) {
if (permits <= 0) {
return false;
}
protected boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) {
// Convert to nanoseconds to avoid losing precision.
long startTime = System.nanoTime();
timeout = Long.max(0, timeout);
timeout = timeout <= 0 ? 0 : timeUnit.toNanos(timeout);
long expire;
for (RateLimiter limiter : limiters) {
if (!limiter.acquire(permits, timeout - (System.nanoTime() - startTime), timeUnit)) {
expire = timeout - (System.nanoTime() - startTime);
expire = Long.max(0, expire);
if (!limiter.acquire(permits, expire, TimeUnit.NANOSECONDS)) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,18 @@ protected double getMaxPermits() {
}

@Override
protected boolean isTimeout(long nowMicros, long timeoutMicros) {
return super.isTimeout(nowMicros, timeoutMicros) || (capacity > 0 && requests.get() >= capacity);
protected boolean isFull() {
return capacity > 0 && requests.get() >= capacity;
}

@Override
protected void doAcquire(int permits, long nowMicros) {
protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) {
requests.incrementAndGet();
try {
super.doAcquire(permits, nowMicros);
return super.doAcquire(permits, nowMicros, timeoutMicros);
} finally {
requests.decrementAndGet();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public abstract class TokenBucketLimiter extends AbstractRateLimiter {

protected long nextFreeTicketMicros;

protected volatile Object mutex;
protected final Object mutex = new Object();

/**
* The currently stored permits.
*/
private double storedPermits;

public TokenBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWindow) {
super(limitPolicy);
super(limitPolicy, TimeUnit.MILLISECONDS);
this.stopwatch = SleepingStopwatch.createFromSystemTimer();
double secondPermits = slidingWindow.getSecondPermits();
this.stableIntervalMicros = secondPermits <= 0 ? DEFAULT_SECOND_PERMITS : TimeUnit.SECONDS.toMicros(1L) / secondPermits;
Expand All @@ -65,32 +65,34 @@ public TokenBucketLimiter(RateLimitPolicy limitPolicy, SlidingWindow slidingWind
}

@Override
public boolean acquire() {
return acquire(1, timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public boolean acquire(int permits) {
return acquire(permits, timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) {
if (permits <= 0) {
throw new IllegalArgumentException("Permits must be greater than 0");
}
long timeoutMicros = timeUnit.toMicros(timeout < 0 ? 0 : timeout);
protected boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) {
long timeoutMicros = timeout <= 0 ? 0 : timeUnit.toMicros(timeout);
long nowMicros = stopwatch.readMicros();
if (isTimeout(nowMicros, timeoutMicros)) {
if (isTimeout(nowMicros, timeoutMicros) && isFull()) {
return false;
}
synchronized (mutex()) {
return doAcquire(permits, nowMicros, timeoutMicros);
}

/**
* Attempts to acquire the specified number of permits,
* waiting if necessary until the permits become available or the specified timeout expires.
*
* @param permits the number of permits to acquire
* @param nowMicros the current time in microseconds
* @param timeoutMicros the maximum time to wait in microseconds
* @return true if the permits were acquired, false if the timeout expired
*/
protected boolean doAcquire(int permits, long nowMicros, long timeoutMicros) {
long microsToWait;
synchronized (mutex) {
// double check in lock
if (isTimeout(nowMicros, timeoutMicros)) {
return false;
}
doAcquire(permits, nowMicros);
microsToWait = computeWaitFor(permits, nowMicros);
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}

Expand Down Expand Up @@ -120,14 +122,12 @@ protected boolean isTimeout(long nowMicros, long timeoutMicros) {
}

/**
* Acquires the specified number of permits, blocking until they become available.
* Checks if the current state is full.
*
* @param permits the number of permits to acquire
* @param nowMicros the current time in microseconds
* @return {@code true} if the state is full, {@code false} otherwise.
*/
protected void doAcquire(int permits, long nowMicros) {
// always pay in advance
stopwatch.sleepMicrosUninterruptibly(waitFor(permits, nowMicros));
protected boolean isFull() {
return false;
}

/**
Expand All @@ -137,7 +137,7 @@ protected void doAcquire(int permits, long nowMicros) {
* @param nowMicros the current time in microseconds
* @return the time waited in microseconds, or 0 if no wait was necessary
*/
protected long waitFor(long permits, long nowMicros) {
protected long computeWaitFor(long permits, long nowMicros) {
long momentAvailable = waitForEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
Expand Down Expand Up @@ -194,24 +194,6 @@ protected void update(long nowMicros) {
}
}

/**
* Returns the mutex object used for synchronization.
*
* @return the mutex object
*/
private Object mutex() {
Object mutex = this.mutex;
if (mutex == null) {
synchronized (this) {
mutex = this.mutex;
if (mutex == null) {
this.mutex = mutex = new Object();
}
}
}
return mutex;
}

/**
* Adds two long values, handling overflow by returning the maximum or minimum value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ public Resilience4jRateLimiter(RateLimitPolicy policy, SlidingWindow window) {
}

public Resilience4jRateLimiter(RateLimitPolicy policy, SlidingWindow window, String name) {
super(policy);
super(policy, TimeUnit.NANOSECONDS);
// Create a RateLimiter
this.limiter = new AtomicRateLimiter(name, RateLimiterConfig.custom()
.timeoutDuration(timeout)
.timeoutDuration(Duration.ofNanos(timeout)) // the timeout is nanoseconds
.limitRefreshPeriod(Duration.ofMillis(window.getTimeWindowInMs()))
.limitForPeriod(window.getThreshold())
.build());
}

@Override
public boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) {
return permits > 0 && limiter.acquirePermission(permits, timeout, timeUnit);
protected boolean doAcquire(int permits, long timeout, TimeUnit timeUnit) {
return limiter.acquirePermission(permits, timeout, timeUnit);
}
}

0 comments on commit 9b2dcee

Please sign in to comment.