You have a dream, you got to protect it. People can’t do something by themselves; They wanna tell you you can not do it. You want something.

First think yes, then ask why!

In the last article, I gave a brief introduction to the Thread pool architecture of Excecutor. In this article, I will explain the principle of ThreadPoolExcecutor, which is often used in development, and why not just create threads using existing thread pools.

In this paper, we use a new diagram to tease out the relationship between the main thread and the task thread and the thread pool



Step: Call the executor () method of ThreadPoolExcecutor to commit a thread and determine the size of the thread in the core thread pool. If it is smaller than the core thread pool size, create a thread. If the used thread is equal to the core thread pool size, the thread is added to the blocking queue. Note whether the thread pool has a maximum number of threads, if so, consider using the maximum thread pool after the core thread pool trial, and determine the usage of the maximum thread pool. In all of the above discussions, thread execution is not freed. If it has been freed, thread execution is pulled directly from the blocking queue.

Constructor of ThreadPoolExcecutor





Constructor parameters:

Number of core threads corePoolSize

Maximum number of threads maximumPoolSize

KeepAliveTime for non-core threads

Unit of time

The buffer queue used by the thread pool workQueue

ThreadFactory

RejectedExecutionHandler Rejection policy

Anomaly analysis:

if(corePoolSize < 0 | | / / if the core is less than zero, throw an exception maximumPoolSize < = 0 | | / / if the thread is less than or equal to 0, the largest Throw an exception maximumPoolSize < corePoolSize | | / / if the maximum number of threads is less than the core number of threads, throw an exception keepAliveTime < 0) / / if the free time is less than zero, Throw new IllegalArgumentException();if(workQueue = = null | | threadFactory = = null | | handler = = null) / / if the blocking queue null, the thread factory is null, refused to strategy is null, Throw new NullPointerException();Copy the code

Thread execution method executor ()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
        int c = ctl.get();
    if(workerCountOf(c) < corePoolSize) {// The current worker thread is smaller than the core thread pool size, a new worker threadif (addWorker(command.true))
            return;
        c = ctl.get();
    }    
   if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);    
     }    
    else if(! addWorker(command.false))
        reject(command); } 1. If there are fewer threads running than corePoolSize threads, try the following: * Start a new thread task with the given command as the first thread. Calls to addWorker automatically check runState and workerCount, thus preventing false alarms from increasing by returning when they should not be executedfalseThreading. 2. If the task can be queued successfully, then we still need * to carefully check whether we should add threads (because the existing one has died since the last check) or if the pool has been closed since entering this method. So we re-check the status and back out of the queue to stop if necessary, and if not, start a new thread. 3. If we can't queue the task, we try adding a new thread. If we fail, we know we are closed or saturated and therefore reject the task.Copy the code

Execute the method addWorker () method



    private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);        // Check if queue empty only if necessary.
        if(rs >= SHUTDOWN && // The current thread state is greater than SHUTDOWN! (rs == SHUTDOWN && // Current thread state = SHUTDOWN firstTask == null && // Added task is null! Workqueue.isempty ())) // The queue is not emptyreturn false; // If the current thread state is greater than SHUTDOWN, no new tasks can be added; If the current thread status is SHUTDOWN //, no task can be added to the queue. If the queue is empty, no task can be added to the queuefor (;;) {
            int wc = workerCountOf(c);
            if(wc > = CAPACITY | | / / worker CAPACITY greater than or equal to the wc > = (core? CorePoolSize: maximumPoolSize))// Worker is greater than or equal to the number of core pools or the maximum number of threadsreturn false;
            if(compareAndIncrementWorkerCount (c)) / / using the CAS algorithmbreak retry;
            c = ctl.get();  // Re-read ctl
            if(runStateOf(c) ! = rs)// Check the statuscontinue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }    
boolean workerStarted = false;
    boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // Create a new worker final Thread t = w.tuhread; / / a new threadif(t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheckwhile holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if(t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //hashSet
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }            
     } finally {
                mainLock.unlock();
            }
            if(workerAdded) { t.start(); // Start () workerStarted =true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
---------------------------------------------------------------------------------------
 Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public voidrun() {
            runWorker(this);
        }

---------------------------------------------------------------------------------------
 private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if(w ! = null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } } -------------------------------------------------------------------------------------- public synchronized voidstart() {
       
        if(threadStatus ! = 0) throw new IllegalThreadStateException(); group.add(this); boolean started =false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if(! started) { group.threadStartFailed(this); } } catch (Throwable ignore) { } } } ---------------------------------------------------------------------------------- final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly =true;
        try {
            while(task ! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; //if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly =false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
-------------------------------------------------------------------------------------
private Runnable getTask(){
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if(r ! = null)return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false; }}}Copy the code

Execute the shutdown() method



checkShutdownAccess(); / / check the thread state -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- private voidcheckShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if(security ! = null) { security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {for(Worker w : workers) security.checkAccess(w.thread); } finally { mainLock.unlock(); }}} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- advanceRunState(SHUTDOWN); / / set the thread state to shutdown -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- private void  advanceRunState(int targetState) {for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
----------------------------------------------------------------------------------------
interruptIdleWorkers();
----------------------------------------------------------------------------------------
private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
-----------------------------------------------------------------------------------------
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
--------------------------------------------------------------------------------------
tryTerminate();
---------------------------------------------------------------------------------------
   final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if(workerCountOf(c) ! = 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE);return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return; } } finally { mainLock.unlock(); } / /else retry on failed CAS
        }
    }
Copy the code

Execute the shutdownNow() method



 checkShutdownAccess();
 advanceRunState(STOP);
 interruptWorkers();
 tryTeyminate();
tasks = drainQueue();
------------------------------------------------------------------------------------------
private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if(! q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {
                if(q.remove(r)) taskList.add(r); }}return taskList;
    }

Copy the code



If there are any omissions in this article, I will give them in the comments section. Meanwhile, if readers find any inadequacies in this article, please point them out in the comments section for joint research.

Special attention: the copyright of this article belongs to the original author!!