preface
Java’s thread pool is best known as the ExecutorService interface, an API added to the java.util.concurrent package in jdk1.5 that greatly simplifies multithreaded code development.
Whether you use FixedThreadPool or CachedThreadPool the implementation behind it is ThreadPoolExecutor.
ThreadPoolExecutor is the product of a typical cache pooling design, because pools have sizes, and rejection policies are involved when the pool is not large enough to hold.
There are already four preset thread pool rejection policies in the JDK. Here’s a look at how these policies are used and what other rejection policies can be extended.
Pooling design idea
Pool design should not be a new term. We commonly such as Java thread pool, JDBC connection pool, Redis connection pool is the representative implementation of this kind of design.
The problem is to offset the cost of each resource acquisition, such as the cost of creating a thread, the cost of acquiring a remote connection, etc.
Just like you go to the canteen to make a meal, the old woman who makes a meal will first fill several portions of the meal and put them there, and you will directly take the lunch box to add food, without temporarily filling rice and making dishes, and the efficiency is high.
In addition to initializing resources, pooling design includes characteristics such as the initial value of the pool, the active value of the pool, the maximum value of the pool, and so on, which can be mapped directly to the Java thread pool and database connection pool member properties.
When the thread pool triggers the reject policy
Unlike data source connection pools, thread pools have an additional blocking queue to buffer in addition to their initial size and pool maximum.
When the number of connection requests from the data source connection pool exceeds the maximum value of the connection pool, the denial policy is triggered. The denial policy usually blocks waiting for the set time or throws exceptions directly.
The trigger time of the thread pool is shown below:
As shown in the figure, to understand when the thread pool triggers the reject rough, it is necessary to clarify the specific meaning of the above three parameters, which is the result of the overall coordination of these three parameters, rather than simply triggering the reject rough when the maximum number of threads exceeds.
When the number of submitted tasks is greater than corePoolSize, the queue buffer will be prioritized. Only after the buffer is filled, the queue will determine whether the current running tasks are larger than maxPoolSize. If the number is smaller than maxPoolSize, a new thread will process the tasks. Greater than that triggers the rejection policy.
If the current number of submitted tasks is greater than (maxPoolSize + queueCapacity), the thread pool rejection policy is triggered.
There are four thread pool rejection policies built into the JDK
Reject the policy interface definition
The JDK defines a thread pool denial policy interface, as follows:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code
When the reject policy is triggered, the thread pool will invoke the specified policy you set, passing the current submitted task and the thread pool instance itself to you for processing.
CallerRunsPolicy (CallerRun Policy)
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(! e.isShutdown()) { r.run(); }}}Copy the code
Function: When the reject policy is triggered, it is handled by the current thread submitting the task as long as the thread pool is not closed.
Usage scenarios: generally not allowed in failure, the performance requirement is not high, concurrent scenarios using small amount, usually because the thread pool will not shut down, which is submitted by the task will be run, but because it is the caller thread its execution, when repeatedly submitting, can block the subsequent task execution, performance and efficiency naturally slow.
AbortPolicy (AbortPolicy)
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from "+ e.toString()); }}Copy the code
function
When the reject policy is triggered, the exception that refuses to execute is thrown directly. The abort policy means that the current execution process is interrupted
Usage scenarios
There is no special case for this, but it is important to handle the exception thrown correctly. The default policy for ThreadPoolExecutor is AbortPolicy. The ExecutorService interface family of ThreadPoolExecutors does not display a rejection policy, so it does by default.
Note, however, that the thread pool instance queues in ExecutorService are unbounded, meaning that overflowing memory will not trigger a rejection policy. When customizing your own thread pool instance, use this policy to handle exceptions that are thrown when the policy is triggered, because it interrupts the current execution process.
DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
Copy the code
function
Drop the quest silently without triggering any action
Usage scenarios
If the task you submit is unimportant, you can use it. Because it’s an empty realization, a task that will silently devour you. So that strategy is basically gone
DiscardOldestPolicy discarding old policy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code
function
If the thread pool is not closed, pop the element in the queue header and try to execute
Usage scenarios
This strategy still drops the task, again silently, but typically drops the old unexecuted task, and the high priority task to be executed.
Based on this feature, the scenario I can think of is publishing a message, modifying a message, and then when the message is published and not yet executed, the updated message comes back, and the version of the message that is not executed is lower than the version of the message that is now committed and can be discarded.
Because there may be messages of lower versions queued, it is important to compare the versions of messages before actually processing them
Rejection policies implemented by third parties
Thread rejection policy in Dubbo
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
// Omit the implementation}}Copy the code
As you can see, there are three main things that dubbo’s worker does when it triggers a thread-rejection policy. The principle is to make it as clear to the user as possible why the thread-rejection policy is triggered
- A warning level log is output with details of the thread pool setting parameters, the current state of the thread pool, and some details of the current rejected task. It is safe to say that this log, which is seen more or less by anyone with production operations experience using Dubbo, is a good example of log printing. Other examples of log printing are spring. Thanks to such detailed logs, it is easy to locate problems
- If you can’t locate a problem with the current thread stack, the dump thread context can be a lifeline. See dubbo thread pool exhaustion event -“CyclicBarrier”
- The task fails by continuing to throw a reject exception, which inherits the JDK’s default reject policy feature
Thread pool rejection policy in Netty
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e); }}}Copy the code
The implementation in Netty is much like the CallerRunsPolicy in the JDK, reluctant to drop tasks. The difference is that the CallerRunsPolicy is a task that is executed directly in the caller thread.
Netty creates a new thread to handle it. As a result, the implementation of Netty can be extended to support high performance scenarios with respect to the use of caller execution policies.
Note, however, that the implementation of Netty does not make any judgment constraints on the creation of threads, that is, as long as the system has resources to create a new thread to handle, until the new thread is not new, will throw thread creation failure exception
Thread pool rejection policy in activeMq
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); }});Copy the code
The policy in activeMq belongs to the maximum effort execution type. When the rejection policy is triggered, the second effort will last for one minute. Re-queue the task and throw an exception if the one-minute timeout is not successful
Pinpoint thread pool rejection policy
public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {
private final RejectedExecutionHandler[] handlerChain;
public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {
Objects.requireNonNull(chain, "handlerChain must not be null");
RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);
return new RejectedExecutionHandlerChain(handlerChain);
}
private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {
this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
for(RejectedExecutionHandler rejectedExecutionHandler : handlerChain) { rejectedExecutionHandler.rejectedExecution(r, executor); }}}Copy the code
The e rejection strategy has a lot of characteristics, and other implementation are different. He defines a rejection policy chain and wraps a list of rejection policies. When a rejection policy is triggered, rejectedExecution in the policy chain will be executed one by one
conclusion
This paper introduces the definition of the Java thread pool rejection policy interface from the thread pool design idea and the time when the thread pool triggers the rejection policy.
In addition, four kinds of denial policy definitions in JDK and four third-party open source software are introduced to describe various ideas and application scenarios of thread pool rejection policy implementation.
Hopefully this article will give you a better understanding of the Java thread pool rejection policy and enable you to apply it flexibly according to different usage scenarios.
From: my.oschina.net/keking/blog…
Give a [look], is the biggest support for IT elder brotherCopy the code