Skip to content

Commit

Permalink
Obtain multiple pipelines concurrently. high performance improvement (#…
Browse files Browse the repository at this point in the history
…3331)

* Obtain multiple pipelines concurrently. high performance improvement

Signed-off-by: c00603587 <[email protected]>

* execute countDown regardless of whether the above program is abnormal, otherwise the await cannot be released

Signed-off-by: c00603587 <[email protected]>

* Update src/main/java/redis/clients/jedis/MultiNodePipelineBase.java

Co-authored-by: M Sazzadul Hoque <[email protected]>

* set default sync workers to 3

Signed-off-by: c00603587 <[email protected]>

* remove unused import package

Signed-off-by: c00603587 <[email protected]>

* modify the muti node pipeline sync workers

---------

Signed-off-by: c00603587 <[email protected]>
Co-authored-by: c00603587 <[email protected]>
Co-authored-by: M Sazzadul Hoque <[email protected]>
  • Loading branch information
3 people authored Mar 28, 2023
1 parent 4a8b9e7 commit e193365
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -40,13 +44,22 @@ public abstract class MultiNodePipelineBase implements PipelineCommands, Pipelin

private final Logger log = LoggerFactory.getLogger(getClass());

/**
* default number of processes for sync, if you got enough cores for client
* or your cluster nodes more than 3 nodes, you may increase this workers number.
* suggest <= cluster nodes
*/
public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3;

private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
private final Map<HostAndPort, Connection> connections;
private volatile boolean syncing = false;

private final CommandObjects commandObjects;
private GraphCommandObjects graphCommandObjects;

private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

This comment has been minimized.

Copy link
@jslopezgithub

jslopezgithub Jun 7, 2023

This code is problematic for a couple of reasons:

  1. For every pipeline created a new ExecutorService is generated.
  2. There is no way to shutdown the ExecutorService, cause a process not to be terminated.

This can be easily be reproduced by doing the following:

        final HostAndPort hostAndPort = new HostAndPort("127.0.0.1", 6379);
        final DefaultJedisClientConfig.Builder builder = DefaultJedisClientConfig.builder()
                .timeoutMillis((int) TimeUnit.SECONDS.toMillis(10));
        final GenericObjectPoolConfig<Connection> poolConfig = new GenericObjectPoolConfig<>();

        try (JedisCluster jedis = new JedisCluster(hostAndPort, builder.build(), 5, poolConfig)) {
            try (final ClusterPipeline pipeline = jedis.pipelined()) {
                pipeline.set("c", "d");
                pipeline.sync();
            }
        }

Running the above code will leave the process running.

This comment has been minimized.

Copy link
@sazzad16

sazzad16 Jun 11, 2023

Author Collaborator

@jslopezgithub I tried your example but didn't get into any issue. I even tried placing different portions of your code in loops but still no issues.

The case I can think of is calling pipelined() concurrently more than the maxTotal of poolConfig; which is 8 by default. If this is the case, the solution is to increase the number.

This comment has been minimized.

Copy link
@jslopezgithub

jslopezgithub Jun 12, 2023

To see the first problem you can do a loop in the jedis.pipelined() call and print out the total number of threads associated with the executor service. Each call to the pipeline method creates a new ExecutorService and the sync method creates new threads by using the service. The executor is only used once and the threads in the system increase with each call. I suspect the intention was to make the executor service be static. Or perhaps the idea is to reuse the ClusterPipeline class? When this is used in a long running program in a system with file descriptors, it will eventually run out of file descriptors.

To see the second problem you simply run the program from the command line and notice that it stays running. A typical program will exit but in this case it remains running because the ExecutorService is still running with a couple of daemon threads.

Note For both issues you need a running cluster that you can interact with. Without the sync() method working without an exception, the errors do not happen.

Code for first problem

    public static void main(String[] args) {
        final HostAndPort hostAndPort = new HostAndPort("127.0.0.1", 6370);
        final DefaultJedisClientConfig.Builder builder = DefaultJedisClientConfig.builder()
                .timeoutMillis((int) TimeUnit.SECONDS.toMillis(10));
        final GenericObjectPoolConfig<Connection> poolConfig = new GenericObjectPoolConfig<>();

        try (JedisCluster jedis = new JedisCluster(hostAndPort, builder.build(), 5, poolConfig)) {
            printThreads();
            for (int i = 0; i < 10; i++) {
                try (final ClusterPipeline pipeline = jedis.pipelined()) {
                    pipeline.set("c", "d");
                    pipeline.sync();
                }
                printThreads();
            }
        }
    }

    public static void printThreads() {
        // Get the root thread group
        final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent();

        // Create a buffer to store the thread information
        final Thread[] threads = new Thread[rootGroup.activeCount()];

        // Enumerate all threads into the buffer
        rootGroup.enumerate(threads);

        // Print information about each thread
        final int count = (int) Arrays.stream(threads)
                .filter(thread -> thread.getName()
                        .startsWith("pool-"))
                .count();
        System.out.println("count = " + count);
    }

Output

count = 0
count = 2
count = 4
count = 6
count = 8
count = 10
count = 12
count = 14
count = 16
count = 18
count = 20

Output for second problem

Output using jedis-4.4.2
> java -cp .:jedis-4.4.2.jar:commons-pool2-2.11.1.jar:slf4j-api-2.0.5.jar Test
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.

If you run the same without using the JediClustered pipeline you will see the prompt '>' and the program exiting after completion. In the case above, the program remains running forever. Ideally, there is a way to access the executor service for it to be shutdown properly.

This comment has been minimized.

Copy link
@sazzad16

sazzad16 Jun 13, 2023

Author Collaborator

@jslopezgithub Hmm... weird stuff. If I write the test in Jedis codebase, there is no issue. But If I write the test in an outside app, the issue is revealed.

Anyway, thank you for reporting this.

This comment has been minimized.

Copy link
@yangbodong22011

yangbodong22011 Jun 14, 2023

Collaborator

releated PR: #3333


public MultiNodePipelineBase(CommandObjects commandObjects) {
pipelinedResponses = new LinkedHashMap<>();
connections = new LinkedHashMap<>();
Expand Down Expand Up @@ -106,25 +119,36 @@ public final void sync() {
}
syncing = true;

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
while (pipelinedResponsesIterator.hasNext()) {
Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
HostAndPort nodeKey = entry.getKey();
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
executorService.submit(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
}
});
}

try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
}

syncing = false;
Expand Down

0 comments on commit e193365

Please sign in to comment.