Skip to content

Commit

Permalink
SOLR-17578: Remove ZkController internal core supplier.
Browse files Browse the repository at this point in the history
  • Loading branch information
psalagnac committed Nov 28, 2024
1 parent cebdb2d commit cce009b
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 142 deletions.
2 changes: 2 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ Optimizations

* SOLR-17453: Leverage waitForState() instead of busy waiting in CREATE, MIGRATE, REINDEXCOLLECTION, MOVEREPLICA commands, and in some tests. (Pierre Salagnac)

* SOLR-17578: Remove ZkController internal core supplier, for slightly faster reconnection after Zookeeper session loss. (Pierre Salagnac)

Bug Fixes
---------------------
* SOLR-12429: Uploading a configset with a symbolic link produces a IOException. Now a error message to user generated instead. (Eric Pugh)
Expand Down
198 changes: 88 additions & 110 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.solr.client.solrj.SolrClient;
Expand Down Expand Up @@ -298,16 +297,12 @@ public Object call() throws Exception {
* @param zkClientConnectTimeout timeout in ms
* @param cloudConfig configuration for this controller. TODO: possibly redundant with
* CoreContainer
* @param descriptorsSupplier a supplier of the current core descriptors. used to know which cores
* to re-register on reconnect
*/
@SuppressWarnings({"unchecked"})
public ZkController(
final CoreContainer cc,
String zkServerAddress,
int zkClientConnectTimeout,
CloudConfig cloudConfig,
final Supplier<List<CoreDescriptor>> descriptorsSupplier)
CloudConfig cloudConfig)
throws InterruptedException, TimeoutException, IOException {

if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
Expand Down Expand Up @@ -366,9 +361,8 @@ public ZkController(
.withUrl(zkServerAddress)
.withTimeout(clientTimeout, TimeUnit.MILLISECONDS)
.withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
.withReconnectListener(() -> onReconnect(descriptorsSupplier))
.withDisconnectListener(
(sessionExpired) -> onDisconnect(descriptorsSupplier, sessionExpired))
.withReconnectListener(this::onReconnect)
.withDisconnectListener((sessionExpired) -> onDisconnect(sessionExpired))
.withAclProvider(zkACLProvider)
.withClosedCheck(cc::isShutDown)
.withCompressor(compressor)
Expand Down Expand Up @@ -404,18 +398,27 @@ public ZkController(
assert ObjectReleaseTracker.track(this);
}

private void onDisconnect(
Supplier<List<CoreDescriptor>> descriptorsSupplier, boolean sessionExpired) {
private void onDisconnect(boolean sessionExpired) {
try {
overseer.close();
} catch (Exception e) {
log.error("Error trying to stop any Overseer threads", e);
}
closeOutstandingElections(descriptorsSupplier, sessionExpired);
markAllAsNotLeader(descriptorsSupplier);

// Close outstanding leader elections
List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
for (CoreDescriptor descriptor : descriptors) {
closeExistingElectionContext(descriptor, sessionExpired);
}

// Mark all cores as not leader
for (CoreDescriptor descriptor : descriptors) {
descriptor.getCloudDescriptor().setLeader(false);
descriptor.getCloudDescriptor().setHasRegistered(false);
}
}

private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) {
private void onReconnect() {
// on reconnect, reload cloud info
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
clearZkCollectionTerms();
Expand Down Expand Up @@ -456,7 +459,7 @@ private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) {
cc.cancelCoreRecoveries();

try {
registerAllCoresAsDown(descriptorsSupplier, false);
registerAllCoresAsDown(false);
} catch (SessionExpiredException e) {
// zk has to reconnect and this will all be tried again
throw e;
Expand All @@ -469,26 +472,24 @@ private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) {
// we have to register as live first to pick up docs in the buffer
createEphemeralLiveNode();

List<CoreDescriptor> descriptors = descriptorsSupplier.get();
List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
// re register all descriptors
ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it was a leader
// that was expired - as well as what to do about leaders/overseers with
// connection loss
try {
// unload solr cores that have been 'failed over'
throwErrorIfReplicaReplaced(descriptor);
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it was a leader
// that was expired - as well as what to do about leaders/overseers with
// connection loss
try {
// unload solr cores that have been 'failed over'
throwErrorIfReplicaReplaced(descriptor);

if (executorService != null) {
executorService.submit(new RegisterCoreAsync(descriptor, true, true));
} else {
register(descriptor.getName(), descriptor, true, true, false);
}
} catch (Exception e) {
log.error("Error registering SolrCore", e);
if (executorService != null) {
executorService.submit(new RegisterCoreAsync(descriptor, true, true));
} else {
register(descriptor.getName(), descriptor, true, true, false);
}
} catch (Exception e) {
log.error("Error registering SolrCore", e);
}
}

Expand Down Expand Up @@ -588,75 +589,72 @@ public int getLeaderConflictResolveWait() {
return leaderConflictResolveWait;
}

private void registerAllCoresAsDown(
final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean updateLastPublished)
throws SessionExpiredException {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
private void registerAllCoresAsDown(boolean updateLastPublished) throws SessionExpiredException {
List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
if (isClosed) return;
if (descriptors != null) {
// before registering as live, make sure everyone is in a
// down state
publishNodeAsDown(getNodeName());
for (CoreDescriptor descriptor : descriptors) {
// if it looks like we are going to be the leader, we don't
// want to wait for the following stuff
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
String slice = cloudDesc.getShardId();
try {

int children =
zkStateReader
.getZkClient()
.getChildren(
ZkStateReader.COLLECTIONS_ZKNODE
+ "/"
+ collection
+ "/leader_elect/"
+ slice
+ "/election",
null,
true)
.size();
if (children == 0) {
log.debug(
"looks like we are going to be the leader for collection {} shard {}",
collection,
slice);
continue;
}
// before registering as live, make sure everyone is in a
// down state
publishNodeAsDown(getNodeName());
for (CoreDescriptor descriptor : descriptors) {
// if it looks like we are going to be the leader, we don't
// want to wait for the following stuff
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
String slice = cloudDesc.getShardId();
try {

} catch (NoNodeException e) {
int children =
zkStateReader
.getZkClient()
.getChildren(
ZkStateReader.COLLECTIONS_ZKNODE
+ "/"
+ collection
+ "/leader_elect/"
+ slice
+ "/election",
null,
true)
.size();
if (children == 0) {
log.debug(
"looks like we are going to be the leader for collection {} shard {}",
collection,
slice);
continue;
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
} catch (SessionExpiredException e) {
// zk has to reconnect
throw e;
} catch (KeeperException e) {
log.warn("", e);
Thread.currentThread().interrupt();
}

final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
try {
log.debug(
"calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}",
coreZkNodeName,
collection,
slice);
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} catch (Exception e) {
log.warn(
"There was a problem while making a best effort to ensure the leader has seen us as down, this is not unexpected as Zookeeper has just reconnected after a session expiration",
e);
if (isClosed) {
return;
}
} catch (NoNodeException e) {
log.debug(
"looks like we are going to be the leader for collection {} shard {}",
collection,
slice);
continue;
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
} catch (SessionExpiredException e) {
// zk has to reconnect
throw e;
} catch (KeeperException e) {
log.warn("", e);
Thread.currentThread().interrupt();
}

final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
try {
log.debug(
"calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}",
coreZkNodeName,
collection,
slice);
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} catch (Exception e) {
log.warn(
"There was a problem while making a best effort to ensure the leader has seen us as down, this is not unexpected as Zookeeper has just reconnected after a session expiration",
e);
if (isClosed) {
return;
}
}
}
Expand All @@ -666,16 +664,6 @@ public NodesSysPropsCacher getSysPropsCacher() {
return sysPropsCacher;
}

private void closeOutstandingElections(
final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean sessionExpired) {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
closeExistingElectionContext(descriptor, sessionExpired);
}
}
}

private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessionExpired) {
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
Expand All @@ -696,16 +684,6 @@ private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessi
return contextKey;
}

private void markAllAsNotLeader(final Supplier<List<CoreDescriptor>> registerOnReconnect) {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
descriptor.getCloudDescriptor().setLeader(false);
descriptor.getCloudDescriptor().setHasRegistered(false);
}
}
}

public void preClose() {
this.isClosed = true;

Expand Down
12 changes: 1 addition & 11 deletions solr/core/src/java/org/apache/solr/core/ZkContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.SolrZkClientTimeout;
import org.apache.solr.cloud.SolrZkServer;
import org.apache.solr.cloud.ZkController;
Expand Down Expand Up @@ -132,15 +129,8 @@ public void initZooKeeper(final CoreContainer cc, CloudConfig config) {
"A chroot was specified in ZkHost but the znode doesn't exist. " + zookeeperHost);
}

Supplier<List<CoreDescriptor>> descriptorsSupplier =
() ->
cc.getCores().stream()
.map(SolrCore::getCoreDescriptor)
.collect(Collectors.toList());

ZkController zkController =
new ZkController(
cc, zookeeperHost, zkClientConnectTimeout, config, descriptorsSupplier);
new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config);

if (zkRun != null) {
if (StrUtils.isNotNullOrEmpty(System.getProperty(HTTPS_PORT_PROP))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ public void testSimpleSliceLeaderElection() throws Exception {
"shard1",
jetty
.getCoreContainer()
.getCores()
.getCoreDescriptors()
.iterator()
.next()
.getCoreDescriptor()
.getCloudDescriptor()
.getShardId());
String jettyNodeName = jetty.getNodeName(); // must get before shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,16 @@
package org.apache.solr.cloud;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;

public class MockSimpleZkController extends ZkController {

public MockSimpleZkController(
CoreContainer cc,
String zkServerAddress,
int zkClientConnectTimeout,
CloudConfig cloudConfig,
Supplier<List<CoreDescriptor>> descriptorsSupplier)
CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig)
throws InterruptedException, TimeoutException, IOException {
super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig, descriptorsSupplier);
super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.tests.util.LuceneTestCase.BadApple;
Expand Down Expand Up @@ -63,8 +62,7 @@ public void testLeaderElectionWithZkExpiry() throws Exception {
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory(this.getTestName()));
try (ZkController zkController =
new ZkController(
cc, server.getZkAddress(), 15000, cloudConfig, Collections::emptyList)) {
new ZkController(cc, server.getZkAddress(), 15000, cloudConfig)) {
threadExecutor.execute(
() -> {
TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
Expand Down
Loading

0 comments on commit cce009b

Please sign in to comment.