Introduction to thread pools

With the rapid development of computer, multi-core CPU has become the mainstream. In order to give full play to computer performance, multithreading programming has become a necessary weapon for improving service performance. The thread pool provided by Java can help us manage threads and facilitate parallel execution of tasks. How to use thread pool reasonably and be familiar with its operation principle is the basic skill of every programmer. This article combines the thread pool source code, let’s get an in-depth understanding of her design ideas. Finally, regression practice combined with corresponding demo further deepen the impression.

What exactly is a thread pool?

In a common sense, a thread pool is a pool of threads that are created in advance. If there is a task that needs to be processed, the thread in the pool will handle the task. After processing, the thread will not be destroyed, but wait for the next task. Since creating and destroying threads consumes system resources, consider using thread pools when you want to create and destroy threads frequently to improve your system’s performance.

The idea of pooling resources, thread pool is a tool to manage threads based on pooling ideas. This idea can be used to solve the problems encountered in development, improve resource reuse and improve service performance.

This article describes the ThreadPoolExecutor class in the Javaj.u.c package.

What are the benefits of using thread pools?

  • Reduce resource consumption: Reduce the cost of frequent creation and destruction by reusing already created threads.
  • Improve manageability of threads: Threads are scarce resources, and unlimited creation will not only consume a large number of system resources, but also cause resource scheduling imbalance due to unreasonable distribution of threads, reducing system stability. Thread pools are easy to manage and monitor.
  • Speed up: Execute tasks immediately, without waiting for a thread to be created to execute them.

What problem does it exist to solve?

Thread pools exist primarily to solve resource management problems. In a concurrent environment, the system does not know at any given moment how many tasks need to be performed or how much resource commitment is required. All kinds of uncertainties will seriously affect the stability of the system.

Second, the core design and implementation of thread pool

The ThreadPoolExecutor class is the core of the thread pool class, and it is essential to understand Java’s thread pool thoroughly. Let’s take a look at the source code for the ThreadPoolExecutor class.

Four constructors are provided in the ThreadPoolExecutor class:

public class ThreadPoolExecutor extends AbstractExecutorService {...public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
    TimeUnit unit,BlockingQueue<Runnable> workQueue);
    	public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
        TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
        TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
        TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
        RejectedExecutionHandler handler); . }Copy the code

As you can see from the code above, ThreadPoolExecutor inherits the AbstractExecutorService class and provides four constructors, all of which are initialized by calling the fourth constructor.

Here’s what the constructor parameters mean:

  • CorePoolSize: core thread pool size. By default, there are no threads in the thread pool. Only when the task comes does it go back and create a thread to execute the task, when the number of threads in the thread pool reachescorePoolSize“, the subsequent submitted tasks are cached in the queue.
  • MaximumPoolSize: specifies the maximum number of threads that can be created in a thread pool.
  • KeepAliveTime: The maximum length of time a thread can live without a task. By default, only if the number of threads in the thread pool is greater thancorePoolSizeWhen,keepAliveTimeBefore it works.
  • The unit:keepAliveTimeUnit of time. As follows:
TimeUnit.DAYS;               / / day
TimeUnit.HOURS;             / / hour
TimeUnit.MINUTES;           / / minute
TimeUnit.SECONDS;           / / SEC.
TimeUnit.MILLISECONDS;      / / ms
TimeUnit.MICROSECONDS;      / / subtle
TimeUnit.NANOSECONDS;       / / nanoseconds
Copy the code
  • WorkQueue: A blocking queue used to cache tasks waiting to be executed. Commonly used as follows:
ArrayBlockingQueue; 
LinkedBlockingQueue;
SynchronousQueue;
Copy the code
  • ThreadFactory: a threadFactory used to create threads.
  • Handler: There are four types of rejection policies:
ThreadPoolExecutor.AbortPolicy; / / discard task and throw RejectedExecutionException anomalies. ThreadPoolExecutor.DiscardPolicy; // Discard the task, but do not throw an exception. ThreadPoolExecutor.DiscardOldestPolicy; / / discard queue in front of the task, and then to try to perform a task (repeat) ThreadPoolExecutor. CallerRunsPolicy; // This task is handled by the calling threadCopy the code

Let’s take a look at how ThreadPoolExecutor works. First, take a look at the ThreadPoolExecutor class diagram to understand the inheritance relationship:

  1. Executors: Top-level interface, which is used to decouple task submission from task execution. Users only need to provide this interfaceRunnaleObject to submit the execution logic of the task toExecutorsbyExecutorsComplete assignment and execution of tasks.
  2. ExecutorService: provides a way to manage the life cycle, returnFutureObject, and can trace the return of one or more asynchronous task executionsFutureMethods. Add ways to manage thread pools, for exampleshutdownSmooth closingExecutorService.
  3. AbstractExecutorServiceAbstract class, implementedExecutorServiceInterface method, will perform the task flow implementation, the lower level only need to perform the task on the line.
  4. ThreadPoolExecutor: Core implementation class, mainly used to manage threads and perform tasks.

Since ThreadPoolExecutor is the core, how does ThreadPoolExecutor work? As shown below:

A production-consumer model:

Producers are used to submit tasks and go through different processes according to the corresponding rules. Consumer is mainly used for thread management, allocation of threads, task execution to continue to obtain new task execution, when the task is not finally obtained, the thread will be recycled according to the corresponding rules.

Source analysis

The UML diagram above shows the structure of ThreadPoolExecutor.

ThreadPoolExecutor’s top-level superclass is the Executor interface and contains only one method, execute, which executes the submitted task object.

public interface Executor {
   execute(Runnable command);
}
Copy the code

Executor#execute is implemented in the ThreadPoolExecutor class:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); . }Copy the code

See CTL variable, also do not know what to use, the code to find her definition:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Copy the code

This is a field that controls the running state of the thread pool and the effective number of threads in the thread pool.

  • WorkerCount: indicates the number of valid threads.
  • RunState: state of the current thread pool.

Thread pool running state description and runtime state transition process:

RUNNING: Accept new tasks and handle queuing tasks. SHUTDOWN: No new tasks are accepted, but queued tasks can continue to be processed. STOP: STOP accepting new tasks, STOP processing blocked tasks, and STOP tasks that are already in progress. TIDYING: All tasks are terminated, and the number of valid threads is 0. Run the terminated() hook method. TERMINATED: The state is TERMINATED ().Copy the code

Lifecycle state transitions described by JDK annotations:

* RUNNING -> SHUTDOWN
*    On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
*    On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
*    When both queue and pool are empty
* STOP -> TIDYING
*    When pool is empty
* TIDYING -> TERMINATED
*    When the terminated() hook method has completed
Copy the code

The thread pool lifecycle flowchart is as follows:

CTL is an AtomicInteger with 32 bits, the state of the thread pool needs 3 bits to represent, and the workerCount has 29 bits, so the code specifies that the number of valid threads in the thread pool is (2^29) -1.

private static final int COUNT_BITS = Integer.SIZE - 3; Private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int CAPACITY = (1 << COUNT_BITS) - 1; Private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } / / calculate the number of threads private static int ctlOf (int the rs, int wc) {return rs | wc. }// State plus number of threads generates CTLCopy the code

Back to the ThreadPoolExecutor#execute method again:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); If the number of threads currently running is smaller than the number of core threads, create a new thread to execute the task, otherwise put the task into the task queue. If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)); c = ctl.get(); } //2. The current core thread is all running, put the thread into the task queue. If (isRunning(c) &&workqueue.offer (command)) {// whether the thread isRunning and whether the task is inserted successfully int recheck = ctl.get(); if (! IsRunning (recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3. If the queue fails to be inserted and the current number of threads is smaller than the maximum number of threads in the pool, create a new thread to execute the task. else if (! addWorker(command, false)) reject(command); // The task refuses to throw an exception}Copy the code

Execute Performs task scheduling and checks the running status, number of running threads, and running policies of the thread pool. Whether to apply directly to the thread for execution or queue the task or reject the task directly.

  1. First check if the thread pool is inRUNNINGStatus, if not outright rejection.
  2. If workerCount < corePoolSize and the number of valid threads is less than the number of core threads, a thread is created and started to execute the newly submitted task.
  3. If workerCount >= corePoolSize, the number of valid threads is greater than or equal to the number of core threads, and the task queue is not full, the task is placed in the task queue.
  4. If workerCount >= corePoolSize && workerCount < maximumPoolSize, the number of valid threads is greater than or equal to the number of core threads, the number of valid threads is less than the maximum number of threads, and the task queue is full, A thread is directly started to execute the newly submitted task.
  5. If workerCount >= maximumPoolSize, the number of valid threads is greater than or equal to the maximum number of threads, and the valid thread task queue is full, the newly submitted task is processed according to the reject policy.

The general flow chart is as follows:

The first step in the execute method is to determine whether there are any idle threads in the current core thread count. If there are any idle threads in the current core thread count, the addWorker method is used to create a thread to execute the task.

private boolean addWorker(Runnable firstTask, boolean core) { //1. Cyclic CAS operation, the number of threads in the thread pool +1; retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //core equals true to add threads to the core thread pool; False: Add to the maximum thread pool. If the number of threads exceeds the threshold, the system returns directly. if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // Modify the CAS CTL +1 to exit successfully. if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // re-read CTL // If the state of the thread pool changes, retry to the outer loop. if (runStateOf(c) ! = rs) continue retry; // else CAS failed due to workerCount change; Retry inner loop}} //2. Create a thread and add it to the thread pool. boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // Wrap Thread as worker Thread final Thread t = w.htread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // Thread pool state // Thread pool state is running, Or thread pool closed and task thread is empty if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {if (t.i sAlive ()) / / thread is in a state to obtain throw new IllegalThreadStateException(); // Add the new thread to the thread pool worker.add (w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; workerAdded = true; }} finally {mainlock. unlock(); If (workerAdded) {t.start(); // This start actually executes the run method in the worker. workerStarted = true; }}} finally {// If the worker thread is not successfully created if (! workerStarted) addWorkerFailed(w); // After a failure, the worker thread needs to be removed and the CTL restored. } return workerStarted; }Copy the code

After the Worker thread is successfully added to the Worker thread collection, start execution begins, where start is the run method in the Worker thread. Inherit AQS, with lock function, implement Runnable, with thread function:

Private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/ / real running threads in a thread pool. final Thread thread; // Thread wrapped tasks. Runnable firstTask; // Record the number of tasks completed by the current thread. volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); RunWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() {runWorker(this); // Run the worker thread}}Copy the code

As you can see, the run method of the Worker class actually calls the Runworker method of ThreadPoolExecutor. Let’s take a look at the runworker source for ThreadPoolExecutor:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task ! = null || (task = getTask()) ! = null) { w.lock(); // If the pool is stopping, make sure the current thread is interrupted, otherwise make sure it is not interrupted. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// Hook beforeExecute(wt, task) before starting a task; Throwable thrown = null; Try {// Execute task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// Hook afterExecute(task, thrown); }} finally {completedTasks++, unlock task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; ProcessWorkerExit (w, completedAbruptly); }}Copy the code

When the current task is not NULL or the task retrieved from the queue is not NULL, the worker thread keeps executing the task. When the transaction fails, the loop ends and the thread is reclaimed. The most important of the runWorker methods is the getTask() method, which continually fetches tasks from the blocking queue and gives them to the thread to execute. Let’s see how this works.

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // if the thread pool is SHUTDOWN and the queue is empty, or if the thread pool is STOP or terminate, the valid thread pool number is -1. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // If allowCoreThreadTimeOut is true or the current number of valid threads is greater than the number of core threads, timeout reclamation is required. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // If the number of valid threads is greater than the maximum number of threads or timeout and no timeout collection is allowed and the number of valid threads is greater than 1 or the queue is empty, the number of valid threads is -1, and null is returned to reclaim the thread. if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try {// Call the poll of the blocking queue if idle collection is allowed, otherwise take until there is a redeeming task in the queue. Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // Go to task, return result task if (r! = null) return r; TimedOut = true; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code

AllowCoreThreadTimeOut is false. Threads are not destroyed even if they are idle. If true, it is destroyed if it is idle during keepAliveTime. If the thread allows idle wait without being destroyed timed == false, the workqueue.take task. If the blocking queue is empty, the current thread is suspended. When a task is added to the queue, the thread is woken up and the take method returns the task and executes it. If the thread is not allowed to idle endlessly timed == true, workqueue.poll task. If there is no work on the blocked queue within keepAliveTime, null is returned.

To this has finished the implementation of the specific, see the meng force must be combined with their own code to see how to get a look at the specific play.

Thread pool practices

In this section, we will take a look at how thread pools are used:

public class ThreadPoolTest { @Test public void test() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000)); for (int i = 0; i < 20; I++) {threadPoolExecutor. Execute (new Runnable () {@ Override public void the run () {System. Out. Println (" the number of threads in thread pool: "+ threadPoolExecutor. GetPoolSize () +", the queue waiting for the number of task execution: "+ threadPoolExecutor. GetQueue (). The size () +", has been playing with other tasks the number: " + threadPoolExecutor.getCompletedTaskCount()); }}); } threadPoolExecutor.shutdown(); }}Copy the code

A very simple demo that can be implemented quickly using the thread pool provided by the JDK. Of course, Executors also provides some static methods to use, as follows:

Executors.newFixedThreadPool(2); // Create a thread pool with a fixed length. The maximum number of concurrent threads can be controlled. Executors.newCachedThreadPool(); // Create a cacheable thread pool. If the length of the pool exceeds the processing requirement, you can recycle idle threads, or create new threads if none are available. Executors.newScheduledThreadPool(1); // Create a thread pool of fixed length to support scheduled and periodic task execution. Executors.newSingleThreadExecutor(); // Create a single threaded thread pool that uses only one worker thread to execute tasks, ensuring that all tasks are executed in the specified order (FIFO, LIFO, priority). Executors.newWorkStealingPool(); // Create a work-steal thread pool using all available processors as the target parallelism level. ForkJoinPool is used as the underlying implementation.Copy the code

Ok, finished can realize a try yourself!!

Ok, finished can realize a try yourself!!

Ok, finished can realize a try yourself!!

Write in the last

JAVA itself provides apis that allow us to quickly implement multi-threaded development based on thread pools, but we have to be responsible for the code we write, and the Settings of each parameter and the choice of policy are absolutely dependent on the application scenario. However, it is not easy to choose different parameters and different strategies. Actual application or should be combined with specific scenarios to use, today is here, I hope this article can give you help. Like to add attention, continue to update the follow-up!!