diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index f43b134eb122a..0bbd37bb21ffc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -21,14 +21,14 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; -import org.apache.pulsar.broker.qos.AsyncTokenBucketBuilder; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.util.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +46,8 @@ public enum Type { private final Type type; private final BrokerService brokerService; - private volatile AsyncTokenBucket dispatchRateLimiterOnMessage; - private volatile AsyncTokenBucket dispatchRateLimiterOnByte; + private volatile RateLimiter dispatchRateLimiterOnMessage; + private volatile RateLimiter dispatchRateLimiterOnByte; public DispatchRateLimiter(PersistentTopic topic, Type type) { this(topic, null, type); @@ -77,9 +77,9 @@ public DispatchRateLimiter(BrokerService brokerService) { * @return */ public long getAvailableDispatchRateLimitOnMsg() { - AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; + RateLimiter localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; return localDispatchRateLimiterOnMessage == null ? -1 : - Math.max(localDispatchRateLimiterOnMessage.getTokens(), 0); + Math.max(localDispatchRateLimiterOnMessage.getAvailablePermits(), 0); } /** @@ -88,8 +88,9 @@ public long getAvailableDispatchRateLimitOnMsg() { * @return */ public long getAvailableDispatchRateLimitOnByte() { - AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; - return localDispatchRateLimiterOnByte == null ? -1 : Math.max(localDispatchRateLimiterOnByte.getTokens(), 0); + RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; + return localDispatchRateLimiterOnByte == null ? -1 : + Math.max(localDispatchRateLimiterOnByte.getAvailablePermits(), 0); } /** @@ -99,13 +100,13 @@ public long getAvailableDispatchRateLimitOnByte() { * @param byteSize */ public void consumeDispatchQuota(long numberOfMessages, long byteSize) { - AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; + RateLimiter localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) { - localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages); + localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages); } - AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; + RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; if (byteSize > 0 && localDispatchRateLimiterOnByte != null) { - localDispatchRateLimiterOnByte.consumeTokens(byteSize); + localDispatchRateLimiterOnByte.tryAcquire(byteSize); } } @@ -221,50 +222,63 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { long msgRate = dispatchRate.getDispatchThrottlingRateInMsg(); long byteRate = dispatchRate.getDispatchThrottlingRateInByte(); - long ratePeriodNanos = TimeUnit.SECONDS.toNanos(Math.max(dispatchRate.getRatePeriodInSecond(), 1)); + long ratePeriod = dispatchRate.getRatePeriodInSecond(); + Supplier permitUpdaterMsg = dispatchRate.isRelativeToPublishRate() + ? () -> getRelativeDispatchRateInMsg(dispatchRate) + : null; // update msg-rateLimiter if (msgRate > 0) { - if (dispatchRate.isRelativeToPublishRate()) { + if (this.dispatchRateLimiterOnMessage == null) { this.dispatchRateLimiterOnMessage = - configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate()) - .rateFunction(() -> getRelativeDispatchRateInMsg(dispatchRate)) - .ratePeriodNanosFunction(() -> ratePeriodNanos) + RateLimiter.builder() + .scheduledExecutorService(brokerService.pulsar().getExecutor()) + .permits(msgRate) + .rateTime(ratePeriod) + .timeUnit(TimeUnit.SECONDS) + .permitUpdater(permitUpdaterMsg) + .isDispatchOrPrecisePublishRateLimiter(true) .build(); } else { - this.dispatchRateLimiterOnMessage = - configureAsyncTokenBucket(AsyncTokenBucket.builder()) - .rate(msgRate).ratePeriodNanos(ratePeriodNanos) - .build(); + this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.getRatePeriodInSecond(), + TimeUnit.SECONDS, permitUpdaterMsg); } } else { - this.dispatchRateLimiterOnMessage = null; + // message-rate should be disable and close + if (this.dispatchRateLimiterOnMessage != null) { + this.dispatchRateLimiterOnMessage.close(); + this.dispatchRateLimiterOnMessage = null; + } } + Supplier permitUpdaterByte = dispatchRate.isRelativeToPublishRate() + ? () -> getRelativeDispatchRateInByte(dispatchRate) + : null; // update byte-rateLimiter if (byteRate > 0) { - if (dispatchRate.isRelativeToPublishRate()) { + if (this.dispatchRateLimiterOnByte == null) { this.dispatchRateLimiterOnByte = - configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate()) - .rateFunction(() -> getRelativeDispatchRateInByte(dispatchRate)) - .ratePeriodNanosFunction(() -> ratePeriodNanos) + RateLimiter.builder() + .scheduledExecutorService(brokerService.pulsar().getExecutor()) + .permits(byteRate) + .rateTime(ratePeriod) + .timeUnit(TimeUnit.SECONDS) + .permitUpdater(permitUpdaterByte) + .isDispatchOrPrecisePublishRateLimiter(true) .build(); } else { - this.dispatchRateLimiterOnByte = - configureAsyncTokenBucket(AsyncTokenBucket.builder()) - .rate(byteRate).ratePeriodNanos(ratePeriodNanos) - .build(); + this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.getRatePeriodInSecond(), + TimeUnit.SECONDS, permitUpdaterByte); } } else { - this.dispatchRateLimiterOnByte = null; + // message-rate should be disable and close + if (this.dispatchRateLimiterOnByte != null) { + this.dispatchRateLimiterOnByte.close(); + this.dispatchRateLimiterOnByte = null; + } } } - private > T configureAsyncTokenBucket(T builder) { - builder.clock(brokerService.getPulsar().getMonotonicSnapshotClock()); - return builder; - } - private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) { return (topic != null && dispatchRate != null) ? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.getDispatchThrottlingRateInMsg() @@ -283,7 +297,7 @@ private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) { * @return */ public long getDispatchRateOnMsg() { - AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; + RateLimiter localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; return localDispatchRateLimiterOnMessage != null ? localDispatchRateLimiterOnMessage.getRate() : -1; } @@ -293,7 +307,7 @@ public long getDispatchRateOnMsg() { * @return */ public long getDispatchRateOnByte() { - AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; + RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; return localDispatchRateLimiterOnByte != null ? localDispatchRateLimiterOnByte.getRate() : -1; } @@ -306,9 +320,11 @@ public static boolean isDispatchRateEnabled(DispatchRate dispatchRate) { public void close() { // close rate-limiter if (dispatchRateLimiterOnMessage != null) { + dispatchRateLimiterOnMessage.close(); dispatchRateLimiterOnMessage = null; } if (dispatchRateLimiterOnByte != null) { + dispatchRateLimiterOnByte.close(); dispatchRateLimiterOnByte = null; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimitFunction.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimitFunction.java new file mode 100644 index 0000000000000..0a4b62e9500f9 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimitFunction.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +/** + * Function use when rate limiter renew permit. + * */ +public interface RateLimitFunction { + void apply(); +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java new file mode 100644 index 0000000000000..17199664f12d1 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.base.MoreObjects; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; + +/** + * A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a + * permit is available, and then takes it. Each {@link #tryAcquire()} tries to acquire permits from available permits, + * it returns true if it succeed else returns false. Rate limiter release configured permits at every configured rate + * time, so, on next ticket new fresh permits will be available. + * + *

For example: if RateLimiter is configured to release 10 permits at every 1 second then RateLimiter will allow to + * acquire 10 permits at any time with in that 1 second. + * + *

Comparison with other RateLimiter such as {@link com.google.common.util.concurrent.RateLimiter} + *

    + *
  • Per second rate-limiting: Per second rate-limiting not satisfied by Guava-RateLimiter
  • + *
  • Guava RateLimiter: For X permits: it releases X/1000 permits every msec. therefore, + * for permits=2/sec => it release 1st permit on first 500msec and 2nd permit on next 500ms. therefore, + * if 2 request comes with in 500msec duration then 2nd request fails to acquire permit + * though we have configured 2 permits/second.
  • + *
  • RateLimiter: it releases X permits every second. so, in above usecase: + * if 2 requests comes at the same time then both will acquire the permit.
  • + *
  • Faster: RateLimiter is light-weight and faster than Guava-RateLimiter
  • + *
+ */ +@Slf4j +public class RateLimiter implements AutoCloseable{ + private final ScheduledExecutorService executorService; + private long rateTime; + private TimeUnit timeUnit; + private final boolean externalExecutor; + private ScheduledFuture renewTask; + private volatile long permits; + private volatile long acquiredPermits; + private boolean isClosed; + // permitUpdate helps to update permit-rate at runtime + private Supplier permitUpdater; + private RateLimitFunction rateLimitFunction; + private boolean isDispatchOrPrecisePublishRateLimiter; + + @Builder + RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime, + final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter, + RateLimitFunction rateLimitFunction) { + checkArgument(permits > 0, "rate must be > 0"); + checkArgument(rateTime > 0, "Renew permit time must be > 0"); + + this.rateTime = rateTime; + this.timeUnit = timeUnit; + this.permits = permits; + this.permitUpdater = permitUpdater; + this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter; + + if (scheduledExecutorService != null) { + this.executorService = scheduledExecutorService; + this.externalExecutor = true; + } else { + final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, + new DefaultThreadFactory("pulsar-rate-limiter")); + executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.executorService = executor; + this.externalExecutor = false; + } + + this.rateLimitFunction = rateLimitFunction; + + } + + // default values for Lombok generated builder class + public static class RateLimiterBuilder { + private long rateTime = 1; + private TimeUnit timeUnit = TimeUnit.SECONDS; + } + + @Override + public synchronized void close() { + if (!isClosed) { + if (!externalExecutor) { + executorService.shutdownNow(); + } + if (renewTask != null) { + renewTask.cancel(false); + } + isClosed = true; + // If there is a ratelimit function registered, invoke it to unblock. + if (rateLimitFunction != null) { + rateLimitFunction.apply(); + } + } + } + + public synchronized boolean isClosed() { + return isClosed; + } + + /** + * Acquires the given number of permits from this {@code RateLimiter}, blocking until the request be granted. + * + *

This method is equivalent to {@code acquire(1)}. + */ + public synchronized void acquire() throws InterruptedException { + acquire(1); + } + + /** + * Acquires the given number of permits from this {@code RateLimiter}, blocking until the request be granted. + * + * @param acquirePermit + * the number of permits to acquire + */ + public synchronized void acquire(long acquirePermit) throws InterruptedException { + checkArgument(!isClosed(), "Rate limiter is already shutdown"); + checkArgument(acquirePermit <= this.permits, + "acquiring permits must be less or equal than initialized rate =" + this.permits); + + // lazy init and start task only once application start using it + if (renewTask == null) { + renewTask = createTask(); + } + + boolean canAcquire = false; + do { + canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; + if (!canAcquire) { + wait(); + } else { + acquiredPermits += acquirePermit; + } + } while (!canAcquire); + } + + /** + * Acquires permits from this {@link RateLimiter} if it can be acquired immediately without delay. + * + *

This method is equivalent to {@code tryAcquire(1)}. + * + * @return {@code true} if the permits were acquired, {@code false} otherwise + */ + public synchronized boolean tryAcquire() { + return tryAcquire(1); + } + + /** + * Acquires permits from this {@link RateLimiter} if it can be acquired immediately without delay. + * + * @param acquirePermit + * the number of permits to acquire + * @return {@code true} if the permits were acquired, {@code false} otherwise + */ + public synchronized boolean tryAcquire(long acquirePermit) { + if (isClosed()) { + log.info("The current rate limiter is already shutdown, acquire permits directly."); + return true; + } + // lazy init and start task only once application start using it + if (renewTask == null) { + renewTask = createTask(); + } + + boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; + if (isDispatchOrPrecisePublishRateLimiter) { + // for dispatch rate limiter just add acquirePermit + acquiredPermits += acquirePermit; + + // we want to back-pressure from the current state of the rateLimiter therefore we should check if there + // are any available premits again + canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; + } else { + // acquired-permits can't be larger than the rate + if (acquirePermit + acquiredPermits > this.permits) { + return false; + } + + if (canAcquire) { + acquiredPermits += acquirePermit; + } + } + + return canAcquire; + } + + /** + * Return available permits for this {@link RateLimiter}. + * + * @return returns 0 if permits is not available + */ + public long getAvailablePermits() { + return Math.max(0, this.permits - this.acquiredPermits); + } + + /** + * Resets new rate by configuring new value for permits per configured rate-period. + * + * @param permits + */ + public synchronized void setRate(long permits) { + this.permits = permits; + } + + /** + * Resets new rate with new permits and rate-time. + * + * @param permits + * @param rateTime + * @param timeUnit + * @param permitUpdaterByte + */ + public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit, Supplier permitUpdaterByte) { + if (renewTask != null) { + renewTask.cancel(false); + } + this.permits = permits; + this.rateTime = rateTime; + this.timeUnit = timeUnit; + this.permitUpdater = permitUpdaterByte; + this.renewTask = createTask(); + } + + /** + * Returns configured permit rate per pre-configured rate-period. + * + * @return rate + */ + public synchronized long getRate() { + return this.permits; + } + + public synchronized long getRateTime() { + return this.rateTime; + } + + public synchronized TimeUnit getRateTimeUnit() { + return this.timeUnit; + } + + protected ScheduledFuture createTask() { + return executorService.scheduleAtFixedRate(catchingAndLoggingThrowables(this::renew), this.rateTime, + this.rateTime, this.timeUnit); + } + + synchronized void renew() { + acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; + if (permitUpdater != null) { + long newPermitRate = permitUpdater.get(); + if (newPermitRate > 0) { + setRate(newPermitRate); + } + } + // release the back-pressure by applying the rateLimitFunction only when there are available permits + if (rateLimitFunction != null && this.getAvailablePermits() > 0) { + rateLimitFunction.apply(); + } + notifyAll(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("rateTime", rateTime).add("permits", permits) + .add("acquiredPermits", acquiredPermits).toString(); + } + +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java new file mode 100644 index 0000000000000..5cbd024556593 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.testng.annotations.Test; + +public class RateLimiterTest { + + @Test + public void testInvalidRenewTime() { + try { + RateLimiter.builder().permits(0).rateTime(100).timeUnit(TimeUnit.SECONDS).build(); + fail("should have thrown exception: invalid rate, must be > 0"); + } catch (IllegalArgumentException ie) { + // Ok + } + + try { + RateLimiter.builder().permits(10).rateTime(0).timeUnit(TimeUnit.SECONDS).build(); + fail("should have thrown exception: invalid rateTime, must be > 0"); + } catch (IllegalArgumentException ie) { + // Ok + } + } + + @Test + public void testClose() throws Exception { + RateLimiter rate = RateLimiter.builder().permits(1).rateTime(1000).timeUnit(TimeUnit.MILLISECONDS).build(); + assertFalse(rate.isClosed()); + rate.close(); + assertTrue(rate.isClosed()); + try { + rate.acquire(); + fail("should have failed, executor is already closed"); + } catch (IllegalArgumentException e) { + // ok + } + } + + @Test + public void testAcquireBlock() throws Exception { + final long rateTimeMSec = 1000; + RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); + rate.acquire(); + assertEquals(rate.getAvailablePermits(), 0); + long start = System.currentTimeMillis(); + rate.acquire(); + long end = System.currentTimeMillis(); + // no permits are available: need to wait on acquire + assertTrue((end - start) > rateTimeMSec / 2); + rate.close(); + } + + @Test + public void testAcquire() throws Exception { + final long rateTimeMSec = 1000; + final int permits = 100; + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); + long start = System.currentTimeMillis(); + for (int i = 0; i < permits; i++) { + rate.acquire(); + } + long end = System.currentTimeMillis(); + assertTrue((end - start) < rateTimeMSec); + assertEquals(rate.getAvailablePermits(), 0); + rate.close(); + } + + @Test + public void testMultipleAcquire() throws Exception { + final long rateTimeMSec = 1000; + final int permits = 100; + final int acquirePermits = 50; + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); + long start = System.currentTimeMillis(); + for (int i = 0; i < permits / acquirePermits; i++) { + rate.acquire(acquirePermits); + } + long end = System.currentTimeMillis(); + assertTrue((end - start) < rateTimeMSec); + assertEquals(rate.getAvailablePermits(), 0); + rate.close(); + } + + @Test + public void testTryAcquireNoPermits() { + final long rateTimeMSec = 1000; + RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); + assertTrue(rate.tryAcquire()); + assertFalse(rate.tryAcquire()); + assertEquals(rate.getAvailablePermits(), 0); + rate.close(); + } + + @Test + public void testTryAcquire() { + final long rateTimeMSec = 1000; + final int permits = 100; + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); + for (int i = 0; i < permits; i++) { + rate.tryAcquire(); + } + assertEquals(rate.getAvailablePermits(), 0); + rate.close(); + } + + @Test + public void testTryAcquireMoreThanPermits() { + final long rateTimeMSec = 1000; + RateLimiter rate = RateLimiter.builder().permits(3).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); + assertTrue(rate.tryAcquire(2)); + assertEquals(rate.getAvailablePermits(), 1); + + //try to acquire failed, not decrease availablePermits. + assertFalse(rate.tryAcquire(2)); + assertEquals(rate.getAvailablePermits(), 1); + + assertTrue(rate.tryAcquire(1)); + assertEquals(rate.getAvailablePermits(), 0); + + rate.close(); + } + + @Test + public void testMultipleTryAcquire() { + final long rateTimeMSec = 1000; + final int permits = 100; + final int acquirePermits = 50; + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); + for (int i = 0; i < permits / acquirePermits; i++) { + rate.tryAcquire(acquirePermits); + } + assertEquals(rate.getAvailablePermits(), 0); + rate.close(); + } + + @Test + public void testResetRate() throws Exception { + final long rateTimeMSec = 1000; + final int permits = 100; + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .build(); + rate.tryAcquire(permits); + assertEquals(rate.getAvailablePermits(), 0); + // check after a rate-time: permits must be renewed + Thread.sleep(rateTimeMSec * 2); + assertEquals(rate.getAvailablePermits(), permits); + + // change rate-time from 1sec to 5sec + rate.setRate(permits, 5 * rateTimeMSec, TimeUnit.MILLISECONDS, null); + assertEquals(rate.getAvailablePermits(), 100); + assertTrue(rate.tryAcquire(permits)); + assertEquals(rate.getAvailablePermits(), 0); + // check after a rate-time: permits can't be renewed + Thread.sleep(rateTimeMSec); + assertEquals(rate.getAvailablePermits(), 0); + + rate.close(); + } + + @Test + public void testDispatchRate() throws Exception { + final long rateTimeMSec = 1000; + final int permits = 100; + RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + rate.tryAcquire(100); + rate.tryAcquire(100); + rate.tryAcquire(100); + assertEquals(rate.getAvailablePermits(), 0); + + Thread.sleep(rateTimeMSec * 2); + // check after two rate-time: acquiredPermits is 100 + assertEquals(rate.getAvailablePermits(), 0); + + Thread.sleep(rateTimeMSec); + // check after three rate-time: acquiredPermits is 0 + assertTrue(rate.getAvailablePermits() > 0); + + rate.close(); + } + + @Test + public void testRateLimiterWithPermitUpdater() throws Exception { + long permits = 10; + long rateTime = 1; + long newUpdatedRateLimit = 100L; + Supplier permitUpdater = () -> newUpdatedRateLimit; + RateLimiter limiter = RateLimiter.builder().permits(permits).rateTime(1).timeUnit(TimeUnit.SECONDS) + .permitUpdater(permitUpdater) + .build(); + limiter.acquire(); + Thread.sleep(rateTime * 3 * 1000); + assertEquals(limiter.getAvailablePermits(), newUpdatedRateLimit); + } + + @Test + public void testRateLimiterWithFunction() { + final AtomicInteger atomicInteger = new AtomicInteger(0); + long permits = 10; + long rateTime = 1; + int reNewTime = 3; + RateLimitFunction rateLimitFunction = atomicInteger::incrementAndGet; + RateLimiter rateLimiter = RateLimiter.builder().permits(permits).rateTime(rateTime).timeUnit(TimeUnit.SECONDS) + .rateLimitFunction(rateLimitFunction) + .build(); + for (int i = 0; i < reNewTime; i++) { + rateLimiter.renew(); + } + assertEquals(reNewTime, atomicInteger.get()); + } + +}