1 overview

This paper mainly explains the Java thread pool interface and implementation classes, as well as their basic use methods, including:

  • Executor/Executors
  • ExecutorService
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

2 Two important interfaces:Executor+ExecutorService

Executor is an interface that defines a simple task submission method:

//Executor
package java.util.concurrent;

public interface Executor {
    void execute(Runnable var1);
}

ExecutorService is an interface that inherits from Executor and provides more methods for task submission and management, such as stopping the execution of a task:

//ExecutorService package java.util.concurrent; import java.util.Collection; import java.util.List; public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException; <T> Future<T> submit(Callable<T> var1); <T> Future<T> submit(Runnable var1, T var2); Future<? > submit(Runnable var1); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException; }

Two important implementations of ExecutorService are described in detail below:

  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

3 ThreadPoolExecutor

This is commonly referred to as a thread pool class. In general, a thread pool has the following characteristics:

  • A thread pool has a certain number of worker threads
  • The number of threads and the number of tasks will be controlled and managed
  • The execution of a task occurs asynchronously
  • The thread pool is responsible for the statistics of executing tasks

3.1 A simple example

Let’s start with a simple example:

public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // Execute (()-> System.out.println(" Execute the Runnable Task.")); // Execute the task with the return value, Future<String> Future = executor.submit(()->" Execute the callable task and this is the result."); Println (future.get())); println(future.get()); println(future.get()); // Manually shutdown the thread pool executor.shutdown(); }}

As you can see from this simple example, the thread pool can perform tasks with or without a return value, which requires the get() method to block the fetch. In addition, the thread pool needs to be closed manually after the run, otherwise the JVM will not exit because there are a specified number of active threads in the pool, and a normal exit condition for the JVM is that there are no non-daemons running in the JVM process.

3.2 Construction method

The source code for the constructor is as follows:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

Although four constructors are provided, the last constructor is essentially called, which takes seven arguments:

  • corePoolSize: Number of core threads, even if the core thread in the thread pool is not working, the number of core threads will not decrease. This parameter has a minimum value of 0 and is less than or equal tomaximumPoolSize
  • maximumPoolSize: Used to set the maximum number of threads allowed in the thread pool
  • keepAliveTimeWhen the number of threads in the pool exceeds the number of core threads and is idle, the pool will reclaim some of the threads to give up system resources. This parameter can be used to set the number of threads to exceedcorePoolSizeThe number of threads after how long it takes to be reclaimed, with the last parameter indicating the unit of timeunitTogether with
  • unit: for settingkeepAliveTimeUnit of time
  • workQueure: Used to hold tasks that have been submitted to the thread pool but have not been executed
  • threadFactory: Factory used to create threads, which can be customized by developersThreadFactoryTo create a thread
  • handler: Reject policy. When the task exceeds the boundary of the blocking queue, the thread pool will reject the new task. It is mainly used to set the reject policy

3.3 Task execution process

After the pool is successfully created, the internal running thread is not created immediately; the ThreadPoolExecutor creates and runs it in a Lazy manner. The thread is created the first time the execution task method is called, such as:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
assert executor.getActiveCount() == 0;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;
executor.execute(()-> System.out.println(" Execute the runnable task."));
assert executor.getActiveCount() == 1;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;

(Please add the -ea parameter when running)

Let’s take a look at the specific execution process of the task:

  • If there are fewer threads running than the core, a new thread is created and the task is executed immediately
  • If the number of running threads is greater than or equal to the number of core threads, and the task queue is not full, the task will be placed in the task queue until the number of running threads completes their tasks, and then the task queue will be polled to obtain the task run
  • If the task queue is full and the number of running threads is less than the maximum number of threads, the thread pool will create threads to execute the task and the number of threads created will be less than the maximum number
  • If the task queue is full and the number of running threads has reached the maximum number of threads, and there are no free running threads at the moment, a task denial policy is executed, depending on the situationRejectedEcecutionHandler
  • If the thread in the thread pool is idle, and the idle time arriveskeepAliveTimeAt the specified time, the thread is retracted until retainedcorePoolSize(The core thread can also be set to be timeout recovered, by default, the core thread timeout is not enabled)

3.4 Thread factory

The ThreadFactory ThreadFactory is an interface:

package java.util.concurrent;

public interface ThreadFactory {
    Thread newThread(Runnable var1);
}

Using a thread factory, you can add custom configuration when creating a thread, such as name, priority, daemon thread, etc. For example, here is a simple implementation of the thread factory:

public class TestThreadFactory implements ThreadFactory { private final static String PREFIX = "Test thread["; private final static String SUFFIX = "]"; private final static AtomicInteger THREAD_NUM = new AtomicInteger(); @Override public Thread newThread(Runnable runnable) { ThreadGroup group = new ThreadGroup("My pool"); Thread thread = new Thread(group,runnable,PREFIX+THREAD_NUM.getAndIncrement()+SUFFIX); thread.setPriority(5); return thread; }}

3.5 Rejection Policy

By default, ThreadPoolExecutor provides four rejection strategies:

  • DiscardPolicy: Discard policy, discarding tasks directly
  • AbortPolicy: Terminate policy, throwRejectedExecutionException
  • DiscardOldestPolicy: Policy for discarding the oldest tasks in the queue (strictly according to the task queue, because not all queues areFIFO)
  • CallerRunsPolicy: Caller thread execution policy, the task will block execution in the current thread

Of course, if this is not possible, the RejectedExecutionHandler interface could be implemented to customize the policy:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}

3.6 Close the thread pool

If the thread pool is not required, you need to manually close the thread pool. Thread pools provide the following three ways:

  • Orderly shutdown:shutdown()
  • Close immediately:shutdownNow()
  • Combination close:shutdown()+shutdownNow()

3.6.1 Orderly shutdown

Shutdown () provides a way to close the thread pool in an orderly way. This method is called and then shutdown, waiting for the current task to complete, and a new submitted task to be rejected. Note that this method is non-blocking and returns immediately. If you need to see the closed status, you can use:

  • isShutdown(): Returns whether the call was madeshutdown()The results of the
  • isTerminating(): Returns whether the end is in progress
  • isTerminated(): Whether the return has ended

3.6.2 Close immediately

The shutdownNow() method first changes the thread pool state to shutdown, then suspends the unexecuted task, then attempts to interrupt the running thread, and finally returns the unexecuted task:

public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); Intstream.range (0,10).foreach (I -> Executor.execute (()-> {try{timeUnit.Second.Sleep (5); }catch (Exception e){ e.printStackTrace(); }})); List<Runnable> runnables = executor.shutdownNow(); System.out.println(runnables.size()); }

Output:

8 BUILD SUCCESSFUL in 326ms 2 actionable tasks: 2 executed java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at  com.company.Main.lambda$main$0(Main.java:29) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at  com.company.Main.lambda$main$0(Main.java:29) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 3:14:36 AM: Task execution finished 'Main.main()'.

3.6.3 Combination closing

In order to ensure the safe closure of the thread pool, a combined mode is generally used to ensure the normal execution of the running task while improving the success rate of the thread pool shutdown. Examples are as follows:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); Intstream.range (0,10).foreach (I -> Executor.execute (()-> {try{timeUnit.Second.Sleep (5); }catch (Exception e){ e.printStackTrace(); }})); // The first call to shutdown attempts to shutdown executor.shutdown(); Try {// if(! Executor. AwaitTermination (10, TimeUnit. SECONDS)) {/ / shutdowns of executor. ShutdownNow (); // If the forced shutdown fails, for example, the running thread is unusually long and cannot be interrupted if(! Executor. AwaitTermination (10, TimeUnit. SECONDS)) {/ / other processing, here only output interrupt failure of information System. The out the println (" the Terminate failed. "); }}}catch (interruptedException e){// If the current thread is interrupted and the exception is caught, execution closes the method executor.shutdownNow(); // Rethrow the interrupt signal Thread.currentThread().interrupt(); }

4 ScheduledThreadPoolExecutor

ScheduledExecutorService inherited the ExecutorService, and provides the characteristics of the task has been performed regularly, you can use ScheduledThreadPoolExecutor to achieve some special tasks. , of course, there are a lot of method to realize the fixed or framework, a native of shell, old Timer/TimerTask implementation, Quartz or special framework implementation, here is the realization of the JDK internal ScheduledThreadPoolExecutor.

Inherited ThreadPoolExecutor ScheduledThreadPoolExecutor, besides has all the methods of the ThreadPoolExecutor, also defines four methods related to the schedule:

  • <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)A:one-shot(execute only once), the task (callable) will be available in unit time (delayIs executed, and returns immediatelyScheduledFuture
  • ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit): is also aone-shotMethod, the task will be executed after unit of time, unlike the first method, which returnsScheduledFutureDoes not contain any execution results, but can be passed by the returnedScheduledFutureDetermines whether the task has ended
  • ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): The task will be at a fixed rateinitialDelayAfter being carried out continuously
  • ScheduledFuture<? > scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): The task will execute with a fixed delay unit of time

The differences between the latter two are as follows:

public static void main(String[] args) throws Exception { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); Runnable runnable = ()->{ long startTimestamp = System.currentTimeMillis(); System.out.println("current timestamp: "+startTimestamp); try{ TimeUnit.MILLISECONDS.sleep(current().nextInt(100)); }catch (Exception e){ e.printStackTrace(); } System.out.println("elapsed time: "+(System.currentTimeMillis() - startTimestamp)); }; Executor. ScheduleAtFixedRate (runnable, 10100, TimeUnit. MILLISECONDS); / / executor. ScheduleWithFixedDelay (runnable, 10100, TimeUnit. MILLISECONDS); }

Output:

current timestamp: 1619351675438
elapsed time: 97
current timestamp: 1619351676438
elapsed time: 85
current timestamp: 1619351677438
elapsed time: 1
current timestamp: 1619351678438
elapsed time: 1
current timestamp: 1619351679438
elapsed time: 68
current timestamp: 1619351680438
elapsed time: 99

You can see that the task always runs at a fixed rate, and that the start time of each run is always 1000ms apart.

The output from using FixedDelay is as follows:

current timestamp: 1619351754890
elapsed time: 53
current timestamp: 1619351755944
elapsed time: 30
current timestamp: 1619351756974
elapsed time: 13
current timestamp: 1619351757987
elapsed time: 80
current timestamp: 1619351759068
elapsed time: 94
current timestamp: 1619351760162
elapsed time: 29

Each start time is the time since the last execution completed plus the time interval (1000ms).

5 ExecutorsThread pool in

The Executors class provides six static methods for creating a thread pool:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool
  • ScheduledThreadPool
  • SingleThreadScheduledExecutor
  • WorkStealingPool

Let’s look at each of them.

5.1 FixedThreadPool

The source is as follows:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory);
}

The underlying call to FixedThreadPool is ThreadPoolExecutor. By default, the number of core threads created is equal to the maximum number of threads, and the task queue is an unbounded LinkedBlockingQueue.

5.2 SingleThreadExecutor

The relevant source code is as follows:

public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)); } private static class FinalizableDelegatedExecutorService extends Executors.DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); }}

Can see SingleThreadPool FinalizableDelegatedExecutorService packaging, is actually the inner class 1, core thread and the maximum number of threads LinkedBlockingQueue task queue is no borders. When a GC occurs, the shutdown() method is called.

5.3 CachedThreadPool

The source is as follows:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory);
}

The CachedThreadPool creates new threads as needed and is typically used to perform large, short-duration asynchronous tasks. Threads that are unused and idle for more than 60 seconds are recycled.

5.4 ScheduledThreadPool

The source is as follows:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

Create a specified number of core ScheduledThreadPoolExecutor.

5.5 SingleThreadScheduledExecutor

The source is as follows:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory)); } private static class DelegatedScheduledExecutorService extends Executors.DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); this.e = executor; } public ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit) { return this.e.schedule(command, delay, unit); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return this.e.schedule(callable, delay, unit); } public ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return this.e.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture<? > scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return this.e.scheduleWithFixedDelay(command, initialDelay, delay, unit); }}

So it’s singelThreadPool plus scheduledThreadPool.

5.6 WorkStealingPool

The source is as follows:

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
}

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
}

WorkstealingPool is a thread pool introduced by JDK8 that returns a ForkJoinPool. In the WorkstealingPool, if the tasks that each thread processes are time-consuming to execute, the tasks it is responsible for will be “stolen” by other threads, thus improving the efficiency of concurrent processing.