The last article covered the first half of the thread pool. This article covers the second half of the thread pool, including source code analysis, design pattern analysis, and interview summaries.
For completeness, I have included the top half of this article as well, and you can read it directly from the source section
I would like to express my thanks to you all for writing a technical summary of Mysql. My technical article has become a little popular with more than 5000 readers, and the number of followers directly takes off:
I will not be Tencent to recommend, or there is a large recommendation, these days has been rising, there is no one to tell what is going on, ha ha ha, the article is recommended to everyone, read directly ignore it: Mysql end summary article (18W word to everyone), the end of the flower
In this episode, I’m going to talk about thread pools in detail. Thread pools are also frequently asked about in interviews at big factories.
Also, if you have a case study of thread pooling and tuning experience on your resume, it’s likely that your resume will be appreciated by a major employer.
Multithreading has always been a difficult bone to chew, but it is a very important piece of content. It’s hard because there’s a lot to think about in multithreaded development.
For example: how to ensure the consistency of your data in the case of multi-threaded concurrent write, will not be overwritten by other threads, many people will also consider using locks (distributed lock redisLock, stand-alone lock, optimistic lock, CAS) to ensure the consistency of data.
However, at the same time you should also consider QPS, not because of the lock, the overall throughput of the system will be greatly reduced, behind the pressure test can not pass, some of you are busy.
So, master multithreaded programming for writing code to improve is very big, test of things will be more than others, and there may also be some strange bugs, troubleshooting is very difficult, for people’s technical postgraduate exam is very big.
All right, let’s cut the crap and get straight to the dry stuff.
Introduction to thread pools
First, let’s talk about what a thread pool is.
Thread pooling is a form of multithreaded processing in which tasks are added to a queue and then automatically started after a thread is created. Thread pool threads are background threads. Each thread uses the default stack size, runs at the default priority, and is in a multithreaded cell. If a thread is idle in managed code (such as waiting for an event), the thread pool inserts another worker thread to keep all processors busy. If all thread pool threads are always busy, but the queue contains pending work, the thread pool will create another worker thread after some time but the number of threads will never exceed the maximum. Threads that exceed the maximum can be queued, but they do not start until other threads have finished.
In plain English, a thread pool is similar to a pool with many threads. When a task is finished, the thread can be directly taken from the pool to execute the task. When the task is finished, the thread can be returned to the thread pool, and the next time there is a task, the same will be executed.
The keyword “feed” is used because it is more reusable than a normal thread pool and does not need to be created again when it is used.
The explanation is straightforward in terms of the functionality of thread pools, and we’ve been exposed to other similar pooling techniques. For example: database connection pool, HttpClient connection pool, memory pool, instance pool, and so on.
We can see that there are many commonalities in these so-called pooling ideas: pre-allocation, recycling and reuse.
For example, the connection pool pre-applies for database connections and overuses connections. The memory pool pre-allocates memory to improve the efficiency of memory allocation and reduce memory fragmentation.
Finally, it is important to know the author of the thread pool. He is well known: Doug Lea, a concurrency programmer known to Java students.
Why use thread pools
So what uses thread pools? The answer is simple: thread pools have certain advantages over traditional threads created directly.
When we learned about the JVM, we talked about thread private and thread common areas. At least if a thread is created separately, it will be created separately for the thread (the virtual machine stack, the program counter, the local method stack), which will take up memory.
Therefore, the first advantage of thread pool is: it can control the server resources, should say reasonable allocation of server resources, not too high QPS, resulting in the allocation of server resources, resulting in the entire server crash.
Another advantage is thread reuse, since repeated creation and destruction of threads has an impact on performance, thread pooling can reduce resource consumption.
Finally, there is the advantage that is our ultimate goal: to optimize the system, in most cases, thread pools perform our tasks asynchronously instead of serialized operations, which reduces the response time of the system.
Therefore, one of the most common operations and scenarios for thread pools is to modify many serial query operations to be asynchronous, and then aggregate them in the service layer and return them to the front end to improve the response time of the system (there is no data dependency between the premise contexts).
So with all this talk about the advantages of thread pools, what are the disadvantages? There are many disadvantages. For example, if the number of thread pools is not properly configured, system resources may be exhausted and an OOM exception may occur.
In addition, the use of thread pools will also bring multi-threading problems, such as: data consistency, business complexity, complexity of testing (generally, the use of thread pools, combined with testing, continuous pressure test, and then observe the impact of memory and CPU changes).
Therefore, when we use thread pool, we must ask to use its benefit, avoid its harm, master the underlying principle of thread, for our more reasonable and effective use of thread pool is very helpful.
So what does it mean to be proficient in thread pools? Personally, I think I can master the following aspects:
- Technology points used (AQS, CAS, Reentrantlock)
- Common thread pool types
- Design patterns (producer-consumer patterns, strategic patterns)
- Design ideas (flexibility, scalability)
- Principle of execution
- Reading source code
- Thread pool tuning (thread count setting)
If you feel a little confused, you might not be able to answer all of these questions in an interview. The most common questions are: how thread pools work, how to tune thread pools, and this is where most technical bloggers write about them.
So, a thread pool can talk all morning, Doug Lea YYDS. Thread pools are divided into custom thread pools and the default thread pools. Let’s talk about custom thread pools first.
Custom thread pools
The custom thread pool in Java is implemented in the java.util.Concurrent ThreadPoolExecutor class as shown in the following diagram:
Executor: Executor is the top-level definition interface. It defines only a void execute(Runnable Command) method and requires parameters of type Runnable, that is, tasks to execute.
ExecutorService: ExecutorService is an Executor extension subinterface that provides methods for the lifecycle of a thread pool, such as: IsShutdown, isTerminated, shutdownNow, shutdown, in order to facilitate understanding, organize a concise source code, the specific source code is as follows:
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<? > submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }Copy the code
Two things I want to talk about here, the first one is that the difference between shutdown and shutdownNow is that shutdownNow is business lossy, so it stops the thread immediately, whether it’s finished or not, very simple.
Shutdown is an elegant way to stop a thread, waiting for it to finish executing a task if it is still running.
The Executor interface has a void Execute (Runnable Command) method for executing tasks, and the ExecutorService extends a Submit method for executing tasks.
The difference is that submit supports both Callable and Runnable, while execute supports only Runnable. If you have learned about threads, you can create basic threads with Callable and Runnable. The difference is that Callable can be combined with the Future to retrieve the return value of the thread.
As an ExecutorService subclass, AbstractExecutorService, the code is very simple:
public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }Copy the code
As you can see from the above code, the underlying implementation of Submit also relies on the execute method
ThreadPoolExecutor: This class is the key to implementing a custom thread pool, most importantly its constructor, as shown below:
Let’s examine the most complex constructor in detail:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {/ / check if illegal parameters (corePoolSize < 0 | | maximumPoolSize < = 0 | | maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); / / workQueue && threadFactory && handler can empty the if (workQueue = = null | | threadFactory = = null | | handler = = null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Copy the code
CorePoolSize: Indicates the number of core threads. Why is it called the number of core threads? Since there are also non-core threads, core threads + non-core threads = maximum number of threads (maximumPoolSize)
The number of core threads and the number of non-core threads, which you can think of as the number of regular employees and the number of outsourced employees of a company, the regular employees are resident employees (thread survival, reuse), while the regular employees are temporary external employees.
Normally task is less, the staff can complete the task, not external outsourcing employees, but there’s always a formal employee hands are full, now just temporary external outsourcing employees beyond the task, when the task is less, at this point, the outsourcing employees idle for a period of time will be hr quit, in order to is to save the cost, Flexible scalability in the number of threads.
Scenario is corresponding to the Internet, after empirical tests show that in most cases, there could be a short time of peak, QPS exploding, but the last time is very short, so in order to adapt to this kind of situation, designed the core and non-core threads, thread non-core thread more than idle time threshold, will be stopped.
In addition, the number of core threads in the thread pool is not the same as the number of threads in the corePoolSize, but the number of threads in the core pool is slowly created as the task increases. Unless it is called the prestartCoreThread/prestartAllCoreThreads, core thread will be started in advance.
We also said that core threads are not destroyed unless allowsCoreThreadTimeOut is called with true to allow idle core threads to be terminated.
However, the above two behaviors are generally not recommended, just to extend the method of exposure.
MaximumPoolSize: Specifies the maximum number of core and non-core threads in the thread pool. When the number of threads exceeds this maximum, the policy will be rejected.
KeepAliveTime: indicates the maximum idle time of an idle thread. If the time exceeds this threshold, the thread is reclaimed.
Unit: indicates the unit of idle time, for example, timeunit.seconds
WorkQueue: indicates the queue storing tasks. There are many different types of workqueues. Here are some of the most common ones:
- ArrayBlockingQueue: A bounded blocking queue implemented by arrays, whose size must be specified when initialized.
- LinkedBlockingQueue: An unbounded blocking queue implemented by a linked list. The default value is integer. MAX_VALUE, which can also be initialized to a specified size.
- DelayQueue: DelayQueue. Elements are fetched from the queue only if the delay period is satisfied.
- SynchronousQueue: a blocking queue that does not store elements. If an element is already present at the time of insertion, it blocks until the element is removed, and vice versa.
- LinkedBlockingDeque: is a two-way blocking queue made up of linked lists.
Let’s talk about the complete process of a thread pool. Let’s start with a flow chart. A complete thread pool execution process looks like this:
(1) When the number of core threads is not full, the number of core threads will be created to execute the task:
(2) When the number of core threads is full, the task will be placed in the task queue:
If the task queue is also full, the number of non-core threads will be created to execute the task, and the total number of core threads + non-core threads will be less than maximumPoolSize:
(4) Finally, if the total number of threads has reached maximumPoolSize, the policy will be rejected.
ThreadFactory: a threadFactory is a threadFactory that identifies a thread. It is recommended to give a name to the thread pool for easy tracing and verification.
private static final ThreadPoolExecutor pool; Static {// Define the name of the thread pool ThreadFactory ThreadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build(); pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512), threadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.allowCoreThreadTimeOut(true); }Copy the code
Handler: This is the rejection policy. There are four types of rejection policies, as follows:
- DiscardPolicy: Discards tasks without processing them or throwing exceptions, usually for irrelevant tasks.
- DiscardOldestPolicy: Discards the first, or oldest, task in the queue and attempts to execute a new task.
- CallerRunsPolicy: Processed by the caller thread.
- AbortPolicy: Throws an exception.
The source code can be seen in the ThreadPoolExecutor class:
Here directly threw an exception, the implementation is very simple, detailed source code, you can refer to the above class, the code is not difficult:
Default thread pool
So why does Ali specify that the default thread pool is not applicable, to use a custom thread pool, we use the source code to analyze.
There are four common default thread pools:
- Executors.newCachedThreadPool();
- Executors.newFixedThreadPool(n);
- Executors.newScheduledThreadPool(n);
- Executors.newSingleThreadExecutor();
The four most is our chat, the interview will be frequently asked, specific source can view the Java. Util. Concurrent. The Executors, this class with the other thread pool, in order to in-depth interpretation of this a few aspects behind the difference between a thread pool, I get the source code here:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
Copy the code
As you can see from these thread pools, the underlying implementation is to call the constructor of the custom thread pool ThreadPoolExecutor, but some of the parameters are already defined.
FixedThreadPool and SingleThreadPool allow the queue length to be integer. MAX_VALUE.
The maximum number of threads allowed by CachedThreadPool and newScheduledThreadPool is integer. MAX_VALUE, which means that they can create an unlimited number of threads, which can also cause resource exhaustion or OOM exceptions.
This is why Ali doesn’t recommend using these default thread pools:
The reason is as simple as reading the source code, so I recommend reading the source code for ThreadPoolExecutor.
Thread pool source code
ReentrantLock, AQS, AtomicInteger, CAS, ReentrantLock, AQS and CAS in my previous article have written corresponding original, in this number can be found here:
Let’s start our source code tour. First, we analyze the initial execute method layer by layer:
Public void execute(Runnable command) {// If the task is empty, throw a null pointer. If (command == null) throw new NullPointerException(); Int c = ctl.get(); int c = ctl.get(); // Step 1: If (workerCountOf(c) < corePoolSize) {if (workerCountOf(c) < corePoolSize) { If (addWorker(command, true)) return; c = ctl.get(); } // Step 2: The workQueue is BlockingQueue<Runnable> and is initialized when you new ThreadPoolExecutor (). This. WorkQueue = workQueue; If (isRunning(c) &&workqueue.offer (command)) {// if (isRunning(c) &&workqueue.offer (command)) { // Recheck the thread pool state because it is RUNNING since the last check. If it is not RUNNING, remove the task from the queue and execute the rejection policy if (! isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) else if (workerCountOf(recheck) == 0) else if (workerCountOf(recheck) == 0) else if (workerCountOf(recheck) == 0) The second argument here indicates whether core or non-core threads (true is core). This method will be examined in detail later. addWorker(null, false); } // Step 3: the queue is full, try to create a non-core thread to execute the task, the second argument is core and non-core (true is core) else if (! AddWorker (command, false)) reject(command); }Copy the code
Execute (); execute (); execute ();
Source code is mainly to execute three steps of logic, annotations I have given you marked the first, two, three steps, we can combine this flow chart together to see, in fact, it is very simple:
- Step 1: If the number of core threads does not exceed corePoolSize, the core thread is created to execute the task. Each thread corresponds to a worker, and workerCountOf is the number of workers.
- Step 2: The number of threads is greater than the number of core threads, check whether it is running and add the task to the workQueue. If the thread is not running, it should reomve off the queue and reject the thread. If the thread is running and the number of threads is equal to 0, addWorker is called to create a thread to execute, but the current task is passed in as null, that is, the task is not specified to execute because the task has already been added to the queue. The created thread retrieves the task from the queue and executes it
- Step 3: When the queue is full, try to create a non-core thread to execute the task. The second argument is core and non-core (true is core).
**reject(command)**
Source code is very simple, it is directly handler. RejectedExecution () method, the handler reject policy object is what we in the new ThreadPoolExecutor incoming parameters ().
Let’s review the four specific rejection strategies, as shown in the figure below:
Here to share a little skill, estimates that a lot of people will know, using the idea of development of students, you can use Alt + CTL + left mouse button. Then you will find the specific implementation class, rather than the point into the interface, because in many cases, an interface may have a lot of implementation class, specific also don’t know how to find, is to use this shortcut to find:
- AbortPolicy: AbortPolicy: AbortPolicy: AbortPolicy: AbortPolicy: AbortPolicy
We can see that the rejection policy implementation class is a static inner class of ThreadPoolExecutor.
Each implementation class implements this interface, RejectedExecutionHandler, and overwrites its rejectedExecution method.
Therefore, if we want to implement our own rejection policy, we can write a policy class and implement the RejectedExecutionHandler interface, then override the rejectedExecution method.
The rejectedExecution method has two parameters: Runnable r and ThreadPoolExecutor e. The Runnable type is the task to be executed. ThreadPoolExecutor is the current thread pool object
Finally, when the thread pool is initialized, we can implement the custom reject policy by passing in our own policy mode.
Do you feel very convenient, this is the charm of the design pattern, open to expansion, closed to modify, as long as the realization of a specified interface, can be in accordance with their own implementation.
- The second CallerRunsPolicy policy implementation source code:
It is used to determine whether the current task is closed. If it is not closed, the run method is called, which is executed directly by the caller thread and not handed over to the thread pool.
- DiscardOldestPolicy DiscardOldestPolicy DiscardOldestPolicy
It also checks if the task is shutdown, and then it takes the first task in the queue, which is the oldest task, throws it away, and then it tries to execute again, and so on.
Queue some common API, you can refer to the following figure, easy to read the source code:
- The last DiscardPolicy policy, look at his source code implementation:
Just an empty method that does nothing, plain and simple.
The most important method is addWorker, which means to create threads to perform tasks, including core or non-core threads.
The first parameter indicates the task the new thread should run first, and the second parameter indicates whether it is a core (true is the core thread).
For the first parameter, firstTask, the authors interpret it as follows:
the task the new thread should run first (or null if none). Workers are created with an initial first task (in method execute()) to bypass queuing when there are fewer than corePoolSize threads (in which case we always start one), or when the queue is full (in which case we must bypass queue). Initially idle threads are usually created via prestartCoreThread or to replace other dying workers
The task that the new thread should run first (null if none). When there are fewer threads than corePoolSize (in which case we always start a thread), or when the queue is full (in which case we have to bypass the queue), the initial first task (in method execute ()) is used to create a worker thread to bypass the queue. Initially free threads are typically created by pre-starting coreThreads or to replace other worker threads that are dying.
According to my understanding, a task corresponds to a thread of execution, either core or non-core, and the current task corresponds to the primary task of the new thread.
Here’s the addWorker source code:
Private Boolean addWorker(Runnable firstTask, Boolean core) {retry: for (;); { int c = ctl.get(); Int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); If the core of the incoming / / according to the core (core thread) cannot exceed corePoolSize, non-core cannot exceed maximumPoolSize if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / thread by means of the CAS number + 1, set up the success, exit the loop and implement the logic behind the if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); // If the thread pool state changes, retry and retry if (runStateOf(c)! = rs) continue retry; Boolean workerStarted = false; Boolean workerAdded = false; Boolean workerAdded = false; Worker w = null; Try {// Initializes the worker. When a new worker is executed, a new thread is generated via threadFactory generation. w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // Compare the current thread pool state, if less than SHUTDOWN, Is actually only in RUNNING or (equal to SHUTDOWN and firstTask when empty) meet the conditions of the if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {/ / Check whether the thread has been started. If the start method has not been executed, the thread has not been started. if (t.isAlive()) throw new IllegalThreadStateException(); Worker.add (w); worker.add (w); worker.add (w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {// start the thread to execute the task. workerStarted = true; }} finally {// thread start failure if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code
The state of a thread pool is obtained through the runStateOf method. Let’s see what the state looks like in the actual source code:
In the source code can see a total of five states of the thread pool, and in the comments explained each state to another state change respectively called what method, above have been detailed to help you write good comments.
As can be seen from the above source code, the following things are mainly done:
- Determine the number of cores or non-cores by core.
- Create the worker and create a new thread through threadFactory.
- Add a new thread to workers.
- Finally, the thread is started.
ReentrantLock is used to ensure thread safety, and Worker is used to wrap the created thread:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) {setState(-1); this.firstTask = firstTask; // Create a new thread using getThreadFactory, the threadFactory passed in when new ThreadPoolExecutor () is obtained. this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } // Whether the current thread is held by someone else = 0; } protected Boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected Boolean tryRelease(int unused) {setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); Void interruptIfStarted() {Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code
The Worker class inherits AbstractQueuedSynchronizer (AQS), and implements the Runnable interface, so AQS (exclusive) lock function it has, through the status in the AQS said whether the current thread running, And using isHeldExclusively (also known as the maintenance of the status value) to determine the exclusive state, indicating that the thread is performing a task, if the non-exclusive state, indicating that the thread is idle.
The runWorker(this) method is called in the run method. RunWorker (this) is called in the run method.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; Try {// Task is not empty or getTask can fetch tasks from the queue while (task! = null || (task = getTask()) ! = null) {// lock w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// Method executed before executing a task beforeExecute(wt, task); Throwable thrown = null; Try {// Execute task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; ProcessWorkerExit (w, completedAbruptly); processWorkerExit(w, completedAbruptly); }}Copy the code
As you can see from the source code above, runWorker does several things:
- Get tasks from worker or constantly loop getTask get tasks from queue.
- And finally, the mission.
Let’s look at getTask’s source code:
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); AllowCoreThreadTimeOut Allows core threads to recycle when they are idle. By default, no recycle is allowed. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try {Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code
Its main core code is the workqueue.take () line, which fetches tasks from the workQueue through a for loop and returns them. This is the single most important function.
Here the basic thread pool source has been finished, have been over again, we can refer to my own detailed look again source code.
Configure the number of threads
Interviewer: Do you know how to set the number of threads in a thread pool? I’m sure a lot of developers have been asked about this in their interviews, and I’ve seen a lot of technical numbers that say this.
In the actual development, the formula for setting reasonable thread number parameters and reference is mainly divided into IO intensive and CPU intensive:
- Cpu-intensive tasks: Because the CPU is frequently occupied, the number of threads should be kept as small as possible. The formula for this type of setup is as follows: Ncpu+1, the Ncpue can be obtained by runtime.getruntime ().availableProcessors(), representing the physical number of cpus.
- IO intensive task: Because the task is not executed all the time, you can set a larger number of threads for this task. The formula is 2 x Ncpu.
However, IN my opinion, this formula is not the most suitable. We usually combine pressure test to set up a large number of threads in advance, and then pressure test, and use a monitoring dog to monitor CPU and memory changes to modify the number of threads.
Thread pool design pattern
One of the more obvious design patterns in thread pools is the policy pattern (reject policy), along with the Factory pattern threadFactory and producer-consumer patterns.
Policy mode: The RejectedExecutionHandler interface is used to implement the rejectedExecution method. The RejectedExecutionHandler interface can be used to override the rejectedExecution method for different policies.
Factory pattern threadFactory: This is too simple to go into detail here.
Producer-consumer pattern: The producer is the production task, using the execute method to create a new thread and add a new task:
public void execute(Runnable command) { ... if (isRunning(c) && workQueue.offer(command)) { isRunning() int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }... }Copy the code
Consumers take tasks out of the queue for consumption through the getTask continuous cycle, wherein the task queue is the intermediary between producers and consumers. Producers plug tasks into the queue, and consumers consume tasks from the queue:
private Runnable getTask() { for (;;) {... Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; . }}Copy the code
Thread pool interview
Finally, I will talk about the thread pool interview, but most of them have been covered in the above article, mainly including the following types of interview questions:
- What do you know about thread pools?
- Have you ever used thread pools? What scenarios are used?
- Do you know how thread pools work?
- Are you familiar with thread pool rejection policies?
- How is the thread count set in the actual use of the thread pool?
- Any experience with actual thread pool tuning?
In fact, it is probably these categories, basically explained above, tuning experience is nothing more than to make the thread pool operation more reasonable, including the number of threads set, the choice of rejection policy, and the thread pool business split.
Because some projects are basically a thread pool, problems can arise, such as a thread pool in which fast and slow tasks are stacked on top of each other. Slow tasks can seriously affect the efficiency of the thread pool.
Therefore, it is generally recommended that different services be divided into different thread pools.
Ok, that’s all for today’s full thread pool explanation. I’m Ledu, and we’ll see you next time. If you found this article helpful, please give it a thumbs up or triple click.