Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] PIP-322 Fix issue with rate limiters where rates can exceed limits initially and consumption pauses until token balance is positive #24012

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Feb 21, 2025

Fixes #23920
Fixes #24001

Motivation

"PIP-322: Pulsar Rate Limiting Refactoring" changes caused a regression in dispatcher rate limiter and publish rate limiter behavior. When consumption starts, the rate can initially exceed limits, and consumption pauses until the token balance has caught up.

Due to the design of AsyncTokenBucket, the token balance is updated periodically as part of the usage calls. There is no separate scheduler required to update the token balance.

In the initial PIP-322 implementation, consistent token balance calculation was avoided since there was an assumption that the solution to calculate a consistent token balance every 16 milliseconds (configurable) was a sufficient solution. This assumption caused problems since in 16 milliseconds, the rate limiter could let a lot of traffic through which would be accounted by throttling when the token balance got updated. The tests DispatchRateLimiterOverconsumingTest and PublishRateLimiterOverconsumingTest added in this PR reproduced this issue and will help prevent future regressions in this area.

Modifications

  • When externally returning the token balance for the AsyncTokenBucket, always return a consistent token balance. Remove the now unnecessary options for configurable consistency.
    • leave a configurable option addTokensResolutionNanos which configures the interval when new tokens get added to the token balance.
    • For dispatch rate limiters, the PIP-322 implementation will behave in the same way as the original dispatch rate limiter implementation when the tokens get added once per second (rate period) as the previous implementation does.
  • Make the dispatch rate limiter implementation configurable with a feature flag
    • Return old implementation for dispatch rate limiter and make it possible to use it
  • For AsyncTokenBucket, use a clock implementation that delegates directly to System.nanoTime()

Verifying this change

This PR adds a new test case DispatchRateLimiterOverconsumingTest which attempts to reproduce the issue described in #24001. The test runs on both PIP-322 implementation and the old dispatch rate limiter implementation.

There's currently similar behavior for both implementations. The rate of the first second of the message receiving results in a rate that is 2x the configured rate. This issue doesn't reproduce exactly in the same way on branch-3.0 (experimental commit: lhotari@a602428), however also on branch-3.0, the behavior of the dispatch rate limiter is not always stable for the first 2 seconds. The test has been modified to allow this behavior and this problem can be investigated later since it's not a regression in the dispatch rate limiters.

This PR also reproduces the issue #23920 in a new test case PublishRateLimiterOverconsumingTest and demonstrates that this PR addresses the issue.

JMH benchmark results

XPS 15 7590 (2019), i9-9980HK CPU @ 2.40GHz

Benchmark                                                    Mode  Cnt          Score           Error  Units
AsyncTokenBucketBenchmark.consumeTokensBenchmark001Threads  thrpt    3   49256534.199 ±   2696712.077  ops/s
AsyncTokenBucketBenchmark.consumeTokensBenchmark010Threads  thrpt    3  355694311.827 ± 193606239.269  ops/s
AsyncTokenBucketBenchmark.consumeTokensBenchmark100Threads  thrpt    3  439857131.356 ± 148199215.903  ops/s
DefaultMonotonicClockBenchmark.getTickNanos001Threads       thrpt    3   69711113.428 ±   9052629.012  ops/s
DefaultMonotonicClockBenchmark.getTickNanos010Threads       thrpt    3  475899645.775 ± 183135317.282  ops/s
DefaultMonotonicClockBenchmark.getTickNanos100Threads       thrpt    3  616883563.660 ± 269262678.166  ops/s

About 440 million ops for the AsyncTokenBucketBenchmark with 100 threads.

On Mac M3 Max, the results are now poor, possibly due to System.nanoTime() bottleneck.

Benchmark                                                    Mode  Cnt         Score          Error  Units
AsyncTokenBucketBenchmark.consumeTokensBenchmark001Threads  thrpt    3  84775402.806 ± 21346715.569  ops/s
AsyncTokenBucketBenchmark.consumeTokensBenchmark010Threads  thrpt    3  22746705.158 ±  2899591.678  ops/s
AsyncTokenBucketBenchmark.consumeTokensBenchmark100Threads  thrpt    3  17764575.534 ±  1805527.872  ops/s
DefaultMonotonicClockBenchmark.getTickNanos001Threads       thrpt    3  99204803.147 ±  5349820.538  ops/s
DefaultMonotonicClockBenchmark.getTickNanos010Threads       thrpt    3  22802728.081 ±  3517752.042  ops/s
DefaultMonotonicClockBenchmark.getTickNanos100Threads       thrpt    3  17440254.540 ±   711659.845  ops/s

The DefaultMonotonicClockBenchmark now measures System.nanoTime(). On MacOS, performance degrades when more threads are added. That's why the AsyncTokenBucketBenchmark numbers are also bad on MacOS.
Previously, the implementation avoided calling System.nanoTime, which added complexity. Since Pulsar deployments target mainly Linux OS, there isn't a need to address the issue now.

The source of slowness of MacOS System.nanoTime can be found in the native code: https://github.com/openjdk/jdk21u/blob/b7d92cd0ae781de6c51ea965c16bd6e0c396e9f7/src/hotspot/os/bsd/os_bsd.cpp#L760-L782
Explained in https://serce.me/posts/2019-05-16-the-matter-of-time#the-nanos-of-the-current-time
Linux uses the Posix implementation in Java 21 and doesn't have such a scalability issue: https://github.com/openjdk/jdk21u/blob/b7d92cd0ae781de6c51ea965c16bd6e0c396e9f7/src/hotspot/os/posix/os_posix.cpp#L1414-L1438

This also PR adds instructions how to run the JMH benchmarks with async-profiler. Initial profiling confirms the assumption that MacOS System.nanoTime implementation is the bottleneck.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@rdhabalia
Copy link
Contributor

rdhabalia commented Feb 21, 2025

should we block this PR with the same reason you provided in another PR: #24002 where you said it's fixed as a part of #23930. and not prepared to take anyone's feedback and concerns. ?
FIrst of all, we need to disable this rate-limiting for dispatcher until we have stable one in place as it's blocking and impacting many systems.

@lhotari lhotari marked this pull request as draft February 21, 2025 19:23
@lhotari
Copy link
Member Author

lhotari commented Feb 24, 2025

Should we block this PR with the same reason you provided in another PR: #24002 where you said it's fixed as a part of #23930, and not prepared to take anyone's feedback and concerns?

@rdhabalia I apologize for the communication issues on my end. I'm definitely taking all feedback and concerns seriously, though it does take me some time to address everything properly. Last week I was focused on preparing the upcoming 3.0.10, 3.3.5, and 4.0.3 release candidates which are urgent since many users are waiting for a Pulsar release without the critical vulnerability, CVE-2025-24970.

After taking a deeper look into this issue, I've found that the pre-PIP-322 rate limiter implementation for dispatch rate limiting actually provides more consistent behavior due to several details in how dispatchers work and how AsyncTokenBucket works.

The test case in this PR demonstrates this from an end-to-end perspective. While the excessive negative tokens problem is solved when using AsyncTokenBucket, there's another challenging issue that would require major changes to how dispatchers acquire tokens.

First of all, we need to disable this rate-limiting for dispatcher until we have a stable one in place as it's blocking and impacting many systems.

@rdhabalia I completely agree with you! I'll revert the changes in the PIP-322 rate limiter related to using AsyncTokenBucket in dispatch rate limiters and handle it in this PR. We'll postpone the 3.3.5 and 4.0.3 releases until this is resolved. I've already aborted the 3.3.5-candidate-1 and 4.0.3-candidate-1 release votes.

@lhotari lhotari force-pushed the lh-fix-dispatcher-rate-limiter-tokens-overuse-issue branch 2 times, most recently from b013512 to ecee9b6 Compare February 24, 2025 15:49
@lhotari
Copy link
Member Author

lhotari commented Feb 24, 2025

The test case in this PR demonstrates this from an end-to-end perspective. While the excessive negative tokens problem is solved when using AsyncTokenBucket, there's another challenging issue that would require major changes to how dispatchers acquire tokens.

It seems that there's some other regression in both branch-3.3 and master that causes the first second rate to be 2x of the configured rate. Now that the pre-PIP-322 rate limiter is used in this PR, it shows that this isn't related to the use of AsyncTokenBucket or branch-4.0 changes since the problem is present in branch-3.3. (when I cherry-pick the changes)

@lhotari
Copy link
Member Author

lhotari commented Feb 24, 2025

I pushed changes to add dispatchRateLimiterFactoryClassName setting for toggling between the "classic" (before PIP-322) implementation and PIP-322 implementation using AsyncTokenBucket. We can default to the "classic" implementation which reverts the PIP-322 changes for dispatch rate limiter until the default configuration value is changed.

@rdhabalia
Copy link
Contributor

I apologize for the communication issues on my end. I'm definitely taking all feedback and concerns seriously, though it does take me some time to address everything properly.

if that's true, then please rethink about this rate-limiter, token-bucket algorithm cab be exploited by malicious or greedy clients and your implementation to prevent it by introducing -ve token has serious implications of delaying or making topics unavailable without any reason. I implemented dispatch-rate limiter 3-4 years ago and we have been using it without any issue with highest accuracy and efficiency including system stability. Yes, I felt that it was very hard for you to take feedbacks even while discussing about this PIP and other other PIP proposed by some user. That's why currently I am not proposing the solution but let's revisit this implementation because in my opinion this implementation won't work because if -ve token stays then it will impact topic's availability and even if we put a cap then also refreshing token happens slowly that user will not be able to get consistent rate instead publiish/dispatch happens in chunk which is not expected behavior. so, please step back, rethink, and let's revisit.

@lhotari
Copy link
Member Author

lhotari commented Feb 24, 2025

if that's true, then please rethink about this rate-limiter, token-bucket algorithm cab be exploited by malicious or greedy clients and your implementation to prevent it by introducing -ve token has serious implications of delaying or making topics unavailable without any reason. I implemented dispatch-rate limiter 3-4 years ago and we have been using it without any issue with highest accuracy and efficiency including system stability. Yes, I felt that it was very hard for you to take feedbacks even while discussing about this PIP and other other PIP proposed by some user. That's why currently I am not proposing the solution but let's revisit this implementation because in my opinion this implementation won't work because if -ve token stays then it will impact topic's availability and even if we put a cap then also refreshing token happens slowly that user will not be able to get consistent rate instead publiish/dispatch happens in chunk which is not expected behavior. so, please step back, rethink, and let's revisit.

@rdhabalia Just to clarify: There's no issue in the AsyncTokenBucket core algorithm itself. The remaining problems after #23930 were caused by eventual consistency of getting the token balance. That was a mistake to assume that eventual consistent token balance would be useful. It's not useful for most usecases. I'll be addressing that usability issue with AsyncTokenBucket since token balance should always be consistent externally.

As mentioned in the earlier PR comments, there is some change since branch-3.0 which causes the current test to fail. The rate for the first second is about 2x of the configured rate. I'll dig more into that.

@lhotari
Copy link
Member Author

lhotari commented Feb 24, 2025

@rdhabalia One of the key behavior differences has been that by default AsyncTokenBucket adds tokens to the bucket very frequently (every 16 milliseconds) based on the consumed time and the "classic" RateLimiterImpl adds tokens once in a rate period (1 second). In this commit 9c0e4cf, I made changes to have similar behavior for the AsyncTokenBucket implementation. It makes the behavior very similar to RateLimiterImpl. The rate during the first second results in 2x rate. You can see the behavior if you wish by gh pr checkout 24012 and running the org.apache.pulsar.broker.service.DispatchRateLimiterOverconsumingTest test which now runs both implementations.

@rdhabalia
Copy link
Contributor

I pushed changes to add dispatchRateLimiterFactoryClassName setting for toggling between the "classic" (before PIP-322) implementation and PIP-322 implementation

Seems like a usecase of Pluggable rate-limiter interface. please find below discussion where someone again refused to take the feedback.
https://lists.apache.org/thread/lz57m8smqolc34h6r5nymd8ppxgv26p7

we have made interface for taking time System.nanoTime() by having interface of LongSupplier clockSource() but refused to add an interface for rate-limiter implementation and impacting business critical usecases.
anyways, let's go ahead and revert back the behavior of rate-limiting asap so it can unblock many mission critical systems that depend on Pulsar.

@lhotari lhotari changed the title [WIP][fix][broker] Fix issue with dispatcher rate limiters where rates can exceed limits initially and consumption pauses until token balance is positive [fix][broker] PIP-322 Fix issue with rate limiters where rates can exceed limits initially and consumption pauses until token balance is positive Feb 24, 2025
@lhotari lhotari marked this pull request as ready for review February 24, 2025 22:11
@codecov-commenter
Copy link

codecov-commenter commented Feb 24, 2025

Codecov Report

Attention: Patch coverage is 71.50685% with 104 lines in your changes missing coverage. Please review.

Project coverage is 74.20%. Comparing base (bbc6224) to head (5eca3cc).
Report is 935 commits behind head on master.

Files with missing lines Patch % Lines
...ice/persistent/DispatchRateLimiterClassicImpl.java 35.44% 40 Missing and 11 partials ⚠️
...ava/org/apache/pulsar/common/util/RateLimiter.java 75.90% 8 Missing and 12 partials ⚠️
.../pulsar/broker/service/AbstractBaseDispatcher.java 83.33% 4 Missing and 1 partial ⚠️
...rg/apache/pulsar/broker/service/BrokerService.java 50.00% 4 Missing and 1 partial ⚠️
...stent/DispatchRateLimiterAsyncTokenBucketImpl.java 94.02% 2 Missing and 2 partials ⚠️
.../PersistentDispatcherMultipleConsumersClassic.java 50.00% 3 Missing and 1 partial ⚠️
...org/apache/pulsar/broker/qos/AsyncTokenBucket.java 85.71% 1 Missing and 2 partials ⚠️
...r/broker/qos/FinalRateAsyncTokenBucketBuilder.java 25.00% 2 Missing and 1 partial ⚠️
.../persistent/DispatchRateLimiterFactoryClassic.java 40.00% 3 Missing ⚠️
...n/java/org/apache/pulsar/broker/PulsarService.java 66.66% 1 Missing and 1 partial ⚠️
... and 3 more
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24012      +/-   ##
============================================
+ Coverage     73.57%   74.20%   +0.62%     
+ Complexity    32624    32004     -620     
============================================
  Files          1877     1861      -16     
  Lines        139502   144152    +4650     
  Branches      15299    16420    +1121     
============================================
+ Hits         102638   106961    +4323     
+ Misses        28908    28738     -170     
- Partials       7956     8453     +497     
Flag Coverage Δ
inttests 26.82% <13.42%> (+2.23%) ⬆️
systests 23.18% <13.15%> (-1.15%) ⬇️
unittests 73.71% <71.50%> (+0.86%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...org/apache/pulsar/broker/ServiceConfiguration.java 98.16% <100.00%> (-1.24%) ⬇️
...che/pulsar/broker/qos/AsyncTokenBucketBuilder.java 100.00% <100.00%> (+22.22%) ⬆️
...pache/pulsar/broker/qos/DefaultMonotonicClock.java 100.00% <100.00%> (ø)
...pulsar/broker/qos/DynamicRateAsyncTokenBucket.java 84.61% <100.00%> (+1.28%) ⬆️
...broker/qos/DynamicRateAsyncTokenBucketBuilder.java 62.50% <100.00%> (ø)
...che/pulsar/broker/resourcegroup/ResourceGroup.java 81.14% <100.00%> (-0.89%) ⬇️
...ker/resourcegroup/ResourceGroupPublishLimiter.java 86.95% <100.00%> (ø)
...rg/apache/pulsar/broker/service/AbstractTopic.java 87.91% <100.00%> (-0.08%) ⬇️
.../pulsar/broker/service/PublishRateLimiterImpl.java 88.40% <100.00%> (+2.25%) ⬆️
...tent/NonPersistentDispatcherMultipleConsumers.java 67.46% <ø> (-1.21%) ⬇️
... and 19 more

... and 1026 files with indirect coverage changes

@lhotari
Copy link
Member Author

lhotari commented Feb 24, 2025

The test case in 088ca04 reproduced the issue #23920 and demonstrates that it is now fixed with the changes in this PR.

…he problem is fixed

- behavior is similar for the pre-PIP-322 implementation
@lhotari
Copy link
Member Author

lhotari commented Feb 25, 2025

Since the tests prove that problems in AsyncTokenBucket are fixed, after all, I'll make the AsyncTokenBucket implementation the default. Users that would like to switch to the previous implementation can configure it with dispatchRateLimiterFactoryClassName=org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryClassic.

@rdhabalia
Copy link
Contributor

Please don't mix two things: bug-fix and new configuration. can you please create a separate PR for asyncToken algo fix and have a new PR to add previous implementation back. also, you might have to create for 2nd PR as it will add new configuration.

@lhotari
Copy link
Member Author

lhotari commented Feb 25, 2025

Please don't mix two things: bug-fix and new configuration. can you please create a separate PR for asyncToken algo fix and have a new PR to add previous implementation back. also, you might have to create for 2nd PR as it will add new configuration.

@rdhabalia I understand your concern about separating bug fixes from configuration changes, which is generally good practice. However, given our time constraints with two urgent releases blocked by this PR, I'd really appreciate if we could review it as is.

The AsyncTokenBucket class is relatively small (303 lines including the license header), and the actual fix for the consistency issue is quite targeted - just changing the return value to tokens - pendingConsumedTokens.sum() in the else branch of the consumeTokensAndMaybeUpdateTokensBalance method.

During #23930, I made some changes based on incorrect assumptions that I've now removed in this PR. While I understand that splitting this into multiple PRs might seem cleaner, it could actually create more confusion about review order and cause further delays.

If you feel strongly about separating parts like the configurability of the dispatch rate implementation or bringing back the pre-PIP-322 implementation, I'm open to discussing it - I just want to make sure we can move forward efficiently given our release timeline.

@rdhabalia
Copy link
Contributor

rdhabalia commented Feb 25, 2025

@lhotari they are two different chanegs. 1. fixing a bug and 2. add new implementation.
We must have two separate PRs without any argument.
you still want to discuss about this simple thing?

*oh..I must admit, even getting simple things out with you is quite challenging.

it could actually create more confusion about review order and cause further delays.

and don't worry about delay because we did rush to merge this rate-limiting and made Pulsar unstable.

there is no argument: we can't mix two things. also you have to create a PIP as it has configuration change, and we can't break that process.

@codelipenghui @eolivelli is it unreasonable to ask two separate PRs for two different things?

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Nice work and thanks for adding new unit test to cover the existing behaviors

@merlimat
Copy link
Contributor

to ask two separate PRs for two different things?

@rdhabalia
Why are these 2 different things? You asked to have a config switch to retain the old buggy behavior. It was added. I don't understand what it is the problem here.

@lhotari lhotari merged commit e547bea into apache:master Feb 25, 2025
56 checks passed
lhotari added a commit that referenced this pull request Feb 25, 2025
…ceed limits initially and consumption pauses until token balance is positive (#24012)

Co-authored-by: Matteo Merli <[email protected]>
(cherry picked from commit e547bea)
lhotari added a commit that referenced this pull request Feb 25, 2025
…ceed limits initially and consumption pauses until token balance is positive (#24012)

Co-authored-by: Matteo Merli <[email protected]>
(cherry picked from commit e547bea)
@rdhabalia
Copy link
Contributor

rdhabalia commented Feb 25, 2025

@merlimat
It was again unexpected from you.

First, this PR had a configuration change that required to creation of a PIP. second, It had two separate things a. fix bug in the async-token implementation and second revert old implementation.

This PR has 52 files changes, it was super difficult to find out which file has a bug fix. I have been spending time to to help in reporting and fixing the issue. You didn't even care to listen and see the feedback because of which @lhotari started again working on it. I was raising many concerns but everything was completely ignored and you just went ahead and merged the PR. It's completely unacceptable.

You block PRs from others without any reason, and here I tried to avoid that destructive practice and didn't block it but you just merged it.

@BewareMyPower
Copy link
Contributor

this PR had a configuration change that required to creation of a PIP.

# org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket (default, PIP-322 implementation)
# and org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryClassic (legacy implementation)

I see the documents here. But I cannot see what's the difference between them. The difference should be documented in a PIP from a high-level perspective. Or at least, if this is a missed part of PIP-322, we should update https://github.com/apache/pulsar/blob/master/pip/pip-322.md for why do we decide to add such config.

I didn't have a chance to go through all conversations above, but it seems that the reason is that the implementation of PIP-322 was reported to have many bugs. Though it seems to be fixed by this PR, how can we know if this fix brings a new bug? The key point should be that this feature might not be used in production for some time, so this feature should still be considered as "unstable". Then we should provide an option for users to switch back to the legacy rate limiter.

@thetumbled
Copy link
Member

@merlimat It was again unexpected from you.

First, this PR had a configuration change that required to creation of a PIP. second, It had two separate things a. fix bug in the async-token implementation and second revert old implementation.

This PR has 52 files changes, it was super difficult to find out which file has a bug fix. I have been spending time to to help in reporting and fixing the issue. You didn't even care to listen and see the feedback because of which @lhotari started again working on it. I was raising many concerns but everything was completely ignored and you just went ahead and merged the PR. It's completely unacceptable.

You block PRs from others without any reason, and here I tried to avoid that destructive practice and didn't block it but you just merged it.

I can't go through all conversations above too, but i agree that we should be cautious about critical and huge change that modify tens of files and thousands of code. This pr is created just few days ago, more review are needed especially when there is still controvesy unsolved.
We don't need to rush, right? We can't rush to fix one bug but introduce other bugs.

nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Feb 27, 2025
…ceed limits initially and consumption pauses until token balance is positive (apache#24012)

Co-authored-by: Matteo Merli <[email protected]>
(cherry picked from commit e547bea)
(cherry picked from commit 56233ea)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Feb 28, 2025
…ceed limits initially and consumption pauses until token balance is positive (apache#24012)

Co-authored-by: Matteo Merli <[email protected]>
(cherry picked from commit e547bea)
(cherry picked from commit 56233ea)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants