1 Why use thread pools
1.1 The more, the better?
1. Thread is an object in Java, but also an operating system resource. It takes time to create and destroy threads. If the create time + destroy time > execute task time is not cost-effective. Java objects occupy heap memory, while operating system threads occupy system memory. According to the JVM specification, the default maximum stack size for a thread is 1M, and this stack space is allocated from system memory. Too many threads consume too much memory. 3. The operating system needs to switch thread context frequently (everyone wants to be run), which affects performance.
Threads enable applications to coordinate the utilization of system resources such as CPU, memory, network, and I/O. Thread creation requires the creation of virtual machine stack, local method stack, program counters and other thread private memory space; These system resources need to be reclaimed during thread destruction. Frequently creating and destroying threads wastes a lot of system resources and increases concurrent programming risk.
How do you make new threads wait or do a friendly denial of service when the server is overloaded?
These are things that threads can’t solve by themselves; Therefore, multiple threads need to be coordinated through the thread pool, and achieve tasks such as isolation of primary and secondary threads, scheduled execution, periodic execution and so on.
The role of thread pools
● Use thread pools to manage and reuse threads, control the maximum number of concurrent requests, etc. Since we use thread pools, we need to ensure that the thread pool is reused, each new thread pool may be worse than no thread pool. If you don’t declare a thread pool directly but use a library provided by someone else to get a thread pool, be sure to review the source code to verify that the thread pool is instantiated and configured as expected
● Implement task thread queue caching strategy and reject mechanism
● To achieve some time-related functions such as timing execution, cycle execution, etc
For example, the transaction service and search service on the same server, respectively open two thread pools, transaction thread resource consumption is obviously larger; Therefore, a separate thread pool is configured to separate the slower transaction services from the search services and prevent the service threads from interacting with each other.
There are three benefits to using thread pools properly in development
- Reduce resource consumption Reduce system resource consumption caused by thread creation and destruction by reusing created threads
- Improve response time When a task arrives, it can be executed immediately without waiting for a thread to be created
- Threads are a scarce resource that, if created in excess, not only consumes system resources but also degrades system stability, leading to uniform allocation, tuning, and monitoring using thread pools.
Three concepts
1. Thread pool manager is used to create and manage thread pools, including creating thread pools, destroying thread pools, adding new tasks; 2. The thread in the worker thread pool is in the waiting state when there is no task and can execute the task in a loop. 3, task interface each task must achieve the interface, for the worker thread scheduling task execution, it mainly provides the entry of the task, the task after the execution of the end work, task execution state; 4. Task queue is used to store unprocessed tasks. Provide a buffer mechanism. .
- Schematic diagram of principle
Thread pool API
4.1 Interface definition and implementation classes
Inheritance diagram
Can think ScheduledThreadPoolExecutor is one of the most abundant implementation class!
4.2 Method Definition
2 the ExecutorService
4.2.2 ScheduledExecutorService
public ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit);
public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);
In both cases, a one-time task is created and executed, and will be executed after a delay
public ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
Creating and executing a periodic task will be executed for the first time after the given initial delay. If an exception occurs during the execution, the task will be stopped
If a task execution duration exceeds the periodic time, the next task will be executed immediately after the completion of the task execution, which is an important difference between scheduleWithFixedDelay and scheduleWithFixedDelay
public ScheduledFuture<? > scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
Creating and executing a periodic task After the initial delay, the task is executed for the first time and the subsequent tasks are executed within the specified period. If an exception occurs during the execution, the task is stopped
If the execution time of a task exceeds the period time, the execution delay of the next task is calculated based on the execution end time of this task. This is an important difference between scheduleAtFixedRate and the different ways in which it handles tasks that take longer than a cycle.
The instance
- Test cases
- Test implementation
- The results
We can see that all threads above core are waiting, so why can’t we reach the maximum number of threads in the pool? What else does this parameter mean? Read on!
4.2.2 Executors Tools
You can instantiate the thread pool yourself or use Executors to create a factory class for the thread pool.
Commonly used method
The ExecutorService abstract class AbstractExecutorService provides implementations of the submit, invokeAll, and other methods, but the core executor.execute () method is not implemented. Because all tasks are executed in this method, different implementations have different execution strategies.
Create wrapper objects for three thread pools by using the Static Factory method of Executors
- ForkJoinPool
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
Low Executors. NewWorkStealingPool JDK8 introduction, create hold enough thread thread pool to support a given parallelism, and to reduce competition by using multiple queue, construction method of the CPU is set to the default number of parallelism. returnForkJoinPool
Object (introduced in JDK7), which is alsoAbstractExecutorService
A subclass of
Low Executors. NewCachedThreadPool limitless create a buffer pool, its task queue queue is a synchronization. The task is added to the pool
- If there are free threads in the pool, execute with free threads
- If no, a new thread is created
Threads in the pool that are idle for more than 60 seconds will be destroyed. The number of threads varies with the number of tasks. This method is suitable for asynchronous tasks that require little time. MAX_ _VALUE maximumPoolSize Specifies the maximum number of threads in the pool up to integer. MAX_VALUE, which is a highly scalable thread pool. If the upper limit is reached, no server can continue to work, directly OOM. KeepAliveTime is 60 seconds by default. If the worker thread is idle, the worker thread is reclaimed. If the number of tasks increases, a new thread is created again to handle the task.
Low Executors. NewScheduledThreadPool can regularly perform task thread pool. The number of core threads in the pool is specified by the parameter. The maximum number of threads can be Integer.MAX_ VALUE. ScheduledExecutorService interface implementation class, support scheduled and periodic task execution; Compared with Timer, ScheduledExecutorService is more secure and powerful. The difference with newCachedThreadPool is that the worker thread is not reclaimed.
Low Executors. NewSingleThreadExecutor create a single thread of the thread pool, equivalent to a single thread serial perform all tasks, ensure implementation submitted according to the task sequence. There are only -threads to execute a single-thread pool of unbounded task queues. This thread pool ensures that tasks are executed one by one in the order they were added. When the only thread aborts due to a task exception, a new thread is created to continue the subsequent task. The difference from newFixedThreadPool(1) is that the pool size of a single-threaded pool is hard-coded in the newSingleThreadExecutor method and cannot be changed.
Low Executors. NewFixedThreadPool create a fixed size task queue capacity unbounded thread pool input parameter is fixed number of threads; Both the number of core threads and the maximum number of threads; There are no idle threads, sokeepAliveTime
Equal to zero.LinkedBlockingQueue is used, but there is no cap!! , too many tasks!!
Below is theLinkedBlockingQueue
The construction method ofWith such an unbounded queue, there is an OOM risk if the instant request is very large. In addition tonewWorkStealingPool
All the other four creation methods have the risk of resource exhaustion.
Using any of these methods to create a thread pool is not recommended because there are no limitations and performance concerns.Executors
The default thread factory and rejection policy is simplistic and generally not developer-friendly. Thread factories need to be prepared for creation, and the threads created for the thread pool must be clearly identified, just like the production lot number of a drug, with meaningful names and serial numbers assigned to the threads themselves. The rejection policy should take into account the business scenario and return a prompt or a friendly jump. The following is a simple ThreadFactory example
The examples above include definitions of thread factories and task bodies; Create thread tasks quickly and uniformly with the newThread method, emphasizing that threads must have meaningful names for easy backtracking in case of errors.
- Single thread pool: NewSingleThreadExecutor () with ThreadPoolExecutor(1, 1, 0L, timeUnit.milliseconds, new LinkedBlockingQueue()). This means that one thread, or at most one thread, is kept in the pool, which means that tasks are executed sequentially in the pool, and extra tasks are queued.
- Fixed thread pool: the newFixedThreadPool(nThreads) method is created
Outside the chain picture archiving failure, the source station might be hotlinking prevention mechanism, proposed to directly upload picture preserved (img – NZEi0e3y – 1570557031347) (uploadfiles.nowcoder.com/images/2019… “image title”)”
NThreads threads are kept in the pool, at most nThreads, and extra tasks are queued. Outside the chain picture archiving failure, the source station might be hotlinking prevention mechanism, proposed to directly upload picture preserved (img – SId8FBO1-1570557031347) (uploadfiles.nowcoder.com/images/2019… “image title”)”
Outside the chain picture archiving failure, the source station might be hotlinking prevention mechanism, proposed to directly upload picture preserved uzv6uak (img – 6-1570557031348) (uploadfiles.nowcoder.com/images/2019… “image title”)” The number of threads is fixed and the threads do not time out
- Cache thread pool: NewCachedThreadPool () is created with ThreadPoolExecutor(0, inteer.max_value, 60L, timeUnit.seconds, new SynchronousQueue()).
The pool does not maintain a fixed number of threads, but can be created as many as integer. MAX_VALUE threads as needed (which, by the way, is far more than any current operating system can allow). Redundant tasks wait in SynchronousQueue (all blocked, concurrent queues are detailed in a future article).
Why is the task blocking queue used by single-thread pools and fixed thread pools LinkedBlockingQueue() and SynchronousQueue() used by cache thread pools? Because the number of threads in a single thread pool or a fixed thread pool is limited, a submitted task needs to wait in the LinkedBlockingQueue for an empty thread; In the cache thread pool, the number of threads is almost unlimited (up to integer.max_value), so submitted tasks need only be handed over to the idle thread in the SynchronousQueue.
- Single thread scheduling thread pool: newSingleThreadScheduledExecutor () to create, five parameters are respectively (1, Integer. MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue ()). One thread is kept in the pool, and redundant tasks are waiting in the DelayedWorkQueue.
- Fixed scheduled thread pool: newScheduledThreadPool(n) is created with five parameters (n, integer.max_value, 0, NANOSECONDS, new DelayedWorkQueue()). There are n threads in the pool and redundant tasks are waiting in the DelayedWorkQueue.
One technique that can mitigate the impact of long tasks is to limit how long tasks wait for resources, rather than waiting indefinitely
Let’s start with the first example, which tests single-thread pools, fixed thread pools, and cache thread pools (note the addition and de-comment) :
public class ThreadPoolExam {
public static void main(String[] args) {
//first test for singleThreadPool
ExecutorService pool = Executors.newSingleThreadExecutor();
//second test for fixedThreadPool
// ExecutorService pool = Executors.newFixedThreadPool(2);
//third test for cachedThreadPool
// ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
pool.execute(newTaskInPool(i)); } pool.shutdown(); }}class TaskInPool implements Runnable {
private final int id;
TaskInPool(int id) {
this.id = id;
}
@Override
public void run(a) {
try {
for (int i = 0; i < 5; i++) {
System.out.println("TaskInPool-["+id+"] is running phase-"+i);
TimeUnit.SECONDS.sleep(1);
}
System.out.println("TaskInPool-["+id+"] is over");
} catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
The screenshot below shows an error check for the underlying public cache callThe green box uses a custom thread factory, which obviously has more additional information than the thread name created by the default thread factory in the blue box: the source of the call, the business meaning of the thread, which helps to quickly locate deadlocks, StackoverflowErrors, and so on.
Quick declaration methods for thread pools provided by the Executors class are simple but hide the details of thread pool parameters. Therefore, when using a thread pool, you must configure a reasonable number of threads, task queues, rejection policies, and thread reclaim policies based on scenarios and requirements, and name threads explicitly to facilitate troubleshooting.
5 Create a thread pool
Start with the ThreadPoolExecutor constructor and learn how to customize ThreadFactory and RejectedExecutionHandler. And write a simple thread pool example. Then, the execute and addWorker core methods of ThreadPoolExecutor are analyzed. Learn how to add task threads to a thread pool to run.
- ThreadPoolExecutor is constructed as follows
- The first argument: corePoolSize indicates the number of resident core threads
If equal to 0, the thread in the thread pool is destroyed when no request comes in after the task is completed; If it is greater than 0, the core thread will not be destroyed even after the local task completes. Setting this value is critical; Setting too much waste of resources; Setting it too small can cause threads to be created or destroyed too frequently.
- The second parameter: maximumPoolSize indicates the maximum number of threads that can be concurrently executed in the thread pool
From point 1, it must be >=1. If the number of threads to execute is greater than this value, it needs to be cached in the queue with the help of parameter 5. If maximumPoolSize = corePoolSize, it is a fixed-size thread pool.
- Parameter 3: keepAliveTime indicates the idle time in the thread pool
When the idle time reaches keepAliveTime, threads are destroyed until there are only corePoolSize threads left. Avoid wasting memory and handle resources. By default,keepAliveTime takes effect when the number of threads in the thread pool is greater than corePoolSize. However, when allowCoreThreadTimeOut = true of ThreadPoolExecutor, the core thread will also be reclaimed if it times out.
- Parameter 4: TimeUnit indicates the TimeUnit
The unit of keepAliveTime is usually timeunit.seconds.
- Parameter 5: workQueue Represents the cache queue
When the number of threads requested is greater than maximumPoolSize, the thread enters BlockingQueue. The LinkedBlockingQueue used in the subsequent sample code is a one-way linked list that uses locks to control atomicity in and out of queues; The two locks, which control the addition and acquisition of elements, are a production and consumption model queue.
- Parameter 6: threadFactory represents a threadFactory
It is used to produce a set of threads for the same task; The thread pool is named by adding a group name prefix to the factory. When the virtual machine stack is analyzed, it is possible to know which thread factory the thread tasks are generated by.
- Parameter 7: Handler represents the object on which the rejection policy is executed
This policy allows requests to be processed when the upper limit of the task cache for parameter 5 workQueue is exceeded, which is a simple form of traffic limiting protection. Friendly rejection strategies can be as follows: (1) Save to the database for peak filling; In the idle time to extract out of the execution (2) to a prompt page (3) print a log
2.1.1 corePoolSize(Number of core threads)
The number of primary threads that should remain in the thread pool, even if the thread is idle. When a task is submitted to a thread pool, if the number of threads is less than corePoolSize, the pool creates a new thread to execute the task in Works (a HashSet), even if other free base threads can execute the new task When the number of tasks that need to be executed is greater than the basic size of the thread pool, the thread pool will not be created and will attempt to be added to the workQueue.
prestartAllCoreThreads
Of the call thread poolprestartAllCoreThreads()
The thread pool creates and starts all the core threads ahead of time, leaving them idle for work. This overrides the fact that the core thread is only started when a new task is executedThe default policy Unless it’s set
allowCoreThreadTimeOut
If false (the default), the core thread remains active even when idle. If true, the core thread uses keepAliveTime to wait for work to timeout, and the thread pool also reclaims the core thread when idle.
2.1.2 maximumPoolSize (maximum number of threads in a pool)
Maximum number of threads allowed to be created by a thread pool If the queue is full and the number of threads created is less than the maximum number of threads, the thread pool will create new threads to perform tasks in Works. The key to CashedThreadPool is that a fixed thread pool is invalid. If you use an unbounded task queue, this parameter is useless. Okay
- workQueue
A blocking queue that stores tasks to be executed, which must be Runnable objects (if Callable objects, converted to Runnable objects within Submit)
-
RunnableTaskQueue: A blocking queue that holds tasks waiting to be executed. You can choose from the following blocking queues.
- LinkedBlockingQueue: A blocking queue based on a linked list structure that sorts elements in FIFO and typically has a higher throughput than ArrayBlockingQueue. Static factory methods Executors. NewFixedThreadPool () using the queue
- SynchronousQueue: A blocking queue that does not store elements. Each insert operation must wait until another thread calls to remove operation, otherwise the insert has been in the blocking state, the throughput is usually more than the Linked – BlockingQueue, static factory methods Executors. NewCachedThreadPool using the queue
-
ThreadFactory: Used to set up a factory for creating threads. You can use the ThreadFactory to give each created thread a meaningful name. ThreadFactoryBuilder is provided using the open source framework Guava to quickly set meaningful names for threads in a thread pool
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
Copy the code
- RejectedExecutionHandler (reject policy)
When both the queue and the thread pool are full, the thread pool is saturated and one policy must be adopted to handle the submitted new task. The default AbortPolicy is AbortPolicy, and the table cannot handle the new task
- AbortPolicy: discard task, throw RejectedExecutionException
- CallerRunsPolicy: Run the task only in the caller’s thread, with feedback mechanism to slow the task submission.
- DiscardOldestPolicy
If shutdown does not occur, try to discard the latest task in the queue and execute the current task, discard the oldest task in the task cache queue, and try to resubmit a new task
- DiscardPolicy: does not process, discards, rejects execution, and does not throw exceptions
You can also customize the RejectedExecutionHandler interface based on application scenarios. Tasks that cannot be handled by logging or persistent storage
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. * / final void reject (Runnable command) {/ / handler execution refuses to strategy. RejectedExecution (command. this); }Copy the code
RejectedExecutionHandler has four implementation classes available for direct use in ThreadPoolExecutor. Of course, You can implement your own strategy, but it’s usually not necessary.
// As long as the thread pool is not closed, Public static Class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() {CallerRunsPolicy() {CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { r.run(); }}} / / anyway, direct selling RejectedExecutionException abnormal / / the default policy, if we construct the thread pool does not pass the corresponding handler, Public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() {} /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }} // Do nothing, Public static class implements RejectedExecutionHandler {/** * Creates a {@code DiscardPolicy} */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}} // If the thread pool is not closed // throw away the task at the head of the queue (i.e. the one that has waited the longest), Public Static Class DiscardOldestPolicy implements RejectedExecutionHandler {public discardoldoldestPolicy implements RejectedExecutionHandler DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code
- KeepAliveTime (thread activity hold time)
How long does a thread stay alive when no task is executed? How long does a thread pool stay alive after a worker thread is idle? Therefore, if there are many tasks and the execution time of each task is short, you can adjust the time to improve thread utilization
- TimeUnit: indicates the TimeUnit of the third parameter; The options include DAYS, HOURS, MINUTES, MILLISECONDS, MICROSECONDS and NANOSECONDS.
From the second point in the code, queues, thread factories, and denial processing services must all have instance objects; But in real programming, few programmers instantiate all three, instead providing the default implementation via Executors, a thread pool static factory
Rejection policies
So let’s just do it a little bit more simplyRejectedExecutionHandler
; Implements the interfacerejectedExecution
Method to print out the current thread pool state
inThreadPoolExecutor
Four public internal static classes are provided in theThe defaultDiscard the task and throwRejectedExecutionException
Low DiscardPolicy –Is not recommendedDiscard the task without throwing an exception.
DiscardOldestPolicy
Discard the longest waiting task in the queue and add the current task to the queue.
CallerRunsPolicy
When the queue is full, the task will be executed on the thread that submitted the task or on the thread that called the execute method, which means that the task submitted to the thread pool cannot be considered to be processed asynchronously. With CallerRunsPolicy, it is possible for asynchronous tasks to be executed synchronously, so this rejection policy is special.
- The run() method of the call task executes directly, bypassing the thread pool.
- Based on the thread factory and rejection policy implemented earlier, the code for thread pool is implemented as follows:
When a task is rejected, the rejection policy prints that the current thread pool size has been reachedmaximumPoolSize=2
, and the queue is full, and the number of completed tasks is already 1 (last line).
You may find that the thread pool has an exception because it can’t handle the task, and you don’t want the thread pool to discard the task, so you sometimes choose this rejection policy.
The source code to explain
Bit operations are frequently used in ThreadPoolExecutor attribute definitions to represent thread pool state; Bit operations are an efficient means of changing the current value.
Let’s start with the property definition
Integer has 32 bits;
The number of working threads in the rightmost 29 bits table; The leftmost 3 bits represent the state of the thread pool, and the state of the 8 different numeric thread pools that can be represented from 0 to 7 is represented by the higher 3 bits, including the sign bit. The decimal values of the five states TERMINATED are RUNNING < SHUTDOWN < STOP < TIDYING <TERMINATED so that the state of the thread pool can be determined by comparing the values. For example, isRunning judgment often appears in the program:
- 000-11111111 11111111111111111;
Similar to a subnet mask, used for and operations; I get 3 on the left, 29 on the right
Use the left 3 bits to achieve 5 thread pool states; Add a center line after the left 3 bits to help understand;
- 111-0000000000000000000000000000 (decimal: – 536, 870, 912);
The state table thread pool can accept new tasks
- 000-0000000000000000000000000 (decimal: 0);
This state does not accept new tasks, but can continue to execute tasks in the queue
- 001-00000000000000000000000000 (decimal: 536870, 912);
This state rejects all and interrupts the task in progress
- 010-00000000000000000000000000. (decimal value: 1, 073, 741, 824);
All tasks in the status table have been terminated
- 101-000000000000000000000000000 (decimal value: 1, 610612, 736)
The status chart has cleared the site
With operations, such as tables, 67, 001-000000000000000000000100011 working thread; Invert mask: 111-00000000000000000000000., namely the left three 001; Indicates that the thread pool is currently inSTOPstate
The same mask, 000-11111111111111111111, to get the right 29, namely working threads
Combine the left 3 bits with the right 29 bits or to form a single value
We all know that the Executor interface has one and only one method execute(); The object of the thread to execute is passed in as an argument. Let’s look at ThreadPoolExecutor’s implementation of the execute() method
The thread pool performs tasks as follows
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, We know we are shut down or saturated * and so reject the task. */ / Return an Integer containing the number of threads and the state of the thread pool int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))Copy the code
The execute method has three attempts at addWorker at different stages.
return; C = ctl.get(); c = ctl.get(); } // If the thread pool is in the RUNNING state, the current task will be put into the work queue. If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); // If the thread pool is in the RUNNING state, execute the following sentence: isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) addWorker(null, false); // The core thread and queue are full, try to create a new thread} else if (! AddWorker (command, false)) / / throw RejectedExecutionException if abnormal / / addWorker return is false, that is to create failure, fire up refuse strategy. Reject (command); }Copy the code
There are two reasons for the rejection: (1) the thread pool is in a non-running state, and (2) the wait queue is full.
Let’s continue our analysis of addWorker
AddWorker source code parsing
Based on the current thread pool state, check whether new task threads can be added, and if so, create and start tasks. Return true if everything is fine; The possibilities of returning false are as follows
- The thread pool is not in
RUNNING
state - The thread factory failed to create a new task thread
parameter
- firstTask
The first thread that needs to be constructed when an external thread pool is started, and it is the parent thread
- core
Indicator for adding a worker thread – true Indicates the current statusRUNNING
State has fewer threadscorePoolsize
– false Indicates the current valueRUNNING
State has fewer threadsmaximumPoolsize
This code is difficult to understand, some places even violate the code protocol, but it contains a wealth of coding knowledge
- In the first place, the label appears with the loop statement, similar to the goto effect
When label is defined, the label and colon combination statements must be defined immediately next to each other before the body of the loop, otherwise compilation errors will occur. The goal is to be able to exit quickly to any level when implementing multiple loops; The idea may seem sweet, but in large software projects, the consequences of abusing tag line jumps can be disastrous. The sample code has two infinite loops under Retry; After the workerCount increment succeeds, the two-level loop is immediately exited.
- In point 2, such expressions are unreadable, and should be
- At number 3, echoing the label at number 1,
AtomicInteger
The increment of an object is atomic;
The break Retry table jumps out of the loop adjacent to the retry table
- Number four, here
continue
Jump to the label and continue the loop.
If the condition is false, the thread pool is still running, i.e., continuing for(;) Execute within a loop.
- The fifth place,
compareAndIncrementWorkerCount
The probability of method execution failure is very low.
Even if it fails, the probability of success when executed again is extremely high, similar to the principle of spin. In this case, we increment by 1 and then decrement by 1 if the creation fails, which is a lightweight way of handling concurrent creation threads; If the thread is created first, then the increment is 1, and the thread is destroyed when the limit is exceeded, then this process is significantly more expensive than the former.
- 6,
Worker
Object is the core class implementation of the worker thread, part of the source code as follows
It implementsRunnable
Interface, and input this object as a parameter torun()
In therunWorker (this)
; So the internal property threadthread
instart
Is calledrunWorker
.
conclusion
The relevant source code of thread pool is relatively refined, including the destruction of thread pool, task extraction and consumption, etc. Like thread state diagram, thread pool also has its own independent state transformation process, which will not be expanded in this section. To sum up, pay attention to the following points when using the thread pool: (1) Set various parameters properly, and set a reasonable number of working threads according to actual service scenarios. (2) Thread resources must be provided through thread pools, and explicit creation of threads in the application is not allowed. (3) When creating a thread or thread pool, please specify a meaningful thread name to facilitate backtracking in case of errors.
Create a thread pool by using ThreadPoolExecutor instead of using Executors. By doing so, you can clarify the running rules of the thread pool and avoid resource depletion. These methods are called the ThreadPoolExecutor and ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor inherited from ThreadPoolExecutor constructor
0.2 ThreadPoolExecutor User-defined thread pool
Outside the chain picture archiving failure, the source station might be hotlinking prevention mechanism, proposed to directly upload picture preserved a6ervc8 (img – 5-1570557031390) (uploadfiles.nowcoder.com/images/2019… “image title”)” They are a sort of thread pool, a management framework that controls thread creation, release, and attempts to reuse threads to perform tasks through some sort of policy
, so eventually all the thread pool constructors call the following constructor of ThreadPoolExecutor, introduced after Java5
The thread pool that Java provides by default
Thread pools are the most widely used concurrency framework in Java, and can be used by almost any program that needs to perform tasks asynchronously or concurrently
We simply put the method to be executed into the run method and pass the implementation class of the Runnable interface to the execute method of the thread pool as one of its arguments, such as:
Executor e = Executors.newSingleThreadExecutor();
e.execute(new Runnable(){ Public void run(){
// Tasks to be performed}});Copy the code
Thread Pool principle – Task Execute process
- The flow chart
- Schematic diagram
ThreadPoolExecutor performs execute() in four cases
- If fewer threads are currently running
corePoolSize
Create a new thread to execute the task (this step requires obtaining a global lock) - If the number of running threads is greater than or equal to
corePoolSize
, and the work queue is not full, the newly submitted tasks are stored in the work queue. That is, add the taskBlockingQueue
- If the task cannot be added
BlockingQueue
, and the maximum number of threads in the pool is not reached, then a new thread is created to process the task (this step requires obtaining a global lock) - Creating a new thread will exceed the current running thread
maximumPoolSize
, the task will be rejected and calledRejectedExecutionHandler.rejectedExecution()
The idea is to avoid acquiring global locks as much as possible when executing execute() After ThreadPoolExecutor has warmed up (the number of threads currently running is greater than or equal to corePoolSize), almost all execute() method calls execute Step 2, which does not require a global lock
The instance
- The results of
Source code analysis
/** * Checks if new worker threads can be added based on the current pool state and given boundaries (core or maximum) *. If so, the number of worker threads is adjusted accordingly, and if possible, a new worker thread is created and started * with firstTask as the firstTask it runs. * This method returns false if the pool has stopped * If the Thread factory failed to create a Thread when it was accessed, and also false * If the Thread creation failed, either because the Thread factory returned NULL, or because of an exception (usually OOM after calling Thread.start ())), we rolled back cleantly. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * Check if queue empty only if necessary. ** If the thread pool is closed and one of the following conditions is met, no new worker is created: * 1. Thread pool state is greater than SHUTDOWN, or STOP, TIDYING, or TERMINATED * 2.firstTask! = null * 3.workqueue.isempty () * When the thread pool is SHUTDOWN, it is not allowed to submit tasks, but existing tasks are not allowed to continue executing tasks. * */ if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // If the thread is created successfully, it is ready to create the thread to execute the task. That there are other threads are trying to create a thread in the thread pool the if (compareAndIncrementWorkerCount (c)) break retry. CTL c = ctl.get(); // re-read CTL // normal If CAS fails, go to the next for loop. // If CAS fails, go to the next for loop. If another thread closes the pool, go back to the outer for loop if (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; Retry inner loop}} /* * * At this point, we consider it appropriate to start a thread to execute the task */ // Whether the worker has started Boolean workerStarted = false; Boolean workerAdded = false; Boolean workerAdded = false; Worker w = null; Try {// Pass the firstTask to the worker constructor w = new worker (firstTask); The worker constructor calls ThreadFactory to create a new Thread. Final Thread t = w.htread; if (t ! = null) {final ReentrantLock mainLock = this.mainLock; Mainlock. lock(); mainlock. lock(); mainlock. lock(); mainlock. lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // If it is equal to SHUTDOWN, no new task is accepted. But will continue to perform tasks in the waiting queue if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {/ / worker thread inside the if cannot be started (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); Worker. add(w); // Add worker. add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // if (workerAdded) {// start the thread. workerStarted = true; }}} finally {// If the thread is not started, do some cleaning. If the workCount is increased by 1, subtract it from if (! workerStarted) addWorkerFailed(w); } // Return workerStarted; }Copy the code
Look at theaddWorkFailed
worker
The threadstart
After itrun
Method will callrunWorker
Keep readingrunWorker
// Called after the worker thread starts,while loop (spin!) // Worker initialization, you can specify firstTask, Final void runWorker(Worker w) {Thread wt = thread.currentThread (); // The thread's first task (if any) Runnable task = w.firstTask; w.firstTask = null; // allow interrupt w.nlock (); boolean completedAbruptly = true; Try {// loop getTask to get the task while (task! = null || (task = getTask()) ! = null) { w.lock(); /** * If the thread pool is stopped, make sure the thread is interrupted * If not, make sure the thread is not interrupted * This requires a recheck in the second case, In order to close the interrupt processing shutdownNow competition * / if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// This is a hook method that is left to subclasses that need to implement beforeExecute(wt, task); Throwable thrown = null; Task.run (); task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) {// Error thrown = x; throw new Error(x); } finally {// Is also a hook method that takes tasks and exceptions as arguments, leaving afterExecute(task, thrown) to subclasses that need it; }} finally {// empty task, prepare getTask next task = null; W.com pletedTasks++; // release worker's exclusive lock w.nlock (); } } completedAbruptly = false; } finally {// To this point, the thread needs to execute the closure // 1. GetTask returns null, that is, the worker's mission is over, execute shutdown // 2. // workCount is not handled by the getTask method. // workCount is not handled by the getTask method. So processWorkerExit(w, completedAbruptly) needs to be handled in processWorkerExit; }}Copy the code
Take a look atgetTask()
// There are three possibilities for this method
// 1. Block until the task returns. By default, threads within corePoolSize are not collected and wait for the task
// 2. Exit due to timeout. When keepAliveTime is in effect, that is, if there is no work for that amount of time, it should be closed
// 3. Return null if any of the following conditions occur
// There are more than maximumPoolSize workers in the pool (setMaximumPoolSize)
// the thread pool is SHUTDOWN and the workQueue is empty
// The thread pool is stopped, not only does it not accept new threads, but it also stops executing threads in the workQueue
private Runnable getTask(a) {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// Allow thread reclaiming within the number of core threads, or if the current number of threads exceeds the number of core threads, a timeout shutdown may occur
/ / break here, so as not to perform after an if (compareAndDecrementWorkerCount (c))
If wc > maximumPoolSize, or if a timeout occurs, null is returned
Wc > maximumPoolSize = wc > maximumPoolSize
In other words, returning NULL means closing the thread.
// It is possible that the developer called setMaximumPoolSize to reduce the maximumPoolSize of the thread pool
// If the worker is interrupted, retry is the solution
// To explain why the interrupt occurred, the reader should look at the setMaximumPoolSize method,
// If the developer sets maximumPoolSize smaller than the current number of workers,
// This means that the extra threads will be closed. Re-enter the for loop and some threads will naturally return NULL
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// CAS operation to reduce the number of worker threads
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// If the worker is interrupted, retry is the solution
// To explain why the interrupt occurred, the reader should look at the setMaximumPoolSize method,
// If the developer sets maximumPoolSize smaller than the current number of workers,
// This means that the extra threads will be closed. Re-enter the for loop and some threads will naturally return NULL
timedOut = false; }}}Copy the code
Now that I’ve basically covered the process, let’s go back to the execute(Runnable Command) method and look at the branches. Let me post the code here:
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, We know we are shut down or saturated * and so reject the task. */ / int c = ctl.get(); // If the current number of threads is less than the number of core threads, directly add a worker to perform the task, // create a new thread, If (workerCountOf(c) < corePoolSize) {// Add the task successfully, i.e. If (addWorker(command, true)) return; if (addWorker(command, true)) return; c = ctl.get(); } // If the thread pool is RUNNING, If (isRunning(c) &&workqueue.offer (command)) {/* If (isRunning(c) &&workqueue.offer (command)) {/* If (isRunning(c) &&workqueue.offer (command)) { Do we need to start a new thread * The number of threads in [0, corePoolSize) is unconditional to start a new thread * If the number of threads is greater than or equal to corePoolSize, add the task to the queue, */ int recheck = ctl.get(); If (! IsRunning (recheck) && remove(command)) reject(command); if (! IsRunning (recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) addWorker(null, false);} // Create a new worker with maximumPoolSize bound. // If this fails, the current number of threads has reached maximumPoolSize. Else if (! AddWorker (command, false)) reject(command);}Copy the code
Worker thread: When the thread pool creates a thread, it will encapsulate the thread as Worker thread. After the Worker completes the task, it will cycle to obtain the task in the work queue for execution. We can see this in the run() method of the Worker class
public void run() { try { Runnable task = firstTask; firstTask = null; while (task ! = null || (task = getTask()) ! = null) { runTask(task); task = null; } } finally { workerDone(this); } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code
Threads in a thread pool perform tasks in two ways
- When you create a thread in the execute() method, you let the thread execute the current task
- After the thread completes task 1 in the figure above, it repeatedly fetches tasks from BlockingQueue to execute
Use of thread pools
Submit tasks to the thread pool
There are two ways to submit a task to a thread pool
execute()
Used to submit tasks that do not require a return value, so it is impossible to determine whether the task was successfully executed by the thread pool. The following code shows that the task entered by the execute() method is an instance of the Runnable class.
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
Copy the code
As you can see from the run results, the threads in the single-thread pool execute sequentially. In a fixed thread pool (parameter 2), there are always at most two threads executing concurrently. In the cache thread pool, all threads execute concurrently. The second example tests a single-thread scheduled thread pool and a fixed-thread scheduled thread pool.
public class ScheduledThreadPoolExam {
public static void main(String[] args) {
//first test for singleThreadScheduledPool
ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
//second test for scheduledThreadPool
// ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
for (int i = 0; i < 5; i++) {
scheduledPool.schedule(new TaskInScheduledPool(i), 0, TimeUnit.SECONDS); } scheduledPool.shutdown(); }}class TaskInScheduledPool implements Runnable {
private final int id;
TaskInScheduledPool(int id) {
this.id = id;
}
@Override
public void run(a) {
try {
for (int i = 0; i < 5; i++) {
System.out.println("TaskInScheduledPool-["+id+"] is running phase-"+i);
TimeUnit.SECONDS.sleep(1);
}
System.out.println("TaskInScheduledPool-["+id+"] is over");
} catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
As you can see from the run results, single-scheduled thread pools are similar to single-thread pools, and fixed-scheduled thread pools are similar to fixed thread pools. Conclusion:
- If there are no specific requirements, using a cache thread pool is always appropriate;
- If only one thread can run, use a single thread pool.
- If scheduled tasks are to run, either a scheduled thread pool or a single-thread scheduled thread pool is used as needed
- If you have other special requirements, you can use the constructor of the ThreadPoolExecutor class directly to create a thread pool and give yourself those five parameters.
submit()
Used to submit tasks that require a return value. The thread pool returns an object of type Future that determines whether the task was successfully executed and gets the return value from GET (). Get () blocks the current thread until the task is complete, instead of using GET (long timeout, The TimeUnit unit method blocks the current thread for a while and then returns immediately, perhaps without completing the task.
Future<Object> future = executor.submit(harReturnValuetask); try { Object s = future.get(); } Catch (InterruptedException e) {// Handle an interrupt exception} Catch (ExecutionException e) {// Handle an exception that cannot execute the task} finally {// close the thread pool executor.shutdown(); }Copy the code
Closing the thread pool
The thread pool can be shutdown by calling the shutdown or shutdownNow methods of the thread pool. They work by iterating through worker threads in a thread pool and then interrupting them by calling the thread_by-thread_interrupt method, so tasks that cannot respond to interrupts may never be terminated. But there are certain differences
- ShutdownNow first sets the state of the thread pool to STOP, then attempts to STOP all threads executing or suspending tasks and returns a list of tasks awaiting execution
- Shutdown simply sets the state of the thread pool to shutdown and interrupts all threads that are not executing.
The isShutdown method returns true whenever either of the two shutdown methods is called. The thread pool is closed successfully when all tasks are closed, and calling isTerminaed returns true. Which method should be called depends on the nature of the task submitted to the thread pool. The shutdown method is usually called to shutdown the thread pool, or the shutdownNow method can be called if the task does not have to complete.
The reasonable configuration
To properly configure thread pools, you must first
Analyzing task characteristics
It can be analyzed from the following perspectives
- The nature of the task: CPU intensive task, IO intensive task, and hybrid task
- Task priority: high, medium and low
- Task execution time: long, medium and short
- Task dependencies: Whether they depend on other system resources, such as database connections.
Tasks of a different nature
It can be handled separately with thread pools of different sizes
CPU intensive tasks (computational tasks)
For computing tasks with high throughput, the number of threads should be small, which can be N(number of CPU cores)+1 or N(number of CPU cores) * 2, because the threads must be scheduled to a certain CPU for execution. If the task itself is cpu-bound, too many threads will only increase the overhead of thread switching, but not the throughput. However, longer queues may be required for buffering.
I/O intensive tasks
For slower, smaller I/O tasks, consider more threads rather than large queues. More threads are required than computational tasks, depending on the specific I/O blocking duration
For example, the default maximum number of threads in Tomcat is 200.
You can also consider automatically increasing or decreasing the number of threads flexibly between a minimum number and a maximum number as needed.
N(CPU)/1 – Blocking coefficient (0.8 to 0.9) Generally, the CPU usage in the production environment is 80, indicating that the CPU is fully used.
Hybrid tasks
If it can be split into one CPU intensive task and one IO intensive task, the throughput of the split execution will be higher than that of the serial execution as long as the time difference between the two tasks is not too great. If the execution time of the two tasks is too different, there is no need to break them down.
You can use runtime.getruntime ().availableProcessors() to get the number of cpus on the current device.
Tasks with different priorities can be handled using PriorityBlockingQueue. It allows high-priority tasks to be executed first.
Note that if higher priority tasks are consistently submitted to the queue, lower priority tasks may never be executed
Tasks with different execution times can be assigned to thread pools of different sizes, or priority queues can be used to allow shorter tasks to be executed first.
Tasks that depend on the database connection pool, because the longer threads wait for the database to return results after submitting SQL, the longer the CPU is idle, the larger the number of threads should be set to better utilize the CPU.
It is recommended to use bounded queues. Bounded queues can increase the stability and warning capability of the system, and can be set as large as necessary, such as thousands. If the system background task queue of the thread pool and thread pool is full, and abandon task is unusual, is found by screening database problems, lead to execute SQL becomes very slowly, because background tasks all the tasks in the thread pool is need to the database query and insert the data, so the cause of the work in the thread pool threads blocked, back-up in the thread The pond. If we set it to unbounded queues, the thread pool would become more and more queued, potentially filling up memory and rendering the entire system unusable, not just background tasks.
Reusing a thread pool does not mean that your application will always use the same thread pool; you should choose different thread pools depending on the nature of the task. Pay special attention to the preference of thread pool attributes for IO bound tasks and CPU bound tasks. If you want to reduce interference between tasks, consider using isolated thread pools on demand.
2.5 Thread pool monitoring
If a large number of thread pools are used in the system, it is necessary to monitor the thread pool so that problems can be quickly located based on the usage of the thread pool. The thread pool can be monitored using parameters provided by the thread pool. The following properties can be used to monitor the thread pool:
- TaskCount: Indicates the number of tasks that need to be executed by the thread pool
- CompletedTaskCount: The number of tasks completed by the thread pool during the run, less than or equal to taskCount.
- LargestPoolSize: maximum number of threads that have ever been created in the thread pool. This data tells you if the thread pool has ever been full. If the value is equal to the maximum size of the thread pool, it indicates that the thread pool has been full.
- GetPoolSize: specifies the number of threads in the thread pool. Threads in the thread pool are not automatically destroyed if the thread pool is not destroyed, so the size only increases.
- GetActiveCount: Gets the number of active threads.
Monitor by extending the thread pool. You can customize the thread pool by inheriting the thread pool, overriding the beforeExecute, afterExecute, and terminated methods of the thread pool, or you can perform some code to monitor the task before, after, and before the pool is closed. For example, monitor the average, maximum, and minimum execution time of a task. These methods are empty methods in the thread pool.
protected void beforeExecute(Thread t, Runnable r) {}Copy the code
Thread pool, as the core component of the application program, is often not monitored until the program crashes.
2.6 Status of the thread pool
After the thread pool is created, the initial state is running. 2. After the shutdown method is invoked, the thread pool is in the shutdown state. When the ShutdownNow method is called, it enters the stop state, does not accept new tasks, and attempts are made to terminate ongoing tasks. 4. In the shotdown or stop state, all worker threads have been destroyed, the task cache queue has been emptied, and the thread pool is set to terminated.
conclusion
What are the key attributes of a thread pool?
- WorkQueue is used to hold tasks. When adding tasks, if the current number of threads exceeds corePoolSize, the task is inserted into the queue, and the threads in the thread pool are responsible for pulling the task from the queue.
- KeepAliveTime is used to set the idle time. If the number of threads exceeds corePoolSize and some threads exceed this idle time, they are shut down
- RejectedExecutionHandler used to handle the case when a thread pool can’t perform this task, the default has thrown RejectedExecutionException anomaly, ignore the task and submit task threads to use to perform this task and will delete the queue waiting for the longest tasks, The task is then committed to these four policies, which by default throw an exception.
When is a thread created in the thread pool?
- If the current number of threads is less than corePoolSize, a new thread is created when the task is submitted and this thread executes the task.
- If the current number of threads has reached corePoolSize, the submitted task is added to the queue and the thread in the thread pool is waiting to fetch the task from the queue.
- If the queue is full, a new thread is created to execute the task, ensuring that the number of threads in the pool does not exceed maximumPoolSize. If the number of threads in the pool exceeds maximumPoolSize, a reject policy is implemented.
What can I do if an exception occurs during task execution?
If a task execution fails, the thread executing the task is closed, rather than continuing to receive other tasks. A new thread is then started to replace it.
When will the rejection policy be implemented?
- If the number of workers reaches corePoolSize, the task is enqueued successfully, and the thread pool is closed at the same time, and closing the thread pool does not dequeue the task, then the rejection policy is implemented. This is a very borderline problem, queuing and closing the pool to execute simultaneously, and the reader takes a closer look at how the execute method goes into the first reject(command).
- If the number of workers is greater than or equal to corePoolSize and the queue is full, the task fails to join the queue. If the queue is full and the task fails to join the queue, the new thread is ready to start. If the number of threads reaches maximumPoolSize, the reject policy is executed.
- Too many thread pools cause OOM because there are too many active threads and the thread pool will not be reclaimed
- The Java Stream Api asynchronous streaming uses a default Forkjion thread pool, which should be used with caution
- When creating a thread pool, determine whether the task is an I/O resource or a CPU resource
- If the IO resource type or task is executed for a long time and the reject policy is Call, it will be handed over to the caller thread for execution after the thread pool is full. If the Web service is running in Tomcat, the overall throughput will decrease
reference
- Code Efficient
- The Art of Concurrent Programming in Java