Skip to content

Commit

Permalink
[ISSUE #8460] Set default broker name when revive found ack without b…
Browse files Browse the repository at this point in the history
…roker name field
  • Loading branch information
lizhimins committed Nov 25, 2024
1 parent a8779c0 commit c788856
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
Expand Down Expand Up @@ -376,7 +377,9 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName();
String brokerName = StringUtils.isNotBlank(ackMsg.getBrokerName()) ?
ackMsg.getBrokerName() : brokerController.getBrokerConfig().getBrokerName();
String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + brokerName;
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
Expand All @@ -401,7 +404,9 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {

BatchAckMsg bAckMsg = JSON.parseObject(raw, BatchAckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(bAckMsg, queueId);
String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime() + bAckMsg.getBrokerName();
String brokerName = StringUtils.isNotBlank(bAckMsg.getBrokerName()) ?
bAckMsg.getBrokerName() : brokerController.getBrokerConfig().getBrokerName();
String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime() + brokerName;
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
Expand Down

0 comments on commit c788856

Please sign in to comment.