Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8968] Introduce the clearRetryTopicWhenDeleteTopic option to enable precise external deletion of topics #8969

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public class BrokerController {
private final NettyClientConfig nettyClientConfig;
protected final MessageStoreConfig messageStoreConfig;
private final AuthConfig authConfig;
protected final ConsumerOffsetManager consumerOffsetManager;
protected ConsumerOffsetManager consumerOffsetManager;
protected final BroadcastOffsetManager broadcastOffsetManager;
protected final ConsumerManager consumerManager;
protected final ConsumerFilterManager consumerFilterManager;
Expand Down Expand Up @@ -1313,6 +1313,11 @@ public ConsumerOffsetManager getConsumerOffsetManager() {
return consumerOffsetManager;
}

public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) {
this.consumerOffsetManager = consumerOffsetManager;
}


public BroadcastOffsetManager getBroadcastOffsetManager() {
return broadcastOffsetManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R
response.setBody(JSON.toJSONBytes(result));
return response;
}

@Override
public boolean rejectRequest() {
return false;
Expand Down Expand Up @@ -559,18 +560,17 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
return response;
}
finally {
} finally {
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
.build();
.put(LABEL_INVOCATION_STATUS, status.getName())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
.build();
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
}
LOGGER.info("executionTime of create topic:{} is {} ms" , topic, executionTime);
LOGGER.info("executionTime of create topic:{} is {} ms", topic, executionTime);
return response;
}

Expand Down Expand Up @@ -637,8 +637,7 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
return response;
}
finally {
} finally {
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Expand All @@ -648,7 +647,7 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont
.build();
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
}
LOGGER.info("executionTime of all topics:{} is {} ms" , topicNames, executionTime);
LOGGER.info("executionTime of all topics:{} is {} ms", topicNames, executionTime);
return response;
}

Expand Down Expand Up @@ -725,21 +724,28 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
}
}

final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
// delete pop retry topics first
try {
List<String> topicsToClean = new ArrayList<>();
topicsToClean.add(topic);

if (brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) {
final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
for (String group : groups) {
final String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(topic, group, true);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != null) {
deleteTopicInBroker(popRetryTopicV2);
topicsToClean.add(popRetryTopicV2);
}
final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) {
deleteTopicInBroker(popRetryTopicV1);
topicsToClean.add(popRetryTopicV1);
}
}
// delete topic
deleteTopicInBroker(topic);
}

try {
for (String topicToClean : topicsToClean) {
// delete topic
deleteTopicInBroker(topicToClean);
}
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
RongtongJin marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down Expand Up @@ -982,10 +988,10 @@ private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHan
String consumerGroup = String.valueOf(key);
Long threshold = Long.valueOf(String.valueOf(value));
this.brokerController.getColdDataCgCtrService()
.addOrUpdateGroupConfig(consumerGroup, threshold);
.addOrUpdateGroupConfig(consumerGroup, threshold);
} catch (Exception e) {
LOGGER.error("updateColdDataFlowCtrGroupConfig properties on entry error, key: {}, val: {}",
key, value, e);
key, value, e);
}
});
} else {
Expand Down Expand Up @@ -1598,12 +1604,12 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
long executionTime = System.currentTimeMillis() - startTime;
LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" ,config.getGroupName() ,executionTime);
LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms", config.getGroupName(), executionTime);
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
return response;
}
Expand Down Expand Up @@ -2083,13 +2089,13 @@ private Long searchOffsetByTimestamp(String topic, int queueId, long timestamp)
/**
* Reset consumer offset.
*
* @param topic Required, not null.
* @param group Required, not null.
* @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue
* would get reset.
* @param topic Required, not null.
* @param group Required, not null.
* @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue
* would get reset.
* @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; otherwise,
* binary search is performed to locate target offset.
* @param offset Target offset to reset to if target queue ID is properly provided.
* binary search is performed to locate target offset.
* @param offset Target offset to reset to if target queue ID is properly provided.
* @return Affected queues and their new offset
*/
private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) {
Expand Down Expand Up @@ -3371,7 +3377,8 @@ private boolean validateBlackListConfigExist(Properties properties) {
return false;
}

private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
String requestTopic = requestHeader.getTopic();
MessageStore messageStore = brokerController.getMessageStore();
Expand Down Expand Up @@ -3428,7 +3435,9 @@ private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerCo
return result;
}

private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long checkpointByStoreTime) {
private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic,
RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail,
long checkpointByStoreTime) {
boolean processResult = true;
for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) {
Integer queueId = queueEntry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ public class BrokerConfig extends BrokerIdentity {

private boolean appendCkAsync = false;

private boolean clearRetryTopicWhenDeleteTopic = true;

private boolean enableLmqStats = false;

Expand Down Expand Up @@ -1908,6 +1909,14 @@ public void setAppendCkAsync(boolean appendCkAsync) {
this.appendCkAsync = appendCkAsync;
}

public boolean isClearRetryTopicWhenDeleteTopic() {
return clearRetryTopicWhenDeleteTopic;
}

public void setClearRetryTopicWhenDeleteTopic(boolean clearRetryTopicWhenDeleteTopic) {
this.clearRetryTopicWhenDeleteTopic = clearRetryTopicWhenDeleteTopic;
}

public boolean isEnableLmqStats() {
return enableLmqStats;
}
Expand Down
Loading