Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: Caching the ConsistentHashSelector to avoid remapping when select every time. #7072

Open
wants to merge 4 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.*;
hein-hp marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.config.ConfigurationFactory;
Expand All @@ -38,16 +36,67 @@ public class ConsistentHashLoadBalance implements LoadBalance {
* The constant LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES.
*/
public static final String LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES = LoadBalanceFactory.LOAD_BALANCE_PREFIX
+ "virtualNodes";
+ "virtualNodes";
/**
* The constant VIRTUAL_NODES_NUM.
*/
private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance().getInt(
LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES, VIRTUAL_NODES_DEFAULT);
LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES, VIRTUAL_NODES_DEFAULT);

/**
* The ConsistentHashSelectorWrapper that caches a {@link ConsistentHashSelector}.
*/
private volatile ConsistentHashSelectorWrapper selectorWrapper;

@SuppressWarnings("unchecked")
@Override
public <T> T select(List<T> invokers, String xid) {
return new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM).select(xid);
if (selectorWrapper == null) {
synchronized (this) {
if (selectorWrapper == null) {
selectorWrapper = new ConsistentHashSelectorWrapper(
new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM), invokers);
}
}
}
return (T) selectorWrapper.getSelector(invokers).select(xid);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private static final class ConsistentHashSelectorWrapper {

private volatile ConsistentHashSelector selector;
// only shared with read
private volatile Set invokers;

public ConsistentHashSelectorWrapper(ConsistentHashSelector selector, List invokers) {
this.selector = selector;
this.invokers = new HashSet<>(invokers);
}

public ConsistentHashSelector getSelector(List invokers) {
if (!equals(invokers)) {
synchronized (this) {
if (!equals(invokers)) {
selector = new ConsistentHashSelector(invokers, VIRTUAL_NODES_NUM);
this.invokers = new HashSet<>(invokers);
}
}
}
return selector;
}

private boolean equals(List invokers) {
if (invokers.size() != this.invokers.size()) {
return false;
}
for (Object invoker : invokers) {
if (!this.invokers.contains(invoker)) {
return false;
}
}
return true;
}
}

private static final class ConsistentHashSelector<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

Expand Down Expand Up @@ -85,7 +88,7 @@ public void testXIDLoadBalance_select(List<InetSocketAddress> addresses) throws
Assertions.assertNotNull(inetSocketAddress);
// test not found tc channel
inetSocketAddress = loadBalance.select(addresses, "127.0.0.1:8199:123456");
Assertions.assertNotEquals(inetSocketAddress.getPort(),8199);
Assertions.assertNotEquals(inetSocketAddress.getPort(), 8199);
}

/**
Expand All @@ -108,6 +111,31 @@ public void testConsistentHashLoadBalance_select(List<InetSocketAddress> address
Assertions.assertEquals(1, selected, "selected must be equal to 1");
}

/**
* Test cached consistent hash load balance select.
*
* @param addresses the addresses
*/
@ParameterizedTest
@MethodSource("addressProvider")
public void testCachedConsistentHashLoadBalance_select(List<InetSocketAddress> addresses) throws Exception {
ConsistentHashLoadBalance loadBalance = new ConsistentHashLoadBalance();

List<InetSocketAddress> addresses1 = new ArrayList<>(addresses);
loadBalance.select(addresses1, XID);
Object o1 = getConsistentHashSelectorByReflect(loadBalance);
List<InetSocketAddress> addresses2 = new ArrayList<>(addresses);
loadBalance.select(addresses2, XID);
Object o2 = getConsistentHashSelectorByReflect(loadBalance);
Assertions.assertEquals(o1, o2);

List<InetSocketAddress> addresses3 = new ArrayList<>(addresses);
addresses3.remove(ThreadLocalRandom.current().nextInt(addresses.size()));
loadBalance.select(addresses3, XID);
Object o3 = getConsistentHashSelectorByReflect(loadBalance);
Assertions.assertNotEquals(o1, o3);
}

/**
* Test least active load balance select.
*
Expand Down Expand Up @@ -166,6 +194,22 @@ public Map<InetSocketAddress, AtomicLong> getSelectedCounter(int runs, List<Inet
return counter;
}

/**
* Gets ConsistentHashSelector Instance By Reflect
*
* @param loadBalance the loadBalance
* @return the ConsistentHashSelector
*/
public Object getConsistentHashSelectorByReflect(ConsistentHashLoadBalance loadBalance) throws Exception {
Field selectorWrapperField = ConsistentHashLoadBalance.class.getDeclaredField("selectorWrapper");
selectorWrapperField.setAccessible(true);
Object selectWrapper = selectorWrapperField.get(loadBalance);
Assertions.assertNotNull(selectWrapper);
Field selectorField = selectWrapper.getClass().getDeclaredField("selector");
selectorField.setAccessible(true);
return selectorField.get(selectWrapper);
}

/**
* Address provider object [ ] [ ].
*
Expand Down