Original text: chenmingyu. Top/concurrent -…
The thread pool
Thread pools are used to process asynchronous or concurrent tasks
Advantages:
- Reuse created threads to reduce resource consumption caused by thread creation and destruction
- Use threads directly from the thread pool to improve response times
- Improved manageability of threads, managed by thread pools
ThreadPoolExecutor
Thread pools in Java are implemented using ThreadPoolExecutor
The constructor
ThreadPoolExecutor provides four constructors, and the other three constructors end up calling the following constructor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Copy the code
The arguments:
-
CorePoolSize: The number of core threads in the thread pool
The number of core threads maintained by the thread pool. When the thread pool is initialized, the number of core threads is zero, and threads are created to execute tasks when they arrive. When the number of worker threads in the thread pool equals the number of core threads, newly arrived tasks are placed in the cache queue
-
MaximumPoolSize: The maximum number of threads allowed to be created in the thread pool
When the blocking queue is full and the number of threads created in the thread pool is less than maximumPoolSize, a new thread is created to execute the task
-
KeepAliveTime: hold time for thread activity
KeepAliveTime is valid only when the number of thread pools is greater than the number of core threads. If the number of current threads is greater than the number of core threads, and the idle time of the threads reaches keepAliveTime, the current thread terminates until the number of thread pools equals the number of core threads
-
Unit: Unit of the hold time for thread activity
Units of keepAliveTime, including: Timeunit. DAYS DAYS, timeunit. HOURS, timeunit. MINUTES, timeunit. SECONDS, timeunit. MILLISECONDS, timeunit. MICROSECONDS, TimeUnit. NANOSECONDS NANOSECONDS
-
WorkQueue: A task queue. It is used to save the blocking queue of tasks waiting to be executed
ArrayBlockingQueue: is a bounded queue based on an array structure
LinkedBlockingQueue: is a blocking queue based on a linked list structure
SynchronousQueue: a blocking queue that does not store elements. Each insert must wait until the next thread calls the remove operation, otherwise the insert will block
PriorityBlockingQueue: A wireless blocking queue with a priority
-
ThreadFactory: A factory used to create threads
-
Handler: Saturation policy. When the thread pool and queue are full, a policy must be adopted to process new tasks. The default policy is AbortPolicy
AbortPolicy: Directly throws an exception
CallerRunsPolicy: Runs the current task with the caller’s thread
DiscardOldestPolicy: Discards the latest task in the queue and executes the current task
DiscardPolicy: Do not process, discard
It is also possible to customize the implementation strategy by implementing RejectedExecutionHandler
The thread pool operates differently depending on the input parameter. Understanding the meaning of each input parameter is due to a better understanding of the implementation principle of the thread pool
Submit a task
The thread pool processes the submission task as follows
Processing process:
- If the number of core threads is not enough, create a thread to execute the task, otherwise add to the blocking queue
- If the blocking queue is not full, the task is stored in the queue
- If the blocking queue is full, see if the maximum number of thread pools has been reached. If not, create a thread to execute the task
- If the maximum number of thread pools has been reached, it is processed according to the saturation policy
ThreadPoolExecutor uses Execute (Runnable Command) and submit(Runnable Task) to submit tasks to a thread pool, Execute (Runnable command) is called in the submit(Runnable task) method, so we need to understand execute(Runnable command).
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// Get the state of the thread pool, and you can use CTL to get the current number of thread pools and the state of the thread pool
int c = ctl.get();
// If the number of worker threads is smaller than the number of core threads, a new thread is created to execute the task
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// If the above conditions are not met, the current thread is running and writing to the blocking queue succeeded
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// Double check to get the thread status again. If the current thread state becomes non-running, remove the task from the queue and execute the reject policy
if (! isRunning(recheck) && remove(command))
reject(command);
// Check whether the number of worker threads is 0
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
// Create a thread to execute the task, and execute a reject policy if the task fails to be added
else if(! addWorker(command,false))
reject(command);
}
Copy the code
Execute (Runnable command) is a new thread to execute the task. AddWorker (command, true)
The workqueue.offer (command) method is used to add tasks to a blocking queue
Will reject (command) method according to create a thread pool incoming saturated strategy to deal with the task, such as the default AbortPolicy, know after check the source is directly behind a RejectedExecutionException abnormalities, other saturated strategy source code is very simple
How is the thread pool state represented against the number of worker threads
Use an AtomicInteger variable representation in ThreadPoolExecutor
/ * * * CTL said two information, one is the state of the thread pool (high three representation), one is the number of the current thread pool low (29), the front with us * said the read-write lock state variable is the same, with a variable record two information, are using 32 bytes of int, high 16 statement read, The lower ten * six bits indicate write lock */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Lower than 29 bits to save the number of thread pools
private static final int COUNT_BITS = Integer.SIZE - 3;
// Maximum capacity of the thread pool
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// The health state is stored in the high three bits
// Running status
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Copy the code
AddWorker (command, Boolean) Creates a worker thread to execute the task
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// Thread pool status
int rs = runStateOf(c);
// Determine the thread pool state and whether the blocking queue is empty
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
for (;;) {
// Get the number of thread workers
int wc = workerCountOf(c);
// Determine whether it is greater than the maximum capacity, and whether it is greater than the number of core threads or the maximum number of threads based on the incoming core
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// Increase the number of worker threads
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// If the thread pool state changes, retry
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Create Worker, create a new thread internally
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// Thread pool status
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// Add the created thread to the thread pool
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
if (workerAdded) {
// to execute a task, the firstTask of the Worker object is executed
t.start();
workerStarted = true; }}}finally {
// If the task fails to execute
if (! workerStarted)
/ / remove the worker
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
Closing the thread pool
Shutting down a thread pool in ThreadPoolExecutor uses the shutdown() and shutdownNow() methods, both of which interrupt threads by iterating through the pool
for (Worker w : workers) {
Thread t = w.thread;
if(! t.isInterrupted() && w.tryLock()) {try {
t.interrupt();
} catch (SecurityException ignore) {
} finally{ w.unlock(); }}if (onlyOne)
break;
}
Copy the code
Executor framework
The Executor framework separates the submission of a task from its execution
Execorators provides a series of factory methods for initiating thread pools and returning thread pools that implement the ExecutorService interface
Factory Method:
newFixedThreadPool
: a thread pool for creating a fixed number of threadsnewCachedThreadPool
: Used to create a cacheable thread pool. The execute call will reuse previously constructed threads or, if no existing threads are available, create a new thread and add it to the pool. Terminates and removes threads from the cache that have not been used for 60 secondsnewSingleThreadExecutor
: Used to create a thread pool with only one threadnewScheduledThreadPool
: Used to create a thread pool that supports scheduled and periodic task execution
The Alibaba manual mandates a ban on using the factory method provided by Executors to create thread pools
This is indeed a very serious problem. Our department once used the FixedThreadPool thread pool, which resulted in OOM. This is because the thread was blocked or took a long time to execute the task, so the blocking queue kept adding tasks until the memory was full and OOM was reported
So when using a thread pool, we use the constructor of ThreadPoolExecutor to create a thread pool, determine the number of core threads and the maximum number of threads based on the task type, and choose the appropriate length of the blocking queue and the blocking queue
Configure thread pools properly
Properly configuring a thread pool requires analyzing the nature of the task (using ThreadPoolExecutor to create a thread pool) :
-
Cpu-intensive tasks should be configured with potentially small threads, such as CPU count +1
-
IO intensive tasks are not always performing tasks and should be configured with as many threads as possible, such as CPU number x2
The number of cpus is available via Runtime.getruntime ().availableProcessors()
-
If the task is time-consuming to call the external interface, the idle time of the CUP will be longer. You can set the number of thread pools to be larger, so that the idle time of the CUP can be used to perform other tasks
-
You are advised to use a bounded queue. You can set the length of the queue to be larger as required to prevent OOM
Reference: The Art of Concurrent Programming in Java
Recommended reading:
Concurrent Java programming | thread explanation
A: concurrent Java programming | Lock AQS, Lock, already, ReentrantReadWriteLock