Java JVM multithreading MySQL Redis Kafka Docker RocketMQ Nginx MQ queue data structure concurrent programming concurrent pressure kill architecture and other technology knowledge PDF, if you need to click here to receive
What is a thread pool
In Java, creating a Thread can be done by inheriting Thread or implementing the Runnable interface, but if a new Thread is created for each request, creating and destroying the Thread takes a considerable amount of time and consumes system resources, perhaps even more than processing the actual user request.
To solve this problem, there is the concept of a thread pool. The core logic of a thread pool is to create several threads in advance and put them into a container. If a task needs to be processed, it is simply assigned to a thread in the thread pool to execute it. After the task is processed, the thread will not be destroyed and will wait for subsequent assignments. Managing threads repeatedly through a thread pool also avoids the overhead of creating a large number of threads.
Creating a thread pool
Create thread pools by following the closure-builder methods provided by the Java Executors class. For ease of use, create thread pools by following the following methods:
- NewFixedThreadPool: Creates a thread pool with a fixed number of threads
- NewSingleThreadExecutor: Create a thread pool with only 1 thread
- NewCachedThreadPool: Returns a thread pool that can adjust the number of threads according to the actual situation. The maximum number of threads is not limited. If idle threads are used, the task will be executed. And every idle thread is automatically reclaimed after 60 seconds.
- NewScheduledThreadPool: Creates a thread pool that can specify the number of threads, but also has the ability to delay and periodically execute tasks, similar to timers.
FixedThreadPool
Create a pool of a fixed number of N threads operating on a shared borderless queue. At any one time, up to N threads are active to process tasks. If a new task is submitted while all threads are active, the new submitted task is queued until a thread is available.
If any thread is terminated due to failure before shutdown, a new thread will be created when a new task needs to be executed, and the new thread will remain in the thread pool until it is explicitly shutdown.
The sample
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
//FixedThreadPool - Fixed number of threads
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i=0; i<10; i++){ fixedThreadPool.execute(()-> { System.out.println("Thread name:"+ Thread.currentThread().getName()); }); } fixedThreadPool.shutdown(); }}Copy the code
The output is:As you can see, there are at most three threads executing the task in a loop (The result of the run is uncertain, but there will only be 3 threads at most).
A FixedThreadPool calls the following methods to construct the thread pool:
SingleThreadExecutor
An executor with only one worker thread. If the thread is closed due to execution failure before normal closure, a new thread is created and added to the executor.
This actuator ensures that all tasks are executed in sequence and that only one task is active at any given time.
The sample
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i=0; i<9; i++){ singleThreadExecutor.execute(()-> { System.out.println("Thread name:" + Thread.currentThread().getName());
});
}
}
}
singleThreadExecutor.shutdown();
Copy the code
The result is only 1 thread:SingleThreadExecutor calls the following method to construct the thread pool:
CachedThreadPool
A thread pool that creates threads when a task needs to be processed, and that can be reused if a thread has finished processing a task and has not been reclaimed.
When we call the execute method, if the previously created thread is free and available, the previously created thread will be reused, otherwise a new thread will be created and added to the thread pool.
If a created thread is not used within 60 seconds, it is terminated and removed from the cache. Thus, this thread pool can remain idle for a long time without consuming any resources.
The sample
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i=0; i<9; i++){ cachedThreadPool.execute(()-> { System.out.println("Thread name:" + Thread.currentThread().getName());
});
}
cachedThreadPool.shutdown();
}
Copy the code
As you can see from the output, nine different threads have been created:Let’s modify the above example to sleep for a while before executing:
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i=0; i<9; i++){try {
Thread.sleep(i * 10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(()-> {
System.out.println("Thread name:" + Thread.currentThread().getName());
});
}
cachedThreadPool.shutdown();
}
Copy the code
The output is only 1 thread, because some threads can be reused: Note: The results of these two examples are not fixed, the first may not create 9 threads, and the second may create more than 1 thread, depending on how the thread executes.
CachedThreadPool calls the following methods to construct the thread pool
ScheduledThreadPool
Create a pool of threads that can be run after the given delay of a scheduling command or executed periodically. Compared to other thread pools, ScheduledExecutorService provides a custom subclass that inherits from ExecutorService.
The sample
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
for (int i=0; i<9; i++){ scheduledThreadPool.execute(()->{ System.out.println("Thread name:"+ Thread.currentThread().getName()); }); } scheduledThreadPool.shutdown(); }}Copy the code
Output results (execution results are random, with a maximum of 3 threads) :ScheduledThreadPool eventually calls the following methods to construct the thread pool
Thread Pool Principle
As you can see from the above screenshots, the ThreadPoolExecutor constructor is used to construct thread pools.
Construct thread pool 7 big parameters
Here is the most complete constructor of the ThreadPoolExecutor class:This is the core method for constructing a thread pool, which takes 7 parameters:
- CorePoolSize: number of core threads. Core threads that remain in the pool are not reclaimed even if they are idle, unless the allowCoreThreadTimeOut property is set
- MaximumPoolSize: indicates the maximum number of threads. The maximum number of threads allowed in the thread pool is greater than or equal to the number of core threads
- KeepAliveTime: active time. When the maximum number of threads is greater than the number of core threads, other threads exceeding the number of core threads are reclaimed if the space time exceeds keepAliveTime
- TimeUnit: active TimeUnit
- BlockingQueue: Blocks a queue. Used to store tasks that are waiting to be executed.
- ThreadFactory: A factory class that creates threads
- RejectedExecutionHandler: reject the policy. A policy to execute when submitted tasks are blocked when thread boundaries and queue capacity are reached.
Thread pools execute processes
The main flow of the execute(Runnable) method is very clear:According to the above source code, the thread pool execution flow chart is as follows:
Source code analysis
ThreadPoolExecutor CTL is a 32-bit int with the highest three bits representing the number of threads and the lowest 29 bits representing the number of threads.The conversion relationship of various states is shown as follows:Where, the relationship between the size of the state is:RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED
AddWork method
private boolean addWorker(Runnable firstTask, boolean core) {
// Number of threads +1
retry:
for (;;) {
int c = ctl.get();// Get thread pool capacity
int rs = runStateOf(c);// Get the status
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&// TERMINATED: SHUTDOWN, STOP, TIDYING
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))/ / : rs = = RUNNING, firstTask! =null,queue==null
return false;// If the task is closed, do not accept the task; False is also returned if it is running and queue is null
for (;;) {
int wc = workerCountOf(c);// Get the current number of worker threads
// If the number of worker threads is greater than or equal to the capacity or greater than or equal to the number of core threads (the maximum number of threads), then no more workers can be added
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//cas increases the number of threads, fails and spins again
break retry;
c = ctl.get(); // re-read CTL // Get the number of worker threads again
if(runStateOf(c) ! = rs)// Not equal indicates that the state of the thread pool has changed, continue the spin attempt
continueretry; }}// The second piece of logic: construct the thread as a Worker object and add it to the thread pool
boolean workerStarted = false;// Check whether the worker thread started successfully
boolean workerAdded = false;// Check whether the worker thread is added successfully
Worker w = null;
try {
w = new Worker(firstTask);// Build a worker
final Thread t = w.thread;// Remove the thread from the worker
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;// Get the reentrant lock
mainLock.lock();/ / lock
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());// After obtaining the lock, check the status again
// Can be added to workers only if the current thread pool is running, or if SHUTDOWN and firstTask is empty
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);// Add the newly created Worker to the workers collection
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;// Update the number of threads in the thread pool
workerAdded = true;// Succeeded in adding the thread (worker)}}finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();// The run() method in the Worker is executed
workerStarted = true;// The startup succeeded}}}finally {
if (! workerStarted)
addWorkerFailed(w);If the thread fails to start, it needs to be rolled back
}
return workerStarted;
}
Copy the code
This method basically does two things:
- 1, the number of threads +1
- Construct a thread as a Worker object, add it to the thread pool, and call the start() method to start the thread
Worker object
This method inherited AbstractQueuedSynchronizer, AQS is a synchronizer, so since there are threads of synchronizer, here why not used directly, instead of inheriting after rewriting?
This is because lock reentrant is supported in the AQS synchronizer, but the thread pool design is not intended to support reentrant, so an AQS is rewritten to avoid reentrant.
The state initialization state in Worker is set to -1 because when initializing the Worker object, the thread cannot be interrupted until it actually executes the runWorker() method. Threads are allowed to interrupt once they are constructed and start executing tasks, so the first thing they do after entering the runWorker() is to set state to 0(lock free), allowing them to interrupt.
Let’s look at the Worker’s constructor again:The addWork method executes to this:w = new Worker(firstTask); // Build a worker“, the constructor will be called to create a Worker object, state=-1, and the current task will be treated as firstTask, which will be executed first when running later.
The addWorker method above will call the worker.start method after the worker is successfully constructed. At this time, the run() method in the worker will be executed, which is also a way of delegation.
The run() method calls the runWorker(this) method, which actually performs the task:
RunWorker (this) method
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
/** * indicates that the current worker thread is allowed to interrupt, because the default state of new worker is -1, where the * worker class tryRelease() method is called, and state is set to 0. * interruptIfStarted() allows only state>=0 to call interrupt */
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
/ * * * lock, lock is to prevent concurrent, not just here but to when the shutDown () method when the thread is not interrupted, * for shutDown () before the interrupt threads will call tryLock method attempts to acquire the lock, acquiring a lock will interrupt * / success
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
If state>=STOP, and the thread interrupt flag is false * 2. If state
if ((runStateAtLeast(ctl.get(), STOP) ||/ / state > = STOP(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();// Interrupt the thread
try {
beforeExecute(wt, task);// The null method, which we can override to do something before executing a task, is often used for monitoring and counting thread pool operations
Throwable thrown = null;
try {
task.run();// Formally call run() to execute the task
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);// Call it after a task is executed, which is also an empty method. We can override it to do something after a task is executed}}finally {
task = null;// Set the task to empty, and the next loop will fetch the task from the workerQueue via the getTask() method
w.completedTasks++;// Number of tasks completed +1
w.unlock();
}
}
completedAbruptly = false;
} finally {
// The core thread blocks in the getTask() method, waiting for the thread, unless the core thread is set to be destroyed,
// Otherwise only non-core threads will normally execute this
processWorkerExit(w, completedAbruptly);// Destroy the thread}}Copy the code
The main steps are as follows:
- 1. Release the lock first, since threads are allowed to interrupt after entering this method
- First check whether the firstTask passed is empty. If it is not empty, the firstTask is executed first
- 3. If firstTask is empty, try getting the task from getTask(), which is getting the task from queue L
- 4. If the task is obtained, the execution starts. During the execution, the lock must be re-locked because the task cannot be interrupted
- 5, there is an empty method before and after the task runs, we can rewrite these two methods when necessary, to realize the monitoring of the thread pool
- 6. If the task is not available, the thread is destroyed with the processWorkerExit method
GetTask () method
private Runnable getTask(a) {
The default value is false for the first spin, true for the first spin, and null for the second spin
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/** * 1. If the thread pool is in the shutdown state, we must wait until the workQueue is empty, because shutdown() is * 2 that needs to execute the remaining tasks in the queue. If the thread pool is in the stop state, you do not need to care if there are tasks */ in the workQueue
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();// The number of threads in the thread pool is -1
return null;// If null is returned, the runWorker method will break out of the loop and execute processWorkerExit in finally to destroy the thread
}
int wc = workerCountOf(c);
// Are workers subject to culling?
/ / 1. AllowCoreThreadTimeOut - the default false, said the core number of threads will not timeout
//2. If the total number of threads is greater than the number of core threads, then some threads need to be destroyed
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/** * 1. If the number of threads exceeds maximumPoolSize, the thread pool may have been resized by a call to setMaximumPoolSize() at runtime, otherwise it will not exceed maximumPoolSize if addWorker() succeeds. * 2. Timed && timedOut If the timed && timedOut value is true, the current operation needs to be timed out and the task obtained from the blocking queue * timed out last time. This is the lifetime of the idle thread */
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// Wait for the specified time to return
workQueue.take();// Can't get the task will always block (e.g. core thread)
if(r ! =null)
return r;// If you get a task, return it to the worker for processing
timedOut = true;// Set it to true, and the second spin will return null
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
The main steps of this method are:
- If the status is correct, then return null and the number of threads is -1. If the status is null, then the previous method will break out of the while loop and execute the process of destroying the thread.
- 2. Determine if there is a timeout or the maximum number of threads exceeds the number of core threads
- 3. Decide whether to poll or take from the queue based on the above judgment. A poll method with a timeout returns null if the poll method has not been retrieved. The poll method is not busy, so the thread will be destroyed if null is returned. Case 2: If the take method is executed, according to the judgment in point 2, unless we set the core thread to be reclaimed artificially, the core thread will execute the take method. If it can’t get the task, it will block waiting until it gets the task.
ProcessWorkerExit method
This is the thread destruction method. The getTask() method above returns null, and the thread destruction method is executed. GetTask () already has the number of threads -1, so the thread destruction can be executed directly.The remove() method of the workers collection is called directly, followed by an attempt to abort and some compensation operations for abnormal conditions.
Rejection policies
The JDK provides the following rejection policies by default:
- AbortPolicy: Directly throws an exception. Default policy
- CallerRunsPolicy: Executes the task with the caller’s thread
- DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task
- DiscardPolicy: Discards tasks directly
We can also customize our own rejection strategy by implementing the RejectedExecutionHandler interface and overriding the only rejectedExecution method.
Common interview questions
Thread pools are a very popular interview question. Here are some common questions:
Problem a
Q: Why not use Executors directly to build thread pools?
A: By using Executors, we don’t have to worry about thread pool parameters, which could cause problems like newFixdThreadPool or newSingleThreadPool. The allowed queue length is integer. MAX_VALUE, which, if used incorrectly, will result in a large number of requests being queued and may result in OOM risk. NewCachedThreadPool allows the number of threads to be created as integer. MAX_VALUE. It can also cause the creation of a large number of threads with high CPU usage or OOM problems. If we were to construct a thread pool using ThreadPoolExecutor, we would be more careful to understand what each parameter in the thread pool construction means.
Question 2
Q: How to properly configure the size of the thread pool?
A: To properly configure thread pools, you must first analyze task characteristics, which can be analyzed from the following perspectives:
- The nature of the task: CPU intensive task, IO intensive task, and hybrid task.
- Task priority: high, medium and low.
- Task execution time: long, medium and short.
- Task dependencies: Whether they depend on other system resources, such as database connections.
CPU intensive: The CPU intensive task is characterized by fast response time and continuous CPU running. The CPU utilization of this task is high. Therefore, the number of threads should be determined based on the number of CPU cores. Too many threads can lead to on-off and off-off switching, which reduces efficiency. The maximum number of threads in the pool can be configured to be the number of CPU cores +1.
I/O intensive: The CPU is idle when I/O operations take a long time. In this case, you can increase the size of the thread pool. It can be judged based on the waiting time of the thread. The higher the waiting time, the more threads there are. Generally, you can configure twice the number of CPU cores. One formula: The optimal number of threads set by the thread pool = ((thread wait time set by the thread pool + thread CPU time)/thread CPU time) * number of cpus
Use runtime.getruntime (). AvailableProcessors ().
Question 3
Q: When is the core thread in the thread pool initialized?
A: By default, after A thread pool is created, there are no threads in the pool. The thread is created only after the task is submitted. In practice, if you want to create a thread immediately after the thread pool is created, there are two methods:
- PrestartCoreThread () : Initializes a core thread.
- PrestartAllCoreThreads () : Initializes all core threads
Problem four
Q: What if tasks are still running when the thread pool is closed?
A: There are two ways to close A thread pool:
- Shutdown () does not immediately terminate the thread pool until all tasks in the task cache queue have been executed, but it does not accept new tasks
- ShutdownNow () immediately terminates the thread pool and attempts to interrupt tasks in progress, and empties the task cache queue to return tasks that have not yet been executed
Problem five
Q: Can thread pool capacity be dynamically adjusted?
A: There are two ways to dynamically resize A thread pool.
- SetCorePoolSize () : Sets the maximum number of core threads
- SetMaximumPoolSize () : Sets the maximum number of worker threads
conclusion
This article starts with a sample of four common uses of a thread pool, and finally finds that the same class is called to construct a thread pool (ThreadPoolExecutor). Therefore, we start with the ThreadPoolExecutor constructor to analyze the seven parameters to build a thread pool. And from the execute() method to gradually analyze the use of thread pool principle, of course, in fact, there is a thread pool method submit() can also be used as an entrance.