Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize : skip sending some request if client-version is v0 #6998

Open
wants to merge 16 commits into
base: 2.x
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
*/
public interface MessageType {

/**
* The constant VERSION_NOT_SUPPORT.
*/
short VERSION_NOT_SUPPORT = -1;
/**
* The constant TYPE_GLOBAL_BEGIN.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public static boolean isAboveOrEqualVersion230(String version) {
return isAboveOrEqualVersion(version, VERSION_2_3_0);
}

public static boolean isV0(String version) {
return !isAboveOrEqualVersion(version, VERSION_0_7_1);
}

public static boolean isAboveOrEqualVersion(String clientVersion, String divideVersion) {
boolean isAboveOrEqualVersion = false;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.protocol;

/**
* The type Version not support message.
*
*/
public class VersionNotSupportMessage extends AbstractMessage {
@Override
public short getTypeCode() {
return MessageType.VERSION_NOT_SUPPORT;
}
}
52 changes: 52 additions & 0 deletions core/src/main/java/org/apache/seata/core/rpc/MsgVersionHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc;

import io.netty.channel.Channel;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.protocol.MessageType;
import org.apache.seata.core.protocol.MessageTypeAware;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;

import java.util.Arrays;
import java.util.List;

/**
* the type ServerSkipMsgHelper
**/
public class MsgVersionHelper {

private static final List<Short> SKIP_MSG_CODE_V0 = Arrays.asList(MessageType.TYPE_RM_DELETE_UNDOLOG);

public static boolean versionNotSupport(Channel channel, RpcMessage rpcMessage) {
if (rpcMessage == null || rpcMessage.getBody() == null || channel == null) {
return false;
}
Object msg = rpcMessage.getBody();
String version = Version.getChannelVersion(channel);
if (StringUtils.isBlank(version) || msg == null) {
return false;
}
boolean isV0 = Version.isV0(version);
if (!isV0 || !(msg instanceof MessageTypeAware)) {
return false;
}
short typeCode = ((MessageTypeAware) msg).getTypeCode();
return SKIP_MSG_CODE_V0.contains(typeCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import org.apache.seata.core.protocol.MessageTypeAware;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.VersionNotSupportMessage;
import org.apache.seata.core.rpc.Disposable;
import org.apache.seata.core.rpc.MsgVersionHelper;
import org.apache.seata.core.rpc.hook.RpcHook;
import org.apache.seata.core.rpc.processor.Pair;
import org.apache.seata.core.rpc.processor.RemotingProcessor;
Expand Down Expand Up @@ -173,6 +175,12 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi
LOGGER.warn("sendSync nothing, caused by null channel.");
return null;
}
if (MsgVersionHelper.versionNotSupport(channel, rpcMessage)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Message sending will be skipped as the client version does not support it,{}", rpcMessage);
}
return new VersionNotSupportMessage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看起来没有使用到它的地方,后续将有何作用?
It seems that it is not being used anywhere. What will its role be in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看起来没有使用到它的地方,后续将有何作用? It seems that it is not being used anywhere. What will its role be in the future?

比如以后的版本里如果要sync发送某个全新的request类,旧版本无法识别,在remote通信这层会返回VersionNotSupportMessage,发送者判断到返回类型是这个后做忽略处理或者抛出异常

For example, if you want to send a sync to a new request class in a future version that is not recognized by the old version, the versionNotSupportMessage will be returned at the remote communication level, and the sender will either ignore the return type or throw an exception.

}

MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
Expand Down Expand Up @@ -216,6 +224,12 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi
* @param rpcMessage rpc message
*/
protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
if (MsgVersionHelper.versionNotSupport(channel, rpcMessage)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Message sending will be skipped as the client version does not support it,{}", rpcMessage);
}
return;
}
channelWritableCheck(channel, rpcMessage.getBody());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.discovery.registry.FileRegistryServiceImpl;
import org.apache.seata.discovery.registry.RegistryFactory;
import org.apache.seata.discovery.registry.RegistryService;
Expand Down Expand Up @@ -270,12 +271,13 @@ void invalidateObject(final String serverAddress, final Channel channel) throws
nettyClientKeyPool.invalidateObject(poolKeyMap.get(serverAddress), channel);
}

void registerChannel(final String serverAddress, final Channel channel) {
void registerChannel(final String serverAddress, final Channel channel, String version) {
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null && channelToServer.isActive()) {
return;
}
channels.put(serverAddress, channel);
Version.putChannelVersion(channel, version);
}

private Channel doConnect(String serverAddress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object r
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register RM success. client version:{}, server version:{},channel:{}", registerRMRequest.getVersion(), registerRMResponse.getVersion(), channel);
}
getClientChannelManager().registerChannel(serverAddress, channel);
getClientChannelManager().registerChannel(serverAddress, channel, registerRMRequest.getVersion());
String dbKey = getMergedResourceKeys();
if (registerRMRequest.getResourceIds() != null) {
if (!registerRMRequest.getResourceIds().equals(dbKey)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object r
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);
}
getClientChannelManager().registerChannel(serverAddress, channel);
getClientChannelManager().registerChannel(serverAddress, channel, registerTMRequest.getVersion());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ public static Channel getChannel(TmNettyRemotingClient client) {
private static NettyClientChannelManager getChannelManager(AbstractNettyRemotingClient remotingClient) {
return remotingClient.getClientChannelManager();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty;

import io.netty.channel.Channel;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.ConfigurationTestHelper;
import org.apache.seata.common.XID;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.UUIDGenerator;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.protocol.VersionNotSupportMessage;
import org.apache.seata.core.protocol.transaction.UndoLogDeleteRequest;
import org.apache.seata.core.rpc.MsgVersionHelper;
import org.apache.seata.server.coordinator.DefaultCoordinator;
import org.apache.seata.server.session.SessionHolder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* MsgVersionHelper Test
**/
public class MsgVersionHelperTest {
private static final Logger LOGGER = LoggerFactory.getLogger(MsgVersionHelperTest.class);

@BeforeAll
public static void init(){
ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, "8091");
}
@AfterAll
public static void after() {
ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL);
}

public static ThreadPoolExecutor initMessageExecutor() {
return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy());
}
@Test
public void testSendMsgWithResponse() throws Exception {
ThreadPoolExecutor workingThreads = initMessageExecutor();
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
new Thread(() -> {
SessionHolder.init(null);
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
// set registry
XID.setIpAddress(NetUtil.getLocalIp());
XID.setPort(8091);
// init snowflake for transactionId, branchId
UUIDGenerator.init(1L);
nettyRemotingServer.init();
}).start();
Thread.sleep(3000);

String applicationId = "app 1";
String transactionServiceGroup = "default_tx_group";
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
tmNettyRemotingClient.init();

String serverAddress = "0.0.0.0:8091";
Channel channel = TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress);

RpcMessage rpcMessage = buildUndoLogDeleteMsg(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
Assertions.assertFalse(MsgVersionHelper.versionNotSupport(channel, rpcMessage));
TmNettyRemotingClient.getInstance().sendAsync(channel,rpcMessage);


Version.putChannelVersion(channel,"0.7.0");
Assertions.assertTrue(MsgVersionHelper.versionNotSupport(channel, rpcMessage));
TmNettyRemotingClient.getInstance().sendAsync(channel,rpcMessage);
Object response = TmNettyRemotingClient.getInstance().sendSync(channel, rpcMessage, 100);
Assertions.assertTrue(response instanceof VersionNotSupportMessage);

nettyRemotingServer.destroy();
tmNettyRemotingClient.destroy();
}

private RpcMessage buildUndoLogDeleteMsg(byte messageType) {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setId(100);
rpcMessage.setMessageType(messageType);
rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);
rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);
rpcMessage.setBody(new UndoLogDeleteRequest());
return rpcMessage;
}
}
Loading