Skip to content

Commit

Permalink
feature: add TCC three-phase hooks (apache#6731)
Browse files Browse the repository at this point in the history
  • Loading branch information
chengliefeng committed Sep 1, 2024
2 parents 2637ac3 + 2186ceb commit 2d920e2
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 63 deletions.
3 changes: 3 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL driver loading by replacing custom classloader with system classloader for better compatibility and simplified process
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty


### optimize:
Expand Down Expand Up @@ -71,6 +72,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6787](https://github.com/apache/incubator-seata/pull/6787)] upgrade elliptic to 6.5.7
- [[#6783](https://github.com/apache/incubator-seata/pull/6783)] rename the server naming/v1 api to vgroup/v1
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] fix npmjs conflicts
- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] optimize NacosMockTest UT case

### refactor:

Expand Down Expand Up @@ -113,6 +115,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [l81893521](https://github.com/l81893521)
- [laywin](https://github.com/laywin)


Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
4 changes: 3 additions & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址


### optimize:
Expand Down Expand Up @@ -72,7 +73,7 @@
- [[#6787](https://github.com/apache/incubator-seata/pull/6787)] 升级 elliptic 至 6.5.7 版本
- [[#6783](https://github.com/apache/incubator-seata/pull/6783)] 将server事务分组修改接口改为/vgroup/v1
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题

- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 NacosMockTest 单测问题

### refactor:

Expand Down Expand Up @@ -117,6 +118,7 @@
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [l81893521](https://github.com/l81893521)
- [laywin](https://github.com/laywin)



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ public void getConfig() {
Configuration configuration = ConfigurationFactory.getInstance();
String configStrValue = configuration.getConfig(SUB_NACOS_DATAID);
Assertions.assertNull(configStrValue);
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, 5000);
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, 1000);
Assertions.assertNull(configStrValue);
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 5000);
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 1000);
Assertions.assertEquals("TEST", configStrValue);
ConfigurationCache.clear();
System.setProperty(SUB_NACOS_DATAID, "SYS-TEST");
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 5000);
configStrValue = configuration.getConfig(SUB_NACOS_DATAID, "TEST", 1000);
Assertions.assertEquals("SYS-TEST", configStrValue);
ConfigurationCache.clear();
System.clearProperty(SUB_NACOS_DATAID);
Expand All @@ -91,45 +91,45 @@ public void getConfig() {
Assertions.assertEquals(0, configIntValue);
configIntValue = configuration.getInt(SUB_NACOS_DATAID, 100);
Assertions.assertEquals(100, configIntValue);
configIntValue = configuration.getInt(SUB_NACOS_DATAID, 100, 5000);
configIntValue = configuration.getInt(SUB_NACOS_DATAID, 100, 1000);
Assertions.assertEquals(100, configIntValue);

ConfigurationCache.clear();
boolean configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID);
Assertions.assertEquals(false, configBoolValue);
configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID, true);
Assertions.assertEquals(true, configBoolValue);
configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID, true, 5000);
configBoolValue = configuration.getBoolean(SUB_NACOS_DATAID, true, 1000);
Assertions.assertEquals(true, configBoolValue);

ConfigurationCache.clear();
short configShortValue = configuration.getShort(SUB_NACOS_DATAID);
Assertions.assertEquals(0, configShortValue);
configShortValue = configuration.getShort(SUB_NACOS_DATAID, (short)64);
Assertions.assertEquals(64, configShortValue);
configShortValue = configuration.getShort(SUB_NACOS_DATAID, (short)127, 5000);
configShortValue = configuration.getShort(SUB_NACOS_DATAID, (short)127, 1000);
Assertions.assertEquals(127, configShortValue);

ConfigurationCache.clear();
long configLongValue = configuration.getShort(SUB_NACOS_DATAID);
Assertions.assertEquals(0L, configLongValue);
configLongValue = configuration.getLong(SUB_NACOS_DATAID, 12345678L);
Assertions.assertEquals(12345678L, configLongValue);
configLongValue = configuration.getLong(SUB_NACOS_DATAID, 65535L, 5000);
configLongValue = configuration.getLong(SUB_NACOS_DATAID, 65535L, 1000);
Assertions.assertEquals(65535L, configLongValue);

ConfigurationCache.clear();
Duration configDurValue = configuration.getDuration(SUB_NACOS_DATAID);
Assertions.assertEquals(Duration.ZERO, configDurValue);
Duration defaultDuration = Duration.ofMillis(5000);
Duration defaultDuration = Duration.ofMillis(1000);
configDurValue = configuration.getDuration(SUB_NACOS_DATAID, defaultDuration);
Assertions.assertEquals(defaultDuration, configDurValue);
defaultDuration = Duration.ofMillis(1000);
configDurValue = configuration.getDuration(SUB_NACOS_DATAID, defaultDuration, 5000);
configDurValue = configuration.getDuration(SUB_NACOS_DATAID, defaultDuration, 1000);
Assertions.assertEquals(defaultDuration, configDurValue);

ConfigurationCache.clear();
configStrValue = configuration.getLatestConfig(SUB_NACOS_DATAID, "DEFAULT", 5000);
configStrValue = configuration.getLatestConfig(SUB_NACOS_DATAID, "DEFAULT", 1000);
Assertions.assertEquals("DEFAULT", configStrValue);

}
Expand All @@ -145,7 +145,7 @@ public void putConfigIfAbsent() {
@Test
public void removeConfig() {
Configuration configuration = ConfigurationFactory.getInstance();
boolean removed = configuration.removeConfig(SUB_NACOS_DATAID);
boolean removed = configuration.removeConfig(NACOS_DATAID);
Assertions.assertTrue(removed);
}

Expand Down Expand Up @@ -175,7 +175,7 @@ public void onChangeEvent(ConfigurationChangeEvent event) {
configuration.addConfigListener(SUB_NACOS_DATAID, listener);
Thread.sleep(1000);
configuration.putConfig(NACOS_DATAID, "KEY=VALUE");
latch.await(3000, TimeUnit.MILLISECONDS);
latch.await(1000, TimeUnit.MILLISECONDS);
Set<ConfigurationChangeListener> listeners = configuration.getConfigListeners(SUB_NACOS_DATAID);
//configcache listener + user listener
Assertions.assertEquals(2, listeners.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener
private static final int THREAD_POOL_NUM = 1;
private static final int MAP_INITIAL_CAPACITY = 8;

private String transactionServiceGroup;

/**
* default tcp check interval
*/
Expand Down Expand Up @@ -161,6 +163,7 @@ public void unsubscribe(String cluster, ConsulListener listener) throws Exceptio

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
final String cluster = getServiceGroup(key);
if (cluster == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -311,7 +314,7 @@ private void refreshCluster(String cluster, List<HealthService> services) {

clusterAddressMap.put(cluster, addresses);

removeOfflineAddressesIfNecessary(cluster, addresses);
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, addresses);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
*/
package org.apache.seata.discovery.registry;

import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.config.ConfigurationFactory;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -27,8 +29,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.seata.config.ConfigurationFactory;

/**
* The interface Registry service.
*
Expand All @@ -54,7 +54,7 @@ public interface RegistryService<T> {
/**
* Service node health check
*/
Map<String,List<InetSocketAddress>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
Map<String, Map<String, List<InetSocketAddress>>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
/**
* Register.
*
Expand Down Expand Up @@ -119,12 +119,29 @@ default String getServiceGroup(String key) {
}

default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>());
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
k -> new ConcurrentHashMap<>());

String clusterName = getServiceGroup(transactionServiceGroup);
List<InetSocketAddress> inetSocketAddresses = clusterAddressMap.get(clusterName);
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
return inetSocketAddresses;
}

// fall back to addresses of any cluster
return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty)
.findAny().orElse(Collections.emptyList());
}

default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
List<InetSocketAddress> aliveAddress) {
return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress);

Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
key -> new ConcurrentHashMap<>());

String clusterName = getServiceGroup(transactionServiceGroup);

return clusterAddressMap.put(clusterName, aliveAddress);
}


Expand All @@ -137,15 +154,21 @@ default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGrou
* @param clusterName
* @param newAddressed
*/
default void removeOfflineAddressesIfNecessary(String clusterName, Collection<InetSocketAddress> newAddressed) {
default void removeOfflineAddressesIfNecessary(String transactionGroupService, String clusterName, Collection<InetSocketAddress> newAddressed) {

Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService,
key -> new ConcurrentHashMap<>());

List<InetSocketAddress> currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList());
List<InetSocketAddress> currentAddresses = clusterAddressMap.getOrDefault(clusterName, Collections.emptyList());

List<InetSocketAddress> inetSocketAddresses = currentAddresses
.stream().filter(newAddressed::contains).collect(
Collectors.toList());

CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses);
// prevent empty update
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
clusterAddressMap.put(clusterName, inetSocketAddresses);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener>
private static final int MAP_INITIAL_CAPACITY = 8;
private static final int THREAD_POOL_SIZE = 2;
private ExecutorService executorService;

private String transactionServiceGroup;
/**
* TTL for lease
*/
Expand Down Expand Up @@ -181,6 +183,7 @@ public void unsubscribe(String cluster, Watch.Listener listener) throws Exceptio

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
final String cluster = getServiceGroup(key);
if (cluster == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -252,7 +255,7 @@ private void refreshCluster(String cluster) throws Exception {
}).collect(Collectors.toList());
clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList));

removeOfflineAddressesIfNecessary(cluster, instanceList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, instanceList);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis
private static volatile EurekaRegistryServiceImpl instance;
private static volatile EurekaClient eurekaClient;

private String transactionServiceGroup;

private EurekaRegistryServiceImpl() {
}

Expand Down Expand Up @@ -130,6 +132,7 @@ public void unsubscribe(String cluster, EurekaEventListener listener) throws Exc

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
String clusterName = getServiceGroup(key);
if (clusterName == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -169,7 +172,7 @@ private void refreshCluster(String clusterName) {
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class NacosRegistryServiceImpl implements RegistryService<EventListener>
private static final Pattern DEFAULT_SLB_REGISTRY_PATTERN = Pattern.compile("(?!.*internal)(?=.*seata).*mse.aliyuncs.com");
private static volatile Boolean useSLBWay;

private String transactionServiceGroup;

private NacosRegistryServiceImpl() {
String configForNacosSLB = FILE_CONFIG.getConfig(getNacosUrlPatternOfSLB());
Pattern patternOfNacosRegistryForSLB = StringUtils.isBlank(configForNacosSLB)
Expand Down Expand Up @@ -193,7 +195,7 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
}
});
}
Expand Down
Loading

0 comments on commit 2d920e2

Please sign in to comment.