This is the 9th day of my participation in the More text Challenge. For details, see more text Challenge

The execute () principle

The procedure is as follows:

If the workerCount is smaller than the corePoolSize, the new thread executes the Task as a first Task. Add the task to the queue. If the task is added successfully, it still needs to check. Before entering the task,

If the thread pool is closed, the task is removed from the queue;

If there are no workers in the current thread pool and a task has just been added to the queue, ensure that at least one worker in the thread pool can process the task.

If the task cannot be queued, try to add a new thread to execute the task, but this is not always successful, either because the thread pool is shut down or the pool has reached maximumPoolSize. If this fails, the rejection policy is executed.

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // The high 3 bits indicate the status, and the low 29 bits indicate the number of tasks. // If the number of worker threads is smaller than the number of core threads, create a new thread. If (workerCountOf(c) < corePoolSize) {if (workerCountOf(c) < corePoolSize) {if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); If (isRunning(c) && workqueue.offer (command)) {int recheck = ctl.get(); // The second check prevents the thread pool from being closed after the first check. If the thread pool is closed, remove the task from the queue and reject the task if (! isRunning(recheck) && remove(command)) reject(command); // If the number of threads is equal to 0 (threads are dead), for example: Else if (workerCountOf(recheck) == 0) addWorker(null, false); if (workerCountOf(recheck) == 0) addWorker(null, false); } // The thread queue is full. Try to create a new thread to execute the task. The task is rejected after the creation failed. Thread pool closed; MaxPoolSize else if (! addWorker(command, false)) reject(command); }Copy the code

AddWorker method

The addWorker method has two main parts

1: Determine whether the worker can be created. Determine whether to continue creating or return false based on the spin, CAS, CTL, etc. The spin period is usually very short.

2: Synchronously create workder and start thread. AddWorker determines whether new worker threads can be added based on the current working state of the thread pool and given limits (corePoolSize and maximumPoolSize). Each Worker object is an AQS queue.

Private Boolean addWorker(Runnable firstTask, Boolean core) {retry: // Check the thread pool status for (;;) Int c = ctl.get(); int c = ctl.get(); int rs = runStateOf(c); // Thread pool status :RUNNING = -1, SHUTDOWN = 0, STOP = 1, TIDYING = // The state of the thread pool is at least shutdown if (rs >= shutdown && // except the thread pool is shutdown). // If there is a task in the queue, nothing else can be added! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; Worker +1 for (;;) worker+1 for (;;) { int wc = workerCountOf(c); / / worker / / worker is greater than the largest Integer number limit / / or reached boundary limit the if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; //CAS worker+1 if (compareAndIncrementWorkerCount(c)) break retry; C = ctl.get(); // Re-read ctl if (runStateOf(c) ! // If the thread pool status changes, retry the outer loop; // else CAS failed due to workerCount change; // workerCount is changed by another thread. // Retry the inner loop CAS for workerCount+1}} Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); / / 1. The state set to 1, the Worker inherited AbstractQueuedSynchronizer. / / 2. //3. The Worker implements the Runable interface and creates the Thread with this as the incoming parameter. if (t ! = null) {//addWorker needs to lock mainlock.lock (); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c  = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new  IllegalThreadStateException(); workers.add(w); //workers is HashSet<Worker> // Set maximum thread pool size int s = worker.size (); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

Worker

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

Copy the code

The Worker method implements Runnable, which assigns the constructed Thread to Thread. SetState (-1) in the first step, that is, the synchronization state in AQS is set to -1. The thread pool uses the synchronization state in AQS to determine whether the worker thread can be interrupted. -1: indicates the initialization value. At this time, the worker thread has not been started and there is no need to interrupt. 0: accepts interrupts, and the worker thread is idle. 1: indicates that the worker thread is executing a task. It implements a non-reentrant mutex, which is designed to prevent some of the thread pool’s control methods from acquiring a reentrant lock. Note that the Worker implements locking for a different purpose than traditional locking. This is primarily for controlling whether a thread is interruptible, as well as for other monitoring, such as whether the thread is active (executing a task).

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // Each worker has its own internal Thread, and the ThreadFactory is null when it fails to create. // Initialize the task, possibly null Runnable firstTask; // the number of completedTasks for each worker is volatile long completedTasks; Worker(Runnable firstTask) {// inhibit interrupts until runWorker // set state to -1 This ensures that we can only interrupt threads setState(-1) from workerQueue getTask; // Stop the thread from being interrupted before it starts this. this.thread = getThreadFactory().newThread(this); } public void run() {// the core method is public void run() {// the core method is public void run(); } // state = 0; Protected Boolean isHeldExclusively() {return getState()! = 0; } // Call this method when setCorePoolSize/shutdown or other methods interrupt the worker thread. Boolean tryAcquire(int unused) {if (compareAndSetState(0, 0, 0); 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } // Call tryRelease to change state=0, locksupport.unpark (thread) the next thread waiting for the lock public Boolean isLocked() {return isHeldExclusively(); } void interruptIfStarted() {Thread t; If (getState() >= 0 && (t = thread)! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

The run() method of the Worker actually calls the runWorker(this), which starts the Worker thread (note that this is not a directly started task) and executes the task in the Worker thread. The worker thread loops around, repeatedly fetching tasks from the queue and executing them.

1. The Worker class is responsible for controlling the state of the running thread. 2. The Worker class is responsible for controlling the state of the running thread. 2.Worker inherits AQS to implement simple operations of lock acquisition and release. To avoid interrupting a thread waiting for a task to execute, interrupt a running thread (the thread has just started, but the task has not yet started). 3. The non-reentrant lock is implemented in order to avoid the implementation of thread pool control status control methods, such as setCorePoolSize, interrupt the starting thread. SetCorePoolSize may call interruptIdleWorkers(), where the worker’s tryLock() method is called to interrupt the thread. Implementing the lock yourself ensures that the worker thread is not interrupted before it starts

The main task of the runWorker is to loop through the tasks and get to getTask() without any tasks. The code is as follows:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); Boolean completedAbruptly = true; Try {// loop through the task until task = null (thread pool closed, timeout, etc.) // Notice the getTask() method here, the blocking queue we configured will work here while (task! = null || (task = getTask()) ! = null) { w.lock(); // If the thread pool is stopped (in the STOP or TIDYING or TERMINATED state), If ((runStateAtLeast(ctl.get()), make sure the thread is in the RUNNING or SHUTDOWN state. If (runStateAtLeast(ctL.get ()), make sure the thread is in the STOP state. STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// call beforeExecute(wt, task) before the thread runs; Throwable thrown = null; Try {// Here the run method is executed task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// Invoke afterExecute(task, thrown) after the thread has run; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); // Thread exits work}}Copy the code

getTask()

Getting a Task from a queue is roughly divided into the following steps.

Check the status of the thread pool and queue. If the status of the thread pool is above STOP, the thread pool does not process tasks in the queue. If the thread pool is SHUTDOWN but the queue is empty (SHUTDOWN no longer accepts new tasks), the workerCount is subtracted by 1 and null is returned. The worker thread does not actually terminate in processWorkerExit(w, completedAbruptly); In the. If the status check is passed, determine whether to recycle the thread, subtract the workerCount by 1, and return NULL on success. According to timed (timed indicates that a timed idle thread needs to be recollected), select a timed wait or a blocked method to get the task from the queue.

Method works according to the configured workQueue. The blocking principle and timeout principle are implemented based on the blocking queue.

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // loop for (;;) { int c = ctl.get(); int rs = runStateOf(c); / / thread thread pool status and whether the queue is empty the if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {decrementWorkerCount (); return null; } // Count of threads int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // If the current number of threads is greater than the maximum number of threads or) // If the number of threads is greater than 1 or the task queue is empty // There is a problem here. Timed timedOut = false, If (timed && timedOut) has been a false, if ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try {Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code

conclusion

ThreadPoolExecutor is the core of the Worker, the Worker has realized AbstractQueuedSynchronizer and inherited the Runnable. Aqs lock is very clever.