-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][pip] PIP-406: Introduce metrics related to dispatch_throttled_count #23945
base: master
Are you sure you want to change the base?
Conversation
…led_msgs and bytes metrics
…a schema and transaction Discussion email: Implementation PR: [improve][pip] PIP-406: Introduce pulsar_subscription_dispatch_throttled_msgs and bytes metrics apache#23945
…a schema and transaction Discussion email: Implementation PR: [improve][pip] PIP-406: Introduce pulsar_subscription_dispatch_throttled_msgs and bytes metrics apache#23945
hi, @RobertIndie @tjiuming @poorbarcode The previous design made it difficult for users to identify where throttling was causing a high backlog. I have rewritten the PIP to separate the metrics at the broker, topic, and subscription levels. Please review it again. Thanks. |
pip/pip-406.md
Outdated
- localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages); | ||
+ if (!localDispatchRateLimiterOnMessage.consumeTokensAndCheckIfContainsTokens(numberOfMessages)) { | ||
+ dispatchThrottleMsgCount.increment(); | ||
+ } | ||
} | ||
AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; | ||
if (byteSize > 0 && localDispatchRateLimiterOnByte != null) { | ||
- localDispatchRateLimiterOnByte.consumeTokens(byteSize); | ||
+ if (!localDispatchRateLimiterOnByte.consumeTokensAndCheckIfContainsTokens(byteSize)) { | ||
+ dispatchThrottleBytesCount.increment(); | ||
+ } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't a correct place to check which got throttled. The tokens get consumed after the messages have been sent. Throttling happens here (example for PersistentDispatcherMultipleConsumers):
Lines 565 to 615 in 28f7845
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { | |
if (topic.getBrokerDispatchRateLimiter().isPresent()) { | |
DispatchRateLimiter brokerRateLimiter = topic.getBrokerDispatchRateLimiter().get(); | |
Pair<Integer, Long> calculateToRead = | |
updateMessagesToRead(brokerRateLimiter, messagesToRead, bytesToRead); | |
messagesToRead = calculateToRead.getLeft(); | |
bytesToRead = calculateToRead.getRight(); | |
if (messagesToRead == 0 || bytesToRead == 0) { | |
if (log.isDebugEnabled()) { | |
log.debug("[{}] message-read exceeded broker message-rate {}/{}, schedule after a {}", name, | |
brokerRateLimiter.getDispatchRateOnMsg(), brokerRateLimiter.getDispatchRateOnByte(), | |
MESSAGE_RATE_BACKOFF_MS); | |
} | |
reScheduleRead(); | |
return Pair.of(-1, -1L); | |
} | |
} | |
if (topic.getDispatchRateLimiter().isPresent()) { | |
DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get(); | |
Pair<Integer, Long> calculateToRead = | |
updateMessagesToRead(topicRateLimiter, messagesToRead, bytesToRead); | |
messagesToRead = calculateToRead.getLeft(); | |
bytesToRead = calculateToRead.getRight(); | |
if (messagesToRead == 0 || bytesToRead == 0) { | |
if (log.isDebugEnabled()) { | |
log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name, | |
topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(), | |
MESSAGE_RATE_BACKOFF_MS); | |
} | |
reScheduleRead(); | |
return Pair.of(-1, -1L); | |
} | |
} | |
if (dispatchRateLimiter.isPresent()) { | |
Pair<Integer, Long> calculateToRead = | |
updateMessagesToRead(dispatchRateLimiter.get(), messagesToRead, bytesToRead); | |
messagesToRead = calculateToRead.getLeft(); | |
bytesToRead = calculateToRead.getRight(); | |
if (messagesToRead == 0 || bytesToRead == 0) { | |
if (log.isDebugEnabled()) { | |
log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", | |
name, dispatchRateLimiter.get().getDispatchRateOnMsg(), | |
dispatchRateLimiter.get().getDispatchRateOnByte(), | |
MESSAGE_RATE_BACKOFF_MS); | |
} | |
reScheduleRead(); | |
return Pair.of(-1, -1L); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the record is here will get huge throttled count, Because once rate limiting occurs, there will be multiple repeated requests(asyncRead) in a short period.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the record is here will get huge throttled count, Because once rate limiting occurs, there will be multiple repeated requests(asyncRead) in a short period.
@shibd What I meant to say earlier is that if you handle the metric this way, it's completely bogus metric. The reason for this is that when the tokens are consumed, the messages have already been sent. The metric would be tracking sent messages and bytes and not throttled messages and bytes.
I understand that when throttling occurs, the same messages might get throttled again. You would need to address that problem if you would like that this metric counts the same entry only once.
Duplication in the throttling code was recently eliminated and there's now a single location in the code base where the dispatch rate limiter throttling happens, it's here:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
Lines 417 to 428 in efcf7c2
if (readLimits.getLeft() == 0 || readLimits.getRight() == 0) { | |
if (log.isDebugEnabled()) { | |
log.debug("[{}] message-read exceeded {} message-rate {}/{}, schedule after {}ms", getName(), | |
limiterType.name().toLowerCase(), | |
rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(), | |
MESSAGE_RATE_BACKOFF_MS); | |
} | |
reScheduleRead(); | |
readLimits.setLeft(-1); | |
readLimits.setRight(-1L); | |
return false; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shibd I voted -1 on the PIP due to this issue with more details in the email message, https://lists.apache.org/thread/6447tlcvbrkszqnb24q5q4xsc4yc08zb .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, from the name of the metric, it does seem ambiguous.
Initially, I added it during reading, but then I thought about it and realized that for metrics like throttled_count
, it can only represent the degree
of throttling.
So, I added it when consuming tokens
, rather than before reading messages. If a limit throttles, then there will definitely be a shortage when consuming tokens, and we can then increment the metric.
Whether it solves the frequent reading issue wasn't my main focus at the time.
We can discuss:
Regarding the metric for message throttling counts
, we can't obtain an exact value, right? Even with the new code, you've changed the logic: delaying the re-read when messages can't be read at all
However, if we count the times here, it can only represent a degree
of throttling. This is because the metric's count is also related to the number of reads
.
I don't have any strong point(I think adding it in either place would be fine.). If you can explain more insights, I can learn from them.
For now, I'll follow your suggestion and add the metric during reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I added it during reading, but then I thought about it and realized that for metrics like
throttled_count
, it can only represent thedegree
of throttling.So, I added it when
consuming tokens
, rather than before reading messages. If a limit throttles, then there will definitely be a shortage when consuming tokens, and we can then increment the metric.
Yes, that's completely understandable and logical.
Regarding the metric for message throttling counts, we can't obtain an exact value, right?
Do you mean the exact value of how many messages and bytes gets throttled? Yes, it doesn't seem to be possible since the dispatch rate limiter will set limits on how many messages could be read. At the time that the limit is set, we don't know whether there would be messages to be sent out.
Even with the new code, you've changed the logic: delaying the re-read when messages can't be read at all
The new code in #24012 didn't really change the core logic in this area. It removes code duplication that was present in multiple dispatcher classes and consolidates the logic to a single method in AbstractBaseDispatcher.
However, if we count the times here, it can only represent a
degree
of throttling. This is because the metric's count is alsorelated to the number of reads
.
Exactly.
I don't have any strong suggestions(I think adding it in either place would be fine.).
Yes, it's definitely valuable to add the counter to be able to observe when throttling is occuring.
If you can explain more insights, I can learn from them.
Not directly related, but I just created #24036 about making the fixed 1-second backoff period configurable and what would need to be taken into account if we were to change that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with these. thanks.
when you have time, please take another look at the PIP. I think we can vote again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shibd You found a great solution! I like it. There's a remaining issue about naming in the PIP document and about another issue explained later in this message.
For example, where it says "total count of bytes throttled," it's currently misleading:
Introduce the metric
pulsar_broker_dispatch_throttled_bytes_count
to represent the total count of bytes throttled for a broker.
Instead of "total count of bytes throttled," it's actually "total number of times bytes were throttled."
This is Claude revisited PIP-409 document: https://gist.github.com/lhotari/9e971fb7ac89d73a97d4de8724bc4359 .
I'm not exactly sure how accurate the changes are. I asked it to address the "total count of bytes throttled" issue.
I brought up one more detail in the mailing list message:
For better observability, I'd suggest including a property in the
metric that indicates the throttling reason (broker, topic, or
subscription level rate limiter). This would eliminate the need for
separate metrics while still providing all the necessary information
in a single, clean metric.
After thinking about it again, the presented metrics themselves in the PIP document are fine. There's a problem is the implementation:
You loose information when the subscription gets throttled due to the rate limiting at broker or topic level. The reason why this is the problem is that when you go to look at the subscription metrics, you wouldn't find out that it was throttled if it got throttled at broker or topic level. Do you agree that this is a problem?
Motivation
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: