-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce thread pool for multi node pipeline
- Loading branch information
1 parent
e193365
commit e15af9d
Showing
4 changed files
with
368 additions
and
25 deletions.
There are no files selected for viewing
140 changes: 140 additions & 0 deletions
140
src/main/java/redis/clients/jedis/JedisThreadFactoryBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
package redis.clients.jedis; | ||
|
||
import java.lang.Thread.UncaughtExceptionHandler; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
/** | ||
* JedisThreadFactoryBuilder is a class that builds a ThreadFactory for Jedis. | ||
*/ | ||
public class JedisThreadFactoryBuilder { | ||
private String namePrefix = null; | ||
private boolean daemon = false; | ||
private int priority = Thread.NORM_PRIORITY; | ||
private ThreadFactory backingThreadFactory = null; | ||
private UncaughtExceptionHandler uncaughtExceptionHandler = null; | ||
|
||
/** | ||
* Sets the name prefix for the threads created by the ThreadFactory. | ||
* | ||
* @param namePrefix the name prefix for the threads | ||
* @return the JedisThreadFactoryBuilder instance | ||
* @throws NullPointerException if namePrefix is null | ||
*/ | ||
public JedisThreadFactoryBuilder setNamePrefix(String namePrefix) { | ||
if (namePrefix == null) { | ||
throw new NullPointerException(); | ||
} | ||
this.namePrefix = namePrefix; | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets whether the threads created by the ThreadFactory are daemon threads. | ||
* | ||
* @param daemon true if the threads are daemon threads, false otherwise | ||
* @return the JedisThreadFactoryBuilder instance | ||
*/ | ||
public JedisThreadFactoryBuilder setDaemon(boolean daemon) { | ||
this.daemon = daemon; | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets the priority for the threads created by the ThreadFactory. | ||
* | ||
* @param priority the priority for the threads | ||
* @return the JedisThreadFactoryBuilder instance | ||
* @throws IllegalArgumentException if priority is not in the range of Thread.MIN_PRIORITY to Thread.MAX_PRIORITY | ||
*/ | ||
public JedisThreadFactoryBuilder setPriority(int priority) { | ||
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { | ||
throw new IllegalArgumentException(String.format( | ||
"Thread priority (%s) must be in %d ~ %d", priority, | ||
Thread.MIN_PRIORITY, Thread.MAX_PRIORITY)); | ||
} | ||
|
||
this.priority = priority; | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets the UncaughtExceptionHandler for the threads created by the ThreadFactory. | ||
* | ||
* @param uncaughtExceptionHandler the UncaughtExceptionHandler for the threads | ||
* @return the JedisThreadFactoryBuilder instance | ||
* @throws NullPointerException if uncaughtExceptionHandler is null | ||
*/ | ||
public JedisThreadFactoryBuilder setUncaughtExceptionHandler( | ||
UncaughtExceptionHandler uncaughtExceptionHandler) { | ||
if (uncaughtExceptionHandler == null) { | ||
throw new NullPointerException( | ||
"UncaughtExceptionHandler cannot be null"); | ||
} | ||
this.uncaughtExceptionHandler = uncaughtExceptionHandler; | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets the backing ThreadFactory for the JedisThreadFactoryBuilder. | ||
* | ||
* @param backingThreadFactory the backing ThreadFactory | ||
* @return the JedisThreadFactoryBuilder instance | ||
* @throws NullPointerException if backingThreadFactory is null | ||
*/ | ||
public JedisThreadFactoryBuilder setThreadFactory( | ||
ThreadFactory backingThreadFactory) { | ||
if (uncaughtExceptionHandler == null) { | ||
throw new NullPointerException( | ||
"BackingThreadFactory cannot be null"); | ||
} | ||
this.backingThreadFactory = backingThreadFactory; | ||
return this; | ||
} | ||
|
||
/** | ||
* Builds a ThreadFactory using the JedisThreadFactoryBuilder instance. | ||
* | ||
* @return the ThreadFactory | ||
*/ | ||
public ThreadFactory build() { | ||
return build(this); | ||
} | ||
|
||
/** | ||
* Builds a ThreadFactory by JedisThreadFactoryBuilder. | ||
* | ||
* @param builder JedisThreadFactoryBuilder | ||
* @return ThreadFactory | ||
*/ | ||
private static ThreadFactory build(JedisThreadFactoryBuilder builder) { | ||
final String namePrefix = builder.namePrefix; | ||
final Boolean daemon = builder.daemon; | ||
final Integer priority = builder.priority; | ||
final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler; | ||
final ThreadFactory backingThreadFactory = (builder.backingThreadFactory != null) ? builder.backingThreadFactory | ||
: Executors.defaultThreadFactory(); | ||
final AtomicLong count = new AtomicLong(0); | ||
|
||
return new ThreadFactory() { | ||
@Override | ||
public Thread newThread(Runnable runnable) { | ||
Thread thread = backingThreadFactory.newThread(runnable); | ||
if (daemon) { | ||
thread.setDaemon(daemon); | ||
} | ||
if (priority != Thread.NORM_PRIORITY) { | ||
thread.setPriority(priority); | ||
} | ||
if (namePrefix != null) { | ||
thread.setName(namePrefix + "-" + count.getAndIncrement()); | ||
} | ||
if (uncaughtExceptionHandler != null) { | ||
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); | ||
} | ||
return thread; | ||
} | ||
}; | ||
} | ||
} |
120 changes: 120 additions & 0 deletions
120
src/main/java/redis/clients/jedis/JedisThreadPoolBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
package redis.clients.jedis; | ||
|
||
import java.lang.Thread.UncaughtExceptionHandler; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.RejectedExecutionHandler; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* This class is used to build a thread pool for Jedis. | ||
*/ | ||
public class JedisThreadPoolBuilder { | ||
private static final Logger log = LoggerFactory.getLogger(JedisThreadPoolBuilder.class); | ||
|
||
private static final RejectedExecutionHandler defaultRejectHandler = new AbortPolicy(); | ||
|
||
public static PoolBuilder pool() { | ||
return new PoolBuilder(); | ||
} | ||
|
||
/** | ||
* Custom thread factory or use default | ||
* @param threadNamePrefix the thread name prefix | ||
* @param daemon daemon | ||
* @return ThreadFactory | ||
*/ | ||
private static ThreadFactory createThreadFactory(String threadNamePrefix, boolean daemon) { | ||
if (threadNamePrefix != null) { | ||
return new JedisThreadFactoryBuilder().setNamePrefix(threadNamePrefix).setDaemon(daemon) | ||
.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { | ||
@Override | ||
public void uncaughtException(Thread t, Throwable e) { | ||
log.error(String.format("Thread %s threw exception %s", t.getName(), e.getMessage())); | ||
} | ||
}).build(); | ||
} | ||
|
||
return Executors.defaultThreadFactory(); | ||
} | ||
|
||
/** | ||
* This class is used to build a thread pool. | ||
*/ | ||
public static class PoolBuilder { | ||
private int coreSize = 0; | ||
private int maxSize = Integer.MAX_VALUE; | ||
private long keepAliveMillSecs = 10; | ||
private ThreadFactory threadFactory; | ||
private String threadNamePrefix; | ||
private boolean daemon; | ||
private RejectedExecutionHandler rejectHandler; | ||
private BlockingQueue<Runnable> workQueue; | ||
|
||
public PoolBuilder setCoreSize(int coreSize) { | ||
this.coreSize = coreSize; | ||
return this; | ||
} | ||
|
||
public PoolBuilder setMaxSize(int maxSize) { | ||
this.maxSize = maxSize; | ||
return this; | ||
} | ||
|
||
public PoolBuilder setKeepAliveMillSecs(long keepAliveMillSecs) { | ||
this.keepAliveMillSecs = keepAliveMillSecs; | ||
return this; | ||
} | ||
|
||
public PoolBuilder setThreadNamePrefix(String threadNamePrefix) { | ||
this.threadNamePrefix = threadNamePrefix; | ||
return this; | ||
} | ||
|
||
public PoolBuilder setDaemon(boolean daemon) { | ||
this.daemon = daemon; | ||
return this; | ||
} | ||
|
||
public PoolBuilder setThreadFactory(ThreadFactory threadFactory) { | ||
this.threadFactory = threadFactory; | ||
return this; | ||
} | ||
|
||
public PoolBuilder setRejectHandler(RejectedExecutionHandler rejectHandler) { | ||
this.rejectHandler = rejectHandler; | ||
return this; | ||
} | ||
|
||
public PoolBuilder setWorkQueue(BlockingQueue<Runnable> workQueue) { | ||
this.workQueue = workQueue; | ||
return this; | ||
} | ||
|
||
public ExecutorService build() { | ||
if (threadFactory == null) { | ||
threadFactory = createThreadFactory(threadNamePrefix, daemon); | ||
} | ||
|
||
if (workQueue == null) { | ||
throw new IllegalArgumentException("workQueue can't be null"); | ||
} | ||
|
||
if (rejectHandler == null) { | ||
rejectHandler = defaultRejectHandler; | ||
} | ||
|
||
ExecutorService executorService = new ThreadPoolExecutor(coreSize, maxSize, keepAliveMillSecs, | ||
TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectHandler); | ||
|
||
return executorService; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.