1 overview
This article mainly explains the Java thread pool interface and implementation class, as well as their basic use methods, including:
Executor
/Executors
ExecutorService
ThreadPoolExecutor
ScheduledThreadPoolExecutor
2 Two important interfaces:Executor
+ExecutorService
Executor is an interface that defines a simple task submission method:
//Executor
package java.util.concurrent;
public interface Executor {
void execute(Runnable var1);
}
Copy the code
ExecutorService is an interface that inherits from Executor and provides additional methods for task submission and management, such as stopping task execution:
//ExecutorService
package java.util.concurrent;
import java.util.Collection;
import java.util.List;
public interface ExecutorService extends Executor {
void shutdown(a);
List<Runnable> shutdownNow(a);
boolean isShutdown(a);
boolean isTerminated(a);
boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
<T> Future<T> submit(Callable<T> var1);
<T> Future<T> submit(Runnable var1, T var2); Future<? > submit(Runnable var1); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1)throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code
Two important ExecutorService implementations are detailed below:
ThreadPoolExecutor
ScheduledThreadPoolExecutor
3 ThreadPoolExecutor
This is commonly referred to as a thread pool class. Generally speaking, a thread pool has the following characteristics:
- A thread pool has a certain number of worker threads
- The number of threads and tasks is controlled and managed
- Tasks are executed asynchronously
- The thread pool takes care of the statistics for executing tasks
3.1 A simple example
Let’s start with a simple example:
public class Main {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2.4.30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
// Perform a task with no return value
executor.execute(()-> System.out.println(" Execute the runnable task."));
// Perform tasks with return values using the Future generic class
Future<String> future = executor.submit(()->" Execute the callable task and this is the result.");
// Get the result of the task with get(), which blocks until the task is completed
System.out.println(future.get());
// Manually close the thread poolexecutor.shutdown(); }}Copy the code
As you can see from this simple example, thread pools can perform tasks with and without a return value, which is blocked using the get() method. In addition, you need to manually shut down the thread pool after the run, otherwise the JVM will not exit, because there is a specified number of active threads in the thread pool, and the JVM exits normally only if there are no running non-daemons in the JVM process.
3.2 Construction method
The constructor’s source code is as follows:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Copy the code
Although four constructors are provided, it is essentially the last constructor that is called, which takes seven parameters:
corePoolSize
: number of core threads. The number of core threads does not decrease even when the core threads in the thread pool are not working. The minimum value of this parameter is 0 and must be less than or equal tomaximumPoolSize
maximumPoolSize
: Sets the maximum number of threads allowed in the thread poolkeepAliveTime
: When the number of threads in the thread pool exceeds the number of core threads and the thread pool is idle, the thread pool will reclaim some threads to release system resourcescorePoolSize
How long after the number of threads is recycled, with the last parameter representing the time unitunit
Together withunit
: For settingkeepAliveTime
Unit of timeworkQueure
: Holds tasks that have been submitted to the thread pool but have not been executedthreadFactory
: a factory for creating threads, which developers can customizeThreadFactory
To create a threadhandler
: Reject policy. When a task exceeds the limit of the blocking queue, the thread pool will reject the new task
3.3 Task Execution Process
After a thread pool is successfully created, the internal running threads are not created immediately, and ThreadPoolExecutor is created and run in a Lazy manner. The thread is created when the execute task method is first called, for example:
ThreadPoolExecutor executor = new ThreadPoolExecutor(2.4.30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
assert executor.getActiveCount() == 0;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;
executor.execute(()-> System.out.println(" Execute the runnable task."));
assert executor.getActiveCount() == 1;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;
Copy the code
(Please add -EA parameter when running)
Let’s take a look at the specific execution process of the task:
- If the number of running threads is less than the number of core threads, create a new thread and execute the task immediately
- If the number of running threads is greater than or equal to the number of core threads and the task queue is not full, the system puts the task in the task queue until the number of running threads completes their tasks, and then polls the task queue to obtain the task running
- If the task queue is full and the number of running threads is less than the maximum number of threads, the thread pool creates threads to execute the task and the number of created threads is less than the maximum number of threads
- If the task queue is full and the number of running threads has reached the maximum number of threads, and there are no free running threads at the moment, the task rejection policy is executed, depending on
RejectedEcecutionHandler
- If the thread in the thread pool is free and the idle time has reached
keepAliveTime
Specifies the time at which threads are recycled until reservedcorePoolSize
(Core threads can also be set to timeout, default is not enabled core thread timeout)
3.4 Thread Factory
ThreadFactory is an interface:
package java.util.concurrent;
public interface ThreadFactory {
Thread newThread(Runnable var1);
}
Copy the code
Using a thread factory, you can add custom configurations when creating a thread, such as specifying the name, priority, whether or not it is a daemon thread, etc. For example, here is a simple implementation of a thread factory:
public class TestThreadFactory implements ThreadFactory {
private final static String PREFIX = "Test thread[";
private final static String SUFFIX = "]";
private final static AtomicInteger THREAD_NUM = new AtomicInteger();
@Override
public Thread newThread(Runnable runnable) {
ThreadGroup group = new ThreadGroup("My pool");
Thread thread = new Thread(group,runnable,PREFIX+THREAD_NUM.getAndIncrement()+SUFFIX);
thread.setPriority(5);
returnthread; }}Copy the code
3.5 Rejection Policy
By default, ThreadPoolExecutor provides four rejection policies:
DiscardPolicy
: Discards the task directlyAbortPolicy
: Terminates the policy and throwsRejectedExecutionException
DiscardOldestPolicy
: The policy of discarding the oldest task in the queue (strictly speaking, the choice should be based on the task queue, because not all queues areFIFO
)CallerRunsPolicy
: The caller thread executes the policy and the task blocks execution in the current thread
The RejectedExecutionHandler interface can be used to customize the policy:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}
Copy the code
3.6 Disabling a thread pool
If you don’t need a thread pool, you need to turn it off manually. Thread pools are provided in the following three ways:
- Orderly closure:
shutdown()
- Close immediately:
shutdownNow()
- Combination closure:
shutdown()+shutdownNow()
3.6.1 Orderly Shutdown
Shutdown () provides an orderly shutdown to close the thread pool. After calling this method, it will wait for all currently executing tasks to complete and then close, while new submitted tasks will be rejected. Note that this method is non-blocking and returns immediately. If you need to check the shutdown status, you can use:
isShutdown()
: returns whether or not it was calledshutdown()
The results of theisTerminating()
: Returns whether ending is in progressisTerminated()
: Returns whether it is finished
3.6.2 Close immediately
The shutdownNow() method first changes the thread pool state to shutdown, then suspends the unexecuted task, then attempts to interrupt the running thread, and finally returns the unexecuted task:
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2.4.30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
IntStream.range(0.10).forEach(i-> executor.execute(()-> {
try{
TimeUnit.SECONDS.sleep(5);
}catch(Exception e){ e.printStackTrace(); }})); List<Runnable> runnables = executor.shutdownNow(); System.out.println(runnables.size()); }Copy the code
Output:
8
BUILD SUCCESSFUL in326ms 2 actionable tasks: 2 executed java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at com.company.Main.lambda$main$0(Main.java:29)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at com.company.Main.lambda$main$0(Main.java:29)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
3:14:36 AM: Task execution finished 'Main.main()'.
Copy the code
3.6.3 Disabling combination
In order to safely shut down a thread pool, a combined approach is generally used to shut down the thread pool to ensure that the running tasks are executed normally and to improve the success rate of the thread pool shut down. Examples are as follows:
ThreadPoolExecutor executor = new ThreadPoolExecutor(2.4.30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
IntStream.range(0.10).forEach(i-> executor.execute(()-> {
try{
TimeUnit.SECONDS.sleep(5);
}catch(Exception e){ e.printStackTrace(); }}));// First call shutdown() to try to shut it down
executor.shutdown();
try{
// If it is not closed after waiting some time
if(! executor.awaitTermination(10,TimeUnit.SECONDS)){
// Force shutdown
executor.shutdownNow();
// If the forced shutdown fails, for example, the running thread is abnormally time-consuming and cannot be interrupted
if(! executor.awaitTermination(10,TimeUnit.SECONDS)){
// Other processing, here is only output interrupt failure information
System.out.println("Terminate failed."); }}}catch (InterruptedException e){
// If the current thread is interrupted and an exception is caught, execute the immediate shutdown method
executor.shutdownNow();
// Rethrows the interrupt signal
Thread.currentThread().interrupt();
}
Copy the code
4 ScheduledThreadPoolExecutor
ScheduledExecutorService inherited the ExecutorService, and provides the characteristics of the task has been performed regularly, you can use ScheduledThreadPoolExecutor to achieve some special tasks. , of course, there are a lot of method to realize the fixed or framework, a native of shell, old Timer/TimerTask implementation, Quartz or special framework implementation, here is the realization of the JDK internal ScheduledThreadPoolExecutor.
Inherited ThreadPoolExecutor ScheduledThreadPoolExecutor, besides has all the methods of the ThreadPoolExecutor, also defines four methods related to the schedule:
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
A:one-shot
(execute only once), task (callable
) in unit time (delay
) is executed and returns immediatelyScheduledFuture
ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit)
: Also aone-shot
Method, the task will be executed after unit time, unlike the first method, which returnsScheduledFuture
Does not contain any execution results, but can be returnedScheduledFuture
Check whether the task is completeScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
: Tasks are performed at a fixed rateinitialDelay
After constantly being executedScheduledFuture<? > scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
: The task will execute the task with a fixed delay unit time
The differences between the latter two are as follows:
public static void main(String[] args) throws Exception {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
Runnable runnable = ()->{
long startTimestamp = System.currentTimeMillis();
System.out.println("current timestamp: "+startTimestamp);
try{
TimeUnit.MILLISECONDS.sleep(current().nextInt(100));
}catch (Exception e){
e.printStackTrace();
}
System.out.println("elapsed time: "+(System.currentTimeMillis() - startTimestamp));
};
executor.scheduleAtFixedRate(runnable,10.1000,TimeUnit.MILLISECONDS);
/ / executor. ScheduleWithFixedDelay (runnable, 10100, TimeUnit. MILLISECONDS);
}
Copy the code
Output:
current timestamp: 1619351675438
elapsed time: 97
current timestamp: 1619351676438
elapsed time: 85
current timestamp: 1619351677438
elapsed time: 1
current timestamp: 1619351678438
elapsed time: 1
current timestamp: 1619351679438
elapsed time: 68
current timestamp: 1619351680438
elapsed time: 99
Copy the code
You can see that the tasks are always running at a fixed rate, and each run is always 1000ms apart.
The output from FixedDelay is as follows:
current timestamp: 1619351754890
elapsed time: 53
current timestamp: 1619351755944
elapsed time: 30
current timestamp: 1619351756974
elapsed time: 13
current timestamp: 1619351757987
elapsed time: 80
current timestamp: 1619351759068
elapsed time: 94
current timestamp: 1619351760162
elapsed time: 29
Copy the code
The start time is the time since the last execution and the interval (1000ms).
5 Executors
Thread pool in
The Executors class provides six static methods for creating a thread pool:
FixedThreadPool
SingleThreadExecutor
CachedThreadPool
ScheduledThreadPool
SingleThreadScheduledExecutor
WorkStealingPool
Let’s take a look at each of them.
5.1 FixedThreadPool
The source code is as follows:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory);
}
Copy the code
ThreadPoolExecutor is called at the bottom of FixedThreadPool. By default, the number of core threads created is equal to the maximum number of threads, and the task queue is LinkedBlockingQueue without borders.
5.2 SingleThreadExecutor
The relevant source code is as follows:
public static ExecutorService newSingleThreadExecutor(a) {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory));
}
private static class FinalizableDelegatedExecutorService extends Executors.DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize(a) {
super.shutdown(); }}Copy the code
Can see SingleThreadPool FinalizableDelegatedExecutorService packaging, is actually the inner class 1, core thread and the maximum number of threads LinkedBlockingQueue task queue is no borders. When GC occurs, the shutdown() method is called.
5.3 CachedThreadPool
The source code is as follows:
public static ExecutorService newCachedThreadPool(a) {
return new ThreadPoolExecutor(0.2147483647.60L, TimeUnit.SECONDS, new SynchronousQueue());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0.2147483647.60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory);
}
Copy the code
CachedThreadPool creates new threads as needed, which are typically used to execute large, short asynchronous tasks. Unused threads that have been idle for more than 60 seconds are reclaimed.
5.4 ScheduledThreadPool
The source code is as follows:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
Copy the code
Create a specified number of core ScheduledThreadPoolExecutor.
5.5 SingleThreadScheduledExecutor
The source code is as follows:
public static ScheduledExecutorService newSingleThreadScheduledExecutor(a) {
return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));
}
private static class DelegatedScheduledExecutorService extends Executors.DelegatedExecutorService implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
this.e = executor;
}
publicScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit) {
return this.e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return this.e.schedule(callable, delay, unit);
}
publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit) {
return this.e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
publicScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit) {
return this.e.scheduleWithFixedDelay(command, initialDelay, delay, unit); }}Copy the code
SingelThreadPool+ScheduledThreadPool.
5.6 WorkStealingPool
The source code is as follows:
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null.true);
}
public static ExecutorService newWorkStealingPool(a) {
return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null.true);
}
Copy the code
WorkStealingPool is a thread pool introduced in JDK8 that returns a ForkJoinPool. In WorkStealingPool, if the tasks handled by each thread are time-consuming to execute, the tasks it is responsible for will be “stolen” by other threads, thus improving the efficiency of concurrent processing.