Skip to content

Commit

Permalink
Merge pull request #889 from Wshoway/master
Browse files Browse the repository at this point in the history
limit client total connections
  • Loading branch information
sunnights authored Mar 26, 2020
2 parents f36fce9 + a27512f commit 1b818ae
Show file tree
Hide file tree
Showing 14 changed files with 542 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.weibo.api.motan.cluster.support;

import com.weibo.api.motan.closable.ShutDownHook;
import com.weibo.api.motan.cluster.Cluster;
import com.weibo.api.motan.cluster.HaStrategy;
import com.weibo.api.motan.cluster.LoadBalance;
Expand All @@ -33,13 +34,17 @@
import com.weibo.api.motan.rpc.URL;
import com.weibo.api.motan.util.CollectionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanSwitcherUtil;
import com.weibo.api.motan.util.StringTools;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
* Notify cluster the referers have changed.
Expand All @@ -50,21 +55,42 @@

public class ClusterSupport<T> implements NotifyListener {

private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<String, Protocol>();
private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<>();
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private static Set<ClusterSupport> refreshSet = new HashSet<>();

static {
executorService.scheduleAtFixedRate(() -> {
for (ClusterSupport clusterSupport : refreshSet) {
clusterSupport.refreshReferers();
}
}, MotanConstants.REFRESH_PERIOD, MotanConstants.REFRESH_PERIOD, TimeUnit.SECONDS);

ShutDownHook.registerShutdownHook(() -> {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
});
}

private Cluster<T> cluster;
private List<URL> registryUrls;
private URL url;
private Class<T> interfaceClass;
private Protocol protocol;
private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<URL, List<Referer<T>>>();

private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<>();
private int selectNodeCount;
private ConcurrentHashMap<URL, Map<String, GroupUrlsSelector>> registryGroupUrlsSelectorMap = new ConcurrentHashMap<>();

public ClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
this.registryUrls = registryUrls;
this.interfaceClass = interfaceClass;
String urlStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
this.url = URL.valueOf(urlStr);
protocol = getDecorateProtocol(url.getProtocol());
int maxConnectionCount = this.url.getIntParameter(URLParamType.maxConnectionPerGroup.getName(), URLParamType.maxConnectionPerGroup.getIntValue());
int maxClientConnection = this.url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue());
selectNodeCount = (int)Math.ceil(1.0 * maxConnectionCount / maxClientConnection);
}

public void init() {
Expand Down Expand Up @@ -152,7 +178,6 @@ public synchronized void notify(URL registryUrl, List<URL> urls) {
onRegistryEmpty(registryUrl);
LoggerUtil.warn("ClusterSupport config change notify, urls is empty: registry={} service={} urls=[]", registryUrl.getUri(),
url.getIdentity());

return;
}

Expand All @@ -165,8 +190,19 @@ public synchronized void notify(URL registryUrl, List<URL> urls) {
// 判断urls中是否包含权重信息,并通知loadbalance。
processWeights(urls);

List<Referer<T>> newReferers = new ArrayList<Referer<T>>();
for (URL u : urls) {
List<URL> serviceUrls = urls;
if (selectNodeCount > 0 && MotanSwitcherUtil.switcherIsOpenWithDefault("feature.motan.partial.server", true)) {
refreshSet.add(this);
serviceUrls = selectUrls(registryUrl, urls);
} else {
refreshSet.remove(this);
}
doRefreshReferersByUrls(registryUrl, serviceUrls);
}

private void doRefreshReferersByUrls(URL registryUrl, List<URL> serviceUrls) {
List<Referer<T>> newReferers = new ArrayList<>();
for (URL u : serviceUrls) {
if (!u.canServe(url)) {
continue;
}
Expand All @@ -192,6 +228,103 @@ public synchronized void notify(URL registryUrl, List<URL> urls) {
refreshCluster();
}

protected List<URL> selectUrls(URL registryUrl, List<URL> urls) {
Map<String, List<URL>> groupUrlsMap = new HashMap<>();
for (URL u : urls) {
String group = u.getGroup();
if (!groupUrlsMap.containsKey(group)) {
groupUrlsMap.put(group, new ArrayList<URL>());
}
if (u.canServe(url)) {
groupUrlsMap.get(group).add(u);
}
}
Map<String, GroupUrlsSelector> selectorMap = registryGroupUrlsSelectorMap.computeIfAbsent(registryUrl, k -> new HashMap<>());

for (Map.Entry<String, List<URL>> entry : groupUrlsMap.entrySet()) {
GroupUrlsSelector groupUrlsSelector = selectorMap.computeIfAbsent(entry.getKey(), k -> new GroupUrlsSelector());
if (entry.getValue().size() <= selectNodeCount) {
LoggerUtil.info("ClusterSupport config change notify: registry={} service={} group={} size={} non increased",
registryUrl.getUri(), url.getIdentity(), entry.getKey(), entry.getValue().size());
}
groupUrlsSelector.updateBaseUrls(entry.getValue());
}
//去掉多余的group
Set<String> removeGroups = new HashSet<>(selectorMap.keySet());
removeGroups.removeAll(groupUrlsMap.keySet());
if (!CollectionUtil.isEmpty(removeGroups)) {
for (String removeGroup : removeGroups) {
selectorMap.remove(removeGroup);
}
}

return doSelectUrls(registryUrl);
}

private List<URL> doSelectUrls(URL registryUrl) {
List<URL> result = new ArrayList<>();
Map<String, GroupUrlsSelector> selectors = registryGroupUrlsSelectorMap.getOrDefault(registryUrl, Collections.emptyMap());
for (Map.Entry<String, GroupUrlsSelector> entry : selectors.entrySet()) {
List<URL> urls = entry.getValue().selectUrls();
result.addAll(urls);

LoggerUtil.info("ClusterSupport select group urls: registry={} service={} group={} expectSize={} size={} urls={}",
registryUrl.getUri(), url.getIdentity(), entry.getKey(), entry.getValue().getSelectSize(), urls.size(), getIdentities(urls));
}

return result;
}

protected void refreshReferers() {
for (Map.Entry<URL, List<Referer<T>>> entry : registryReferers.entrySet()) {
URL registryUrl = entry.getKey();
LoggerUtil.info("ClusterSupport refreshReferers: registry={} service={}", registryUrl.getUri(), url.getIdentity());
Map<String, GroupUrlsSelector> groupSelectorMap = registryGroupUrlsSelectorMap.get(registryUrl);
if (groupSelectorMap == null || groupSelectorMap.size() == 0) {
LoggerUtil.warn("ClusterSupport refreshReferers, groupSelectorMap is empty: registry={} service={}", registryUrl.getUri(), url.getIdentity());
continue;
}
Map<String, Integer> groupAvailableCounter = new HashMap<>(groupSelectorMap.size());
for (Referer<T> referer : entry.getValue()) {
String group = referer.getServiceUrl().getGroup();
if (referer.isAvailable()) {
groupAvailableCounter.put(group, groupAvailableCounter.getOrDefault(group, 0) + 1);
}
}

boolean needRefresh = false;
for (Map.Entry<String, Integer> counter : groupAvailableCounter.entrySet()) {
String group = counter.getKey();
int available = counter.getValue();

GroupUrlsSelector selector = groupSelectorMap.get(group);
if (selector == null) {
LoggerUtil.warn("ClusterSupport refreshReferers ,urls selector is null: registry={} service={} group={}", registryUrl.getUri(), url.getIdentity(), group);
continue;
}
int selectSize = selector.getSelectSize();

int newSize = selectSize;
//将有效referer的数量保持在一个范围内, 如果小于selectNodeCount的2/3或大于selectNodeCount的4/3
// 则试图将可用数量恢复成selectNodeCount个
if (available <= 1.0 * selectNodeCount * 2 / 3 && selector.getBaseUrlsSize() > selectSize) {
newSize = Math.min(selectSize + (selectNodeCount - available), selector.getBaseUrlsSize());
} else if (available >= 1.0 * selectNodeCount * 4 / 3) {
newSize = selectSize - (available - selectNodeCount);
}
if (newSize != selectSize) {
needRefresh = true;
selector.setSelectSize(newSize);
LoggerUtil.info("ClusterSupport refreshReferers selectSize changed: registry={} service={} group={} newSize={} oldSize={}", registryUrl.getUri(), url.getIdentity(), group, newSize, selectSize);
}
}
if (needRefresh) {
List<URL> urls = doSelectUrls(registryUrl);
doRefreshReferersByUrls(registryUrl, urls);
}
}
}

/**
* 检查urls中的第一个url是否为权重信息。 如果是权重信息则把权重信息传递给loadbalance,并移除权重url。
*
Expand Down Expand Up @@ -259,7 +392,7 @@ private void mergeClientConfigs(URL refererURL) {
}

private void refreshCluster() {
List<Referer<T>> referers = new ArrayList<Referer<T>>();
List<Referer<T>> referers = new ArrayList<>();
for (List<Referer<T>> refs : registryReferers.values()) {
referers.addAll(refs);
}
Expand Down Expand Up @@ -307,7 +440,7 @@ private void prepareCluster() {

private List<URL> parseDirectUrls(String directUrlStr) {
String[] durlArr = MotanConstants.COMMA_SPLIT_PATTERN.split(directUrlStr);
List<URL> directUrls = new ArrayList<URL>();
List<URL> directUrls = new ArrayList<>();
for (String dus : durlArr) {
URL du = URL.valueOf(StringTools.urlDecode(dus));
if (du != null) {
Expand All @@ -316,4 +449,48 @@ private List<URL> parseDirectUrls(String directUrlStr) {
}
return directUrls;
}

private class GroupUrlsSelector {
private List<URL> baseUrls;
private int selectSize;

GroupUrlsSelector(){
baseUrls = new ArrayList<>();
selectSize = selectNodeCount;
}

void updateBaseUrls(List<URL> newBaseUrls){
baseUrls.retainAll(newBaseUrls);

Set<URL> addedUrls = new HashSet<>(newBaseUrls);
addedUrls.removeAll(baseUrls);

for (URL addedUrl : addedUrls) {
int addPosition = ThreadLocalRandom.current().nextInt(baseUrls.size() + 1);
baseUrls.add(addPosition, addedUrl);
}
}

List<URL> selectUrls() {
List<URL> result = new ArrayList<>(selectSize);
if (baseUrls.size() >= selectSize) {
result.addAll(baseUrls.subList(0, selectSize));
} else {
result.addAll(baseUrls);
}
return result;
}

int getSelectSize() {
return selectSize;
}

void setSelectSize(int selectSize) {
this.selectSize = selectSize;
}

int getBaseUrlsSize(){
return baseUrls.size();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class MotanConstants {
public static final String DEFAULT_CHARACTER = "utf-8";
public static final int SLOW_COST = 50; // 50ms
public static final int STATISTIC_PEROID = 30; // 30 seconds
public static final int REFRESH_PERIOD = 60;
public static final String ASYNC_SUFFIX = "Async";// suffix for async call.
public static final String APPLICATION_STATISTIC = "statisitic";
public static final String REQUEST_REMOTE_ADDR = "requestRemoteAddress";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,65 @@ public enum URLParamType {
requestTimeout("requestTimeout", 200),
/** request id from http interface **/
requestIdFromClient("requestIdFromClient", 0),
/** connect timeout **/
/**
* connect timeout
**/
connectTimeout("connectTimeout", 1000),
/** service min worker threads **/
/**
* service min worker threads
**/
minWorkerThread("minWorkerThread", 20),
/** service max worker threads **/
/**
* service max worker threads
**/
maxWorkerThread("maxWorkerThread", 200),
/** pool min conn number **/
/**
* pool min conn number
**/
minClientConnection("minClientConnection", 2),
/** pool max conn number **/
/**
* pool max conn number
**/
maxClientConnection("maxClientConnection", 10),
/** pool max conn number **/
maxConnectionPerGroup("maxConnectionPerGroup", 0),
/**
* pool max conn number
**/
maxContentLength("maxContentLength", 10 * 1024 * 1024),
/** max server conn (all clients conn) **/
/**
* max server conn (all clients conn)
**/
maxServerConnection("maxServerConnection", 100000),
/** pool conn manger stragy **/
/**
* pool conn manger stragy
**/
poolLifo("poolLifo", true),

lazyInit("lazyInit", false),
/** multi referer share the same channel **/
/**
* multi referer share the same channel
**/
shareChannel("shareChannel", false),
asyncInitConnection("asyncInitConnection", false),
fusingThreshold("fusingThreshold", 10),

/************************** SPI start ******************************/

/** serialize **/
/**
* serialize
**/
serialize("serialization", "hessian2"),
/** codec **/
/**
* codec
**/
codec("codec", "motan"),
/** endpointFactory **/
/**
* endpointFactory
**/
endpointFactory("endpointFactory", "motan"),
/** heartbeatFactory **/
/**
* heartbeatFactory
**/
heartbeatFactory("heartbeatFactory", "motan"),
/** switcherService **/
switcherService("switcherService", "localSwitcherService"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.weibo.api.motan.config;

import java.util.Map;

import com.weibo.api.motan.config.annotation.ConfigDesc;

import java.util.Map;

/**
*
* protocol
Expand Down Expand Up @@ -48,6 +48,7 @@ public class ProtocolConfig extends AbstractConfig {
protected Integer minClientConnection;
// client最大连接数
protected Integer maxClientConnection;
protected Integer maxConnectionPerGroup;
// 最小工作pool线程数
protected Integer minWorkerThread;
// 最大工作pool线程数
Expand Down Expand Up @@ -164,6 +165,14 @@ public void setMaxClientConnection(Integer maxClientConnection) {
this.maxClientConnection = maxClientConnection;
}

public Integer getMaxConnectionPerGroup() {
return maxConnectionPerGroup;
}

public void setMaxConnectionPerGroup(Integer maxConnectionPerGroup) {
this.maxConnectionPerGroup = maxConnectionPerGroup;
}

public Integer getMinWorkerThread() {
return minWorkerThread;
}
Expand Down
Loading

0 comments on commit 1b818ae

Please sign in to comment.