This article analyzes the underlying source code of the ThreadPoolExecutor thread pool. How does the thread pool contribute to thread reuse and maintain our thread tasks? Let’s get straight to the point:

First, take a look at the source code for the ThreadPoolExecutor class

1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) {/ / rejection policies 8 if (corePoolSize < 0 | | 9 maximumPoolSize < = 0 | | 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.acc = System.getSecurityManager() == null ? 16 null : 17 AccessController.getContext(); This. corePoolSize = corePoolSize; This. MaximumPoolSize = maximumPoolSize; This. WorkQueue = workQueue; KeepAliveTime = unit.tonanos (keepAliveTime); 26 this.threadFactory = threadFactory; This. handler = handler; 29}Copy the code

This is our the parameters of the thread pool instantiation, actually the biggest practical, is the core and set the maximum number of threads, thread the completely rely on personal experience, does not have a real sense of the formula can be applied all the business scenario, bloggers here for everyone to find a article about set the number of threads:

Tech.meituan.com/2020/04/02/…

After our thread pool is initialized, we ourselves will call the excute method to make the pool run our thread tasks. Let’s look at the implementation of this method:

1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Step 1: Whether the worker thread is smaller than the number of core threads. If it is added to work, worker is actually a thread, but its internal operation is our upload task. Step 2: If it is larger than the number of core threads, it will be added to the worker queue. The implementation method of offer in each different queue is also different. Today, we will mainly discuss this 7 * step 3: Int c = ctl.get(); int c = ctl.get(); 10 if (workerCountOf(c) < corePoolSize) { 11 if (addWorker(command, true)) 12 return; 13 c = ctl.get(); 14 } 15 if (isRunning(c) && workQueue.offer(command)) { 16 int recheck = ctl.get(); 17 if (! isRunning(recheck) && remove(command)) 18 reject(command); 19 else if (workerCountOf(recheck) == 0) 20 addWorker(null, false); 21 } 22 else if (! addWorker(command, false)) 23 reject(command); 24} 25Copy the code

We see that addWorker is executed when the task is called, so what is a worker? Let’s look at an example of its construction: when we look at the Worker class, worker is also a thread

1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable 4 { 5 6 ...... 7 8 Worker(Runnable firstTask) { 9 setState(-1); // inhibit interrupts until runWorker 10 this.firstTask = firstTask; 11 this.thread = getThreadFactory().newThread(this); Public void run() {17 runWorker(this); 18} 19... 20 21}Copy the code

This time let’s look at how addworker works:

1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runStateOf(c); 6 7 // Check if queue empty only if necessary. 8 if (rs >= SHUTDOWN && 9 ! (rs == SHUTDOWN && 10 firstTask == null && 11 ! workQueue.isEmpty())) 12 return false; 13 14 for (;;) { 15 int wc = workerCountOf(c); 16 if (wc > = CAPACITY | | 17 / / do not allow you to create the mission of the core is greater than the maximum number of threads and wc > = (core? corePoolSize : maximumPoolSize)) 19 return false; 20 if (compareAndIncrementWorkerCount(c)) 21 break retry; 22 c = ctl.get(); // Re-read ctl 23 if (runStateOf(c) ! = rs) 24 continue retry; 25 // else CAS failed due to workerCount change; retry inner loop 26 } 27 } 28 29 boolean workerStarted = false; 30 boolean workerAdded = false; 31 Worker w = null; W = new worker (firstTask); w = new worker (firstTask); 35 final Thread t = w.thread; 36 if (t ! = null) { 37 final ReentrantLock mainLock = this.mainLock; 38 mainLock.lock(); 39 try { 40 // Recheck while holding lock. 41 // Back out on ThreadFactory failure or if 42 // shut down before lock acquired. 43 int rs = runStateOf(ctl.get()); 44 45 if (rs < SHUTDOWN || 46 (rs == SHUTDOWN && firstTask == null)) { 47 if (t.isAlive()) // precheck that t is startable 48 throw new IllegalThreadStateException(); 49 workers.add(w); 50 int s = workers.size(); 51 if (s > largestPoolSize) 52 largestPoolSize = s; 53 workerAdded = true; 54 } 55 } finally { 56 mainLock.unlock(); If (workerAdded) {if (workerAdded) {if (workerAdded) {if (workerAdded) {if (workerAdded) {if (workerAdded) {if (workerAdded) {if (workerAdded) { 61 t.start(); 62 workerStarted = true; 63 } 64 } 65 } finally { 66 if (! workerStarted) 67 addWorkerFailed(w); 68 } 69 return workerStarted; 70} 71Copy the code

Now that we have added here, let’s see how it executes our thread. Let’s take a look at the runworker method implementation:

1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 Runnable task = w.firstTask; 4 w.firstTask = null; 5 w.unlock(); // allow interrupts 6 boolean completedAbruptly = true; 7 While (task!) {8 // * * * * * * * * * * * * * * * * * * * * * * * * = null || (task = getTask()) ! = null) { 10 w.lock(); 11 // If pool is stopping, ensure thread is interrupted; 12 // if not, ensure thread is not interrupted. This 13 // requires a recheck in second case to deal with 14 // shutdownNow race while  clearing interrupt 15 if ((runStateAtLeast(ctl.get(), STOP) || 16 (Thread.interrupted() && 17 runStateAtLeast(ctl.get(), STOP))) && 18 ! wt.isInterrupted()) 19 wt.interrupt(); 20 try { 21 beforeExecute(wt, task); 22 Throwable thrown = null; 25 // Our thread is added to the thread pool. We don't want our thread to run, we just define a method body. 27 task.run(); 27 task.run(); 28 } catch (RuntimeException x) { 29 thrown = x; throw x; 30 } catch (Error x) { 31 thrown = x; throw x; 32 } catch (Throwable x) { 33 thrown = x; throw new Error(x); 34 } finally { 35 afterExecute(task, thrown); 36 } 37 } finally { 38 task = null; 39 w.completedTasks++; 40 w.unlock(); 41 } 42 } 43 completedAbruptly = false; 46 processWorkerExit(w, completedAbruptly); 46 processWorkerExit(w, completedAbruptly); 48 47}}Copy the code

At this time, you should solve a problem is how to reflect the thread reuse of the thread pool, as reflected in gettask, reuse is the worker thread. Well, at this time, not only has the worker been created, but also directly call the start method to start running and execute the added task. If the number of core threads exceeds the number of core threads, the queue should be added to the queue. What queue can we choose from?

So let’s look at BlockingQueue’s offer, poll (if timeout is set), and take methods. BlockingQueue has many implementation classes. ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue, DelayQueue

Let’s start with ArrayBlockingQueue, and let’s look at its constructor

Public ArrayBlockingQueue(int capacity) {// This (capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); If (count == items. Length) return false; if (count == items. Else {// Add task element after array, using loop array algorithm enqueue(e); return true; } } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; Nanos = notempty.aWaitNanos (nanos); } return dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); Try {while (count == 0) // wait to be waked, notempty.await (); return dequeue(); } finally { lock.unlock(); }}Copy the code

ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue There is no difference between the three methods for storing and fetching tasks and ArrayBlockingQueue, which we won’t go into too much detail about. The main thing is that a node blocking queue uses a different locking mechanism than an ArrayBlockingQueue, mainly because arrays are an object and nodes operate on the head and tail of the queue. Therefore, two taskLock and putLock locks are used, which do not conflict with queue fetching and queue adding

Public LinkedBlockingQueue() {this(integer.max_value); this(integer.max_value); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }Copy the code

PriorityBlockingQueue, in general, is a priority task queue. Since the task implements the Comparator interface, look at its constructor.

Public PriorityBlockingQueue() {public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); This.parator = comparator; this.queue = new Object[initialCapacity]; } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; While ((n = size) >= (cap = (array = queue).length)) try { Comparator<? super E> cmp = comparator; If (CMP == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; Try {// Priority comparison is performed each time a task is fetched, While ((result = deQueue ()) == null && nanos > 0) nanos = notempty.aWaitNanos (nanos); } finally { lock.unlock(); } return result; } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; While ((result = dequeue()) == null) notempty.await (); while (result = dequeue()) == null) notempty.await (); } finally { lock.unlock(); } return result; }Copy the code

SynchronousQueue is a special queue with a fixed length of 1 that executes tasks in real time. The task element is added to the queue only when a worker task has completed and another task is fetched. Otherwise, null is returned. This is ideal for scenarios where a request requires multiple services to be pulled at the same time;

    public SynchronousQueue() {
        this(false);
    }

 public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
Copy the code

In addition, the operations of the offer, take and poll methods mentioned above are all operated by a transfer method. Let’s focus on the differences between these two classes in this method. Take a look at the simple TransferQueue;

Transfer (e, true, 0); transfer(e, true, 0); Transfer (null, false, 0) E transfer(E E, Boolean timed, long nanos) { // If the current queue is empty, return false if it is empty, and the thread pool will create a new worker thread. Is not an empty queue will wake up to get the task worker thread and return the data / / 3, if it is to take data, to determine whether an empty queue, and blocks the current thread to wait for the newly created node wake, put the data is not empty queue (to put it another way already have a thread is waiting for the task), throw out node, continue to go down, QNode s = null; QNode s = null; // constructed/reused as needed boolean isData = (e ! = null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t ! = tail) // inconsistent read continue; if (tn ! = null) { // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait return null; if (s == null) s = new QNode(e, isData); if (! t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (! s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x ! = null) // and forget fields s.item = s; s.waiter = null; } return (x ! = null) ? (E)x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t ! = tail || m == null || h ! = head) continue; // inconsistent read Object x = m.item; if (isData == (x ! = null) || // m already fulfilled x == m || // m cancelled ! m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x ! = null) ? (E)x : e; }}}Copy the code

Don’t know why you see here to see if there is doubt, when more than one worker thread started to get the task, had been waiting node will be thrown out, leaves only the latest to queue waiting for the node, other nodes would not be aroused, actually this is my question, I don’t know if you noticed, but hope experts can give an explanation. Let’s look at another TransferStack. This is the node type used in the default thread pool queue initialization. This is easier to understand and easier to understand. Let’s take a look at the source code:

E transfer(E E, Boolean timed, long nanos) {// Transfer (E E, Boolean timed, long nanos) {// Transfer (E E, Boolean timed, long nanos) {// Transfer (E E, Boolean timed, long nanos) {// Transfer (E E, Boolean timed, long nanos) { If data is saved, null is directly returned when the node is empty, so that the thread pool can create worker thread to run the task. If there are waiting nodes, the current task data will be saved, and the data nodes stored and waiting nodes will be removed. The waiting node will be assigned the value of the saved task and woken up. SNode s = null; SNode s = null; SNode s = null; SNode s = null; SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can't wait if (h ! = null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) ! = null && h.next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (! isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }Copy the code

Now let’s talk about DelayQueue, which is associated with PriorityQueue, what’s the connection? Let’s look at its source;

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); Private final PriorityQueue<E> q = new PriorityQueue<E>(); } // The comparator is this, defining Comparable<Delayed>, Public interface Delayed extends Comparable<Delayed> {/** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }Copy the code

The offer, poll, and take elements will not be explained. Removing elements is an extra step to determine whether the specified time has been exceeded or reached, otherwise it will cause the current thread to wait for the rest of the time instead of the spin

Finally, a summary of thread pools and the principle of queues used:

Why thread pools are more efficient and convenient than creating our own threads? The first point is that thread pools have helped us encapsulate and manage threads. For example, the producer consumer mode enables our thread pools to efficiently use CPU to process tasks, and we can also determine which queue to use for our business scenarios. The second thing is that thread pools help us reuse threads, rather than discarding a thread after processing a task;

ArrayBlockingQueue and LinkedBlockingQueue handle node storage types differently, one is array, one is node type, and LinkedBlockingQueue uses store lock and fetch lock. The two operations do not conflict. ArrayBlockingQueue, on the other hand, uses an exclusive lock to fetch and store data.

PriorityBlockingQueue and DelayQueue, although they both use PriorityQueue internally, PriorityBlockingQueue doesn’t force you to use either comparator. DelayQueue must use a time comparator to be more limited and specify the type of task.

Finally, SynchronousQueue, which is a little bit more special than the other queues, is a SynchronousQueue, which means that the queue does not store the task data, but has to have a waiting node that is fetching it before it can be temporarily queued and immediately retrieved, or not queued at all. Instead, it replaces the task in the wait queue and wakes up the wait queue so that the arriving task is not stored in the queue. This means that no buffered task is put into a queue, more like a real-time task is pulled out and processed.

If you’re using thread pool closers, it’s best to customize the thread pool parameters yourself instead of using Executors’ default parameters to create a thread. Because write queue length is too large, will cause the program crash

Author: light rain hard links: www.cnblogs.com/guoxiaoyu/p…