diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index b907489bbfb..99e5b85d2e4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -191,7 +191,7 @@ public class BrokerController { private final NettyClientConfig nettyClientConfig; protected final MessageStoreConfig messageStoreConfig; private final AuthConfig authConfig; - protected final ConsumerOffsetManager consumerOffsetManager; + protected ConsumerOffsetManager consumerOffsetManager; protected final BroadcastOffsetManager broadcastOffsetManager; protected final ConsumerManager consumerManager; protected final ConsumerFilterManager consumerFilterManager; @@ -1313,6 +1313,11 @@ public ConsumerOffsetManager getConsumerOffsetManager() { return consumerOffsetManager; } + public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) { + this.consumerOffsetManager = consumerOffsetManager; + } + + public BroadcastOffsetManager getBroadcastOffsetManager() { return broadcastOffsetManager; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index ac882e94ab0..cc70e69a467 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -490,6 +490,7 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R response.setBody(JSON.toJSONBytes(result)); return response; } + @Override public boolean rejectRequest() { return false; @@ -559,18 +560,17 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(e.getMessage()); return response; - } - finally { + } finally { executionTime = System.currentTimeMillis() - startTime; InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? - InvocationStatus.SUCCESS : InvocationStatus.FAILURE; + InvocationStatus.SUCCESS : InvocationStatus.FAILURE; Attributes attributes = BrokerMetricsManager.newAttributesBuilder() - .put(LABEL_INVOCATION_STATUS, status.getName()) - .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic)) - .build(); + .put(LABEL_INVOCATION_STATUS, status.getName()) + .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic)) + .build(); BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes); } - LOGGER.info("executionTime of create topic:{} is {} ms" , topic, executionTime); + LOGGER.info("executionTime of create topic:{} is {} ms", topic, executionTime); return response; } @@ -637,8 +637,7 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(e.getMessage()); return response; - } - finally { + } finally { executionTime = System.currentTimeMillis() - startTime; InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? InvocationStatus.SUCCESS : InvocationStatus.FAILURE; @@ -648,7 +647,7 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont .build(); BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes); } - LOGGER.info("executionTime of all topics:{} is {} ms" , topicNames, executionTime); + LOGGER.info("executionTime of all topics:{} is {} ms", topicNames, executionTime); return response; } @@ -725,21 +724,28 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, } } - final Set groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic); - // delete pop retry topics first - try { + List topicsToClean = new ArrayList<>(); + topicsToClean.add(topic); + + if (brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) { + final Set groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic); for (String group : groups) { final String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(topic, group, true); if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != null) { - deleteTopicInBroker(popRetryTopicV2); + topicsToClean.add(popRetryTopicV2); } final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) { - deleteTopicInBroker(popRetryTopicV1); + topicsToClean.add(popRetryTopicV1); } } - // delete topic - deleteTopicInBroker(topic); + } + + try { + for (String topicToClean : topicsToClean) { + // delete topic + deleteTopicInBroker(topicToClean); + } } catch (Throwable t) { return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } @@ -982,10 +988,10 @@ private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHan String consumerGroup = String.valueOf(key); Long threshold = Long.valueOf(String.valueOf(value)); this.brokerController.getColdDataCgCtrService() - .addOrUpdateGroupConfig(consumerGroup, threshold); + .addOrUpdateGroupConfig(consumerGroup, threshold); } catch (Exception e) { LOGGER.error("updateColdDataFlowCtrGroupConfig properties on entry error, key: {}, val: {}", - key, value, e); + key, value, e); } }); } else { @@ -1598,12 +1604,12 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c response.setCode(ResponseCode.SUCCESS); response.setRemark(null); long executionTime = System.currentTimeMillis() - startTime; - LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" ,config.getGroupName() ,executionTime); + LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms", config.getGroupName(), executionTime); InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? - InvocationStatus.SUCCESS : InvocationStatus.FAILURE; + InvocationStatus.SUCCESS : InvocationStatus.FAILURE; Attributes attributes = BrokerMetricsManager.newAttributesBuilder() - .put(LABEL_INVOCATION_STATUS, status.getName()) - .build(); + .put(LABEL_INVOCATION_STATUS, status.getName()) + .build(); BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes); return response; } @@ -2083,13 +2089,13 @@ private Long searchOffsetByTimestamp(String topic, int queueId, long timestamp) /** * Reset consumer offset. * - * @param topic Required, not null. - * @param group Required, not null. - * @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue - * would get reset. + * @param topic Required, not null. + * @param group Required, not null. + * @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue + * would get reset. * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; otherwise, - * binary search is performed to locate target offset. - * @param offset Target offset to reset to if target queue ID is properly provided. + * binary search is performed to locate target offset. + * @param offset Target offset to reset to if target queue ID is properly provided. * @return Affected queues and their new offset */ private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) { @@ -3371,7 +3377,8 @@ private boolean validateBlackListConfigExist(Properties properties) { return false; } - private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class); String requestTopic = requestHeader.getTopic(); MessageStore messageStore = brokerController.getMessageStore(); @@ -3428,7 +3435,9 @@ private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerCo return result; } - private boolean processConsumeQueuesForTopic(ConcurrentMap queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long checkpointByStoreTime) { + private boolean processConsumeQueuesForTopic(ConcurrentMap queueMap, String topic, + RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, + long checkpointByStoreTime) { boolean processResult = true; for (Map.Entry queueEntry : queueMap.entrySet()) { Integer queueId = queueEntry.getKey(); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 9d8d9135217..c0b557dfa11 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -435,6 +435,7 @@ public class BrokerConfig extends BrokerIdentity { private boolean appendCkAsync = false; + private boolean clearRetryTopicWhenDeleteTopic = true; private boolean enableLmqStats = false; @@ -1908,6 +1909,14 @@ public void setAppendCkAsync(boolean appendCkAsync) { this.appendCkAsync = appendCkAsync; } + public boolean isClearRetryTopicWhenDeleteTopic() { + return clearRetryTopicWhenDeleteTopic; + } + + public void setClearRetryTopicWhenDeleteTopic(boolean clearRetryTopicWhenDeleteTopic) { + this.clearRetryTopicWhenDeleteTopic = clearRetryTopicWhenDeleteTopic; + } + public boolean isEnableLmqStats() { return enableLmqStats; }