diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index cc70e69a467..fc3b6182731 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -76,6 +76,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; +import org.apache.rocketmq.common.TopicAttributes; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UnlockCallback; import org.apache.rocketmq.common.UtilAll; @@ -534,11 +535,15 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext String attributesModification = requestHeader.getAttributes(); topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification)); - if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED - && !brokerController.getBrokerConfig().isEnableMixedMessageType()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("MIXED message type is not supported."); - return response; + if (!brokerController.getBrokerConfig().isEnableMixedMessageType() && topicConfig.getAttributes() != null) { + // Get attribute by key with prefix sign + String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN + TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(); + String msgTypeAttrValue = topicConfig.getAttributes().get(msgTypeAttrKey); + if (msgTypeAttrValue != null && msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("MIXED message type is not supported."); + return response; + } } if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { @@ -609,11 +614,15 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont return response; } } - if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED - && !brokerController.getBrokerConfig().isEnableMixedMessageType()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("MIXED message type is not supported."); - return response; + if (!brokerController.getBrokerConfig().isEnableMixedMessageType() && topicConfig.getAttributes() != null) { + // Get attribute by key with prefix sign + String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN + TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(); + String msgTypeAttrValue = topicConfig.getAttributes().get(msgTypeAttrKey); + if (msgTypeAttrValue != null && msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("MIXED message type is not supported."); + return response; + } } if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) { LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}", diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index d87f5133552..48ddb891728 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.TopicQueueId; import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.attribute.AttributeParser; import org.apache.rocketmq.common.constant.FIleReadaheadMode; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -330,6 +331,19 @@ public void testUpdateAndCreateTopic() throws Exception { request = buildCreateTopicRequest(topic); response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + // test deny MIXED topic type + brokerController.getBrokerConfig().setEnableMixedMessageType(false); + topic = "TEST_MIXED_TYPE"; + Map attributes = new HashMap<>(); + attributes.put("+message.type", "MIXED"); + request = buildCreateTopicRequest(topic, attributes); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + // test allow MIXED topic type + brokerController.getBrokerConfig().setEnableMixedMessageType(true); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @Test @@ -355,6 +369,20 @@ public void testUpdateAndCreateTopicList() throws RemotingCommandException { //test no changes response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + // test deny MIXED topic type + brokerController.getBrokerConfig().setEnableMixedMessageType(false); + topicList.add("TEST_MIXED_TYPE"); + topicList.add("TEST_MIXED_TYPE1"); + Map attributes = new HashMap<>(); + attributes.put("+message.type", "MIXED"); + request = buildCreateTopicListRequest(topicList, attributes); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + // test allow MIXED topic type + brokerController.getBrokerConfig().setEnableMixedMessageType(true); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @Test @@ -1312,18 +1340,29 @@ private ResetOffsetRequestHeader createRequestHeader(String topic,String group,l } private RemotingCommand buildCreateTopicRequest(String topic) { + return buildCreateTopicRequest(topic, null); + } + + private RemotingCommand buildCreateTopicRequest(String topic, Map attributes) { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topic); requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name()); requestHeader.setReadQueueNums(8); requestHeader.setWriteQueueNums(8); requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + if (attributes != null) { + requestHeader.setAttributes(AttributeParser.parseToString(attributes)); + } RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); request.makeCustomHeaderToNet(); return request; } private RemotingCommand buildCreateTopicListRequest(List topicList) { + return buildCreateTopicListRequest(topicList, null); + } + + private RemotingCommand buildCreateTopicListRequest(List topicList, Map attributes) { List topicConfigList = new ArrayList<>(); for (String topic:topicList) { TopicConfig topicConfig = new TopicConfig(topic); @@ -1333,6 +1372,9 @@ private RemotingCommand buildCreateTopicListRequest(List topicList) { topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); topicConfig.setTopicSysFlag(0); topicConfig.setOrder(false); + if (attributes != null) { + topicConfig.setAttributes(new HashMap<>(attributes)); + } topicConfigList.add(topicConfig); } RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, null);