Skip to content

Commit

Permalink
Avoid message type validate in message sync scenario.
Browse files Browse the repository at this point in the history
  • Loading branch information
dingshuangxi888 committed Dec 25, 2024
1 parent b33485f commit 186f38f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ public String getProperty(final String name) {
return this.properties.get(name);
}

public boolean hasProperty(final String name) {
if (null == this.properties) {
return false;
}
return this.properties.containsKey(name);
}

public String getTopic() {
return topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class MessageConst {
public static final String PROPERTY_TIMER_DELIVER_MS = "TIMER_DELIVER_MS";
public static final String PROPERTY_BORN_HOST = "__BORNHOST";
public static final String PROPERTY_BORN_TIMESTAMP = "BORN_TIMESTAMP";
public static final String PROPERTY_SYNC_FLAG = "SYNC_FLAG";

/**
* property which name starts with "__RMQ.TRANSIENT." is called transient one that will not stored in broker disks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSe
try {
Message message = messageList.get(0);
String topic = message.getTopic();

String syncFlag = message.getProperty(MessageConst.PROPERTY_SYNC_FLAG);
if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck() && !StringUtils.equals(syncFlag, "1")) {
if (isNeedCheckTopicMessageType(message)) {
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
Expand Down Expand Up @@ -263,4 +261,8 @@ public CompletableFuture<RemotingCommand> forwardMessageToDeadLetterQueue(ProxyC
return FutureUtils.addExecutor(future, this.executor);
}

private boolean isNeedCheckTopicMessageType(Message message) {
return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
&& !message.hasProperty(MessageConst.PROPERTY_TRANSFER_FLAG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@
import io.netty.channel.ChannelHandlerContext;
import java.time.Duration;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator;
import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;

public class SendMessageActivity extends AbstractRemotingActivity {
TopicMessageTypeValidator topicMessageTypeValidator;
Expand Down Expand Up @@ -67,9 +66,8 @@ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand
SendMessageRequestHeader requestHeader = SendMessageRequestHeader.parseRequestHeader(request);
String topic = requestHeader.getTopic();
Map<String, String> property = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String syncFlag = property.get(MessageConst.PROPERTY_SYNC_FLAG);
TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(property);
if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck() && !StringUtils.equals(syncFlag, "1")) {
if (isNeedCheckTopicMessageType(property)) {
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
Expand All @@ -90,4 +88,9 @@ protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, Remotin
ProxyContext context) throws Exception {
return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
}

private boolean isNeedCheckTopicMessageType(Map<String, String> property) {
return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
&& !property.containsKey(MessageConst.PROPERTY_TRANSFER_FLAG);
}
}

0 comments on commit 186f38f

Please sign in to comment.