Skip to content

Commit

Permalink
Make test connectionLoss logic the same, add waitForLoss option
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman committed Feb 27, 2025
1 parent 0a50eda commit 2b1f108
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ protected void testLeaderZkSessionLoss() throws Exception {
doc.addField("a_t", "hello" + 2);

// cause leader migration by expiring the current leader's zk session
chaosMonkey.expireSession(leaderJetty);
chaosMonkey.expireSession(leaderJetty, false);

String expectedNewLeaderCoreNodeName = notLeaders.get(0).getName();
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testSimpleSliceLeaderElection() throws Exception {
String leader = getLeader(collection);
JettySolrRunner jetty = getRunner(leader);
assertNotNull(jetty);
cluster.expireZkSession(jetty);
cluster.expireZkSession(jetty, false);

for (int i = 0; i < 60; i++) { // wait till leader is changed
if (jetty != getRunner(getLeader(collection))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testPullReplicaDisconnectsFromZooKeeper() throws Exception {
}
addDocs(20);
JettySolrRunner jetty = getJettyForReplica(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0));
cluster.expireZkSession(jetty);
cluster.expireZkSession(jetty, true);
addDocs(30);
waitForState("Expecting node to be disconnected", collectionName, activeReplicaCount(1, 0, 0));
addDocs(40);
Expand All @@ -254,7 +254,7 @@ public void testCloseHooksDeletedOnReconnect() throws Exception {
SolrCore core = jetty.getCoreContainer().getCores().iterator().next();

for (int i = 0; i < (TEST_NIGHTLY ? 5 : 2); i++) {
cluster.expireZkSession(jetty);
cluster.expireZkSession(jetty, true);
waitForState(
"Expecting node to be disconnected", collectionName, activeReplicaCount(1, 0, 0));
waitForState("Expecting node to reconnect", collectionName, activeReplicaCount(1, 0, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testReloadedLeaderStateAfterZkSessionLoss() throws Exception {

final int initialStateVersion = getCollectionState(testCollectionName).getZNodeVersion();

cluster.expireZkSession(cluster.getReplicaJetty(leader));
cluster.expireZkSession(cluster.getReplicaJetty(leader), true);

waitForState(
"Timed out waiting for core to re-register as ACTIVE after session expiry",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private void testEmptyCollection() throws Exception {

private void testAllNodesDown() throws Exception {
try (CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory)) {
cluster.expireZkSession(cluster.getReplicaJetty(getReplicas().get(0)));
cluster.expireZkSession(cluster.getReplicaJetty(getReplicas().get(0)), true);
expectThrows(IOException.class, () -> getTuples(stream));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void test() throws Exception {

// expire a session...
CloudJettyRunner cloudJetty = shardToJetty.get(SHARD1).get(0);
chaosMonkey.expireSession(cloudJetty.jetty);
chaosMonkey.expireSession(cloudJetty.jetty, true);
// Wait until the jetty is reconnected, otherwise the following index command could fail
cloudJetty
.jetty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,28 @@ public ChaosMonkey(
}

// TODO: expire all clients at once?
public void expireSession(final JettySolrRunner jetty) {
public void expireSession(final JettySolrRunner jetty, boolean waitForConnectionLoss) {
expireSession(jetty, zkServer, waitForConnectionLoss);
}

public static void expireSession(
final JettySolrRunner jetty, final ZkTestServer zkServer, boolean waitForConnectionLoss) {
CoreContainer cores = jetty.getCoreContainer();
if (cores != null) {
monkeyLog("expire session for " + jetty.getLocalPort() + " !");
long sessionId = cores.getZkController().getZkClient().getZkSessionId();
monkeyLog("expire session for node " + jetty.getBaseUrl() + " !");
SolrZkClient zkClient = cores.getZkController().getZkClient();
long sessionId = zkClient.getZkSessionId();
zkServer.expire(sessionId);
causeConnectionLoss(jetty);
if (waitForConnectionLoss) {
// Loop until either the Zookeeper Client is no longer connected, or the zkSessionID changes
// (which means the connection was lost in the client)
while (zkClient.getCuratorFramework().getZookeeperClient().isConnected()) {
if (zkClient.getZkSessionId() != sessionId) {
break;
}
}
}
}
}

Expand All @@ -160,7 +175,7 @@ public void expireRandomSession() throws KeeperException, InterruptedException {

CloudJettyRunner jetty = getRandomJetty(sliceName, aggressivelyKillLeaders);
if (jetty != null) {
expireSession(jetty.jetty);
expireSession(jetty.jetty, false);
expires.incrementAndGet();
}
}
Expand All @@ -179,7 +194,7 @@ public void randomConnectionLoss() throws KeeperException, InterruptedException
public static void causeConnectionLoss(JettySolrRunner jetty) {
CoreContainer cores = jetty.getCoreContainer();
if (cores != null) {
monkeyLog("Will cause connection loss on " + jetty.getLocalPort());
monkeyLog("Will cause connection loss on node " + jetty.getBaseUrl());
SolrZkClient zkClient = cores.getZkController().getZkClient();
try {
KillSession.kill(zkClient.getCuratorFramework().getZookeeperClient().getZooKeeper());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.servlet.Filter;
import org.apache.curator.test.KillSession;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.embedded.SSLConfig;
Expand Down Expand Up @@ -798,22 +797,8 @@ public JettySolrRunner getReplicaJetty(Replica replica) {
}

/** Make the zookeeper session on a particular jetty lose connection and expire */
public void expireZkSession(JettySolrRunner jetty) {
CoreContainer cores = jetty.getCoreContainer();
if (cores != null) {
ChaosMonkey.causeConnectionLoss(jetty);
SolrZkClient zkClient = cores.getZkController().getZkClient();
long sessionId = zkClient.getZkSessionId();
zkServer.expire(sessionId);
try {
KillSession.kill(zkClient.getCuratorFramework().getZookeeperClient().getZooKeeper());
} catch (Exception e) {
log.error("Exception killing session", e);
}
if (log.isInfoEnabled()) {
log.info("Expired zookeeper session from node {}", jetty.getBaseUrl());
}
}
public void expireZkSession(JettySolrRunner jetty, boolean waitForConnectionLoss) {
ChaosMonkey.expireSession(jetty, zkServer, waitForConnectionLoss);
}

// Currently not used ;-(
Expand Down

0 comments on commit 2b1f108

Please sign in to comment.