From e19336598a74fd49b35cd2e095de957937996f3a Mon Sep 17 00:00:00 2001 From: chenshi5012 Date: Tue, 28 Mar 2023 16:30:25 +0800 Subject: [PATCH] Obtain multiple pipelines concurrently. high performance improvement (#3331) * Obtain multiple pipelines concurrently. high performance improvement Signed-off-by: c00603587 * execute countDown regardless of whether the above program is abnormal, otherwise the await cannot be released Signed-off-by: c00603587 * Update src/main/java/redis/clients/jedis/MultiNodePipelineBase.java Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> * set default sync workers to 3 Signed-off-by: c00603587 * remove unused import package Signed-off-by: c00603587 * modify the muti node pipeline sync workers --------- Signed-off-by: c00603587 Co-authored-by: c00603587 Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> --- .../clients/jedis/MultiNodePipelineBase.java | 46 ++++++++++++++----- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 8f9f90fae9..6f5e4b9b0e 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -8,6 +8,10 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import org.json.JSONArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +44,13 @@ public abstract class MultiNodePipelineBase implements PipelineCommands, Pipelin private final Logger log = LoggerFactory.getLogger(getClass()); + /** + * default number of processes for sync, if you got enough cores for client + * or your cluster nodes more than 3 nodes, you may increase this workers number. + * suggest <= cluster nodes + */ + public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3; + private final Map>> pipelinedResponses; private final Map connections; private volatile boolean syncing = false; @@ -47,6 +58,8 @@ public abstract class MultiNodePipelineBase implements PipelineCommands, Pipelin private final CommandObjects commandObjects; private GraphCommandObjects graphCommandObjects; + private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + public MultiNodePipelineBase(CommandObjects commandObjects) { pipelinedResponses = new LinkedHashMap<>(); connections = new LinkedHashMap<>(); @@ -106,6 +119,7 @@ public final void sync() { } syncing = true; + CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); Iterator>>> pipelinedResponsesIterator = pipelinedResponses.entrySet().iterator(); while (pipelinedResponsesIterator.hasNext()) { @@ -113,18 +127,28 @@ public final void sync() { HostAndPort nodeKey = entry.getKey(); Queue> queue = entry.getValue(); Connection connection = connections.get(nodeKey); - try { - List unformatted = connection.getMany(queue.size()); - for (Object o : unformatted) { - queue.poll().set(o); + executorService.submit(() -> { + try { + List unformatted = connection.getMany(queue.size()); + for (Object o : unformatted) { + queue.poll().set(o); + } + } catch (JedisConnectionException jce) { + log.error("Error with connection to " + nodeKey, jce); + // cleanup the connection + pipelinedResponsesIterator.remove(); + connections.remove(nodeKey); + IOUtils.closeQuietly(connection); + } finally { + countDownLatch.countDown(); } - } catch (JedisConnectionException jce) { - log.error("Error with connection to " + nodeKey, jce); - // cleanup the connection - pipelinedResponsesIterator.remove(); - connections.remove(nodeKey); - IOUtils.closeQuietly(connection); - } + }); + } + + try { + countDownLatch.await(); + } catch (InterruptedException e) { + log.error("Thread is interrupted during sync.", e); } syncing = false;