Skip to content

Commit

Permalink
Remove known nodes and only use liveNodes
Browse files Browse the repository at this point in the history
  • Loading branch information
mlbiscoc committed Jan 10, 2025
1 parent 52ea46b commit 9d0e720
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -54,7 +58,6 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
private String urlScheme;
private Set<String> initialNodes;
volatile Set<String> liveNodes;
volatile Set<String> knownNodes;
long liveNodesTimestamp = 0;
volatile Map<String, List<String>> aliases;
volatile Map<String, Map<String, String>> aliasProperties;
Expand All @@ -64,12 +67,11 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
private int cacheTimeout = EnvUtils.getPropertyAsInteger("solr.solrj.cache.timeout.sec", 5);

public void init(List<String> solrUrls) throws Exception {
this.initialNodes = getNodeNamesFromSolrUrls(solrUrls);
for (String solrUrl : solrUrls) {
urlScheme = solrUrl.startsWith("https") ? "https" : "http";
try (SolrClient initialClient = getSolrClient(solrUrl)) {
this.liveNodes = fetchLiveNodes(initialClient);
this.initialNodes = Set.copyOf(liveNodes);
setKnownNodes();
liveNodesTimestamp = System.nanoTime();
break;
} catch (SolrServerException | IOException e) {
Expand Down Expand Up @@ -101,7 +103,7 @@ public DocCollection getCollection(String collection) {

@Override
public ClusterState.CollectionRef getState(String collection) {
for (String nodeName : knownNodes) {
for (String nodeName : liveNodes) {
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
DocCollection docCollection = fetchCollectionState(client, collection);
Expand All @@ -125,7 +127,7 @@ public ClusterState.CollectionRef getState(String collection) {
}
throw new RuntimeException(
"Tried fetching cluster state using the node names we knew of, i.e. "
+ knownNodes
+ liveNodes
+ ". However, "
+ "succeeded in obtaining the cluster state from none of them."
+ "If you think your Solr cluster is up and is accessible,"
Expand All @@ -141,8 +143,7 @@ private ClusterState fetchClusterState(SolrClient client)

List<String> liveNodesList = (List<String>) cluster.get("live_nodes");
if (liveNodesList != null) {
this.liveNodes = Set.copyOf(liveNodesList);
setKnownNodes();
setLiveNodes(Set.copyOf(liveNodesList));
liveNodesTimestamp = System.nanoTime();
}

Expand Down Expand Up @@ -222,7 +223,7 @@ private SimpleOrderedMap<?> submitClusterStateRequest(

@Override
public Set<String> getLiveNodes() {
if (knownNodes == null) {
if (liveNodes == null) {
throw new RuntimeException(
"We don't know of any live_nodes to fetch the"
+ " latest live_nodes information from. "
Expand All @@ -232,11 +233,10 @@ public Set<String> getLiveNodes() {
}
if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp), TimeUnit.NANOSECONDS)
> getCacheTimeout()) {
for (String nodeName : knownNodes) {
for (String nodeName : liveNodes) {
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
this.liveNodes = fetchLiveNodes(client);
setKnownNodes();
setLiveNodes(fetchLiveNodes(client));
liveNodesTimestamp = System.nanoTime();
return this.liveNodes;
} catch (Exception e) {
Expand All @@ -245,7 +245,7 @@ > getCacheTimeout()) {
}
throw new RuntimeException(
"Tried fetching live_nodes using all the node names we knew of, i.e. "
+ knownNodes
+ liveNodes
+ ". However, "
+ "succeeded in obtaining the cluster state from none of them."
+ "If you think your Solr cluster is up and is accessible,"
Expand Down Expand Up @@ -278,7 +278,7 @@ public String resolveSimpleAlias(String aliasName) throws IllegalArgumentExcepti
}

private Map<String, List<String>> getAliases(boolean forceFetch) {
if (this.knownNodes == null) {
if (this.liveNodes == null) {
throw new RuntimeException(
"We don't know of any live_nodes to fetch the"
+ " latest aliases information from. "
Expand All @@ -291,7 +291,7 @@ private Map<String, List<String>> getAliases(boolean forceFetch) {
|| this.aliases == null
|| TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp), TimeUnit.NANOSECONDS)
> getCacheTimeout()) {
for (String nodeName : knownNodes) {
for (String nodeName : liveNodes) {
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {

Expand Down Expand Up @@ -319,7 +319,7 @@ > getCacheTimeout()) {

throw new RuntimeException(
"Tried fetching aliases using all the node names we knew of, i.e. "
+ knownNodes
+ liveNodes
+ ". However, "
+ "succeeded in obtaining the cluster state from none of them."
+ "If you think your Solr cluster is up and is accessible,"
Expand All @@ -338,7 +338,7 @@ public Map<String, String> getAliasProperties(String alias) {

@Override
public ClusterState getClusterState() {
for (String nodeName : knownNodes) {
for (String nodeName : liveNodes) {
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
return fetchClusterState(client);
Expand All @@ -353,7 +353,7 @@ public ClusterState getClusterState() {
}
throw new RuntimeException(
"Tried fetching cluster state using the node names we knew of, i.e. "
+ knownNodes
+ liveNodes
+ ". However, "
+ "succeeded in obtaining the cluster state from none of them."
+ "If you think your Solr cluster is up and is accessible,"
Expand All @@ -364,7 +364,7 @@ public ClusterState getClusterState() {
@SuppressWarnings("unchecked")
@Override
public Map<String, Object> getClusterProperties() {
for (String nodeName : knownNodes) {
for (String nodeName : liveNodes) {
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
SimpleOrderedMap<?> cluster =
Expand All @@ -376,7 +376,7 @@ public Map<String, Object> getClusterProperties() {
}
throw new RuntimeException(
"Tried fetching cluster state using the node names we knew of, i.e. "
+ knownNodes
+ liveNodes
+ ". However, "
+ "succeeded in obtaining the cluster state from none of them."
+ "If you think your Solr cluster is up and is accessible,"
Expand Down Expand Up @@ -418,19 +418,28 @@ public String getQuorumHosts() {
return String.join(",", this.liveNodes);
}

public Set<String> getKnownNodes() {
getLiveNodes();
return this.knownNodes;
/** Live nodes should always have the latest set of live nodes but never remove initial set */
private void setLiveNodes(Set<String> nodes) {
Set<String> liveNodes = new HashSet<>(nodes);
liveNodes.addAll(this.initialNodes);
this.liveNodes = Set.copyOf(liveNodes);
}

public Set<String> getNodeNamesFromSolrUrls(List<String> urls)
throws URISyntaxException, MalformedURLException {
Set<String> set = new HashSet<>();
for (String url : urls) {
String nodeNameFromSolrUrl = getNodeNameFromSolrUrl(url);
set.add(nodeNameFromSolrUrl);
}
return Collections.unmodifiableSet(set);
}

/**
* Known nodes should always have the latest set of live nodes but never remove initial set of
* live nodes
*/
private void setKnownNodes() {
Set<String> knownNodes = new HashSet<>(this.liveNodes);
knownNodes.addAll(this.initialNodes);
this.knownNodes = Set.copyOf(knownNodes);
/** URL to cluster state node name (http://127.0.0.1:12345/solr to 127.0.0.1:12345_solr) */
public String getNodeNameFromSolrUrl(String solrUrl)
throws MalformedURLException, URISyntaxException {
URL url = new URI(solrUrl).toURL();
return url.getAuthority() + url.getPath().replace('/', '_');
}

private enum ClusterStateRequestType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,13 @@
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.embedded.JettySolrRunner;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;

public class ClusterStateProviderTest extends SolrCloudTestCase {

private static JettySolrRunner jettyNode1;
private static JettySolrRunner jettyNode2;

@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
Expand All @@ -57,19 +53,15 @@ public static void setupCluster() throws Exception {
.resolve("streaming")
.resolve("conf"))
.configure();
jettyNode1 = cluster.getJettySolrRunner(0);
jettyNode2 = cluster.getJettySolrRunner(1);
cluster.waitForAllNodes(30);
System.setProperty("solr.solrj.cache.timeout.sec", "1");
}

@After
public void cleanup() throws Exception {
if (!jettyNode1.isRunning()) {
cluster.startJettySolrRunner(jettyNode1);
}
if (!jettyNode2.isRunning()) {
cluster.startJettySolrRunner(jettyNode2);
while (cluster.getJettySolrRunners().size() < 2) {
cluster.startJettySolrRunner();
}
waitForCSPCacheTimeout();
}

@ParametersFactory
Expand Down Expand Up @@ -206,97 +198,60 @@ public void testClusterStateProvider() throws SolrServerException, IOException {
}

@Test
public void testClusterStateProviderDownedLiveNodes() throws Exception {
try (var cspZk = zkClientClusterStateProvider();
var cspHttp = http2ClusterStateProvider()) {
Set<String> expectedLiveNodes = cspZk.getClusterState().getLiveNodes();
public void testClusterStateProviderDownedInitialLiveNodes() throws Exception {
try (var cspHttp = http2ClusterStateProvider()) {
var jettyNode1 = cluster.getJettySolrRunner(0);
var jettyNode2 = cluster.getJettySolrRunner(1);

Set<String> actualLiveNodes = cspHttp.getLiveNodes();
assertEquals(2, actualLiveNodes.size());
assertEquals(expectedLiveNodes, actualLiveNodes);

cluster.stopJettySolrRunner(jettyNode1);
waitForCSPCacheTimeout();

expectedLiveNodes = cspZk.getClusterState().getLiveNodes();
actualLiveNodes = cspHttp.getLiveNodes();
assertEquals(1, actualLiveNodes.size());
assertEquals(expectedLiveNodes, actualLiveNodes);
assertEquals(2, actualLiveNodes.size());

cluster.startJettySolrRunner(jettyNode1);
cluster.stopJettySolrRunner(jettyNode2);
waitForCSPCacheTimeout();

// Should still be reachable because known hosts doesn't remove initial nodes
expectedLiveNodes = cspZk.getClusterState().getLiveNodes();
// Should still be reachable because live nodes doesn't remove initial nodes
actualLiveNodes = cspHttp.getLiveNodes();
assertEquals(1, actualLiveNodes.size());
assertEquals(expectedLiveNodes, actualLiveNodes);
}
}

@Test
public void testClusterStateProviderDownedKnownHosts() throws Exception {

try (var cspHttp = http2ClusterStateProvider()) {

String jettyNode1Url = normalizeJettyUrl(jettyNode1.getBaseUrl().toString());
String jettyNode2Url = normalizeJettyUrl(jettyNode2.getBaseUrl().toString());
Set<String> expectedKnownNodes = Set.of(jettyNode1Url, jettyNode2Url);
Set<String> actualKnownNodes = cspHttp.getKnownNodes();

assertEquals(2, actualKnownNodes.size());
assertEquals(expectedKnownNodes, actualKnownNodes);

cluster.stopJettySolrRunner(jettyNode1);
waitForCSPCacheTimeout();

// Known hosts should never remove the initial set of live nodes
actualKnownNodes = cspHttp.getKnownNodes();
assertEquals(2, actualKnownNodes.size());
assertEquals(expectedKnownNodes, actualKnownNodes);
assertEquals(2, actualLiveNodes.size());
}
}

@Test
public void testClusterStateProviderKnownHostsWithNewHost() throws Exception {

public void testClusterStateProviderLiveNodesWithNewHost() throws Exception {
try (var cspHttp = http2ClusterStateProvider()) {

var jettyNode1 = cluster.getJettySolrRunner(0);
var jettyNode2 = cluster.getJettySolrRunner(1);
var jettyNode3 = cluster.startJettySolrRunner();
String jettyNode1Url = normalizeJettyUrl(jettyNode1.getBaseUrl().toString());
String jettyNode2Url = normalizeJettyUrl(jettyNode2.getBaseUrl().toString());
String jettyNode3Url = normalizeJettyUrl(jettyNode3.getBaseUrl().toString());
Set<String> expectedKnownNodes = Set.of(jettyNode1Url, jettyNode2Url, jettyNode3Url);
waitForCSPCacheTimeout();

Set<String> actualKnownNodes = cspHttp.getKnownNodes();
assertEquals(3, actualKnownNodes.size());
assertEquals(expectedKnownNodes, actualKnownNodes);

cluster.stopJettySolrRunner(jettyNode1);
String nodeName1 = cspHttp.getNodeNameFromSolrUrl(jettyNode1.getBaseUrl().toString());
String nodeName2 = cspHttp.getNodeNameFromSolrUrl(jettyNode2.getBaseUrl().toString());
String nodeName3 = cspHttp.getNodeNameFromSolrUrl(jettyNode3.getBaseUrl().toString());
Set<String> expectedKnownNodes = Set.of(nodeName1, nodeName2, nodeName3);
waitForCSPCacheTimeout();

actualKnownNodes = cspHttp.getKnownNodes();
Set<String> actualKnownNodes = cspHttp.getLiveNodes();
assertEquals(3, actualKnownNodes.size());
assertEquals(expectedKnownNodes, actualKnownNodes);

// Stop non initially passed node from the cluster
cluster.stopJettySolrRunner(jettyNode3);
expectedKnownNodes = Set.of(jettyNode1Url, jettyNode2Url);
expectedKnownNodes = Set.of(nodeName1, nodeName2);
waitForCSPCacheTimeout();

// New nodes are removable from known hosts
actualKnownNodes = cspHttp.getKnownNodes();
actualKnownNodes = cspHttp.getLiveNodes();
assertEquals(2, actualKnownNodes.size());
assertEquals(expectedKnownNodes, actualKnownNodes);
}
}

private void waitForCSPCacheTimeout() throws InterruptedException {
Thread.sleep(6000);
}

/** Jetty URL to Cluster State Node String http://127.0.0.1:12345/solr to 127.0.0.1:12345_solr */
private String normalizeJettyUrl(String jettyUrl) {
return jettyUrl.substring(jettyUrl.lastIndexOf("//") + 2).replace("/", "_");
Thread.sleep(2000);
}
}

0 comments on commit 9d0e720

Please sign in to comment.