diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 2c5b4c3879a..9f15a82ac7b 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -29,6 +29,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL driver loading by replacing custom classloader with system classloader for better compatibility and simplified process - [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer +- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty ### optimize: @@ -71,6 +72,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6787](https://github.com/apache/incubator-seata/pull/6787)] upgrade elliptic to 6.5.7 - [[#6783](https://github.com/apache/incubator-seata/pull/6783)] rename the server naming/v1 api to vgroup/v1 - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] fix npmjs conflicts +- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] optimize NacosMockTest UT case ### refactor: @@ -113,6 +115,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [imashimaro](https://github.com/hmj776521114) - [lyl2008dsg](https://github.com/lyl2008dsg) - [l81893521](https://github.com/l81893521) +- [laywin](https://github.com/laywin) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 9a26e964fcb..86c2619b3fe 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -30,6 +30,7 @@ - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题 - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程 - [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug +- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址 ### optimize: @@ -72,7 +73,7 @@ - [[#6787](https://github.com/apache/incubator-seata/pull/6787)] 升级 elliptic 至 6.5.7 版本 - [[#6783](https://github.com/apache/incubator-seata/pull/6783)] 将server事务分组修改接口改为/vgroup/v1 - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题 - +- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 NacosMockTest 单测问题 ### refactor: @@ -117,6 +118,7 @@ - [imashimaro](https://github.com/hmj776521114) - [lyl2008dsg](https://github.com/lyl2008dsg) - [l81893521](https://github.com/l81893521) +- [laywin](https://github.com/laywin) diff --git a/config/seata-config-nacos/src/test/java/org/apache/seata/config/nacos/NacosMockTest.java b/config/seata-config-nacos/src/test/java/org/apache/seata/config/nacos/NacosMockTest.java index 7a30740a89c..dbf2995e9f7 100644 --- a/config/seata-config-nacos/src/test/java/org/apache/seata/config/nacos/NacosMockTest.java +++ b/config/seata-config-nacos/src/test/java/org/apache/seata/config/nacos/NacosMockTest.java @@ -75,13 +75,13 @@ public void getConfig() { Configuration configuration = ConfigurationFactory.getInstance(); String configStrValue = configuration.getConfig(SUB_NACOS_DATAID); Assertions.assertNull(configStrValue); - configStrValue = configuration.getConfig(SUB_NACOS_DATAID, 5000); + configStrValue = configuration.getConfig(SUB_NACOS_DATAID, 1000); Assertions.assertNull(configStrValue); - configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 5000); + configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 1000); Assertions.assertEquals("TEST", configStrValue); ConfigurationCache.clear(); System.setProperty(SUB_NACOS_DATAID, "SYS-TEST"); - configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 5000); + configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 1000); Assertions.assertEquals("SYS-TEST", configStrValue); ConfigurationCache.clear(); System.clearProperty(SUB_NACOS_DATAID); @@ -91,7 +91,7 @@ public void getConfig() { Assertions.assertEquals(0, configIntValue); configIntValue = configuration.getInt(SUB_NACOS_DATAID, 100); Assertions.assertEquals(100, configIntValue); - configIntValue = configuration.getInt(SUB_NACOS_DATAID, 100, 5000); + configIntValue = configuration.getInt(SUB_NACOS_DATAID, 100, 1000); Assertions.assertEquals(100, configIntValue); ConfigurationCache.clear(); @@ -99,7 +99,7 @@ public void getConfig() { Assertions.assertEquals(false, configBoolValue); configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID, true); Assertions.assertEquals(true, configBoolValue); - configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID, true, 5000); + configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID, true, 1000); Assertions.assertEquals(true, configBoolValue); ConfigurationCache.clear(); @@ -107,7 +107,7 @@ public void getConfig() { Assertions.assertEquals(0, configShortValue); configShortValue = configuration.getShort(SUB_NACOS_DATAID, (short)64); Assertions.assertEquals(64, configShortValue); - configShortValue = configuration.getShort(SUB_NACOS_DATAID, (short)127, 5000); + configShortValue = configuration.getShort(SUB_NACOS_DATAID, (short)127, 1000); Assertions.assertEquals(127, configShortValue); ConfigurationCache.clear(); @@ -115,21 +115,21 @@ public void getConfig() { Assertions.assertEquals(0L, configLongValue); configLongValue = configuration.getLong(SUB_NACOS_DATAID, 12345678L); Assertions.assertEquals(12345678L, configLongValue); - configLongValue = configuration.getLong(SUB_NACOS_DATAID, 65535L, 5000); + configLongValue = configuration.getLong(SUB_NACOS_DATAID, 65535L, 1000); Assertions.assertEquals(65535L, configLongValue); ConfigurationCache.clear(); Duration configDurValue = configuration.getDuration(SUB_NACOS_DATAID); Assertions.assertEquals(Duration.ZERO, configDurValue); - Duration defaultDuration = Duration.ofMillis(5000); + Duration defaultDuration = Duration.ofMillis(1000); configDurValue = configuration.getDuration(SUB_NACOS_DATAID, defaultDuration); Assertions.assertEquals(defaultDuration, configDurValue); defaultDuration = Duration.ofMillis(1000); - configDurValue = configuration.getDuration(SUB_NACOS_DATAID, defaultDuration, 5000); + configDurValue = configuration.getDuration(SUB_NACOS_DATAID, defaultDuration, 1000); Assertions.assertEquals(defaultDuration, configDurValue); ConfigurationCache.clear(); - configStrValue = configuration.getLatestConfig(SUB_NACOS_DATAID, "DEFAULT", 5000); + configStrValue = configuration.getLatestConfig(SUB_NACOS_DATAID, "DEFAULT", 1000); Assertions.assertEquals("DEFAULT", configStrValue); } @@ -145,7 +145,7 @@ public void putConfigIfAbsent() { @Test public void removeConfig() { Configuration configuration = ConfigurationFactory.getInstance(); - boolean removed = configuration.removeConfig(SUB_NACOS_DATAID); + boolean removed = configuration.removeConfig(NACOS_DATAID); Assertions.assertTrue(removed); } @@ -175,7 +175,7 @@ public void onChangeEvent(ConfigurationChangeEvent event) { configuration.addConfigListener(SUB_NACOS_DATAID, listener); Thread.sleep(1000); configuration.putConfig(NACOS_DATAID, "KEY=VALUE"); - latch.await(3000, TimeUnit.MILLISECONDS); + latch.await(1000, TimeUnit.MILLISECONDS); Set listeners = configuration.getConfigListeners(SUB_NACOS_DATAID); //configcache listener + user listener Assertions.assertEquals(2, listeners.size()); diff --git a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java index e44ac33b3aa..72fa626b1f4 100644 --- a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java +++ b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java @@ -75,6 +75,8 @@ public class ConsulRegistryServiceImpl implements RegistryService lookup(String key) throws Exception { + transactionServiceGroup = key; final String cluster = getServiceGroup(key); if (cluster == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -311,7 +314,7 @@ private void refreshCluster(String cluster, List services) { clusterAddressMap.put(cluster, addresses); - removeOfflineAddressesIfNecessary(cluster, addresses); + removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, addresses); } /** diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java index fd9561434f6..efc997b20bf 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java @@ -16,8 +16,10 @@ */ package org.apache.seata.discovery.registry; +import org.apache.seata.common.util.CollectionUtils; +import org.apache.seata.config.ConfigurationFactory; + import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -27,8 +29,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import org.apache.seata.config.ConfigurationFactory; - /** * The interface Registry service. * @@ -54,7 +54,7 @@ public interface RegistryService { /** * Service node health check */ - Map> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>(); + Map>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>(); /** * Register. * @@ -119,12 +119,29 @@ default String getServiceGroup(String key) { } default List aliveLookup(String transactionServiceGroup) { - return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>()); + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + k -> new ConcurrentHashMap<>()); + + String clusterName = getServiceGroup(transactionServiceGroup); + List inetSocketAddresses = clusterAddressMap.get(clusterName); + if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { + return inetSocketAddresses; + } + + // fall back to addresses of any cluster + return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty) + .findAny().orElse(Collections.emptyList()); } default List refreshAliveLookup(String transactionServiceGroup, List aliveAddress) { - return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress); + + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + key -> new ConcurrentHashMap<>()); + + String clusterName = getServiceGroup(transactionServiceGroup); + + return clusterAddressMap.put(clusterName, aliveAddress); } @@ -137,15 +154,21 @@ default List refreshAliveLookup(String transactionServiceGrou * @param clusterName * @param newAddressed */ - default void removeOfflineAddressesIfNecessary(String clusterName, Collection newAddressed) { + default void removeOfflineAddressesIfNecessary(String transactionGroupService, String clusterName, Collection newAddressed) { + + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService, + key -> new ConcurrentHashMap<>()); - List currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList()); + List currentAddresses = clusterAddressMap.getOrDefault(clusterName, Collections.emptyList()); List inetSocketAddresses = currentAddresses .stream().filter(newAddressed::contains).collect( Collectors.toList()); - CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses); + // prevent empty update + if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { + clusterAddressMap.put(clusterName, inetSocketAddresses); + } } } diff --git a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java index 70e3dfe8520..96d3cf05110 100644 --- a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java +++ b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java @@ -75,6 +75,8 @@ public class EtcdRegistryServiceImpl implements RegistryService private static final int MAP_INITIAL_CAPACITY = 8; private static final int THREAD_POOL_SIZE = 2; private ExecutorService executorService; + + private String transactionServiceGroup; /** * TTL for lease */ @@ -181,6 +183,7 @@ public void unsubscribe(String cluster, Watch.Listener listener) throws Exceptio @Override public List lookup(String key) throws Exception { + transactionServiceGroup = key; final String cluster = getServiceGroup(key); if (cluster == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -252,7 +255,7 @@ private void refreshCluster(String cluster) throws Exception { }).collect(Collectors.toList()); clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList)); - removeOfflineAddressesIfNecessary(cluster, instanceList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, instanceList); } /** diff --git a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java index 80d640c57c9..5ab5191234d 100644 --- a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java +++ b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java @@ -75,6 +75,8 @@ public class EurekaRegistryServiceImpl implements RegistryService lookup(String key) throws Exception { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -169,7 +172,7 @@ private void refreshCluster(String clusterName) { .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } } diff --git a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java index 7ddda0d0e7e..1f6abccba4a 100644 --- a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java +++ b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java @@ -84,6 +84,8 @@ public class NacosRegistryServiceImpl implements RegistryService private static final Pattern DEFAULT_SLB_REGISTRY_PATTERN = Pattern.compile("(?!.*internal)(?=.*seata).*mse.aliyuncs.com"); private static volatile Boolean useSLBWay; + private String transactionServiceGroup; + private NacosRegistryServiceImpl() { String configForNacosSLB = FILE_CONFIG.getConfig(getNacosUrlPatternOfSLB()); Pattern patternOfNacosRegistryForSLB = StringUtils.isBlank(configForNacosSLB) @@ -193,7 +195,7 @@ public List lookup(String key) throws Exception { .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } }); } diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java index 2781f4687bc..a479fe69d15 100644 --- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -16,16 +16,16 @@ */ package org.apache.seata.discovery.registry.namingserver; - import java.io.IOException; import java.net.InetSocketAddress; import java.rmi.RemoteException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; +import java.util.HashMap; +import java.util.Objects; import java.util.Map; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Objects; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -41,22 +41,23 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ContentType; import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; import org.apache.seata.common.metadata.Node; import org.apache.seata.common.metadata.namingserver.Instance; import org.apache.seata.common.metadata.namingserver.MetaResponse; import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.CollectionUtils; +import org.apache.seata.common.util.HttpClientUtil; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; -import org.apache.seata.common.util.HttpClientUtil; import org.apache.seata.discovery.registry.RegistryService; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -322,17 +323,6 @@ public void unsubscribe(String vGroup) throws Exception { isSubscribed = false; } - @Override - public List aliveLookup(String transactionServiceGroup) { - return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>()); - } - - @Override - public List refreshAliveLookup(String transactionServiceGroup, - List aliveAddress) { - return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress); - } - /** * @param key vGroup name * @return List available instance list @@ -413,6 +403,31 @@ public String getNamespace() { return namespace; } + @Override + public List aliveLookup(String transactionServiceGroup) { + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + k -> new ConcurrentHashMap<>()); + + List inetSocketAddresses = clusterAddressMap.get(transactionServiceGroup); + if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { + return inetSocketAddresses; + } + + // fall back to addresses of any cluster + return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty) + .findAny().orElse(Collections.emptyList()); + } + + @Override + public List refreshAliveLookup(String transactionServiceGroup, + List aliveAddress) { + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + key -> new ConcurrentHashMap<>()); + + + return clusterAddressMap.put(transactionServiceGroup, aliveAddress); + } + /** * get one namingserver url diff --git a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java index 769799f6930..6c18a57e72f 100644 --- a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java +++ b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java @@ -74,6 +74,8 @@ public class RedisRegistryServiceImpl implements RegistryService private static final long KEY_TTL = 5L; private static final long KEY_REFRESH_PERIOD = 2000L; + private String transactionServiceGroup; + private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("RedisRegistryService-subscribe", 1)); private ScheduledExecutorService threadPoolExecutorForUpdateMap = new ScheduledThreadPoolExecutor(1, @@ -219,6 +221,7 @@ public void unsubscribe(String cluster, RedisListener listener) { @Override public List lookup(String key) { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -280,7 +283,7 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S } socketAddresses.remove(inetSocketAddress); - removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses); + removeOfflineAddressesIfNecessary(transactionServiceGroup, notifyCluserName, socketAddresses); } @Override diff --git a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java index a7e4e087fa4..fa0b428dda1 100644 --- a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java +++ b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java @@ -81,6 +81,8 @@ public class SofaRegistryServiceImpl implements RegistryService lookup(String key) throws Exception { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -174,7 +177,7 @@ public List lookup(String key) throws Exception { List newAddressList = flatData(instances); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } respondRegistries.countDown(); }); diff --git a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java index 9b112b1efb5..84e70abe041 100644 --- a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java +++ b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java @@ -78,6 +78,8 @@ public class ZookeeperRegisterServiceImpl implements RegistryService REGISTERED_PATH_SET = Collections.synchronizedSet(new HashSet<>(REGISTERED_PATH_SET_SIZE)); + private String transactionServiceGroup; + private ZookeeperRegisterServiceImpl() { } @@ -175,6 +177,7 @@ public void unsubscribe(String cluster, IZkChildListener listener) throws Except */ @Override public List lookup(String key) throws Exception { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { @@ -309,7 +312,7 @@ private void refreshClusterAddressMap(String clusterName, List instances } CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } private String getClusterName() { diff --git a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index cd7afa792f7..dffd1a279a3 100644 --- a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -18,9 +18,7 @@ import java.lang.reflect.Field; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -128,18 +126,22 @@ public void testLookUp() throws Exception { @Test public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() { - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + Map> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + service.removeOfflineAddressesIfNecessary("default_tx_group","cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size()); } @Test - public void testRemoveOfflineAddressesIfNecessaryRemoveCase() { - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + public void testRemovePreventEmptyPushCase() { + Map> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); - Assertions.assertEquals(0, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + + service.removeOfflineAddressesIfNecessary("default_tx_group", "cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + + Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size()); } @Test @@ -148,7 +150,8 @@ public void testAliveLookup() { System.setProperty("txServiceGroup", "default_tx_group"); System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + Map> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); List result = service.aliveLookup("default_tx_group"); Assertions.assertEquals(result, Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); @@ -161,12 +164,11 @@ public void tesRefreshAliveLookup() { System.setProperty("txServiceGroup", "default_tx_group"); System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); service.refreshAliveLookup("default_tx_group", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); - Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("cluster"), + Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster"), Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); } } diff --git a/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/fence/hook/TccHookTest.java b/integration-tx-api/src/test/java/org/apache/seata/integration/tx/api/fence/hook/TccHookTest.java new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tcc/src/test/java/org/apache/seata/rm/tcc/TccHookTest.java b/tcc/src/test/java/org/apache/seata/rm/tcc/TccHookTest.java index 32dab76dc8e..c109f084953 100644 --- a/tcc/src/test/java/org/apache/seata/rm/tcc/TccHookTest.java +++ b/tcc/src/test/java/org/apache/seata/rm/tcc/TccHookTest.java @@ -73,7 +73,6 @@ public void setUp() throws NoSuchFieldException, IllegalAccessException, NoSuchM Map tccResourceCache = new ConcurrentHashMap<>(); tccResourceCache.put(actionName, tccResource); setPrivateField(tccResourceManagerObject, "tccResourceCache", tccResourceCache); - tccResourceManagerObject.registerResource(); tccResourceManager = Mockito.spy(tccResourceManagerObject); }