Skip to content

Commit

Permalink
Introduce thread pool for multi node pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
yangbodong22011 committed Jun 21, 2023
1 parent 0dba977 commit 44713b7
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 29 deletions.
140 changes: 140 additions & 0 deletions src/main/java/redis/clients/jedis/JedisThreadFactoryBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package redis.clients.jedis;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/**
* JedisThreadFactoryBuilder is a class that builds a ThreadFactory for Jedis.
*/
public class JedisThreadFactoryBuilder {
private String namePrefix = null;
private boolean daemon = false;
private int priority = Thread.NORM_PRIORITY;
private ThreadFactory backingThreadFactory = null;
private UncaughtExceptionHandler uncaughtExceptionHandler = null;

/**
* Sets the name prefix for the threads created by the ThreadFactory.
*
* @param namePrefix the name prefix for the threads
* @return the JedisThreadFactoryBuilder instance
* @throws NullPointerException if namePrefix is null
*/
public JedisThreadFactoryBuilder setNamePrefix(String namePrefix) {
if (namePrefix == null) {
throw new NullPointerException();
}
this.namePrefix = namePrefix;
return this;
}

/**
* Sets whether the threads created by the ThreadFactory are daemon threads.
*
* @param daemon true if the threads are daemon threads, false otherwise
* @return the JedisThreadFactoryBuilder instance
*/
public JedisThreadFactoryBuilder setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}

/**
* Sets the priority for the threads created by the ThreadFactory.
*
* @param priority the priority for the threads
* @return the JedisThreadFactoryBuilder instance
* @throws IllegalArgumentException if priority is not in the range of Thread.MIN_PRIORITY to Thread.MAX_PRIORITY
*/
public JedisThreadFactoryBuilder setPriority(int priority) {
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(String.format(
"Thread priority (%s) must be in %d ~ %d", priority,
Thread.MIN_PRIORITY, Thread.MAX_PRIORITY));
}

this.priority = priority;
return this;
}

/**
* Sets the UncaughtExceptionHandler for the threads created by the ThreadFactory.
*
* @param uncaughtExceptionHandler the UncaughtExceptionHandler for the threads
* @return the JedisThreadFactoryBuilder instance
* @throws NullPointerException if uncaughtExceptionHandler is null
*/
public JedisThreadFactoryBuilder setUncaughtExceptionHandler(
UncaughtExceptionHandler uncaughtExceptionHandler) {
if (uncaughtExceptionHandler == null) {
throw new NullPointerException(
"UncaughtExceptionHandler cannot be null");
}
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
return this;
}

/**
* Sets the backing ThreadFactory for the JedisThreadFactoryBuilder.
*
* @param backingThreadFactory the backing ThreadFactory
* @return the JedisThreadFactoryBuilder instance
* @throws NullPointerException if backingThreadFactory is null
*/
public JedisThreadFactoryBuilder setThreadFactory(
ThreadFactory backingThreadFactory) {
if (uncaughtExceptionHandler == null) {
throw new NullPointerException(
"BackingThreadFactory cannot be null");
}
this.backingThreadFactory = backingThreadFactory;
return this;
}

/**
* Builds a ThreadFactory using the JedisThreadFactoryBuilder instance.
*
* @return the ThreadFactory
*/
public ThreadFactory build() {
return build(this);
}

/**
* Builds a ThreadFactory by JedisThreadFactoryBuilder.
*
* @param builder JedisThreadFactoryBuilder
* @return ThreadFactory
*/
private static ThreadFactory build(JedisThreadFactoryBuilder builder) {
final String namePrefix = builder.namePrefix;
final Boolean daemon = builder.daemon;
final Integer priority = builder.priority;
final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
final ThreadFactory backingThreadFactory = (builder.backingThreadFactory != null) ? builder.backingThreadFactory
: Executors.defaultThreadFactory();
final AtomicLong count = new AtomicLong(0);

return new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = backingThreadFactory.newThread(runnable);
if (daemon) {
thread.setDaemon(daemon);
}
if (priority != Thread.NORM_PRIORITY) {
thread.setPriority(priority);
}
if (namePrefix != null) {
thread.setName(namePrefix + "-" + count.getAndIncrement());
}
if (uncaughtExceptionHandler != null) {
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
return thread;
}
};
}
}
120 changes: 120 additions & 0 deletions src/main/java/redis/clients/jedis/JedisThreadPoolBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package redis.clients.jedis;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used to build a thread pool for Jedis.
*/
public class JedisThreadPoolBuilder {
private static final Logger log = LoggerFactory.getLogger(JedisThreadPoolBuilder.class);

private static final RejectedExecutionHandler defaultRejectHandler = new AbortPolicy();

public static PoolBuilder pool() {
return new PoolBuilder();
}

/**
* Custom thread factory or use default
* @param threadNamePrefix the thread name prefix
* @param daemon daemon
* @return ThreadFactory
*/
private static ThreadFactory createThreadFactory(String threadNamePrefix, boolean daemon) {
if (threadNamePrefix != null) {
return new JedisThreadFactoryBuilder().setNamePrefix(threadNamePrefix).setDaemon(daemon)
.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error(String.format("Thread %s threw exception %s", t.getName(), e.getMessage()));
}
}).build();
}

return Executors.defaultThreadFactory();
}

/**
* This class is used to build a thread pool.
*/
public static class PoolBuilder {
private int coreSize = 0;
private int maxSize = Integer.MAX_VALUE;
private long keepAliveMillSecs = 10;
private ThreadFactory threadFactory;
private String threadNamePrefix;
private boolean daemon;
private RejectedExecutionHandler rejectHandler;
private BlockingQueue<Runnable> workQueue;

public PoolBuilder setCoreSize(int coreSize) {
this.coreSize = coreSize;
return this;
}

public PoolBuilder setMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}

public PoolBuilder setKeepAliveMillSecs(long keepAliveMillSecs) {
this.keepAliveMillSecs = keepAliveMillSecs;
return this;
}

public PoolBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}

public PoolBuilder setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}

public PoolBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}

public PoolBuilder setRejectHandler(RejectedExecutionHandler rejectHandler) {
this.rejectHandler = rejectHandler;
return this;
}

public PoolBuilder setWorkQueue(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
return this;
}

public ExecutorService build() {
if (threadFactory == null) {
threadFactory = createThreadFactory(threadNamePrefix, daemon);
}

if (workQueue == null) {
throw new IllegalArgumentException("workQueue can't be null");
}

if (rejectHandler == null) {
rejectHandler = defaultRejectHandler;
}

ExecutorService executorService = new ThreadPoolExecutor(coreSize, maxSize, keepAliveMillSecs,
TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectHandler);

return executorService;
}
}
}
79 changes: 50 additions & 29 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -25,25 +26,44 @@
public abstract class MultiNodePipelineBase extends PipelineBase
implements PipelineCommands, PipelineBinaryCommands, RedisModulePipelineCommands, Closeable {

private final Logger log = LoggerFactory.getLogger(getClass());

/**
* The number of processes for {@code sync()}. If you have enough cores for client (and you have
* more than 3 cluster nodes), you may increase this number of workers.
* Suggestion:&nbsp;&le;&nbsp;cluster&nbsp;nodes.
*/
public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3;
private static final Logger log = LoggerFactory.getLogger(MultiNodePipelineBase.class);

private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
private final Map<HostAndPort, Connection> connections;
private volatile boolean syncing = false;
/**
* The following are the default parameters for the multi node pipeline executor
* Since Redis query is usually a slower IO operation (requires more threads),
* so we set DEFAULT_CORE_POOL_SIZE to be the same as the core
*/
private static final long DEFAULT_KEEPALIVE_TIME_MS = 60000L;
private static final int DEFAULT_BLOCKING_QUEUE_SIZE = Protocol.CLUSTER_HASHSLOTS;
private static final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static ExecutorService executorService = JedisThreadPoolBuilder.pool()
.setCoreSize(DEFAULT_CORE_POOL_SIZE)
.setMaxSize(DEFAULT_MAXIMUM_POOL_SIZE)
.setKeepAliveMillSecs(DEFAULT_KEEPALIVE_TIME_MS)
.setThreadNamePrefix("jedis-multi-node-pipeline")
.setWorkQueue(new ArrayBlockingQueue<>(DEFAULT_BLOCKING_QUEUE_SIZE)).build();

public MultiNodePipelineBase(CommandObjects commandObjects) {
super(commandObjects);
pipelinedResponses = new LinkedHashMap<>();
connections = new LinkedHashMap<>();
}

/**
* Provide an interface for users to set executors themselves.
* @param executor the executor
*/
public static void setExecutorService(ExecutorService executor) {
if (executorService != executor && executorService != null) {
executorService.shutdown();
}
executorService = executor;
}

/**
* Sub-classes must call this method, if graph commands are going to be used.
* @param connectionProvider connection provider
Expand Down Expand Up @@ -102,8 +122,6 @@ public final void sync() {
}
syncing = true;

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
Expand All @@ -112,32 +130,35 @@ public final void sync() {
HostAndPort nodeKey = entry.getKey();
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
executorService.submit(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
try {
executorService.submit(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
}
});
});
} catch (RejectedExecutionException e) {
log.error("Get a reject exception when submitting, it is recommended that you use the "
+ "MultiNodePipelineBase#setExecutorService method to customize the executor", e);
throw e;
}
}

try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
}

executorService.shutdownNow();

syncing = false;
}

Expand Down
Loading

0 comments on commit 44713b7

Please sign in to comment.