preface
Thread pool series:
The thread return value of a Java thread pool
Java thread pool is one of the basics that must be mastered for interview and work. Using thread pool can better plan application CPU usage and improve application running smoothness. This article will explore the application and principle of thread pool. Through this article, you will learn:
1, why do we need a thread pool 2, implement a simple thread pool 3, thread pool principle 4, summary
1. Why do we need thread pools
Noun origin
A pool, as the name suggests, is a place to hold a pile of things. Take things from the pool when you need them, and put them in the pool when you don’t need them. You can see that storage and access are very convenient.
Thread pool analogy
When we get the vaccine, if we want to get the vaccine when we go to the hospital, the doctor needs to put on protective clothing, wear gloves, prepare the vaccine, prepare the computer to record information and so on.
Inoculant 1 arrives at the hospital, the doctor makes preparations, finally gives inoculant 1 a dozen vaccines, and dismounts the equipment. At this point, inoculator 2 also arrives at the hospital, and the doctor needs to change the equipment again. It can be seen that the preparation work of the doctors is time-consuming and laborious. If the vaccine is administered at a fixed time, the doctors do not need to change equipment frequently, which not only saves the time of the doctors, but also shortens the waiting time of the recipients. The diagram below:
Why do we need thread pools
As you can see from the example above, because the preparation of the doctor is laborious, try to concentrate on playing for a period of time before changing equipment. The same is true for threads in computers:
1. Context switching is required for thread switching, which involves system call and occupies system resources. 2. Thread switching takes time and can only be done after successfully creating a thread. 3. Threads cannot be managed effectively after they are started.
For these reasons, thread pools need to be introduced.
2. Implement simple thread pools yourself
A simple Demo
private void createThread() { Thread thread = new Thread(() -> { System.out.println("thread running..." ); }); thread.start(); }Copy the code
The thread completes execution and terminates. Now we want it not to terminate:
private void createThread() {
Thread thread = new Thread(() -> {
while (true) {
System.out.println("thread running...");
}
});
thread.start();
}
Copy the code
If the thread is running and the external wants to submit the Runnable to it, it needs to have a shared variable. Select the queue as the shared variable:
class ThreadPool { public static void main(String args[]) { ThreadPool threadPool = new ThreadPool(); threadPool.createThread(); threadPool.startRunnable(); } BlockingQueue<Runnable> shareQueue = new LinkedBlockingQueue<>(); private void createThread() { Thread thread = new Thread(() -> { while (true) { try { Runnable targetRunnable = shareQueue.take(); targetRunnable.run(); } catch (InterruptedException e) { e.printStackTrace(); }}}); thread.start(); } public void startRunnable() { for (int i = 0; i < 10; i++) { int finalI = i; Runnable = new Runnable() {@override public void run() {system.out.println (" run Runnable "+ finalI); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }}}; System.out.println(" put in queue runnable "+ I); shareQueue.offer(runnable); }}}Copy the code
Only one thread is started in the Demo above, and that thread is an infinite loop. The thread body fetches the Runnable from the shared queue shareQueue, executes it if it has elements, and blocks it if it doesn’t. An external caller can place a Runnable on a shared queue and wait for the thread to execute the Runnable. In this way, only one thread is running, but different tasks can be performed, and the situation that each task needs to be opened for execution is avoided.
Thread pool principle
Basic composition of a thread pool
The thread pool is implemented, but it is a beggar version, which clearly shows many defects:
1. The thread keeps running and cannot be stopped. 2. Only one thread is executing the task. Other tasks need to be queued. 3. Queues expand indefinitely, consuming memory. 4. Other disadvantages…
As a general tool library, take a look at how Java thread pools are implemented. The core of the thread pool revolves around a single atomic variable: CTL
# threadPoolExecutor.java // initialization state: Private Final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); Private static final int COUNT_BITS = integer.size - 3; private static final int COUNT_BITS = integer.size - 3; private static final int COUNT_BITS = integer.size - 3; Private static final Int CAPACITY = (1<< COUNT_BITS) - 1; private static final int CAPACITY = (1<< COUNT_BITS) - 1; Private static final int RUNNING = -1 << COUNT_BITS; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } / / will state and the number of threads are stored in an int in the private static int ctlOf (int the rs, int wc) {return rs | wc. }Copy the code
The reason why we need 29 bits to represent the number of threads is because there are five thread pool states and it takes three bits to distinguish between them.
Thread pools perform tasks
ThreadPoolExecutor is the core thread pool class that implements the Executor interface and overwrites the Execute (Runnable) method. When external caller need when performing the task, the thread pool just call ThreadPoolExecutor. Execute (Runnable) method, a thread in the thread pool in the body will implement the Runnable, also is to perform a task.
#ThreadPoolExecutor.java public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {// The new task will create a core thread to execute it. true)) return; c = ctl.get(); If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); // Check again if the thread pool is running if (! IsRunning (recheck) && remove(command)) // If the task is not running, it is removed from the queue. Reject (command) if the task is successfully removed. If (workerCountOf(recheck) == 0) else if (workerCountOf(recheck) == 0) So // the first argument here is null addWorker(null, false); } // If the queue fails, try to create a non-core thread to execute the task. Else if (! AddWorker (command, false)) // Execute reject(command); }Copy the code
It can be seen that the value of CTL is judged several times, because there may be multiple threads operating on the value of CTL at the same time, and THE TYPE of CTL is AtomicInteger. The underlying CAS is used, and we know that CAS is unlocked, so we need to judge its status in a loop. Thread pools rely on CAS in many ways, so understanding CAS is a great way to understand thread pools. CAS applications and principles: Java Unsafe, CAS, and LockSupport
The above code is shown in a flowchart as follows:
A few concepts of thread pools
The above process involves several concepts:
1. Core threads, number of core threads 2. Non-core threads 3
Many of the computer world’s analogies can be found in the real world. Again, take vaccinations:
1. There are only 4 doctors at a vaccination site who can make vaccines. These 4 doctors can be likened to 4 threads, because they are resident at the vaccination site, so the core doctors –> the core threads. 2. 4 doctors can only vaccinate 4 people at the same time. When more than 4 people come to vaccinate at the same time, they need to queue up in order, which is called waiting queue. 3. One day, there were a lot of people waiting to be vaccinated at the vaccination site. The queue was so long that it could not go any longer, so doctors from other places were needed to support them. These rushed doctors are not resident here, they will be back after the completion of support, so the analogy is: non-core threads. 4. Core threads + non-core threads == maximum number of threads (similar to the maximum number of doctors in the inoculation site). 5. Of course, the resources of doctors in some vaccination sites are relatively tight. The queue is too long and there are no doctors from other places to support them, so they can’t finish the whole day. Even with the support of other doctors, there are still too many people waiting to be vaccinated, which exceeds the vaccination capacity of the vaccination site. At this time, new people waiting to be vaccinated cannot be vaccinated and will still be rejected.
What is the difference between core threads and non-core threads?
1. When the number of core threads does not reach the maximum, new core threads will be created to execute new tasks when new tasks arrive. 2. The core thread is resident in the thread pool, waiting to execute new tasks unless timeout destruction is set. 3. After a non-core thread completes execution, regardless of whether timeout destruction is set, as long as there are no tasks to execute, the thread will exit execution.
In fact, thread pool plays down the concept of thread itself, and only cares whether the task is executed, not which thread is executing the task. Therefore, the main difference between core and non-core threads is whether they are resident in the thread pool.
Core threads, wait queues and non-core threads execute tasks in the following order:
Thread pools manage threads
One important method mentioned above is addWorker(xx).
#ThreadPoolExecutor. Java private Boolean addWorker(Runnable firstTask, Boolean core) { C/c + + with more retry: / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- (1) for (;;) Int c = ctl.get(); Int rs = runStateOf(c); If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) Int wc = workerCountOf(c); // The number of core threads that need to be started is already full (other threads successfully added tasks to the core thread) // Or the number of non-core threads that need to be started exceeds the total number of threads // Will not be able to add tasks to the thread pool if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / CAS modify CTL, increase the number of threads counts / / if you don't succeed (changed) by others, continued to circle the if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); // re-read CTL // If the state has changed, retry if (runStateOf(c)! = rs) // Continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; Try {// Create Worker, add task to Worker, create Thread (new Thread) w = new Worker(firstTask); final Thread t = w.thread; if (t ! Final ReentrantLock mainLock = this.mainLock; //-----------------(2) // lock mainlock. lock(); Int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); // Add worker to worker. add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; WorkerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } // Return workerStarted; }Copy the code
This method takes two arguments, the first being Runnable and the second indicating whether the thread to be added is a core thread. The above highlights two important points: (1) Why do we need an infinite loop? Since the CTL variable may be modified by multiple threads, and the CTL is AtomicInteger and the underlying is CAS, it needs to be judged several times until the condition is met.
(2) Why do we need locks? Because the worker declares:
private final HashSet<Worker> workers = new HashSet<>();
Copy the code
This collection can be operated by multiple threads, so its add and remove needs to be locked before it can be safely operated. Worker inherits AQS and uses AQS to implement its own set of locks (similar to ReentrantLock).
The main information is as follows:
Thread objects are stored in workers, and the thread pool controls the number of core/non-core threads by managing workers.
Thread pool management tasks
Now that the thread has been created, let’s see how to execute the task. Worker implements the Runnable interface, so the run() method must be overridden. When constructing a Thread object, the Runnable of Worker as Thread is passed in. Therefore, the Runnable actually executed after thread.start () is the run() method in the Worker, which calls runWorker(xx). Take a look at the source code:
# threadpoolExecutor.java final void runWorker(Worker w) {Thread wt = thread.currentThread (); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; ------------>(1) while (task! = null || (task = getTask()) ! = null) {//worker locks w.lock(); ------------>(2) if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! Wt.isinterrupted ()) // Interrupt the thread wt.interrupt() if the pool has been interrupted and the thread has not been interrupted; Try {// Callback before task execution (hook) beforeExecute(wt, task); Throwable thrown = null; Try {// Actually execute task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// afterExecute(task, thrown); }} finally {// null task = null; W.com pletedTasks++; // release lock w.nlock (); } } completedAbruptly = false; } finally {// The thread exits to execute processWorkerExit(w, completedAbruptly); }}Copy the code
(1) In the original thread pool Demo, we demonstrated how to keep threads running all the time, and there are two criteria:
1. Whether the first task associated with the current worker exists, if so, take it out and run it. This judgment corresponds to the new core/non-core thread (not in the queue) scenario. In other words, when the thread is started, the associated task is taken out and executed. 2. When the thread is not associated with the first task, this judgment corresponds to the thread is not executed for the first time or the thread is executed for the first time (there is a task in the queue), then the task is removed from the queue for execution.
GetTask () does the following:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? For (;;) { int c = ctl.get(); Int rs = runStateOf(c); / / thread pool close or no task is waiting for the if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {/ / reduce thread count decrementWorkerCount (); return null; } int wc = workerCountOf(c); AllowCoreThreadTimeOut Specifies whether to allow timeout to close the core thread. //2. Check whether the number of threads is greater than the number of core threads (that is, the number of non-core threads enabled) Boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; / / there are four kinds of combination judgment / / timeout refers to queue elements have occurred if overtime ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | WorkQueue. IsEmpty ())) {/ / if a timeout occurs, or the queue is empty, so try to reduce the thread count / / modify counting is successful, it returns null if (compareAndDecrementWorkerCount (c)) return null; continue; Runnable r = timed? Runnable r = timed? Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; TimedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code
As can be seen:
1. With no timeout set, the core thread will block in blockingQueue.take () until a new task is added to the thread pool to request execution. This is why core threads can be resident in the thread pool. 2. When the number of threads exceeds the number of core threads, timed=true, namely, the thread sets timeout when acquiring queue elements. If no element is obtained after timeout, the thread will exit from execution. This is why non-core threads cannot reside in the thread pool.
(2) Why is the lock needed here? As mentioned earlier, the wokers collection requires a lock: mainLock, which is intended to be safe for multithreading to add/remove collections. Each woker can only be operated by one thread at the same time, so there seems to be no need for locking. In fact, the more important functions of Worker lock are as follows:
Used to subtly determine if a thread is running (busy). The runnable.run () method is locked when a task is executed and released when it is finished. So when Woker is judged not to have acquired the lock, it > is waiting to acquire the element of the queue, at which point it is idle.
Whether a thread is free or not can be used to provide a basis for external interrupt thread pool execution.
4, summarize
At this point, we understand the core advantage of thread pools: threads are not created repeatedly as often. The blocking queue feature makes the thread permanent, as shown in the graph below:
For space reasons, the rest of the thread pool shutdown, some important runtime states of the thread pool, and some simple ways to create a thread pool will be analyzed in the next article. Stay tuned!
If the demo code helps, give Github a thumbs up