Introduction to THE JDK thread pool
The use of thread pools is almost a part of Java development, and even if you’ve never done multithreaded development, your Web containers (e.g., Tomcat, Jetty), RPC services (Dubbo) all use thread pools heavily to support concurrent execution. With the development of CPU hardware technology, multithreading has become one of the most critical language features and optimization points.
The most common postures for thread pools in Java look like this. (Java. Util. Concurrent. ThreadPoolExecutor)
public class Foo {
// Fixed size thread pool, queue when more than 20 tasks are submitted (queue size is unlimited, OOM warning ⚠️)
private static ExecutorService fixThreadPool = Executors.newFixThreadPool(20);
// Infinite size thread pool, it also receives 100 million tasks, OOM warning ⚠️
private static ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// Custom thread pool, thread number, queue and queue capacity controllable, ali Java specification manual required usage
private static ExecutorService customThreadPool = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
}
Copy the code
As the comments in the code examples show, different types of thread pools have different capabilities, with the following points to note:
- You can see that thread pools are initialized to static static variables, which means that thread pools areThe singleton. Some novice readers may be tempted to use thread pools in a demo that uses a noncanonical usage found on the web,Declare the creation of a thread pool in a local variable of the method, which doesn’t fit the purpose of thread pools (saving hard-won threads for reuse later), and can existThread overflowThe thread pool will not be collected by GC after the reference to the thread pool ends. I used this pit when I was a beginner.
public class foo {
// This method creates 2 threads each time it is called and does not recycle at the end of the method
public void theMethod(a) {
ExecutorService fixThreadPool = Executors.newFixThreadPool(2);
fixThreadPool.execute(() -> {});
fixThreadPool.execute(() -> {});
fixThreadPool.shutdown(); // If you must use a thread pool ina local variable, you must end it with shutdown, preferably try-finally}}Copy the code
- You can see whether
newFixThreadPool
ornewCachedThreadPool
There are OOM risks. Many thread pools used in our current projects do not have problems because of the low concurrency, so alibaba Java Development Manual [1] requires the use of a third custom thread poolThreadPoolExecutor
, requires the user to determine the parameters by evaluating the future concurrency scale before using the thread pool.
The serial number | The name of the | type | meaning |
---|---|---|---|
1 | corePoolSize | int | Core thread pool size |
2 | maximumPoolSize | int | Maximum thread pool size |
3 | keepAliveTime | long | Maximum idle time of a thread |
4 | unit | TimeUnit | Unit of time |
5 | workQueue | BlockingQueue | Thread wait queue |
6 | threadFactory | ThreadFactory | Thread creation factory |
7 | handler | RejectedExecutionHandler | Rejection policies |
More on how JDK thread pools are used and how they work with different parameters (corePoolSize, maximumPoolSize, etc.) have been covered in many blogs and won’t be covered in this article. This paper focuses on the implementation principle of these thread pool functions from the perspective of source code.
ThreadPoolExecutor source
Both Executors newFixThreadPool, or Executors newCachedThreadPool, are actually created through different initialization parameter calls ThreadPoolExecutor, To analyze the source code of ThreadPoolExecutor, you first need to understand the data structure of ThreadPoolExecutor. The core member variables of ThreadPoolExecutor are shown below.
CTL — runState and wokerCount in one
ThreadPoolExecutor has a core field called CTL, which is the JDK’s AtomicInteger for atomic operations (based on the CAS optimistic lock that guarantees atomicity for operations such as i++).
Looking at the comments and source code, we can see that the 32-bit AtomicInteger is actually a combination of runState (thread pool state) and workerCount (thread pool number), each occupying different bits of a 32-bit number, as shown below.
runState
Thread pool status runState Each state is described as follows. You can see that the value of runState increases with the lifetime of the thread pool.
state | The values | English description | English description |
---|---|---|---|
RUNNING | Minus one, binary 101 | Accept new tasks and process queued tasks | In the operation of the |
SHUTDOWN | 0, binary 000 | Don’t accept new tasks, but process queued tasks | Stop receiving new tasks and process tasks in the queue |
STOP | 1, binary 001 | Don’t accept new tasks, don’t process queued tasks, and interrupt in-progress tasks | Stop receiving new tasks, stop processing tasks in the queue, and interrupt ongoing tasks |
TIDYING | Two, binary 010 | All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method | Clean up. All tasks are terminated, workerCount==0, and the thread turned to TIDYING will execute the terminated() hook method |
TERMINATED | 3, binary 011 | terminated() has completed | Terminated () method callback is complete |
The conversion relationship between each state is shown in the following figure.
The final transformation steps of the whole state machine can be summarized as follows: RUNNABLE is the normal running state of the thread pool, which is moved to a different state by gentle shutdown or violent shutdownNow, and then to TIDYING state after resource release. TIDYING state terminated by calling the reserved terminated() method terminated.
workerCount
There is no more authoritative explanation of workerCount than Doug Lea’s comments in the source code.
The workerCount is the number of workers that have been permitted to start and not permitted to stop. The value may be transiently different from the actual number of live threads, for example when a ThreadFactory fails to create a thread when asked, and when exiting threads are still performing bookkeeping before terminating. The user-visible pool size is reported as the current size of the workers set.
WorkerCount counts the number of threads in workers that are allowed to start and not allowed to stop (that is, workers in the thread pool that can be used to assign new tasks). This value can be temporarily inconsistent with the actual number of threads alive, such as when ThreadFactory fails to create a thread and the exiting thread is registered with the thread. The thread pool size visible to the user is reported as the current size of the worker collection (that is, the getPoolSize() method returns worker.size ()).
Why two in one?
In cases where both runState and workerCount need to be determined, the performance penalty of locking is avoided.
The reason why we need to determine runState and workerCount at the same time is because we need to atomically check runState and workerCount in addWorker. Prevent in should not add the thread pool thread state add threads (details in below ThreadPoolExecutor. Worker relevant section will in detail), not the kui is Doug Lea patriarch.
ThreadPoolExecutor# the execute () method
The execute method itself is not complicated, so this article posts the source code directly with annotations
public void execute(Runnable command) {
if (command == null) // 0. Null pointer exception
throw new NullPointerException();
int c = ctl.get();
if (addWorker(command, true)) It is possible that core is full at this point, or that runState is not RUNNABLE
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 2. The current thread exceeds coreSize and is enqueued
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) For concurrency, if the thread pool stops, remove it from the queue
reject(command);
else if (workerCountOf(recheck) == 0) // Because existing ones died since last checking
addWorker(null.false);
}
else if(! addWorker(command,false)) // 3. If the queue fails, create a thread again. If the queue fails, shutdown or the thread pool is full
reject(command);
}
Copy the code
The entire execution flow of ThreadpoolOr #execute() can be summarized as follows
You can see that when the number of threads is less than coreSize, all thread pools behave as creating new threads for execution. When the number of threads exceeds coreSize, the thread pool is enqueued, which has different behavior for different thread pools. For example, synchronousQueue. offer will fail and addWorker will be tried again. LinkedBlockingQueue. Will offer into the queue, queue capacity if it is infinite, under normal circumstances would never go to try addWorder that step, the reader can understand).
ThreadPoolExecutor.Worker
We can generalize addWorder to create a new thread for the thread pool. Here we analyze the execution process of addWorker(). The core simplified version of the pseudocode for the addWorker() method can be summarized as follows
def addWorker(firstTask, core) :
while True:
if runState > SHUTDOWN # thread pool stopped, cannot add worker
return False
if runState == SHUTDOWN and workQueue.isEmpty()): During the SHUTDOWN phase, the messages in the queue are consumed and the worker cannot be added
return False
if not cas_increment_worker_count(core) WorkerCount++ via cas+ loop
return False
worker = new Worker(firstTask)
workers.add(worker) # add worker
worker.start() # start the worker thread
def cas_increment_worker_count(core) :
while True:
if workCounter_too_mach(core): Determine the different workerCount upper limit depending on whether it is core expansion mode
return False;
if cas_increment(workerCount) # CAS way workerCount++
return True;
Select * from workerCount (workerCount, runState); select * from workerCount (workerCount, runState)
Copy the code
Based on the above understanding, we know the significance of CTL field 2-in-1, which is to ensure that a CAS operation judges runStatue and workerCount unchanged at the same time when worker is expanding, otherwise the only way to keep the two fields unchanged is to lock them. In that case, the execute method of the thread pool will execute much slower, especially in many services that rely heavily on the thread pool (web containers such as Tomcat, Dubbo).
In addition, we can see that after the workerCount is added through CAS, the workerCount will be started by executing the start() method as a thread. The structure and workflow of Worker are shown below.
From the figure above, we can see that Worker has three attributes
- Thread, worker itself implements the Runnable interface. Thread records the thread carrying the worker when it executes
- FirstTask, the thread pool is a thread pool because the threads in it are reusable. The task executed by the worker when it is created for the first time will be collected from the queue after it is finished
- CompletedTasks, statistical, actually unused, records how many things the Worker has done
The wonderful metaphor of dragon and Dragon:
Every Worker is just like a Worker. On the first day on the job, he or she will bring similar thingsNew reportThe firstTask is arranged with its own id and thread, and then starts to get jobs. The process starts again and again, until the company downsizes and goes bankrupt and leaves.
Friends who understandTears eyeType it on the public screen
def runWorker(worker) :
# thread refers to the current thread
task = worker.firstTask
whiletask ! = nullor(task = getTask()) ! = null:if runState >= STOP and thread.isNotInterrupted(): If the thread pool has been terminated and the current thread has not been interrupted, interrupt it.
thread.interrupt() # interrupt the current thread
beforeExecute(thread, task) Empty drop, reserved drop ~
task.run() # Execute task.run() on the current thread, which is the runnable we send to the thread pool
afterExecute(task, exception) Empty drop, reserved drop ~
worker.completedTasks++ Employee performance +1
# Because of exceptions or layoffs, the worker's endless cycle has ended and it's time to quit
processWorkerExit()
def processWorkerExit() :
workers.remove(worker)
if STOP > runState and need_revival:
addWorker() # If the thread needs to be expanded after it finishes, then you can resurrect it!
The thread pool has stopped
Copy the code
The judgment of need_revival is complicated and not important, which is ignored by the author here. In short, it is to determine whether the current exit is abnormal and whether the size of the thread pool still allows the restart of a new thread after the exit. (After the sudden death of an employee, the enterprise decides whether to recruit a new employee to fill the hole according to whether the current performance still needs manpower, tears)
Callable, FutureTask, and executor.submit()
executor.submit()
ThreadPoolExecutor has a execute(runnable) method and a Submit (Callable) method to retrieve the result of a task. Essentially, it wraps the callable object that we pass in. Waiters waiters hang all get() waiters while run() runs, and wake up all pending threads when the work is complete. ThreadPoolExecutor. Submit (callable) code is very simple, here Posting source code directly
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // callable ==> RunnableFuture (the actual implementation is FutureTask)
execute(ftask); // Execute after encapsulation
return ftask; // Submit returns a Future that is actually a FutureTask object
}
Copy the code
Callable and FutureTask
The essence of callable is to invoke execute and return a FutureTask to the user, who waits for the result of the response by executing its get() method. So the secret of Callable is that at the encapsulated newTaskFor layer, the implementation class returned by newTaskFor is actually FutureTask, whose inheritance is shown below.
Submit method returns a Future interface to the user, FutureTask as its implementation class, and inherits and implements Runnable and Future interfaces. The encapsulation of Callable is as follows.
The relationship between FutureTask execution and consumer GET () is shown below
The entire life cycle state machine of FutureTask during execution is shown
Locks in the JDK thread pool
ThreadPoolExecutor#addWorker
ThreadPoolExecutor has a HashSet
workers container to maintain workers in the thread pool. HashSet operations need to be thread-safe through ReentrantLock.
The Worker and AbstractQueuedSynchronizer
AbstractQueuedSynchronizer, hereinafter referred to as “AQS, is each sync related functions in the JDK source code already, Semaphore and CountDownLatch depend on the synchronizer. AQS is a Synchronizer that encapsulates the locking and unlocking logic of the JDK into a Synchronizer. For example, ReentranceLock lock = number of times the thread re-enters the lock +1, Semaphore lock = number of remaining permissions -1. At the same time, AQS assigns the actual execution logic of the actual tryAcquire and tryRelease methods to subclasses.
Worker inherits AQS and implements the methods left to subclasses. Back to the implementation of runWorker method above, we know that this is a process in which Worker workers continuously receive and process work.
This part of logic is the part that needs to maintain atomicity. After receiving a job, workers can only deal with this part of the job with undivided attention and cannot be disturbed by other interference in the execution process. Therefore, the actual Worker’s execution code is locked by inheriting AQS. The locking method is CAS optimistic locking.
As you can imagine, task.run() is the thread we submit via executor.execute()/submit(). If a Worker executes two task.run() tasks simultaneously without locking them, Then their ThreadLocal would be fully reused, what a terrible problem it would be, how much money the programmer would lose…
Thread pools and ThreadLocal
By analyzing the runWorker method, we can also see that for a thread pool with multiple threads, a Worker, i.e. a thread, will continue to process the tasks we submit via executor.execute(). Therefore, from the point of view of the thread pool user, the submitted thread ends, but the Worker does not. If we assume that the thread has finished and finished, and we do not reset the ThreadLocal before the thread completes, then the next thread will inherit the previous thread’s ThreadLocal. There may be unimaginable bugs caused by data errors (programmer warning ⚠️!!)
For more information on ThreadLocal, please visit our blog [3] for a graphic analysis of the principles and application scenarios of ThreadLocal
Summary and review
Having delved into the ThreadPoolExecutor source code for so long, let’s go back to the beginning and remember your awkward face
From the user’s point of view, We created a ThreadPoolExecutor (Executors newFixThreadPool () and Executors. NewCachedThreadPool () nature ThreadPoolExecutor), Executor.execute is then executed in the code (the submit method itself also calls execute)
The executor.execute() method first determines whether to call addWorker or queue based on the number of core threads and the state of the queue. Among them:
- AddWorker essentially starts the Worker thread
- Queued tasks will be fished out and processed in Worker execution
Now that we’ve thoroughly dissected the logic behind JDK thread pools, readers, if you were asked to implement a thread pool empty-handed, would you be able to do it?
refercences
- ^ Alibaba Java Development Manual
- ^ openjdk/jdk: JDK main-line development
- ^ Graphic analysis of ThreadLocal principles and application scenarios
- 14. ^ “Java Concurrent Programming in action” – Brian Goetz/Tim Peierls/Joshua Bloch/Joseph Bowbeer/David Holmes/Doug Lea