Hello, today I’m going to share with you a Java thread pool. Take out your notebook and write it down
What is a thread pool?
In order to avoid the frequent repetition of creating and destroying threads, we can let these threads for reuse, in the thread pool, there will always be active threads in the footprint, but thread pool will not take up the thread, the thread is in the idle state, when the task will be from the inside to a thread pool to use, after finished the work, and no destruction of threads, Instead, we put the thread back into the pool.
Thread pools solve two main problems:
One is that thread pools provide good performance when performing a large number of asynchronous tasks.
Second, thread pool provides a means of resource limitation and management, such as the number of existing limits, dynamic new threads and so on.
The first problem, as mentioned above, is that the frequent creation and destruction of threads is costly to performance, but the threads in the thread pool can be reused to improve performance, and the thread pool uses blocking queues to maintain Runnable objects.
The principle of analysis
* * * * provides a set of thread pool management tools for controlling thread pool closers. * Provides a set of thread pool management tools for controlling thread pool closers.
- NewFixedThreadPool: Returns a thread pool of fixed length with a fixed number of threads in the pool.
- NewCacheThreadPool: This method returns a thread pool that adjusts the number of threads as needed, with a lifetime of 60 seconds
- NewSingleThreadExecutor: This method returns a thread pool with only one thread.
- NewSingleThreadScheduledExecutor: This method returns a SchemeExecutorService object with a thread pool size of 1, an extension of the SchemeExecutorService interface over the ThreadPoolExecutor class and the ExecutorService interface to perform a task at a given time.
- NewSchemeThreadPool: This method returns a SchemeExecutorService object that specifies the number of threads in the thread pool.
ThreadPoolExecutor (ThreadPoolExecutor) : ThreadPoolExecutor (newSingleThreadExecutor) : ThreadPoolExecutor (newSingleThreadExecutor) : ThreadPoolExecutor (newSingleThreadExecutor) : ThreadPoolExecutor (newSingleThreadExecutor)
ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor
WorkQueue is the object of the BlockingQueue interface. It is used to store Runable objects. There are several types of workQueue:
-
Queue for direct submission: SynchronousQueue is a queue that has no capacity. As I explained earlier, when a thread pool conducts an enqueue offer operation, it itself is empty, so it returns false, does not save it, and submits it to the thread for execution. If there are no free threads, the rejection policy is executed.
-
Bounded queue of tasks: You can use the ArrayBlockingQueue queue, because it is implemented internally based on arrays. You must specify a capacity parameter when initializing the queue. When using a bounded task queue, when a task is submitted, the number of threads in the thread pool is less than corePoolSize, a new thread is created to execute the task. When the number of threads in the thread pool exceeds corePoolSize, the submitted task is queued. When the submitted task is queued, if the number of threads in the thread pool does not exceed maximumPoolSize, a new thread is created to execute the task. If maximumPoolSize is exceeded, a reject policy is executed.
-
Unbounded queue of tasks: You can use the LinkedBlockingQueue queue, which is an internal list based queue. The default queue length is integer.max_value. You can also specify the length of the queue and block when the queue is full. Return false when the queue is full and true when the queue is joined. When using the LinkedBlockingQueue queue, when a task is submitted to the thread pool, if the number of threads in the thread pool is less than corePoolSize, the thread pool will generate a new thread to execute the task. When the number of threads in the thread pool is greater than corePoolSize, the submitted task is put into the queue, and the task in the queue is consumed after the thread executing the task is finished. If there are still new tasks to be submitted, and there are no idle threads, it will continue to join the submitted task in the queue until the resources are exhausted.
-
Priority task queue: T Finite task queue is a queue with execution priority, it can use PriorityBlockingQueue queue, can control the task execution order, it is an unbounded queue, the queue can be executed according to the priority of the task itself, in order to ensure performance, but also can have a good quality assurance.
ThreadPoolExecutor (ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor) ¶
CorePoolSize = 5, maximumThreadPoolSize = 10, and ArrayBlockingQueue = 60 seconds The ThreadFactory thread pool factory is used to specify the name of the ThreadFactory thread pool when a thread is created.
DefaultThreadFactory () : the thread pool name is displayed when the newThread is created. The main method is used to submit 15 tasks, and the execute method is called. Before analyzing the execute method, let’s take a look at thread state:
Through the above content can see CTL is actually stored thread pool of state variables and the number of threads, default is RUNNING, which is 11100000000000000000000000000000, here let’s assume on the machine is a 32-bit Integer, If the number of bits is 32 bits, the number of bits is 32 bits. If the number of bits is 32 bits, the number of bits is 29 bits. If the number of bits is 32 bits, the number of bits is 29 bits. We can clearly see that the following thread pool state is shifted to the left by the low level. In addition to the above variable, we also provide methods to manipulate the thread pool state:
Let’s look at the execute method under ThreadPoolExecutor:
The following points are summarized by analyzing the execute method:
- When the number of threads in the thread pool is less than corePoolSize, threads are added directly to the thread pool and the current task is executed as the first task.
- If the state of the thread pool is RUNNING, you can accept the task, the task into a blocking queue, internal secondary checks, are likely to run the following content when the thread pool status has changed, in this time if the thread pool state is not RUNNING, will the current task is removed from the queue, and refused to strategy.
- If the blocking queue is full or the SynchronousQueue special queue is empty, new threads are added to execute the task. If the number of threads in the thread pool exceeds maximumPoolSize, the policy is rejected.
- The offer method is used to join the queue. This method does not block the queue and returns false if the queue is full or timed out, or true if the queue is successfully joined.
ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue = ArrayBlockingQueue The maximum number of threads (including the number of core threads) is 10. Assuming that all tasks are submitted to the thread pool at the same time, 5 tasks are submitted to the thread for execution as the first task, and 5 tasks are added to the blocking queue. When the other 5 tasks are submitted to the thread pool, the blocking queue is found to be full. At this time, the task will be directly submitted, and the current number of threads is found to be 5, which is less than the maximum number of threads. New threads can be created to execute the task.
Here we are just assuming that all the tasks are submitted, because we added Thread. Sleep to the task for a while, and the task may not be completed until after the for loop is completed, so we can assume that all the tasks are submitted, but no tasks are completed. If any tasks are completed, May not trigger an maximum number of threads, could be taken out, after completion of a task from the queue and then to another task can be added to the queue, when in the figure above you can see, there are five core core thread on a mission, there are five tasks in task queue waiting for spare thread execution, and there are five thread of execution, Core threads are threads in the corePoolSize range, and non-core threads are threads larger than corePoolSize but less than or equal to MaximumPoolSize. These non-core threads are not always alive and will be destroyed as specified by the thread pool. We specified here if there is no task submitted after the 60 s, will destroy operation, of course the worker thread does not specify the thread must recycle these threads must be retained, and is determined according to the retrieval task from the queue, if the thread for the task, it found that the number of threads in thread pool is greater than the corePoolSize, and blocking the queue is empty, After 60 seconds, return false if there are no more tasks, the thread will be released, and processWorkerExit will be called to handle the exit of the thread.
The addWorker method above can be divided into two parts to explain. The first part is to increase the number of threads in the thread pool by CAS. There is an if statement in the first part.
You can view it as the following, will! Put it in parentheses and make it look like this:
The thread pool states are SHUTDOWN, STOP, TIDYING, and TERMINATED
-
Threads TERMINATED in STOP, TIDYING, or TERMINATED state do not need to be added or started. In TERMINATED state, threads in the thread pool are being destroyed, meaning shutdownNow is invoked.
-
If the thread pool state is SHUTDOWN and the first task is not empty, no new task will be accepted and false will be returned.
-
When the thread pool state is SHUTDOWN and the queue is empty, return no task addition.
The upper part is divided into inner and outer loops. The outer loop judges the state of the thread pool and is used to determine whether it needs to add work task threads. The latter inner loop increases the number of threads through CAS operation. Indicates that the number of threads in the thread pool has not exceeded corePoolSize. When false, indicates that the number of threads in the thread pool has reached corePoolSize and the queue is full, or that the SynchronousQueue has no space. However, the maximum thread pool maximumPoolSize has not been reached, so it determines internally whether the maximum limit has been exceeded based on the specified core parameter. If the maximum limit has been exceeded, no thread can be added, and a policy of rejection is implemented. If the maximum limit has not been exceeded, the number of threads is increased.
The second part is mainly about adding tasks to workers and starting threads. Here we first look at worker objects.
It can be seen that Worker is a lock that implements AQS, which is a non-reentrant exclusive lock. In addition, it also implements Runnable interface and implements the run method. In the constructor, the state of AQS is set to -1. The shutdown or shutdownNow methods are called, which will be interrupted, and -1 will not be interrupted. The run method calls ThreadPoolExecutor’s runWorker method. Recall that the addWorker method sets workerAdded to true after adding worker to the HashSet. It means that the worker is added successfully, and the following code is called:
if (workerAdded) {
t.start();
workerStarted = true;
}
This t represents the thread created using ThreadFactory in the Worker constructor and passes itself (the Worker itself) to the current thread. The created thread is the task thread. When the task thread is started, the run method under the Worker will be called. The run method is internally entrusted to the external runWorker method to operate, and its parameters are passed to the caller. The run method in Worker is shown as follows:
public void run() {
runWorker(this); // This denotes the Worker object itself
}
Here’s a simple diagram to illustrate the logic of the call down.
The overall logic is to create a thread first, the thread sets the Worker as an execution program and inserts the thread into the Worker, and then addWorker takes out the thread from the Worker and starts the operation. After starting, it calls the run method in the Worker. The run method then calls ThreadPoolExecutor’s runWorker, which in turn calls the Worker’s firstTask, the fistTask that is the actual task to execute and the code logic implemented by the user.
Let’s look at the details of the runWorker method:
We see that if the Worker is started for the first time, it will get the firstTask task from the Worker to execute, and then after the successful execution, it will get the task from the queue. This is interesting. It gets the task separately. There are two methods used in getTask. The first method is poll, which returns null if there is no task in the queue after a while, and the second method is take. The getTask method blocks the current thread if there is no task in the queue. If there is a task in the queue, the waiting thread is notified to consume the task.
- The worker thread calls getTask to get the task from the queue.
- If allowCoreThreadTimeOut is specified or the number of threads in the thread pool is greater than corePoolSize, the poll method of the blocking queue is used to clear idle redundant threads, and false is returned if no task has been obtained within the specified time.
- If the number of thread pools in the thread pool is smaller than corePoolSize or allowCoreThreadTimeOut is the default value false, the thread is blocked to get the task from the queue until there is a task on the queue to wake up the thread.
If the number of thread pools exceeds corePoolSize and allowCoreThreadTimeOut is not specified, It will clean up threads that are greater than corePoolSize and less than or equal to maximumPoolSize. If the number of threads in the current thread pool is greater than corePoolSize, the poll method is used to obtain the task in the queue. If there is no task in the queue for a period of time, null will be returned. After null is returned, timeOut=true is set, and getTask also returns null, which jumps to the caller’s runWorker method, and is kept in the while (task! = null || (task = getTask()) ! GetTask returns null to jump out of the while loop and set completedAbruptly = false to indicate that the completion was not sudden but normal. After exiting, it executes finally processWorkerExit(w, completedAbruptly) to perform the cleanup. Let’s look at the source code:
According to the above source code, if the number of threads exceeds the number of core threads, the runWorker will not wait for messages in the queue, but will perform clearing operations. The above clearing code first performs less operations on the number of thread pools, and then counts the number of completed tasks in the whole thread pool. The state of the thread pool TERMINATED is TERMINATED by SHUTDOWN->TIDYING-> or STOP->TIDYING->TERMINATED. The state is TERMINATED under two conditions:
- The state is TERMINATED by SHUTDOWN->TIDYING-> state. The state is TERMINATED by SHUTDOWN->TIDYING-> state
- The state is TERMINATED only when the thread pool state is STOP and the number of threads in the pool is empty. The process is STOP->TIDYING->TERMINATED
If TERMINATED, you also need to call termination’s signalAll() method to wake up any threads that are blocked by calling awaitTermination. In other words, when awaitTermination is called, Thread pool state is TERMINATED before it is awakened.
Let’s examine the tryTerminate method and see if it satisfies the above statement:
The first if statement above is modified as follows. We can see that the return is inside the else, and then the internal if statement is converted as follows:
if (! isRunning(c) &&
! RunStateAtLeast (C, TIDYING) && // Only SHUTDOWN and STOP (runStateOf(C)! = SHUTDOWN | | workQueue. IsEmpty ())) {/ / execution logic hereCopy the code
}else {
return;
Copy the code
}
The analysis contents are as follows:
- ! IsRunning (c) is not RUNNING. TERMINATED states are SHUTDOWN, STOP, TIDYING, and TERMINATED
- The language TERMINATED at least (c, TIDYING) means at least TIDYING. The language TERMINATED at both ends may be RUNNING, SHUTDOWN, and STOP. However, the state cannot be RUNINNG, so the state can only be SHUTDOWN and STOP when the two states are connected
- runStateOf(c) ! = SHUTDOWN | | workQueue. IsEmpty () is the current state of the SHUTDOWN, will start workQueue. IsEmpty (), together is the state is SHUTDOWN and work queues empty, when a thread pool status to STOP, RunStateOf (c)! If SHUTDOWN is set to true, the thread pool state is STOP
And then there’s a statement, an if statement that converts it to the logic that looks like this:
if (workerCountOf(c) == 0) {
// Execute the following logicCopy the code
}else{
interruptIdleWorkers(ONLY_ONE);
return;
Copy the code
}
The state of the thread pool TERMINATED is TERMINATED only when the number of threads TERMINATED is empty. The state of the thread pool TERMINATED is TERMINATED only when the number of threads TERMINATED is empty.
The state is TERMINATED by calling SHUTDOWN () or shutdownNow. The state is TERMINATED by calling SHUTDOWN () or shutdownNow. The state is TERMINATED by calling SHUTDOWN () or SHUTDOWN now. Let’s look at the shutdown method source:
- First, check the current thread’s permission to check whether the security manager is set. If so, check whether the current thread that calls shutdown has the permission to shutdown the thread. If so, check whether it has the permission to interrupt the work. SecurityException or NullPointException is thrown if no permission is granted.
- Set the thread pool state to SHUTDOWN, or return if the state is already greater than or equal to SHUTDOWN
- The interrupt flag is set if the thread is not set and the thread is not running
- Attempt to change the thread pool state to TERMINATED
Let’s take a look at advanceRunState as follows:
- When the thread pool status >=SHUTDOWN, return
- If the thread pool status is RUNNING and the thread pool status is set to SHUTDOWN, the system returns if the setting succeeds
The interruptIdleWorkers code is as follows:
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
Copy the code
}
We can see that when we call the shutdown method, we only set the interrupt flag on the idle thread, that is, the active thread that is executing the task does not set the interrupt flag, and the thread operation will be cleaned up gradually after the task is completed. We can remember the code in the getTask method:
ProcessWorkerExit (w, false) is called when the CAS operation succeeds and processWorkerExit(w, false) is returned. The Workers thread is TERMINATED and is trying to set the thread to TERMINATED state. The shutdown method is TERMINATED and the shutdownNow method is TERMINATED.
The shutdownNow method returns a list of unfinished tasks. Tasks = drainQueue(); In fact, the main differences between this method and shutdown method are as follows:
- The shutdownNow method sets the thread pool state to STOP, while shutdown changes the state to shutdown
- ShutdownNow method interrupts the work task, that is to say, if the worker thread is working, it will be interrupted, while shutdown is to try to obtain the lock first. If the lock is successfully obtained, the interrupt flag will be set, that is, the interrupt operation. If the lock is not obtained, it will automatically exit after completion.
- The shutdownNow method returns a list of unfinished tasks.
The following code is shutDownNow’s interruptWorkers method:
InterruptIfStarted internally calls the Worker’s interruptIfStarted method, which interrupts the thread only if the AQS state is greater than or equal to 0. However, the task will not be executed in the runWorker if it is running because the thread pool is stopped. If the thread pool is stopped, the thread will be interrupted.
Rejection policies
The built-in rejection policy in the JDK is as follows:
AbortPolicy: This policy directly throws exceptions to prevent the system from working properly
CallerRunsPolicy policy: As long as the thread pool is not closed and the thread pool is in the RUNNING state, this slightly calls the thread RUNNING the currently discarded task
DiscardOledestPolicy policy: This policy discards the oldest request, the first task to be executed, and attempts to commit the task again
DiscardPolicy: This policy silently discards unprocessed tasks without processing them.
conclusion
First of all, the last picture is used to summarize:
- The main thread makes calls to the thread pool, which performs the execute method
- The thread pool uses addWorker to create threads and add them to the thread pool. Here we see the second step of adding threads to the core thread pool. There is no core thread or non-core thread in the thread pool. If the number of threads in the current thread pool is greater than corePoolSize, or allowCoreThreadTimeOut is set to true, the thread will return after a certain amount of time. Don’t wait forever
- When the number of thread pools reaches corePoolSize, the thread pool first adds tasks to the queue
- When tasks in the queue also reach the maximum value set by the queue, it creates new threads, noting that the number of threads has exceeded corePoolSize but not maximumPoolSize.
- When the number of threads in the thread pool reaches maximumPoolSize, the policy is rejected accordingly.
Well, this is the end of today’s article, I hope to help you confused in front of the screen