In addWorker method, the thread in Worker will be started after the book is added successfully
w = new Worker(firstTask); final Thread t = w.thread; . if (workerAdded) { t.start(); // Start the thread in worker workerStarted = true; }Copy the code
Now let’s see what the run method does
public void run() {
runWorker(this);
}
Copy the code
runWorker
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; Try {// catch getTask's exception // If the first task is not empty, run the first task // If the first task is empty, loop to get the task from the task queue while (task! = null || (task = getTask()) ! = null) { w.lock(); // If the thread pool stops, make sure the thread is interrupted. Thread is not interrupted by the if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) &&! wt.isInterrupted()) wt.interrupt(); Try {// Catch business exceptions beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) {thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; ++ w.nlock (); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); //completedAbruptly If the exception is caused by user services, the value is true}}Copy the code
The way to start a thread is fairly simple.
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); // Remove from workers} finally {mainlock. unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (! completedAbruptly) {// If there are still tasks in the queue, ensure that at least one thread is executing the task int min = allowCoreThreadTimeOut? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); // If user thread causes exception, add worker}}Copy the code
shutdown
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //CAS changes the thread pool state to SHUTDOWN advanceRunState(SHUTDOWN); // interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }Copy the code
InterruptIdleWorkers Interrupts idle threads
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // w.lock() can get the lock if it is an idle thread. And w.u nlock (); Wrap the code to run the task, the idle thread blocks in runWorker's task = getTask() this if (! t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); }}Copy the code
tryTerminate
final void tryTerminate() { for (;;) { int c = ctl.get(); / / the current state of the thread pool to judge the if (set (c) | | runStateAtLeast (c, TIDYING) | | (runStateOf (c) = = SHUTDOWN &&! workQueue.isEmpty())) return; If (workerCountOf(c)! = 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // Perform the thread pool state transition final ReentrantLock mainLock = this.mainlock; mainLock.lock(); If (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {terminated hook function terminated(); } finally {// switch the thread pool state to TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }Copy the code
shutdownNow
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //CAS changes the thread pool state to STOP advanceRunState(STOP); // interruptWorkers(); Tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); // Return tasks; }Copy the code
ShutdownNow Process for closing a thread pool:
- Changing the state of the thread pool to STOP blocks all entries from the runWorker, as shown in the following statement:
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();Copy the code
If a thread is still in task.run() while closing the thread pool; When the thread is interrupted, it will throw an exception when it gets the task again. For example, the interrupted thread will run to queue.take() again. Throws an exception directly
public static void main(String[] args) { BlockingQueue queue = new ArrayBlockingQueue(1); Thread thread = new Thread(() -> { long count = 0; for (;;) { if (count < 160) { count ++; System.out.println(count); } break; } try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); }}); thread.start(); thread.interrupt(); System.out.println(thread.isInterrupted()); } return result: true 1 java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220) at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:400) at com.networkbench.tingyun.demo.thread.Main.lambda$main$0(Main.java:19) at com.networkbench.tingyun.demo.thread.Main$$Lambda$1/1936628443.run(Unknown Source) at java.lang.Thread.run(Thread.java:745)Copy the code
- Interrupt all sites that have been started
interruptWorkers();
Copy the code
- Gets all outstanding tasks
tasks = drainQueue();
Copy the code
- Change the thread pool state
tryTerminate()
Copy the code