1 overview

This article mainly explains the Java thread pool interface and implementation class, as well as their basic use methods, including:

  • Executor/Executors
  • ExecutorService
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

2 Two important interfaces:Executor+ExecutorService

Executor is an interface that defines a simple task submission method:

//Executor
package java.util.concurrent;

public interface Executor {
    void execute(Runnable var1);
}
Copy the code

ExecutorService is an interface that inherits from Executor and provides additional methods for task submission and management, such as stopping task execution:

//ExecutorService
package java.util.concurrent;

import java.util.Collection;
import java.util.List;

public interface ExecutorService extends Executor {
    void shutdown(a);

    List<Runnable> shutdownNow(a);

    boolean isShutdown(a);

    boolean isTerminated(a);

    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;

    <T> Future<T> submit(Callable<T> var1);

    <T> Future<T> submit(Runnable var1, T var2); Future<? > submit(Runnable var1); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1)throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code

Two important ExecutorService implementations are detailed below:

  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

3 ThreadPoolExecutor

This is commonly referred to as a thread pool class. Generally speaking, a thread pool has the following characteristics:

  • A thread pool has a certain number of worker threads
  • The number of threads and tasks is controlled and managed
  • Tasks are executed asynchronously
  • The thread pool takes care of the statistics for executing tasks

3.1 A simple example

Let’s start with a simple example:

public class Main {
    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2.4.30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        // Perform a task with no return value
        executor.execute(()-> System.out.println(" Execute the runnable task."));
        // Perform tasks with return values using the Future generic class
        Future<String> future = executor.submit(()->" Execute the callable task and this is the result.");
        // Get the result of the task with get(), which blocks until the task is completed
        System.out.println(future.get());
   		// Manually close the thread poolexecutor.shutdown(); }}Copy the code

As you can see from this simple example, thread pools can perform tasks with and without a return value, which is blocked using the get() method. In addition, you need to manually shut down the thread pool after the run, otherwise the JVM will not exit, because there is a specified number of active threads in the thread pool, and the JVM exits normally only if there are no running non-daemons in the JVM process.

3.2 Construction method

The constructor’s source code is as follows:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Copy the code

Although four constructors are provided, it is essentially the last constructor that is called, which takes seven parameters:

  • corePoolSize: number of core threads. The number of core threads does not decrease even when the core threads in the thread pool are not working. The minimum value of this parameter is 0 and must be less than or equal tomaximumPoolSize
  • maximumPoolSize: Sets the maximum number of threads allowed in the thread pool
  • keepAliveTime: When the number of threads in the thread pool exceeds the number of core threads and the thread pool is idle, the thread pool will reclaim some threads to release system resourcescorePoolSizeHow long after the number of threads is recycled, with the last parameter representing the time unitunitTogether with
  • unit: For settingkeepAliveTimeUnit of time
  • workQueure: Holds tasks that have been submitted to the thread pool but have not been executed
  • threadFactory: a factory for creating threads, which developers can customizeThreadFactoryTo create a thread
  • handler: Reject policy. When a task exceeds the limit of the blocking queue, the thread pool will reject the new task

3.3 Task Execution Process

After a thread pool is successfully created, the internal running threads are not created immediately, and ThreadPoolExecutor is created and run in a Lazy manner. The thread is created when the execute task method is first called, for example:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2.4.30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
assert executor.getActiveCount() == 0;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;
executor.execute(()-> System.out.println(" Execute the runnable task."));
assert executor.getActiveCount() == 1;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;
Copy the code

(Please add -EA parameter when running)

Let’s take a look at the specific execution process of the task:

  • If the number of running threads is less than the number of core threads, create a new thread and execute the task immediately
  • If the number of running threads is greater than or equal to the number of core threads and the task queue is not full, the system puts the task in the task queue until the number of running threads completes their tasks, and then polls the task queue to obtain the task running
  • If the task queue is full and the number of running threads is less than the maximum number of threads, the thread pool creates threads to execute the task and the number of created threads is less than the maximum number of threads
  • If the task queue is full and the number of running threads has reached the maximum number of threads, and there are no free running threads at the moment, the task rejection policy is executed, depending onRejectedEcecutionHandler
  • If the thread in the thread pool is free and the idle time has reachedkeepAliveTimeSpecifies the time at which threads are recycled until reservedcorePoolSize(Core threads can also be set to timeout, default is not enabled core thread timeout)

3.4 Thread Factory

ThreadFactory is an interface:

package java.util.concurrent;

public interface ThreadFactory {
    Thread newThread(Runnable var1);
}
Copy the code

Using a thread factory, you can add custom configurations when creating a thread, such as specifying the name, priority, whether or not it is a daemon thread, etc. For example, here is a simple implementation of a thread factory:

public class TestThreadFactory implements ThreadFactory {
    private final static String PREFIX = "Test thread[";
    private final static String SUFFIX = "]";
    private final static AtomicInteger THREAD_NUM = new AtomicInteger();
    @Override
    public Thread newThread(Runnable runnable) {
        ThreadGroup group = new ThreadGroup("My pool");
        Thread thread = new Thread(group,runnable,PREFIX+THREAD_NUM.getAndIncrement()+SUFFIX);
        thread.setPriority(5);
        returnthread; }}Copy the code

3.5 Rejection Policy

By default, ThreadPoolExecutor provides four rejection policies:

  • DiscardPolicy: Discards the task directly
  • AbortPolicy: Terminates the policy and throwsRejectedExecutionException
  • DiscardOldestPolicy: The policy of discarding the oldest task in the queue (strictly speaking, the choice should be based on the task queue, because not all queues areFIFO)
  • CallerRunsPolicy: The caller thread executes the policy and the task blocks execution in the current thread

The RejectedExecutionHandler interface can be used to customize the policy:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}
Copy the code

3.6 Disabling a thread pool

If you don’t need a thread pool, you need to turn it off manually. Thread pools are provided in the following three ways:

  • Orderly closure:shutdown()
  • Close immediately:shutdownNow()
  • Combination closure:shutdown()+shutdownNow()

3.6.1 Orderly Shutdown

Shutdown () provides an orderly shutdown to close the thread pool. After calling this method, it will wait for all currently executing tasks to complete and then close, while new submitted tasks will be rejected. Note that this method is non-blocking and returns immediately. If you need to check the shutdown status, you can use:

  • isShutdown(): returns whether or not it was calledshutdown()The results of the
  • isTerminating(): Returns whether ending is in progress
  • isTerminated(): Returns whether it is finished

3.6.2 Close immediately

The shutdownNow() method first changes the thread pool state to shutdown, then suspends the unexecuted task, then attempts to interrupt the running thread, and finally returns the unexecuted task:

public static void main(String[] args) throws Exception {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(2.4.30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    IntStream.range(0.10).forEach(i-> executor.execute(()-> {
        try{
            TimeUnit.SECONDS.sleep(5);
        }catch(Exception e){ e.printStackTrace(); }})); List<Runnable> runnables = executor.shutdownNow(); System.out.println(runnables.size()); }Copy the code

Output:

8

BUILD SUCCESSFUL in326ms 2 actionable tasks: 2 executed java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at  com.company.Main.lambda$main$0(Main.java:29)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:339) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446) at  com.company.Main.lambda$main$0(Main.java:29)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
3:14:36 AM: Task execution finished 'Main.main()'.
Copy the code

3.6.3 Disabling combination

In order to safely shut down a thread pool, a combined approach is generally used to shut down the thread pool to ensure that the running tasks are executed normally and to improve the success rate of the thread pool shut down. Examples are as follows:

 ThreadPoolExecutor executor = new ThreadPoolExecutor(2.4.30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
IntStream.range(0.10).forEach(i-> executor.execute(()-> {
    try{
        TimeUnit.SECONDS.sleep(5);
    }catch(Exception e){ e.printStackTrace(); }}));// First call shutdown() to try to shut it down
executor.shutdown();
try{
	// If it is not closed after waiting some time
    if(! executor.awaitTermination(10,TimeUnit.SECONDS)){
    	// Force shutdown
        executor.shutdownNow();
        // If the forced shutdown fails, for example, the running thread is abnormally time-consuming and cannot be interrupted
        if(! executor.awaitTermination(10,TimeUnit.SECONDS)){
        	// Other processing, here is only output interrupt failure information
            System.out.println("Terminate failed."); }}}catch (InterruptedException e){
	// If the current thread is interrupted and an exception is caught, execute the immediate shutdown method
    executor.shutdownNow();
    // Rethrows the interrupt signal
    Thread.currentThread().interrupt();
}
Copy the code

4 ScheduledThreadPoolExecutor

ScheduledExecutorService inherited the ExecutorService, and provides the characteristics of the task has been performed regularly, you can use ScheduledThreadPoolExecutor to achieve some special tasks. , of course, there are a lot of method to realize the fixed or framework, a native of shell, old Timer/TimerTask implementation, Quartz or special framework implementation, here is the realization of the JDK internal ScheduledThreadPoolExecutor.

Inherited ThreadPoolExecutor ScheduledThreadPoolExecutor, besides has all the methods of the ThreadPoolExecutor, also defines four methods related to the schedule:

  • <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)A:one-shot(execute only once), task (callable) in unit time (delay) is executed and returns immediatelyScheduledFuture
  • ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit): Also aone-shotMethod, the task will be executed after unit time, unlike the first method, which returnsScheduledFutureDoes not contain any execution results, but can be returnedScheduledFutureCheck whether the task is complete
  • ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): Tasks are performed at a fixed rateinitialDelayAfter constantly being executed
  • ScheduledFuture<? > scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): The task will execute the task with a fixed delay unit time

The differences between the latter two are as follows:

public static void main(String[] args) throws Exception {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
    Runnable runnable = ()->{
        long startTimestamp = System.currentTimeMillis();
        System.out.println("current timestamp: "+startTimestamp);
        try{
            TimeUnit.MILLISECONDS.sleep(current().nextInt(100));
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println("elapsed time: "+(System.currentTimeMillis() - startTimestamp));
    };

    executor.scheduleAtFixedRate(runnable,10.1000,TimeUnit.MILLISECONDS);
/ / executor. ScheduleWithFixedDelay (runnable, 10100, TimeUnit. MILLISECONDS);
}
Copy the code

Output:

current timestamp: 1619351675438
elapsed time: 97
current timestamp: 1619351676438
elapsed time: 85
current timestamp: 1619351677438
elapsed time: 1
current timestamp: 1619351678438
elapsed time: 1
current timestamp: 1619351679438
elapsed time: 68
current timestamp: 1619351680438
elapsed time: 99
Copy the code

You can see that the tasks are always running at a fixed rate, and each run is always 1000ms apart.

The output from FixedDelay is as follows:

current timestamp: 1619351754890
elapsed time: 53
current timestamp: 1619351755944
elapsed time: 30
current timestamp: 1619351756974
elapsed time: 13
current timestamp: 1619351757987
elapsed time: 80
current timestamp: 1619351759068
elapsed time: 94
current timestamp: 1619351760162
elapsed time: 29
Copy the code

The start time is the time since the last execution and the interval (1000ms).

5 ExecutorsThread pool in

The Executors class provides six static methods for creating a thread pool:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool
  • ScheduledThreadPool
  • SingleThreadScheduledExecutor
  • WorkStealingPool

Let’s take a look at each of them.

5.1 FixedThreadPool

The source code is as follows:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory);
}
Copy the code

ThreadPoolExecutor is called at the bottom of FixedThreadPool. By default, the number of core threads created is equal to the maximum number of threads, and the task queue is LinkedBlockingQueue without borders.

5.2 SingleThreadExecutor

The relevant source code is as follows:

public static ExecutorService newSingleThreadExecutor(a) {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory));
}

private static class FinalizableDelegatedExecutorService extends Executors.DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }

    protected void finalize(a) {
        super.shutdown(); }}Copy the code

Can see SingleThreadPool FinalizableDelegatedExecutorService packaging, is actually the inner class 1, core thread and the maximum number of threads LinkedBlockingQueue task queue is no borders. When GC occurs, the shutdown() method is called.

5.3 CachedThreadPool

The source code is as follows:

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0.2147483647.60L, TimeUnit.SECONDS, new SynchronousQueue());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0.2147483647.60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory);
}
Copy the code

CachedThreadPool creates new threads as needed, which are typically used to execute large, short asynchronous tasks. Unused threads that have been idle for more than 60 seconds are reclaimed.

5.4 ScheduledThreadPool

The source code is as follows:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
Copy the code

Create a specified number of core ScheduledThreadPoolExecutor.

5.5 SingleThreadScheduledExecutor

The source code is as follows:

public static ScheduledExecutorService newSingleThreadScheduledExecutor(a) {
    return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));
}

private static class DelegatedScheduledExecutorService extends Executors.DelegatedExecutorService implements ScheduledExecutorService {
    private final ScheduledExecutorService e;

    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
        super(executor);
        this.e = executor;
    }

    publicScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit) {
        return this.e.schedule(command, delay, unit);
    }

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return this.e.schedule(callable, delay, unit);
    }

    publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit) {
        return this.e.scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    publicScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit) {
        return this.e.scheduleWithFixedDelay(command, initialDelay, delay, unit); }}Copy the code

SingelThreadPool+ScheduledThreadPool.

5.6 WorkStealingPool

The source code is as follows:

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null.true);
}

public static ExecutorService newWorkStealingPool(a) {
    return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null.true);
}
Copy the code

WorkStealingPool is a thread pool introduced in JDK8 that returns a ForkJoinPool. In WorkStealingPool, if the tasks handled by each thread are time-consuming to execute, the tasks it is responsible for will be “stolen” by other threads, thus improving the efficiency of concurrent processing.