diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index b1afbb58d16..a75a9b71caa 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -18,6 +18,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7075](https://github.com/apache/incubator-seata/pull/7075)] fast fail when channel is null - [[#7089](https://github.com/apache/incubator-seata/pull/7089)] support instance registration to the registry center - [[#7093](https://github.com/apache/incubator-seata/pull/7093)] add a test workflow for JDK 21 +- [[#7095](https://github.com/apache/incubator-seata/pull/7095)] support fast fail when channel disconnect after message sent for RemotingClient ### security: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 0cf918bf87a..ebddf90da1b 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -18,6 +18,7 @@ - [[#7075](https://github.com/apache/incubator-seata/pull/7075)] 当channel为空时,快速失败,以便于减少不必要的等待 - [[#7089](https://github.com/apache/incubator-seata/pull/7089)] 新增instance注册到注册中心的接口 - [[#7093](https://github.com/apache/incubator-seata/pull/7093)] 增加jdk21的工作流测试 +- [[#7095](https://github.com/apache/incubator-seata/pull/7095)] 当消息发送后通道中断,快速失败,以便于减少不必要的等待 ### security: diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 8618c3030b4..db671cb4ce2 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -91,6 +92,9 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting protected final Map childToParentMap = new ConcurrentHashMap<>(); + // Channel -> Set + protected final ConcurrentHashMap> channelToRequestIds = new ConcurrentHashMap<>(); + /** * When batch sending is enabled, the message will be stored to basketMap * Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable} @@ -181,6 +185,7 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { } } else { Channel channel = clientChannelManager.acquireChannel(serverAddress); + channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId()); return super.sendSync(channel, rpcMessage, timeoutMillis); } @@ -193,6 +198,7 @@ public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutExcepti return null; } RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId()); return super.sendSync(channel, rpcMessage, this.getRpcRequestTimeout()); } @@ -214,6 +220,7 @@ public void sendAsyncRequest(Channel channel, Object msg) { childToParentMap.put(msgId, parentId); } } + channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId()); } super.sendAsync(channel, rpcMessage); } @@ -222,6 +229,7 @@ public void sendAsyncRequest(Channel channel, Object msg) { public void sendAsyncResponse(String serverAddress, RpcMessage rpcMessage, Object msg) { RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, ProtocolConstants.MSGTYPE_RESPONSE); Channel channel = clientChannelManager.acquireChannel(serverAddress); + channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId()); super.sendAsync(channel, rpcMsg); } @@ -423,7 +431,9 @@ class ClientHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RpcMessage) { - processMessage(ctx, (RpcMessage)msg); + RpcMessage rpcMessage = (RpcMessage)msg; + processMessage(ctx, rpcMessage); + removeRequestIdFromChannel(ctx.channel(), rpcMessage.getId()); } else { LOGGER.error("rpcMessage type error"); } @@ -448,6 +458,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("channel inactive: {}", ctx.channel()); } clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress())); + failFuturesForChannel(ctx.channel(), new FrameworkException(FrameworkErrorCode.ChannelIsNotWritable)); super.channelInactive(ctx); } @@ -489,6 +500,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E if (LOGGER.isInfoEnabled()) { LOGGER.info("remove exception rm channel:{}", ctx.channel()); } + failFuturesForChannel(ctx.channel(), cause); super.exceptionCaught(ctx, cause); } @@ -501,4 +513,59 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep } } + /** + * Iterates over futures and marks all MessageFutures associated with the specified Channel as failed. + * + * @param channel The Channel that has been disconnected or encountered an exception. + * @param cause The reason for the failure. + */ + private void failFuturesForChannel(Channel channel, Throwable cause) { + Set requestIds = channelToRequestIds.remove(channel); + if (requestIds != null) { + for (Integer requestId : requestIds) { + MessageFuture future = futures.remove(requestId); + if (future != null) { + future.setResultMessage(cause); + } + } + } + } + + /** + * Removes the association between the specified Channel and requestId from the channelToRequestIds mapping. + * If the Channel no longer has any associated requestId, the Channel will be removed from the mapping. + * + * @param channel The Channel that has been disconnected or encountered an exception. + * @param requestId The requestId to be removed. + */ + private void removeRequestIdFromChannel(Channel channel, Integer requestId) { + if (channel == null) { + if (requestId != null) { + LOGGER.warn("Attempted to remove requestId {} from a null channel.", requestId); + } else { + LOGGER.warn("Attempted to remove a null requestId from a null channel."); + } + return; + } + + if (requestId == null) { + LOGGER.warn("Attempted to remove a null requestId from channel {}.", channel); + return; + } + + channelToRequestIds.computeIfPresent(channel, (ch, requestIds) -> { + boolean removed = requestIds.remove(requestId); + if (removed) { + LOGGER.debug("Removed requestId {} from channel {}.", requestId, ch); + } else { + LOGGER.warn("Attempted to remove non-existing requestId {} from channel {}.", requestId, ch); + } + + if (requestIds.isEmpty()) { + LOGGER.debug("No more requestIds associated with channel {}. Channel removed from mapping.", ch); + return null; + } + return requestIds; + }); + } }