Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-406: Introduce metrics related to dispatch_throttled_count #23945

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions pip/pip-406.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# PIP-406: Introduce pulsar_subscription_dispatch_throttled_msgs and bytes metrics

# Background knowledge

## Motivation

Currently, users can monitor subscription backlogs using the `pulsar_subscription_back_log_no_delayed` metric.
However, if [dispatch throttling](https://pulsar.apache.org/docs/next/concepts-throttling/) is configured at the broker/topic/subscription level,
this metric may not accurately reflect whether the backlog is due to insufficient consumer capacity, as it could be caused by dispatch throttling.

## Goals

Introduce metrics to indicate the number of `messages/bytes throttled` for a subscription. This allows users to write PromQL queries to identify subscriptions with high backlogs but low or no throttling, pinpointing backlogs caused by insufficient consumer capacity.

## In Scope
- Introduce the metric `pulsar_subscription_dispatch_throttled_msgs` to represent the total number of messages throttled for a subscription.
- Introduce the metric `pulsar_subscription_dispatch_throttled_bytes` to represent the total number of bytes throttled for a subscription.
- Add `dispatchThrottledMsgs` and `dispatchThrottledBytes` fields to topic subscription stats.

## Out of Scope
- These states are not persistent and will reset upon subscription reconnection.

# High Level Design
1. Maintain `dispatchThrottledMsgs` and `dispatchThrottledBytes` in `AbstractBaseDispatcher`. Increase these values whenever the number of messages/bytes is reduced during `calculateToRead`.
2. Output these fields when retrieving topic stats and metrics.


# Detailed Design

## Design & Implementation Details
1. Maintain `dispatchThrottledMsgs` and `dispatchThrottledBytes` in `AbstractBaseDispatcher`:
```java
private final LongAdder dispatchThrottledMsgs = new LongAdder();
private final AtomicLong dispatchThrottledBytes = new AtomicLong();
```

2. During each [calculateToRead](https://github.com/apache/pulsar/blob/411f6973e85b0a6213e992386e1704f93d0aae42/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L371-L377),
if the number of `messages/bytes` is reduced, increase these fields accordingly.

- dispatchThrottledBytes may overflow in extreme cases, so reset this value before overflow:
```diff
protected Pair<Integer, Long> updateMessagesToRead(DispatchRateLimiter dispatchRateLimiter,
int messagesToRead, long bytesToRead) {
// update messagesToRead according to available dispatch rate limit.
- return computeReadLimits(messagesToRead,
+ Pair<Integer, Long> result = computeReadLimits(messagesToRead,
(int) dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(),
bytesToRead, dispatchRateLimiter.getAvailableDispatchRateLimitOnByte());
+ if (result.getLeft() < messagesToRead) {
+ dispatchThrottledMsgs.add(messagesToRead - result.getLeft());
+ }
+ if (result.getRight() < bytesToRead) {
+ long increment = bytesToRead - result.getRight();
+ dispatchThrottledBytes.updateAndGet(current -> {
+ // Check if adding the increment would cause an overflow
+ if (Long.MAX_VALUE - current < increment) {
+ return increment;
+ }
+ return current + increment;
+ });
+ }
+ return result;
}
```

## Public-facing Changes
- None


### Configuration
- None

# Backward & Forward Compatibility
- Full Compatibility

Loading