Skip to content

Commit

Permalink
[ISSUE #8984] Fix the broker switch enableMixedMessageType doesn't work
Browse files Browse the repository at this point in the history
  • Loading branch information
redlsz authored Dec 2, 2024
1 parent 804847e commit fb3b87d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
import org.apache.rocketmq.common.UtilAll;
Expand Down Expand Up @@ -534,11 +535,15 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
String attributesModification = requestHeader.getAttributes();
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));

if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
&& !brokerController.getBrokerConfig().isEnableMixedMessageType()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("MIXED message type is not supported.");
return response;
if (!brokerController.getBrokerConfig().isEnableMixedMessageType() && topicConfig.getAttributes() != null) {
// Get attribute by key with prefix sign
String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN + TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName();
String msgTypeAttrValue = topicConfig.getAttributes().get(msgTypeAttrKey);
if (msgTypeAttrValue != null && msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("MIXED message type is not supported.");
return response;
}
}

if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) {
Expand Down Expand Up @@ -609,11 +614,15 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont
return response;
}
}
if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
&& !brokerController.getBrokerConfig().isEnableMixedMessageType()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("MIXED message type is not supported.");
return response;
if (!brokerController.getBrokerConfig().isEnableMixedMessageType() && topicConfig.getAttributes() != null) {
// Get attribute by key with prefix sign
String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN + TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName();
String msgTypeAttrValue = topicConfig.getAttributes().get(msgTypeAttrKey);
if (msgTypeAttrValue != null && msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("MIXED message type is not supported.");
return response;
}
}
if (topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic))) {
LOGGER.info("Broker receive request to update or create topic={}, but topicConfig has no changes , so idempotent, caller address={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.TopicQueueId;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.constant.FIleReadaheadMode;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
Expand Down Expand Up @@ -330,6 +331,19 @@ public void testUpdateAndCreateTopic() throws Exception {
request = buildCreateTopicRequest(topic);
response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);

// test deny MIXED topic type
brokerController.getBrokerConfig().setEnableMixedMessageType(false);
topic = "TEST_MIXED_TYPE";
Map<String, String> attributes = new HashMap<>();
attributes.put("+message.type", "MIXED");
request = buildCreateTopicRequest(topic, attributes);
response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
// test allow MIXED topic type
brokerController.getBrokerConfig().setEnableMixedMessageType(true);
response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
Expand All @@ -355,6 +369,20 @@ public void testUpdateAndCreateTopicList() throws RemotingCommandException {
//test no changes
response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);

// test deny MIXED topic type
brokerController.getBrokerConfig().setEnableMixedMessageType(false);
topicList.add("TEST_MIXED_TYPE");
topicList.add("TEST_MIXED_TYPE1");
Map<String, String> attributes = new HashMap<>();
attributes.put("+message.type", "MIXED");
request = buildCreateTopicListRequest(topicList, attributes);
response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
// test allow MIXED topic type
brokerController.getBrokerConfig().setEnableMixedMessageType(true);
response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
Expand Down Expand Up @@ -1312,18 +1340,29 @@ private ResetOffsetRequestHeader createRequestHeader(String topic,String group,l
}

private RemotingCommand buildCreateTopicRequest(String topic) {
return buildCreateTopicRequest(topic, null);
}

private RemotingCommand buildCreateTopicRequest(String topic, Map<String, String> attributes) {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name());
requestHeader.setReadQueueNums(8);
requestHeader.setWriteQueueNums(8);
requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
if (attributes != null) {
requestHeader.setAttributes(AttributeParser.parseToString(attributes));
}
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
request.makeCustomHeaderToNet();
return request;
}

private RemotingCommand buildCreateTopicListRequest(List<String> topicList) {
return buildCreateTopicListRequest(topicList, null);
}

private RemotingCommand buildCreateTopicListRequest(List<String> topicList, Map<String, String> attributes) {
List<TopicConfig> topicConfigList = new ArrayList<>();
for (String topic:topicList) {
TopicConfig topicConfig = new TopicConfig(topic);
Expand All @@ -1333,6 +1372,9 @@ private RemotingCommand buildCreateTopicListRequest(List<String> topicList) {
topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
topicConfig.setTopicSysFlag(0);
topicConfig.setOrder(false);
if (attributes != null) {
topicConfig.setAttributes(new HashMap<>(attributes));
}
topicConfigList.add(topicConfig);
}
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, null);
Expand Down

0 comments on commit fb3b87d

Please sign in to comment.