JAVA thread pool source code parsing
preface
This article will only parse ThreadPoolExecutor in 1.8, focusing on the execute method, the thread declaration cycle in the pool, and the entire process from the commit to the end of the Worker.
Thread pool building
Direct look at the source code:
Let’s take a look at all the thread pool inputs in turn:
corePoolSize
Core threadsmaximumPoolSize
Maximum number of threadskeepAliveTime
Idle threads maintain active wait timesunit
The unit of waiting timeworkQueue
Waiting queuethreadFactory
Thread factoryhandler
Rejection policies
We’ll see what these parameters do in turn in the source code later.
execute
Take a look at itexecute
The code:
See firstctl
.private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
This atomic int is very powerful, using the low 29 bits to indicate the number of threads in the thread pool and the high 3 bits to indicate the running state of the thread pool:
- RUNNING: -1 << COUNT_BITS, i.e. 111, the thread pool in this state receives new tasks and processes tasks in the blocking queue
- SHUTDOWN: 0 << COUNT_BITS, that is, the high three bits are 000. The thread pool in this state will not receive new tasks, but will process tasks in the blocking queue
- STOP: 1 << COUNT_BITS, i.e. 001, a thread in this state does not receive new tasks, does not process tasks in a blocking queue, and interrupts running tasks
- TIDYING: 2 << COUNT_BITS, i.e. 010,
terminated
The thread pool is set to TIDYING after the method is executed - TERMINATED: 3 << COUNT_BITS, that is, the high three bits are 011
The workerCount method takes the number of workers from the CTL. If it is smaller than the number of core threads, the addWorker operation will be attempted. The command is the task we want to run, and the addWorker method will be slowly resolved below. Return if added successfully; If not, update the value of c.
If the number of workers is greater than or equal to the number of core threads, check whether the thread pool is still running according to c (c is the value of CTL) and try to add tasks to the queue. A rollback operation is required (the rollback operation is to remove the task from the queue and process it with a reject policy). If the double Check passes but the number of workers is 0, the worker will be added.
Workqueue. offer(command) returns false. In this case, a direct attempt is made to add a worker. If this fails, a rejection policy will be executed to process the task.
As can be seen from the execute method, the process of execute:
- if
worker
If the number of threads does not reach the number of core threadsworker
Perform this task - if
worker
Exceeds the number of core threads and is placed in a blocking queue for execution - If the blocking queue is also full, it simply tries to add a new one
worker
Execute the task. If the attempt fails, the reject policy is executed.
Worker
All the way up hereWorker
Is a class in the thread pool:
As you can see, this class is inherited from AQS, and is also the parent class of various locks, which is convenient for concurrent control, and is implementedrun
Method,run
Is to throw yourself as a taskrunWorker
Methods.
Glance at theWorker
And then look ataddWorker
Methods:
This is just the first half, so when you add it, let’s get itc
If the state is greater than or equal to SHUTDOWN, do not commit and return false.
If core is true, then the worker count is greater than or equal to corePoolSize. Otherwise, check whether the value is greater than or equal to maximumPoolSize. Return false if greater than or equal to. Otherwise, increase the number of workers through CAS and the increase succeeds, then jump out of the loop and start to create a new thread. Failure indicates that in the competition state, the new C is acquired again, and then the above process is recycled.
The thread pool maintains a set of threads called core threads. If enough core threads have been created, we can create more threads, but not more than maximumPoolSize. (If the values are the same, there will only be corePoolSize threads.)
Look at the second half:
First createWorker
, in the case of a reentrant lock, theWorker
In aworkers
(theworkers
Is ahashset
).worker
Enter the build method offirsttask
Is ourexecute
Submitted tasks. We’ll talk about whyfirsttask
. If you join theworkers
, will performworker
In thethread
thethread
Is the representativeworker
The thread, then, thisworker run
What is to be carried out, as mentioned above, carried outrunWorker(this)
. Now to be specificrunWorker
Methods:
This is the heart of the entire thread pool:
worker
Releases the lock.- Access to task
firsttask
, why is it called the first task? Because when a worker is created, there will be a task at the beginning, and the task will be executed after it is finishedgetTask
Method to retrieve tasks from a queue. - You can customize tasks based on service scenarios before and after task execution
beforeExecute
andafterExecute
Methods. - If there are no tasks in the queue,
getTask
Will wait. getTask
Out of the task, will be executedtask
therun
Methods. Note:This is not the start method!Always remember that this is an executionworker
! whileworker
It is already bound to a thread when it is created, so we are taking the task out inworker
How to create the Worker threadWorker
Class.
To summarize, the entire process from addWorker to runWorker:
- Check the thread pool state and the number of core threads, and then create
Worker
,Worker
In theworkers
In the. - perform
worker
.worker
Execute the task assigned at creation time, and then block the queue for the task to execute.
The worker’s life cycle
Finally, let’s talk about worker’s declaration cycle, which is very important but neglected in most online blogs. Let’s take a closer look.
The created process has already been described, without further elaboration, just looking at the end of the worker’s life cycle. Recall above that the worker executes the firsttask assigned to him at the beginning and then takes the task from the queue to execute it. Let’s look at the code for this loop:
while (task ! = null || (task = getTask()) ! = null) { //.... }Copy the code
No matterwhile
Normal end, or abnormal interruption, will enterfinally
,processWorkerExit
Methods:
thiscompletedAbruptly
The meaning of:If it is true, it means that the worker ends because the user’s task ends abnormally.
Look at the code to see what the exit worker does:
- If it’s a user problem
worker
End, then decreaseworker
The number. - Global lock, the thread pool execution completed
task
Quantity plus thisworker
The executed task is equivalent to a performance review. - from
workers
To remove thisworker
. - Attempted to terminate the thread pool.
- The thread pool state is less than STOP, and if the thread pool is interrupted due to a user problem, replenish one directly
worker
It’s the following sentenceaddWorker(null, false)
). If the interruption is due to a user problem, determine the number of core threads andworker count
If necessary, add another oneworker
. Note: added hereworker
Student: No allocationfirsttask
Of means this supplementaryWorker
The task is fetched directly from the blocking queue.
At this point, we summarize the end of worker’s life cycle:
worker
In awhile
In a loop, tasks are continuously fetched from the blocking queue.keepAliveTime
If the task cannot be obtained within the specified time, or the task is successfully obtained but fails to be executed, exit the while loop and enterprocessWorkerExit
.- If you end the
Worker
After that, it needs to be replenishedworker
, then add one, otherwise, oneworker
End of life, waiting to be GC (fromworkers
After it was removed, thisworker
No more quotes).
conclusion
At this point, the source code analysis of the entire thread pool is complete. We can see that the most important thing in the thread pool is the whole life cycle of workers, which is closely related to the state flow of the thread pool and has a very rigorous structure. One last homage to Doug Lea.
Blog.shiwuliang.com/2017/11/03/… (Other code can follow me on Github)