This article is about Java thread pool source code analysis.
The source code analyzed in this article is based on the Java Development Kit (JDK) 13.
An overview of the
Thread is the kernel resource of the operating system, is the smallest unit of CPU scheduling, all applications run on thread, it is the basis of our implementation of concurrent and asynchronous. In Java API, Thread is the basic class to realize Thread. Every time a Thread object is created, the operating system kernel will start a Thread. In the source code of Thread, all its key methods are Native methods, and its internal implementation is a large number of JNI calls. Because the implementation of threads must be directly supported by the operating system. In The Linux operating system, each Java thread corresponds to a native thread, and they are one-to-one. In Android, the pthread_create function in the Linux API is called during thread creation.
Thread calls have the following problems:
- Threads are not lightweight resources. Creating a large number of threads will consume a lot of resources in the system. Traditional blocking calls will lead to a large number of threads that cannot run because of blocking, which is a waste of system resources.
- Switching between thread running state and blocking state can be quite expensive and has always been an optimization point, for example: The Java virtual machine optimizes locks at runtime, such as spin locking, lock coarsing, and lock elimination.
Thread Pool is a tool to manage threads based on pooling idea. Using Thread Pool has the following benefits:
- Reduced resource consumption: Reuse of created threads through pooling techniques to reduce wastage associated with thread creation and destruction.
- Improved response time: Tasks can be executed immediately when they arrive without waiting for a thread to be created.
- Improve manageability of threads: Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also cause resource scheduling imbalance due to unreasonable distribution of threads, which reduces system stability. Thread pools can be used for unified allocation, tuning and monitoring.
- More and more power: Thread pools are extensible, allowing developers to add more functionality to them.
structure
The UML class diagram for ThreadPoolExecutor looks like this:
The ThreadPoolExecutor class inherits the AbstractExecutorService abstract class, which implements the ExecutorService interface. The ExecutorService interface inherits from the Executor interface.
Executor
Interface Executor can perform tasks for submitted Runnable objects. The idea is that tasks can be submitted and the mechanism by which each task runs (for example: Thread usage and thread scheduling are decoupled. We don’t care how threads are created or scheduled to perform tasks, just provide Runnable objects. The source code is as follows:
// Executor.java
package java.util.concurrent;
public interface Executor {
// Executes a given task at a later time, either in a new thread, in a thread pool, or in the calling thread, depending on the Executor implementation
void execute(Runnable command);
}
Copy the code
ExecutorService
ExeutorService provides the following capabilities:
- Extending Executor capabilities: Provides a way to generate a Future for one or more asynchronous tasks.
- Provides the ability to manage threads: provides a way to terminate a thread pool.
The source code is as follows:
// ExecutorService.java
package java.util.concurrent;
import java.util.Collection;
import java.util.List;
public interface ExecutorService extends Executor {
// Initiates an orderly shutdown, during which previously committed tasks continue to be executed, but no new tasks are accepted. If the shutdown has already occurred, calling this method has no additional impact
void shutdown(a);
// Try to stop all executing tasks, stop processing of waiting tasks, and return to the list of waiting tasks
List<Runnable> shutdownNow(a);
Return true if Excutor is already off, false otherwise
boolean isShutdown(a);
// Return true if all tasks are completed after shutdown. Note that this method will never return true unless the shutdown() or shutdownNow() methods are called first
boolean isTerminated(a);
// block until all tasks finish executing after the request is closed, a timeout occurs, or the current thread is interrupted, whichever occurs first
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// Submit a task with a return value for execution, and return a Future representing the result of the task's suspension. The Future's GET method returns the result of the task upon successful completion
<T> Future<T> submit(Callable<T> task);
// Submit a runnable task for execution and return a Future representing the result of the task's suspension. The Future's GET method returns the result of the task upon successful completion
<T> Future<T> submit(Runnable task, T result);
// Submit a runnable task for execution and return a Future representing the result of the task's suspension. The Future's GET method returns NULL when the task is completeFuture<? > submit(Runnable task);The isDone() method on each element of the Future list returns true. Note that a completed task can be terminated either normally or by throwing an exception. If a given collection is modified while performing this operation, the result of this method is unknown
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
The isDone() method on each element of the Future list returns true. On return, the unfinished task will be canceled. Note that a completed task can terminate normally. You can also terminate by throwing an exception, and if you modify a given collection while doing this, the result of this method is unknown
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// Execute the given task, if any, and return the result of a successfully completed task (for example, no exception was thrown). On normal or exception returns, the unfinished task will be cancelled. Note that the result of this method is unknown if the given collection is modified while performing this operation
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/ / perform a given task, if there is a task to complete before the end of the given timeout, it returns has been the result of the successful completion of the task (for example: no exception is thrown), in normal or abnormal returns, the unfinished task will be cancelled, if change when this operation is performed for a given set, then this method will be the result of the unknown
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code
AbstractExecutorService
The abstract class AbstractExecutorService provides a default implementation of the ExecutorService interface execution methods. This abstract class uses the RunnableFuture returned by the newTaskFor method to implement the Submit, invokeAny, and invokeAll methods.
The thread pool is divided into two parts: task management and thread management. It uses the producer-consumer model, with task management acting as producer and thread management acting as consumer.
Task management
When a task is submitted, the thread pool does the following:
- The application thread executes the task.
- The task is placed in a buffered queue and waits for execution.
- Refuse to execute the mission.
Thread management
The thread pool allocates threads according to tasks. When the thread completes the current task, it will continue to execute the next task. If the thread fails to obtain the task, it will be reclaimed.
Running state
The running state of the thread pool is controlled by the variable CTL, as shown below:
// ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// The runState is stored in higher-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Package and unpack CTL
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code
The value of COUNT_BITS is integer. SIZE (32) minus 3, which is 29, and you can see that the runState is shifted left based on COUNT_BITS, meaning that the runState is stored in the higher 3 bits.
The CTL variable is maintained with two values, namely runState and workerCount. The higher 3 bits save the running state, and the lower 29 bits save the number of workercounts. The benefits of this method are to avoid the execution of related logic. While occupying lock resources. It can be found that the use of bit operations, based on previous experience reading the source code, the use of bit operations will be more efficient than the basic operation.
There are five states of a thread pool:
- RUNNING: Accepts new tasks and processes tasks in the blocking queue.
- SHUTDOWN: does not accept new tasks, but processes tasks in the blocking queue.
- STOP: does not accept new tasks, does not process tasks in the blocking queue, and interrupts ongoing tasks.
- TIDYING: All tasks terminated and the number of threads terminated is zero. Threads terminated to this state will call terminated() hook method.
- TERMINATED: This state is entered when the call to the TERMINATED () method is complete.
The thread pool state is RUNNING and then split into two cases:
- If the shutdown() method is called, it enters the shutdown state, TIDYING state, the blocking queue is empty, the number of worker threads in the thread pool is zero, and terminated() is called.
- The number of working threads in the thread pool is zero, but the blocking queue is not necessarily empty. Terminated () is terminated.
Task scheduling
Task scheduling is the core mechanism of thread pool, related logic in **execute(Runnable command)** method, source code as follows:
// ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
// If command is empty, NullPointerException is thrown
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// If workerCount is less than corePoolSize, a thread is created and started to execute the newly submitted task
if (addWorker(command, true))
// Returns if the task was successfully added
return;
// If the task fails to be added, get the running status and worker thread count again
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
// If the current thread pool is in the RUNNING state and the task is successfully added, execute the following logic
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
Reject (Runnable Command) if the current thread pool is not RUNNING and the task is removed successfully, reject(Runnable Command) is called
reject(command);
else if (workerCountOf(recheck) == 0)
The addWorker(Runnable firstTask, Boolean core) method is called if there are no worker threads. The first parameter firstTask is passed null, indicating that a thread is created in the pool but not started; The second argument, core, is passed false, which sets the maximum number of threads in the thread pool to the value of maximumPoolSize
addWorker(null.false);
}
else if(! addWorker(command,false))
// If the task fails to be added, reject(Runnable Command) is called to reject the task
reject(command);
}
Copy the code
Some important member variables, the source code is as follows:
// ThreadPoolExecutor.java
// the queue used to hold tasks and pass them to the worker thread can be used to determine whether the queue isEmpty using the isEmpty() method, for example, to decide whether to go from SHUTDOWN to TIDYING
private final BlockingQueue<Runnable> workQueue;
// Assign exclusive reentrant locks to variables and worker threads. While there are other collections that can handle concurrency issues, using ReentrantLock is preferable, in part because it serializes interruptIdleWorkers to avoid unnecessary interruptions, especially during shutdown. Otherwise, A thread that has quits interrupts concurrently those that have not. It also locks related variables such as largestPoolSize, which is held by the shutdown() method and shutdownNow() method to ensure that the worker thread is stable and can handle the interrupt state correctly
private final ReentrantLock mainLock = new ReentrantLock();
// A collection of all worker threads in the thread pool (accessed only while holding the mainLock lock)
private final HashSet<Worker> workers = new HashSet<>();
// Wait conditions to support wait termination
private final Condition termination = mainLock.newCondition();
// Maximum pool size (only accessed while holding a mainLock lock)
private int largestPoolSize;
// Completed task counter, updated only when worker thread terminates (accessed only when mainLock is held)
private long completedTaskCount;
// Create a factory for the thread
private volatile ThreadFactory threadFactory;
// Handle the rejection policy
private volatile RejectedExecutionHandler handler;
// Timeout (in nanoseconds) for idle threads to wait for work. This timeout is used when workerCount (number of worker threads) is greater than corePoolSize or allowCoreThreadTimeOut
private volatile long keepAliveTime;
// If false is returned (the default), the core thread remains active even when idle; If true is returned, the core thread uses keepAliveTime to timeout to wait for work
private volatile boolean allowCoreThreadTimeOut;
// Core pool size, which is the minimum number of worker threads that remain active (and do not allow timeouts, etc.)
private volatile int corePoolSize;
// Maximum pool size
private volatile int maximumPoolSize;
// The default rejection policy is AbortPolicy
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
// The permissions required to call the shutdown() and shutdownNow() methods
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
Copy the code
The execution process is as follows:
- Check the RUNNING status of the thread pool to ensure that the task is executed in the RUNNING state, or reject the task if it is not.
- If the workerCount is less than corePoolSize, a thread is created and started to execute the newly submitted task.
- If the workerCount is greater than or equal to corePoolSize, and if the blocking queue is not full, the newly submitted task is put on the blocking queue.
- If the workerCount is greater than or equal to corePoolSize, the blocking queue is full, and the workerCount is less than maximumPoolSize, a thread is created and started to execute the newly submitted task.
- If the workerCount is greater than or equal to corePoolSize, the blocking queue is full, and the workerCount is greater than or equal to maximumPoolSize, the task is processed according to the reject policy, The default handling is thrown RejectedExecutionException anomalies.
Part of the process is in the **addWorker(Runnable firstTask, Boolean core)** method, which is explained in more detail later.
Task buffer
Task buffering is the core mechanism by which a thread pool can manage tasks. It is implemented through a BlockingQueue, which cains tasks and fetches tasks from the BlockingQueue. It conforms to the producer-consumer model, where producers are the thread that adds elements and consumers are the thread that fetches elements.
The data structure of the BlockingQueue is a queue that supports two additional operations:
- When the blocking queue is empty, the thread fetching the element waits for the queue to become non-empty.
- When the blocking queue is full, the thread adding the element waits for the queue to become available.
The source code is as follows:
// BlockingQueue.java
public interface BlockingQueue<E> extends Queue<E> {
If the queue is full, IllegalStateException is thrown. When using a queue with a capacity limit, it is better to call the Offer (E E) method
boolean add(E e);
Return true on success or false if the queue is full. This method is better called than add(E E) when using a queue with a capacity limit
boolean offer(E e);
// Adds the specified element to the queue, and waits if there is not enough space
void put(E e) throws InterruptedException;
// Adds the specified element to the queue, and waits until the specified wait time if there is not enough space
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// Retrieves and removes the header element of the queue, waiting if there is not enough space
E take(a) throws InterruptedException;
// Retrieves and deletes the header element of the queue, and waits until the specified wait time if there is not enough space
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
The return queue can ideally accept the number of additional elements without blocking (without memory or resource constraints), or return integer.max_value if there is no limit
int remainingCapacity(a);
If the queue contains one or more specified elements (using equals to determine whether they are the same element), the elements are removed, and true is returned, otherwise false is returned
boolean remove(Object o);
Return true if the queue contains one or more specified elements (using equals to determine whether they are the same element), false otherwise
boolean contains(Object o);
Removes the element from the specified set C from the queue. Throws an IllegalArgumentException if the specified set C is the queue or some attribute of an element of the queue prevents it from being added to the specified set. If the given set is modified while performing this operation, the result of this method is unknown
int drainTo(Collection<? super E> c);
If the specified set C is the queue or some attribute of an element of the queue prevents it from being added to the specified set, throw an IllegalArgumentException, If a given collection is modified while performing this operation, the result of this method is unknown
int drainTo(Collection<? super E> c, int maxElements);
}
Copy the code
If you look at the implementation class of the interface BlockingQueue, you can see that there are several types of blocking queues:
- ArrayBlockingQueue: A limited-capacity blocking queue implemented using arrays that sorts elements on a first-in-first-out (FIFO) basis, supporting both fair and unfair locks.
- DelayQueue: a blocking queue with no capacity limit that implements delayed fetching. You can specify a delay time before fetching elements in the queue.
- LinkedBlockingDeque: A blocking queue implemented using a two-way list, where elements can be added or removed at the head and tail of the queue, reducing lock contention to at most half in concurrent situations.
- LinkedBlockingQueue: A queued queue with a capacity limit, which sorts elements according to FIFO, first-in-first-out (FIFO) rules. The default length is integer. MAX_VALUE. This queue typically has higher throughput than arrays based implementations, but performance is poor in concurrent environments.
- LinkedTransferQueue: a queue that uses a linked list to sort elements In a first-in-first-out (FIFO) fashion. ** Transfer(E E) method and tryTransfer(E E, long timeout, TimeUnit unit)** method.
- PriorityBlockingQueue: An unrestricted blocking queue that supports thread priority sorting. By default, the queue is sorted in natural order, i.e. from lower priority to higher priority. You can also implement the compareTo() method to specify the order of elements, but cannot guarantee the order of elements of the same priority.
- SynchronousQueue will: Not blocking queue of storage elements, support fair lock and a fair lock, each insert operation must wait for another thread to remove operation, every remove operation must wait for another thread insert, among them, the * * Executors newCachedThreadPool () * * use the queue.
Task to apply for
GetTask ()** getask ()** getask ()**
// ThreadPoolExecutor.java
private Runnable getTask(a) {
boolean timedOut = false;
// Execute the loop
for (;;) {
// Get the running status of the thread pool and the number of worker threads
int c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
// If the thread pool is running in at least SHUTDOWN state and at least STOP state, or the blocking queue is empty, execute the following logic
// Call the decrementWorkerCount() method to reduce the workerCount value of the member variable CTL
decrementWorkerCount();
/ / returns null
return null;
}
// Get the number of worker threads
int wc = workerCountOf(c);
// Get whether the thread is in recycle state
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
// If the number of threads is greater than maximumPoolSize, that is, there are too many threads, null is returned
return null;
continue;
}
try {
Call the poll(long timeout, TimeUnit Unit) method if the thread is recyclable, otherwise call the take() method
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
The execution process is as follows:
- Gets the running status of the thread pool and the number of worker threads.
- Return NULL if the thread pool has stopped executing, otherwise perform Step 3.
- If there are too many thread pools, null is returned; otherwise, go to Step 4.
- If the thread is recyclable, the poll(Long Timeout, TimeUnit Unit) method is called to retrieve and remove the header element of the queue, and if there is not enough space, wait until the specified wait time. If the thread is not recyclable, the take() method is called to retrieve and remove the head element of the queue, and if there is not enough space, wait.
Task refuse
RejectedExecutionHandler (RejectedExecutionHandler) is a thread pool protection mechanism.
// RejectedExecutionHandler.java
public interface RejectedExecutionHandler {
// This method is called when the thread pool cannot accept a task
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code
The RejectedExecutionHandler implementation class provides the following types of rejection policies:
- ThreadPoolExecutor. AbortPolicy: The default thread pool rejection policies, namely ThreadPoolExecutor and ScheduledThreadPoolExecutor default rejection policies, discarding the task, and throw RejectedExecutionException is unusual, it is recommended to use this strategy, Because it’s easy to find through exceptions.
- ThreadPoolExecutor. CallerRunsPolicy: by submitting the thread processing the task, in this case, the need to wait until all the tasks completed, this strategy is suitable for a large amount of computing task type.
- ThreadPoolExecutor. DiscardOldestPolicy: discard queue in front of the task, and then resubmit the rejected tasks, this strategy is suitable for the tasks often need to discard the old type.
- ThreadPoolExecutor. DiscardPolicy: discard task, but does not throw an exception, use this strategy will cause we can’t find abnormal.
Worker thread management
The Worker class is a class of the thread pool of Worker threads, it is ThreadPoolExecutor class is a keyword final modification of the inner class, inherited AbstractQueuedSynchronizer class, and implement the Runnable interface, source code is as follows:
// ThreadPoolExecutor.java
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// Serialize the version number
private static final long serialVersionUID = 6138294804551838833L;
// A running worker thread is empty if it fails to create one
final Thread thread;
// The initial task to run, possibly empty
Runnable firstTask;
// Thread task counter
volatile long completedTasks;
// The first thread and task created from ThreadFactory
Worker(Runnable firstTask) {
// Disable interrupts until the worker thread is running
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run(a) {
Call the runWorker(Worker w) method
runWorker(this);
}
A value of 0 indicates the unlocked state, and a value of 1 indicates the unlocked state
protected boolean isHeldExclusively(a) {
returngetState() ! =0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0.1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock(a) { acquire(1); }
public boolean tryLock(a) { return tryAcquire(1); }
public void unlock(a) { release(1); }
public boolean isLocked(a) { return isHeldExclusively(); }
void interruptIfStarted(a) {
Thread t;
if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Copy the code
As you can see from the previous source code, the thread pool uses HashSet to store all Worker threads, that is, all Worker objects, which makes it easy to control threads in the thread pool by adding or removing elements.
The Worker class through inheritance AbstractQueuedSynchronizer class implements an exclusive lock, an exclusive lock means that only one thread at a time lock is held, no use already because it is a reentrant lock, thread pool cannot be allowed to work thread to acquiring a lock for many times, So using AbstractQueuedSynchronizer.
AbstractQueuedSynchronizer is dependent on First In First Out (FIFO, the First In – First Out) the blocking of the queue lock and related synchronizer (semaphore, events, etc.), it maintains a state of int.
Adding a worker thread
AddWorker (Runnable firstTask, Boolean core)**
// ThreadPoolExecutor.java
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Get the running status of the thread pool and the number of worker threads
if(runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask ! =null
|| workQueue.isEmpty()))
// Failed to add worker thread if thread pool has stopped
return false;
// Execute the loop
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
If the number of worker threads is greater than or equal to the core pool size or the maximum pool size, adding worker threads fails
return false;
// Add workerCount (atomic operation)
if (compareAndIncrementWorkerCount(c))
break retry;
// If the increment fails, get the CTL again
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN))
// If another thread changes the running state of the pool to SHUTDOWN, jump to the Retry tag and return to the outermost loop
continueretry; }}// Whether the thread corresponding to the Worker object has been started
boolean workerStarted = false;
// Whether the Worker object was successfully added to the HashSet
boolean workerAdded = false;
Worker w = null;
try {
// Create the Worker object with firstTask
w = new Worker(firstTask);
// Create a thread from the Worker object
final Thread t = w.thread;
if(t ! =null) {
// If the thread is not empty, create a ReentrantLock object
final ReentrantLock mainLock = this.mainLock;
// Add a lock to mainLock
mainLock.lock();
try {
// Check again while holding the lock to close and exit before ThreadFactory fails or the lock is acquired
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// If the thread pool is in a state less than STOP, or RUNNING, the following logic is performed
if(t.getState() ! = Thread.State.NEW)/ / if the thread is not create state, an abnormal IllegalThreadStateException
throw new IllegalThreadStateException();
// Add Worker objects to the HashSet
workers.add(w);
// The Worker object was successfully added to the HashSet
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
If the number of threads is greater than the maximum pool size, the value is assigned to the maximum pool sizelargestPoolSize = s; }}finally {
// Unlock mainLock
mainLock.unlock();
}
if (workerAdded) {
// If the Worker object is successfully added to the HashSet, the thread is started
t.start();
workerStarted = true; }}}finally {
if (! workerStarted)
// If the thread is not started, the addWorkerFailed(Worker W) method is called to rollback the creation of the Worker thread, that is, to do some cleanup
addWorkerFailed(w);
}
// Returns whether the thread started successfully
return workerStarted;
}
Copy the code
The execution process is as follows:
- Check whether the thread pool has stopped. If so, add the thread failed. Otherwise, go to Step 2.
- Check whether the thread pool is stopping, if so, perform Step 3, otherwise the thread fails to be added.
- Check whether the thread is used to perform the remaining tasks. If so, go to Step 4. Otherwise, adding the thread fails.
- Obtain the number of working threads and check whether the running status of the thread pool changes. If yes, go to Step 1. If no, go to Step 5.
- Check whether the number of worker threads is greater than the core pool size or the maximum pool size (depending on the Boolean form argument core, which is true, uses the core pool size; If core is false, use the maximum pool size), if so, add thread failed, otherwise go to Step 6.
- Store the Worker object in the HashSet. If it is added successfully, the thread is started; otherwise, go to Step 4.
Execution worker thread
// ThreadPoolExecutor.java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// Get the first task
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// Call getTask to get the task
while(task ! =null|| (task = getTask()) ! =null) {
// Lock the Worker object
w.lock();
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted())If the thread is greater than or equal to the STOP state, and the thread is not interrupted, the thread is interrupted
wt.interrupt();
try {
Call the beforeExecute(Thread t, Runnable r) method, which subclasses can implement
beforeExecute(wt, task);
try {
// Execute the task
task.run();
AfterExecute (Runnable r, Throwable t), which can be implemented by subclasses
afterExecute(task, null);
} catch (Throwable ex) {
AfterExecute (Runnable r, Throwable t) is called if there is an exception. This method can be implemented by subclasses
afterExecute(task, ex);
throwex; }}finally {
// Task is set to null to prepare the next task
task = null;
// The number of completed tasks increases
w.completedTasks++;
// Unlock the Worker object
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Call processWorkerExit(Worker w, Boolean completedAbruptly) to reclaim the Worker threadprocessWorkerExit(w, completedAbruptly); }}Copy the code
The execution process is as follows:
- If firstTask is not empty, execute firstTask and call the getTask() method to get the task execution.
- If firstTask is empty, the getTask() method is called to get the task execution.
Reclaim worker thread
// ThreadPoolExecutor.java
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
// If completedAbruptly is true, prove that workerCount has not been reduced
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
// Add a lock to mainLock
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// Remove the corresponding Worker object in the HashSet
workers.remove(w);
} finally {
// Unlock mainLock
mainLock.unlock();
}
// Call the tryTerminate() method, which calls terminated() in terminated state
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0&&! workQueue.isEmpty()) min =1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null.false); }}Copy the code
The execution process is as follows:
- Check the task execution status, for example, check whether the value of workCount is reduced.
- Removes the corresponding Worker object in the HashSet.
- The running state of the thread pool is TERMINATED.
The type of thread pool
Five thread pools are encapsulated in the Executors class:
newFixedThreadPool
The source code is as follows:
// Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
Copy the code
This thread pool has a fixed number of threads, the coolPoolSize is the same as the maximumPoolSize, the keepAliveTime is 0L, and the blocking queue is LinkedBlockingQueue. It uses a linked list implementation of a limited capacity blocking queue, the elements are sorted according to FIFO (first-in-first-out) principle, the default length is integer. MAX_VALUE, so the default creation of the queue may be dangerous capacity. This queue typically has higher throughput than arrays based implementations, but performance is poor in concurrent environments.
This method applies to scenarios where time-consuming tasks are performed under known concurrency pressures.
newWorkStealingPool
The source code is as follows:
// Executors.java
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null.true);
}
public static ExecutorService newWorkStealingPool(a) {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null.true);
}
Copy the code
The thread pool can have multiple queues to reduce the number of connections, and the default number of threads is the number of cpus currently available on the computer.
This method is applicable to scenarios where time-consuming tasks need to be executed concurrently.
newSingleThreadExecutor
The source code is as follows:
// Executors.java
public static ExecutorService newSingleThreadExecutor(a) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
Copy the code
This thread pool has only one thread, coolPoolSize and maximumPoolSize are both 1, keepAliveTime is 0L, and LinkedBlockingQueue is used. It uses a linked list implementation of a limited capacity blocking queue, the elements are sorted according to FIFO (first-in-first-out) principle, the default length is integer. MAX_VALUE, so the default creation of the queue may be dangerous capacity. This queue typically has higher throughput than arrays based implementations, but performance is poor in concurrent environments.
This method is applicable to scenarios where task execution sequence is required.
newCachedThreadPool
The source code is as follows:
// Executors.java
public static ExecutorService newCachedThreadPool(a) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
Copy the code
This thread pool can cache threads. It creates new threads as needed, but reuses previously created threads when available. CoolPoolSize is 0 and maximumPoolSize is integer.max_value, KeepAliveTime is 60L, and the block queue is SynchronousQueue, which does not store elements in the block queue. It supports both fair and unfair locks. Each insert must wait for another thread to remove the insert.
This method is applicable to scenarios that require short time consuming tasks.
newScheduledThreadPool
The source code is as follows:
// Executors.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
Copy the code
This thread pool can schedule tasks to run after a given delay or at regular intervals, using the DelayedWorkQueue, which is a DelayedWorkQueue that supports delay.
This mode is applicable to the scenario where periodic tasks are performed.
My GitHub: TanJiaJunBeyond
Common Android Framework: Common Android framework
My nuggets: Tan Jiajun
My simple book: Tan Jiajun
My CSDN: Tan Jiajun