From a4f046308072bf0c1d1fec831ad72c4e3bcc3484 Mon Sep 17 00:00:00 2001
From: ggbocoder <119659920+ggbocoder@users.noreply.github.com>
Date: Sat, 24 Aug 2024 00:08:17 +0800
Subject: [PATCH 1/8] optimize: automatic deletion of namingserver vgroup
through Caffeine map (#6770)
---
changes/en-us/2.x.md | 2 +-
changes/zh-cn/2.x.md | 1 +
.../namingserver/NamingServerNode.java | 9 +++
dependencies/pom.xml | 6 ++
.../NamingserverRegistryServiceImpl.java | 12 +---
namingserver/pom.xml | 5 ++
.../namingserver/entity/bo/NamespaceBO.java | 8 +++
.../namingserver/manager/NamingManager.java | 58 ++++++++++++++-----
.../namingserver/NamingControllerTest.java | 37 ------------
.../java/org/apache/seata/server/Server.java | 27 ++++++---
.../db/store/VGroupMappingDataBaseDAO.java | 3 +-
11 files changed, 94 insertions(+), 74 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 899fc783568..0baa90725d4 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -61,7 +61,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6763](https://github.com/apache/incubator-seata/pull/6763)] optimize NacosConfiguration singleton reload
- [[#6761](https://github.com/apache/incubator-seata/pull/6761)] optimize the namingserver code to improve readability
- [[#6768](https://github.com/apache/incubator-seata/pull/6768)] report the tcc fence transaction isolation level
-
+- [[#6770](https://github.com/apache/incubator-seata/pull/6770)] Automatic deletion of namingserver vgroup through Caffeine map
### refactor:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 0371543adeb..f19513b8b45 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -61,6 +61,7 @@
- [[#6763](https://github.com/apache/incubator-seata/pull/6763)] 优化 NacosConfiguration 单例加载
- [[#6761](https://github.com/apache/incubator-seata/pull/6761)] 提升namingserver manager代码可读性
- [[#6768](https://github.com/apache/incubator-seata/pull/6768)] 上报tcc fence事务隔离级别
+- [[#6770](https://github.com/apache/incubator-seata/pull/6770)] 通过caffeine map支持namingserver事务分组的过期删除
### refactor:
diff --git a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
index 5645ac0e3ed..459dd66b944 100644
--- a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
+++ b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
@@ -27,6 +27,15 @@ public class NamingServerNode extends Node {
private double weight = 1.0;
private boolean healthy = true;
private long term;
+ private String unit;
+
+ public String getUnit() {
+ return unit;
+ }
+
+ public void setUnit(String unit) {
+ this.unit = unit;
+ }
public double getWeight() {
return weight;
diff --git a/dependencies/pom.xml b/dependencies/pom.xml
index e4887f5ca42..a94309cc3fc 100644
--- a/dependencies/pom.xml
+++ b/dependencies/pom.xml
@@ -111,6 +111,7 @@
1.2.20
2.9.0
3.4.3
+ 2.8.8
4.8
@@ -395,6 +396,11 @@
+
+ com.github.ben-manes.caffeine
+ caffeine
+ ${caffeine.version}
+
com.alibaba.spring
spring-context-support
diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
index b8c988eb310..719336283d3 100644
--- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
+++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
@@ -62,7 +62,6 @@
import org.slf4j.LoggerFactory;
-
public class NamingserverRegistryServiceImpl implements RegistryService {
private static final Logger LOGGER = LoggerFactory.getLogger(NamingserverRegistryServiceImpl.class);
@@ -163,15 +162,6 @@ public void register(InetSocketAddress address) throws Exception {
heartBeatScheduledFuture.cancel(false);
}
- heartBeatScheduledFuture = this.executorService.scheduleAtFixedRate(() -> {
- try {
- instance.setTimestamp(System.currentTimeMillis());
- doRegister(instance, getNamingAddrs());
- } catch (Exception e) {
- LOGGER.error("Naming server register Exception", e);
- }
- }, HEARTBEAT_PERIOD, HEARTBEAT_PERIOD, TimeUnit.MILLISECONDS);
-
}
public void doRegister(Instance instance, List urlList) {
@@ -232,7 +222,7 @@ public void unregister(InetSocketAddress address) {
String unit = instance.getUnit();
String jsonBody = instance.toJsonString();
String params = "unit=" + unit;
- params = params + "&cluster=" + instance.getClusterName();
+ params = params + "&clusterName=" + instance.getClusterName();
params = params + "&namespace=" + instance.getNamespace();
url += params;
Map header = new HashMap<>();
diff --git a/namingserver/pom.xml b/namingserver/pom.xml
index 61e5b0bd8a6..4ed5764fa82 100644
--- a/namingserver/pom.xml
+++ b/namingserver/pom.xml
@@ -82,6 +82,11 @@
spring-boot-starter
+
+ com.github.ben-manes.caffeine
+ caffeine
+
+
org.springframework.boot
spring-boot-starter-test
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/entity/bo/NamespaceBO.java b/namingserver/src/main/java/org/apache/seata/namingserver/entity/bo/NamespaceBO.java
index 907e0b25580..74749d8aac2 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/entity/bo/NamespaceBO.java
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/entity/bo/NamespaceBO.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.seata.common.metadata.Cluster;
+import org.apache.seata.common.util.StringUtils;
import org.apache.seata.namingserver.entity.pojo.ClusterData;
public class NamespaceBO {
@@ -54,4 +55,11 @@ public ClusterBO getCluster(String clusterName) {
return clusterMap.computeIfAbsent(clusterName, k -> new ClusterBO());
}
+ public void removeOldCluster(String clusterName) {
+ clusterMap.keySet().forEach(currentClusterName -> {
+ if (!StringUtils.equals(currentClusterName, clusterName)) {
+ clusterMap.remove(currentClusterName);
+ }
+ });
+ }
}
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
index 79a27bcfe8e..5de8ea9c5ae 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
@@ -33,6 +33,10 @@
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.protocol.HTTP;
@@ -49,6 +53,8 @@
import org.apache.seata.namingserver.listener.ClusterChangeEvent;
import org.apache.seata.namingserver.entity.pojo.ClusterData;
import org.apache.seata.namingserver.entity.vo.monitor.ClusterVO;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -65,7 +71,7 @@
public class NamingManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NamingManager.class);
private final ConcurrentMap instanceLiveTable;
- private final ConcurrentMap> vGroupMap;
+ private volatile LoadingCache> vGroupMap;
private final ConcurrentMap> namespaceClusterDataMap;
@@ -83,12 +89,24 @@ public class NamingManager {
public NamingManager() {
this.instanceLiveTable = new ConcurrentHashMap<>();
- this.vGroupMap = new ConcurrentHashMap<>();
this.namespaceClusterDataMap = new ConcurrentHashMap<>();
}
@PostConstruct
public void init() {
+ this.vGroupMap = Caffeine.newBuilder()
+ .expireAfterAccess(heartbeatTimeThreshold, TimeUnit.MILLISECONDS) // expired time
+ .maximumSize(Integer.MAX_VALUE)
+ .removalListener(new RemovalListener