In Java development, often need to create a thread to perform some task, achieve rise very convenient also, but if the concurrent number of threads a lot, and every thread is executing a short mission was over, so frequently create a thread will reduce the efficiency of the system, because of the frequent needs time to create a thread and destroying threads. At this point, it is natural to use thread pools to solve this problem.

Benefits of using thread pools:

Reduce resource consumption. All pooling techniques in Java have the benefit of reducing system resource consumption by reusing objects in the pool. Imagine if we had n subtasks to execute. If we created a thread for each subtask, the process of creating a thread would be costly to the system and would eventually slow down the entire system. Through the thread pool, we can reuse threads. There are multiple tasks, but the threads executing tasks can be reused through the thread pool, which reduces the overhead of creating threads and improves the utilization rate of system resources.

Reduce the difficulty of managing threads. Thread management is one of the most problematic in a multi-threaded environment, and thread pools make it easier for us to manage threads through the framework. We no longer have to worry about when to destroy threads and how to minimize multithreaded resource competition. The thread pool does all of that for us.

Improve task processing speed. The thread pool hosts a certain number of live threads for a long time. When a task needs to be executed, we do not have to create a thread first. The thread pool chooses to use the existing live threads to handle the task.

Obviously, one notable feature of a thread pool is that it “hosts a certain number of live threads for a long time”, avoiding the overhead of frequent thread creation and thread destruction. How does this work? We know that as soon as a thread has finished executing the code in the run() method, the thread is done and is waiting for destruction. Since this is a “live thread”, it cannot be destroyed quickly. To find out how this “live thread” works, let’s trace the source code to see if we can solve this puzzle.

You can create a variety of thread pools by following the factory Executors:

public static ExecutorService newFixedThreadPool(int var0) { return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService(new  ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newScheduledThreadPool(int var0) { return new ScheduledThreadPoolExecutor(var0); }Copy the code

Any type of thread pool is ultimately implemented, either directly or indirectly, through the ThreadPoolExecutor class. ThreadPoolExecutor, on the other hand, has multiple constructors, all of which end up calling a constructor with seven arguments.

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

1) corePoolSize

As the name implies, this refers to the number of core threads. When a task is submitted to the thread pool, a core thread is created to execute the task, even if other idle core threads are able to execute new tasks, and is not created until the number of tasks that need to be executed exceeds the number of core threads in the thread pool. When the number of core threads is equal to the maximum number of core threads allowed by the thread pool, no new core threads will be created if new tasks come in.

If you want to create and start all core threads ahead of time, call the prestartAllCoreThreads() method of the thread pool.

(2) maximumPoolSize

As the name implies, this refers to the maximum number of threads allowed to be created by the thread pool. If the queue is full and the number of threads created is less than the maximum, the thread pool creates a new thread to execute the task. So this parameter only makes sense if the queue is full. So when you use unbounded task queues, this parameter has no effect.

(3) keepAliveTime

As the name implies, this refers to the thread active hold time, which is the time that a thread pool’s worker threads remain alive after they become idle. So, if a task many, and each task execution time is shorter, can adjust time, improve the utilization rate of threads, thread or just performed a task, could deal with the next task, the thread is terminated, and need a thread to create again, after just created the mission soon, don’t have much time and termination, will lead to waste of resources.

Note: This refers to threads outside the core thread pool. You can also set allowCoreThreadTimeout = true to give the threads in the core thread pool some time to live.

(4) TimeUnit

As the name suggests, it refers to the units of hold time in which a thread is active: optional units are DAYS, HOURS, MINUTES, MILLISECONDS, MICROSECONDS, and NANOSECONDS.

(5) workQueue

As the name suggests, it refers to a task queue: a blocking queue used to hold tasks waiting to be executed.

6 threadFactory

As the name implies, it refers to the factory where the threads are created: you can give each created thread a more meaningful name using the thread factory.

All landowners RejectedExecutionHandler

As the name implies, it refers to the rejection of execution of the program, which can be understood as a saturation policy: when both the queue and the thread pool are full, the thread pool is saturated, and a policy must be adopted to handle new submitted tasks. This policy is AbortPolicy by default, indicating that an exception is thrown when a new task cannot be processed. The Java thread pool framework provides the following four strategies in JDK1.5.

AbortPolicy: an exception is thrown directly RejectedExecutionException.

CallerRunsPolicy: Only the caller’s thread runs the task, that is, the thread calling the execute method executes the task.

DiscardOldestPolicy: Discards the most recent task in the queue and executes the current task.

DiscardPolicy: Not processing, discarding, that is, discarding without throwing an exception.

These seven parameters together determine the thread pool’s strategy for executing a task:

When a task is added to the thread pool:

  1. If the number of threads does not reach corePoolSize, a new thread (core thread) is created to execute the task

  2. When the number of threads reaches corePools, the task is moved to the queue to wait

  3. The queue is full and a new thread (non-core thread) executes the task

  4. If the queue is full and the total number of threads reaches maximumPoolSize, an exception will be thrown by the previous Sunday (RejectedExecutionHandler)

In other words, the core thread is used first. When the core thread runs out, the new one joins the queue. Once the queue is full, the non-core thread can only start executing.

The above strategy will be reflected when you read the code, and the implementation principle of reusing idle threads can be seen in the code.

Let’s look at the entry point of the thread pool task.

A thread pool can accept tasks of type Runnable and Callable, corresponding to the execute and Submit methods, respectively. Currently, we only analyze the execute process.

On the source code:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState  and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {// If (addWorker(command, true)) c = ctl.get(); } if (isRunning(c) &&workqueue.offer (command)) {// Recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); // Start non-core thread execution, note that the task is null, in fact, it will fetch the task queue execution} else if (! AddWorker (command, false))// When the queue is full, start the non-core thread reject(command). // If non-core thread execution cannot be started, the maximum number of threads has been reached, and an exception will be thrown with the 7th argument}Copy the code

The code is small and consists of three steps. There are two static methods that are often used to determine the state of the thread pool and the number of valid threads:

Private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; }Copy the code

To summarize, the execution logic of execute is:

  • If the number of active threads is less than the specified number of core threads, a thread is created and started to execute the newly submitted task (the newly created thread is equivalent to the core thread);

  • If the current number of active threads >= the specified number of core threads and the cache queue is not full, the task is added to the cache queue;

  • If the current number of active threads >= the specified number of core threads and the cache queue is full, a thread is created and started to execute the newly submitted task (the newly created thread is equivalent to a non-core thread);

As you can see from the code, even if the number of active threads is less than the specified number of core threads, a new thread will be started to perform the task. That is, no threads are demultiplexed. We don’t see thread reuse in the execute method, so let’s move on to the addWorker method.

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) ! = rs) continue retry; // else CAS failed due to workerCount change; Retry inner loop}} Boolean workerStarted = false; Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // Create a new Worker object that contains the tasks to be executed, and create a new Thread. Final Thread t = w.htread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new  IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // Start the thread in the newly created worker object and execute workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

Although the method is a bit long, we only consider two key points. First, we create a worker object, and then execute the startup of the thread of the worker object after judging the status of the thread pool successfully. That is, a thread associated with the worker is started in this method, but how does this thread execute the runnable task we passed in? Let’s take a look at what the Worker object does.

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected  boolean isHeldExclusively() { return getState() ! = 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

The most important construction method:

Worker(Runnable firstTask) {// Worker implements Runnable setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // Create a thread object and give the thread its own runnable object. Once the thread executes the start method, The worker's run method this.thread = getThreadFactory().newthread (this); } public void run() {runWorker(this); public void run() {runWorker(this); } The run method in turn executes ThreadPoolExecutor's runWorker method, passing in the current worker object. final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; // Retrieve worker's runnable task w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; If the first task is false, the task is null. When is the task null? // either w.task is null, remember we passed runnable null to addWorker in step 2 of execute method? // Either a while loop is executed, with task=null in finally below; // Or execute the second judgment, and if it is not empty, the code in the loop continues. while (task ! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) {thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; W.com pletedTasks++; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

The method is quite long, summed up in three steps:

1. Fetch runnable from worker (this object may be null, as explained in the comment);

2. Enter the while loop to judge whether the runnable in the current worker or the runnable obtained by getTask is empty. If not, run will be executed.

3. Set the runnable task to NULL.

If we disregard the second judgment of the while loop in this method, the current worker’s run completes after executing the runWorker methods sequentially when our thread starts.

Now that it’s done, the thread is useless, waiting for the virtual machine to destroy. So let’s review our goal: How are threads in a Java thread pool reused? It doesn’t seem like it’s being reused, you create a new thread, you perform a task, and then you’re done, destroyed. Nothing special. Is there something missing or overlooked?

Review the second judgment of the while loop in this method (Task = getTask)! =null

The trick is in the getTask method.

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // The timed variable is used to determine whether timeout control is required. // allowCoreThreadTimeOut is false by default, meaning that core threads are not allowed to timeout; // wc > corePoolSize, indicating that the number of threads in the current thread pool is greater than the number of core threads; // Are workers subject to culling? // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // If timeout control is required and a timeout occurred the last time the task was fetched from the cache queue (timedOut starts false and is set to true at the end of the loop) // or the current number of threads has exceeded the maximum number of threads, So try reducing workerCount, the number of active threads, by 1, If ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {/ / if minus 1 is successful, it returns null, This means runWorker () method of the while loop will be quit, the corresponding thread will be destroyed, is less a thread in the thread pool if (compareAndDecrementWorkerCount (c)) return null; continue; {poll() {poll() {poll();} If the queue is empty, the task is blocked until the object can be fetched. Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) {timedOut = false; }}}Copy the code

As the comment makes clear, getTask does just that, in the current thread:

1. If the number of threads in the current thread pool is greater than the number of core threads or timeout control is set for core threads (which is equivalent to timeout control for all threads at this time), it will go to the task queue to obtain the tasks within the timeout time (poll method of the queue), and the task will continue to execute if it is obtained. That is, the run method that executes the task in the while loop of the runWorker method, and when it’s done, it continues to get the next task from the getTask task queue. If no task is obtained within the timeOut period, it will go to the third from last line of getTask and set the timeOut flag to true. Then it will continue to enter getTask’s for loop. Since the timeOut period expires, it will attempt to operate on thread number -1. This returns to the runWorker method executed by the current thread. The method’s while loop determines that getTask is empty and exits the loop, so the current thread is finished executing, meaning it is about to be destroyed, which is naturally recycled by the collector. There is one less thread in the thread pool. So as long as there are more threads in the thread pool than there are core threads (or if the core thread also allows timeouts), these superfluous threads will be destroyed one by one.

Core 2, if the current active thread number less than or equal to the number of threads (or doesn’t allow the core thread timeout), also take the task to cache the queue, but when no task buffer queue, would be the blocking state (queue take method), until it can remove the task (that is, the queue was the new added tasks). Therefore, the thread is blocked and will not be destroyed because there are no more tasks in the cache queue. This ensures that N threads in the pool are alive and ready to handle tasks, thus achieving reuse.

In summary, the reason why threads can achieve reuse is that there is a while loop in the runWorker method executed by the current thread. The first judgment condition of the while loop is to execute the task in the Worker object associated with the current thread, and the second judgment condition getTask() of the while loop is entered after one round of execution. Fetching a task from the task queue is either always blocked, or it is blocked for a certain period of time until timeout, when the thread will come to the end of its life.

However, when we start to analyze execute, the three parts of this method will call addWorker to execute the task, and the addWorker method will create a new thread to execute the task. In this case, is it necessary to create a thread every time execute? Multiplexing mechanism, in fact, have very big concern with blocking queue of the thread pool, we can see that in the execute in core thread is full, but the queue discontent will add tasks to the queue, once joined, blocked threads will be awakened before to carry out the new task, so as not to create a thread.

Let’s use an example:

Suppose we have a ThreadPoolExecutor with a core thread count of 5 (core threads are not allowed to timeout), a maximum of 10 threads, a timeout of 20 seconds, and a queue of LinkedBlockingDeque (equivalent to an unbounded queue).

When we add tasks to this thread pool one after another, the first step of execute method we analyzed before will be executed, and 5 threads will be successively created as core threads to execute tasks. After the execution of the 5 associated tasks in the current thread is completed, The second judgment getTask of the respective while loop will fetch the task in the queue. Assuming no new tasks are currently coming in, i.e. no execute method is executed, the five threads will remain blocked at workqueue.take (). At this point, we execute to join a task, the sixth task, which will enter the second part of execute and queue the task. Once the task is queued, one of the five threads that blocked before will wake up to take out the new task and execute it. The second half of execute executes the duplicate verification (addWorker).

In our example, since the queue is unbounded, we never get to the third part of execute, which is to start non-core threads. If we had set the queue to bounded, we would certainly get to that point.

summary

Through the above analysis, it should be a clear answer to the question “how the core threads in the thread pool are reused”, and also have a further understanding of the implementation mechanism of the thread pool:

When there is a new task to first take a look at the current number of threads to have more than the core number of threads, if not more than simply create a thread to perform the new task, if more than just see if the cache queue is full, it will be a new task in the cache without full queue, full of create a thread to perform the new task, If the number of threads in the thread pool has reached the specified maximum number of threads, the task is rejected according to the appropriate policy.

When all tasks in the cache queue are completed, if the number of threads in the thread pool is greater than the number of core threads, the additional threads are destroyed until the number of threads in the thread pool equals the number of core threads. Instead of being destroyed, these threads remain blocked, waiting for new tasks to arrive.

Note: This thread “core” and “non-core” thread is a virtual concept, is to facilitate the description and the concept of virtual out in the code and no thread is marked as “core thread” or “non-core” thread, all threads are the same, just when the threads in the pool than in the core of the specified number of threads, extra thread will be destroyed, Only a specified number of threads are kept in the pool. The threads that are destroyed are random and may be the first thread created, the last thread created, or any other time. At first I thought that some threads would be marked as “core threads” and others as “non-core threads”, and that only those “non-core threads” would be destroyed when additional threads were destroyed, leaving the “core threads” intact. This understanding is wrong.

The original link: blog.csdn.net/anhenzhufen…

Copyright notice: This article is originally published BY CSDN blogger “Running _ Running”. It is subject to CC 4.0 BY-SA copyright agreement. Please attach the link of original source and this statement.

Recent hot articles recommended:

1.1,000+ Java Interview Questions and Answers (2021)

2. I finally got the IntelliJ IDEA activation code thanks to the open source project. How sweet!

3. Ali Mock is officially open source, killing all Mock tools on the market!

4.Spring Cloud 2020.0.0 is officially released, a new and disruptive version!

5. “Java Development Manual (Songshan version)” the latest release, quick download!

Feel good, don’t forget to click on + forward oh!