thread
The JVM uses the KLT model. Java threads have a 1:1 relationship with OS threads. This means that every Java thread has a corresponding thread in the operating system. Java threads have multiple lifestates. The State enumeration is defined in the JavaThread class to represent the thread’s life State.
public enum State { /** * Thread state for a thread which has not yet started. */ NEW, /** * Thread state for a runnable thread. A thread in the runnable * state is executing in the Java virtual machine but it may * be waiting for other resources from the operating system * such as processor. */ RUNNABLE, /** * Thread state for a thread blocked waiting for a monitor lock. * A thread in the blocked state is waiting for a monitor lock * to enter a synchronized block/method or * reenter a synchronized block/method after calling * {@link Object#wait() Object.wait}. */ BLOCKED, /** * Thread state for a waiting thread. * A thread is in the waiting state due to calling one of the * following methods: * <ul> * <li>{@link Object#wait() Object.wait} with no timeout</li> * <li>{@link #join() Thread.join} with no timeout</li> * <li>{@link LockSupport#park() LockSupport.park}</li> * </ul> * * <p>A thread in the waiting state is waiting for another thread to * perform a particular action. * * For example, a thread that has called <tt>Object.wait()</tt> * on an object is waiting for another thread to call * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on * that object. A thread that has called <tt>Thread.join()</tt> * is waiting for a specified thread to terminate. */ WAITING, /** * Thread state for a waiting thread with a specified waiting time. * A thread is in the timed waiting state due to calling one of * the following methods with a specified positive waiting time: * <ul> * <li>{@link #sleep Thread.sleep}</li> * <li>{@link Object#wait(long) Object.wait} with timeout</li> * <li>{@link #join(long) Thread.join} with timeout</li> * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li> * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li> * </ul> */ TIMED_WAITING, /** * Thread state for a terminated thread. * The thread has completed execution. */ TERMINATED; }Copy the code
The transitions between health states are as follows:
The thread pool
“Thread pool”, as the name implies, is a thread cache, threads are scarce resources, if created without limit, will not only consume system resources, but also reduce system stability, so Java provides thread pool to uniformly allocate, tune, and monitor threads.
Executor framework
As you can see from the figure, there is an important sub-interface ExecutorService under Executor that defines the specific behavior of the thread pool
- Execute (Runnable command) : executes tasks of the Ruannable type
- Submit (Task) : You can submit a Callable or Runnable task and return a Future object representing the task
- Shutdown () : Shuts down after completing a submitted task, no longer taking over new tasks
- ShutdownNow () : Stop all ongoing tasks and shutdown
- IsTerminated () : tests whether all tasks are completed
- IsShutdown () : tests whether the ExecutorService isShutdown
Important attributes
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
Copy the code
CTL is a field that controls the running state of the thread pool and the number of valid threads in the thread pool. It contains two parts of information: The running state of the thread pool (runState) and the number of valid threads in the thread pool (workerCount), as you can see here, are stored using the Integer type, with the runState stored in the upper 3 bits and the workerCount stored in the lower 29 bits. COUNT_BITS is 29, and CAPACITY is 1 shifted 29 bits to the left minus 1 (29 1s). This constant represents the upper limit of workerCount, which is about 500 million.
// Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } / / get the number of threads private static int ctlOf (int the rs, int wc) {return rs | wc. } // Get values for the health status and number of active threadsCopy the code
Thread pools also have 5 states:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Copy the code
- RUNNING
Status description: When the thread pool is in the RUNNING state, it can receive new tasks and process added tasks. State switching: The initial state of the thread pool is RUNNING. In other words, once created, the thread pool is RUNNING and the number of tasks in the pool is 0!
- SHUTDOWN
Status description: When the thread pool is in the SHUTDOWN state, it does not receive new tasks but can process added tasks. State switching: When the shutdown() interface of the thread pool is called, the thread pool is run -> shutdown.
- STOP
Status description: When the thread pool is in the STOP state, it does not receive new tasks, does not process added tasks, and interrupts ongoing tasks. State switch: When the shutdownNow() interface of the thread pool is called, the thread pool is run or SHUTDOWN -> STOP.
- TIDYING
State description: When all tasks have terminated, the “task quantity” recorded by CTL is 0, and the thread pool changes to TIDYING state. The hook function terminated() is executed when the thread pool is in TIDYING state. Terminated () is empty in the ThreadPoolExecutor class and is processed if the user wants the thread pool to become TIDYING; This can be done by overloading the terminated() function. State switching: SHUTDOWN -> TIDYING occurs when the thread pool is in SHUTDOWN state, the blocking queue is empty, and the tasks executed in the thread pool are empty. When the thread pool is in the STOP state and the task executed in the thread pool is empty, STOP -> TIDYING is invoked.
- TERMINATED
The thread pool is TERMINATED completely and becomes TERMINATED. State switching: the thread pool terminated in TIDYING state is terminated by TIDYING -> terminated(). Entry into TERMINATED conditions is as follows:
- The thread pool is not RUNNING;
- The thread pool state is not TIDYING or TERMINATED;
- If the thread pool state is SHUTDOWN and the workerQueue is empty;
- WorkerCount 0;
- Setting the TIDYING state succeeded.
ThreadPoolExecutor
Use:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 5000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
Copy the code
Public void execute() // No return value is returned. Public Future<? > submit() // Returns a value when the task completesCopy the code
The constructor
- CorePoolSize: The number of core threads in the thread pool. When a task is submitted, the pool creates a new thread to execute the task until the current number of threads equals corePoolSize. If the current number of threads is corePoolSize, further submitted tasks are stored in a blocking queue, waiting to be executed. If the thread pool’s prestartAllCoreThreads() method is executed, the thread pool creates and starts all core threads ahead of time.
- MaximumPoolSize: maximum number of threads allowed in the thread pool. If the current blocking queue is full and the task continues to be submitted, a new thread is created to execute the task, provided that the current number of threads is less than maximumPoolSize;
- KeepAliveTime: The idle time allowed by the thread pool to maintain threads. When the number of threads in the thread pool is greater than corePoolSize, threads outside the core thread will not be destroyed immediately if no new task is submitted. Instead, they will wait until the keepAliveTime is exceeded.
- Unit: keepAliveTime unit;
- workQueue: a blocking queue used to store tasks that are waiting to be executed. The tasks must implement the Runable interface. The JDK provides the following blocking queue:
- ArrayBlockingQueue: A bounded blocking queue based on an array structure that sorts tasks by FIFO
- LinkedBlockingQuene: a blocking queue based on a linked list structure that sorts tasks by FIFO and typically has a higher throughput than ArrayBlockingQuene;
- SynchronousQuene: a blocking queue that does not store elements. Each insert operation must wait until another thread calls a remove operation. Otherwise, the insert operation is blocked and throughput is usually higher than LinkedBlockingQuene.
- PriorityBlockingQuene: an unbounded blocking queue with priority;
- ThreadFactory: This is a variable of type threadFactory used to create a new thread. Default Executors. DefaultThreadFactory () to create a thread. When a thread is created using the default ThreadFactory, the new thread is created with the same NORM_PRIORITY priority and is non-daemon, and the thread name is set.
- handler: Saturation policy of the thread pool. When the blocking queue is full and there are no idle worker threads, if a task continues to be submitted, a policy must be adopted to process the task. The thread pool provides four policies:
- AbortPolicy: Directly throws an exception, the default policy.
- CallerRunsPolicy: Executes the task with the caller’s thread;
- DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.
- DiscardPolicy: Directly discards the task.
The execute method
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, We know we are shut down or so reject the task. */ / CTL record runState and workerCount int c = ctl.get(); /* * workerCountOf (); /* * workerCountOf (); * If the number of active threads is smaller than corePoolSize, a new thread is added to the thread pool; * and add the task to the thread. */ if (workerCountOf(c) < corePoolSize) {/* * The second parameter in addWorker specifies whether to limit the number of added threads based on corePoolSize or maximumPoolSize; * If true, judge by corePoolSize; */ if (addWorker(command, true)) return; */ c = ctl.get(); } /* * if (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get();} /* * if (isRunning(c) &&workqueue.offer (command)) { // If the thread pool is not in the running state, the command has already been added to the workQueue. // If the thread pool is not in the running state, the command needs to be removed. // If the thread pool is not in the running state, the command needs to be removed. isRunning(recheck) && remove(command)) reject(command); /* * Gets the number of valid threads in the thread pool. If the number is 0, execute the addWorker method. The first argument is null, indicating that a thread is created in the thread pool but not started. * 2. If the second parameter is false, set the maximum number of threads in the pool to maximumPoolSize. * If workerCount is greater than 0, the new command will be executed at some point in the future. */ else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * The thread pool is not in the RUNNING state; WorkerCount >= corePoolSize and workQueue is full. * At this point, the addWorker method is called again, but the second argument is passed false, setting the upper limit of the pool's finite number of threads to maximumPoolSize; */ else if (! addWorker(command, false)) reject(command); }Copy the code
Simply put, if the state of execute() is RUNNING, the execution process is as follows:
- If workerCount < corePoolSize, a thread is created and started to execute the newly submitted task;
- If workerCount >= corePoolSize and the blocking queue in the thread pool is not full, the task is added to the blocking queue;
- If workerCount >= corePoolSize && workerCount < maximumPoolSize and the blocking queue in the thread pool is full, then a thread is created and started to execute the newly submitted task;
- If workerCount >= maximumPoolSize and the blocking queue in the thread pool is full, the task is processed according to the reject policy. The default is to throw an exception directly.
- Note here that addWorker(null, false); In other words, a thread is created but no task is passed in because the task has already been added to the workQueue, so the worker will fetch the task directly from the workQueue when executing it. So addWorker(null, false) is executed when workerCountOf(recheck) == 0; In order to ensure that the thread pool in the RUNNING state must have a thread to perform the task.
In this case, a task is submitted, the queue is empty, the core thread has been created, but is smaller than the defined number of core threads. Instead of giving the task to the created thread, a new thread is created to execute the task until the defined number of core threads is reached.
AddWorker () method
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); Int rs = runStateOf(c); // Check if queue empty only if necessary. /* * Check if queue empty only if necessary. * 1. Rs == SHUTDOWN, which indicates that the state is SHUTDOWN, but can continue to process the existing tasks in the blocking queue * 2. FirsTask empty * 3. The blocking queue is not empty * * First consider rs == SHUTDOWN * in which case no new submitted task will be accepted, so return false if firstTask is not empty; * Then, if firstTask is empty and workQueue is empty, return false, * because there are no more tasks in the queue and no more threads need to be added */ if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) Int wc = workerCountOf(c); // If wc exceeds CAPACITY, which is the maximum of the lower 29 bits of the CTL (binary is 29 1s), return false; // Core is the second argument to the addWorker method, which is true to compare against corePoolSize or false to compare against maximumPoolSize. if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; / / try to increase workerCount, if successful, the jump out of the first for loop if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); c = ctl.get(); // Re-read ctl if (runStateOf(c) ! = rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; W = new Worker(firstTask); w = new Worker(firstTask); // Each Worker object creates a Thread. Final Thread t = w.htread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN; // If rs is RUNNING or rs is SHUTDOWN and firstTask is null, add threads to the thread pool. // Since no new tasks are being added during SHUTDOWN, But will carry out the task of workQueue if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {if (t.i sAlive ()) / / precheck that t is startable throw new IllegalThreadStateException(); // workers is a HashSet worker. add(w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {// start the thread. workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code
worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) {// set the state variable to -1. This is because the default state in AQS is 0. If a Worker object has just been created and the task has not yet been executed, it should not be interrupted. // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() ! = 0; } protected Boolean tryAcquire(int unused) {setAcquire (-1);} protected Boolean tryAcquire(int unused) {setacquire (-1); State is set to -1 to disallow thread interruption prior to task execution. // Because of this, the Worker object's unlock method is first called in the runWorker method to set state to 0. if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code
Each thread in the ThreadPool is encapsulated into a Worker object, and ThreadPool maintains a set of Worker objects. The Worker class inherits AQS and implements the Runnable interface. Note firstTask and Thread attributes: firstTask saves incoming tasks; A thread is a thread created from a ThreadFactory when the constructor is called, and is used to process tasks. When the constructor is called, the task is passed in, in this case via getThreadFactory().newThread(this); To create a newThread, the newThread method passes in the parameter this. Since the Worker itself inherits the Runnable interface, that is, a thread, a Worker object will call the run method in the Worker class when it is started. The run method calls the runWorker method and passes in this, the current worker. Worker inherits AQS and uses AQS to realize the function of exclusive lock. Why not use ReentrantLock? You can see the tryAcquire method, which does not allow reentrant, whereas ReentrantLock does:
- Once the lock method obtains an exclusive lock, it indicates that the current thread is executing a task.
- If a task is being executed, the thread should not be interrupted;
- If the thread is not in the exclusive lock state, that is, the idle state, it is not processing the task, then you can interrupt the thread.
- The thread pool calls the interruptIdleWorkers method to interrupt idle threads when the shutdown or tryTerminate methods are executed. The interruptIdleWorkers method uses the tryLock method to determine whether threads in the pool are idle.
- It is set to non-reentrant because we do not want the task to regain the lock when it calls a thread pool control method like setCorePoolSize. If you use ReentrantLock, it is reentrant, so that if a thread pool-controlled method such as setCorePoolSize is called in the task, the running thread is interrupted.
Therefore, Worker inherits from AQS and is used to determine whether threads are free and can be interrupted.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // Get the first task Runnable task = w.firstTask; w.firstTask = null; // allow interrupt w.nlock (); // Allow interrupts // Whether to exit the loop because of an exception Boolean completedAbruptly = true; Try {// If the task is empty, get the task from the blocking queue via getTask() while (task! = null || (task = getTask()) ! = null) { w.lock(); // If the thread pool is stopping, make sure the current thread is interrupted; // If not, make sure the current thread is not interrupted; // Consider that the shutdownNow method may also be executed during the execution of the if statement. The shutdownNow method sets the state to STOP: no new tasks can be accepted, no tasks in the queue are processed, and the thread that is processing the task is interrupted. When a thread pool is in the RUNNING or SHUTDOWN state, calling the shutdownNow() method brings the pool to that state. // STOP interrupts all threads in the Thread pool, using thread.interrupted () to ensure that the Thread is not interrupted when RUNNING or SHUTDOWN. This is because thread.interrupted () restores the interrupted state. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; Try {// Execute task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code
To summarize the execution of the runWorker method:
- The while loop keeps getting tasks through the getTask() method;
- The getTask() method takes the task from the blocking queue;
- If the thread pool is stopping, make sure the current thread is interrupted, otherwise make sure the current thread is not interrupted.
- Call task.run() to execute the task;
- If task is null, it breaks out of the loop and executes processWorkerExit();
- When the runWorker method completes execution, it also means that the run method in the Worker completes execution and destroys the thread.
- The beforeExecute and afterExecute methods here are left empty in the ThreadPoolExecutor class and left to subclasses to implement.
- The completedAbruptly variable to indicate whether an exception occurred during the execution of the task, the value of which is evaluated in the processWorkerExit method.
GetTask () method
Private Runnable getTask() {timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /* * Check if queue empty only if necessary. * 1. Rs >= STOP, whether the thread pool is stopping; * 2. Whether the blocking queue is empty. * If the above conditions are met, the workerCount is reduced by 1 and null is returned. * Because no more tasks are allowed to be added to the blocking queue if the current thread pool state is SHUTDOWN or above. */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // timed Determines whether timeout control is required. // wc > corePoolSize, which indicates that the number of threads in the current thread pool is greater than the number of core threads. The need for overtime control Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; /* * wc > maximumPoolSize because setMaximumPoolSize may be executed at the same time; * Timed && timedOut if true, the current operation needs to be timed out and the last time the task was fetched from the blocking queue timed out * If the number of valid threads is greater than 1 or the blocking queue is empty, try to decrement workerCount by 1; * If the reduction fails, retry is returned. * If wc == 1, the current thread is the only thread in the pool. */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try {/* * Timed: if timed is true, the poll method of the blocking queue is used for timeout control. If no task is obtained within keepAliveTime, null is returned. If the queue is empty at this point, the take method blocks until the queue is not empty. * */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; // If r == null, time out timedOut = true; } catch (InterruptedException retry) {// If an interruption occurs, set timedOut to false and return timedOut = false; }}}Copy the code
The important thing here is the second if judgment, which controls the effective number of threads in the thread pool. If the number of threads in the current thread pool exceeds corePoolSize and is smaller than maximumPoolSize, and the workQueue is full, you can add worker threads. If the number of threads in the current thread pool exceeds corePoolSize and is smaller than maximumPoolSize, you can add worker threads. If timedOut is true, the workQueue is empty, which means that there are no more threads in the current thread pool to perform the task. You can destroy the number of threads that are larger than corePoolSize, leaving the number of threads at corePoolSize. When will it be destroyed? When the runWorker method is finished executing, the JVM automatically retrieves it. When the getTask method returns NULL, the while loop is broken out of the runWorker method, followed by the processWorkerExit method.
ProcessWorkerExit () method
Private void processWorkerExit(Worker w, Boolean completedAbruptly) {// If completedAbruptly is true, an exception occurred during thread execution, You need to subtract workerCount by 1; // If the thread executes without exception, the workerCount is already decrement by 1 in the getTask() method and no further decrement is required. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Count the number of completed tasks completedTaskCount += w.completedTasks; Worker. remove(w); worker. remove(w); worker. remove(w); } finally { mainLock.unlock(); } // Decide whether to terminate the thread pool based on the thread pool state. TryTerminate (); int c = ctl.get(); /* * when the thread pool is RUNNING or SHUTDOWN, if the worker is terminated abnormally, addWorker will be added directly; * Keep at least one worker if allowCoreThreadTimeOut=true and wait queue has tasks; * If allowCoreThreadTimeOut=false, workerCount is not less than corePoolSize. */ if (runStateLessThan(c, STOP)) { if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}Copy the code
At this point, the Worker thread is destroyed after processWorkerExit is executed. This is the entire Worker thread lifecycle. Starting with the execute method, the Worker uses ThreadFactory to create a new Worker thread, and the runWorker uses getTask to get the task. The task is then executed, and if getTask returns NULL, the processWorkerExit method is entered, and the entire thread terminates, as shown: