Assume that the thread pool is configured with 2 core threads, 2 maximum threads, and 2 buffer queue length. At this point, there are five asynchronous tasks starting at the same time, what happens?
The scene again
Step 1: Create the Spring Boot application, assuming the thread pool configuration is written.
@EnableAsync
@SpringBootApplication
public class Chapter78Application {
public static void main(String[] args) {
SpringApplication.run(Chapter78Application.class, args);
}
@EnableAsync
@Configuration
class TaskPoolConfig {
@Bean
public Executor taskExecutor1(a) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(2);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("executor-1-");
returnexecutor; }}}Copy the code
Step 2: Implement a partial task with the @async annotation
@Slf4j
@Component
public class AsyncTasks {
public static Random random = new Random();
@Async("taskExecutor1")
public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
log.info("Start task: {}", taskNo);
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("Complete task: {}, time: {} ms", taskNo, end - start);
return CompletableFuture.completedFuture("Mission accomplished."); }}Copy the code
Step 3: Write test cases
@Slf4j
@SpringBootTest
public class Chapter78ApplicationTests {
@Autowired
private AsyncTasks asyncTasks;
@Test
public void test2(a) throws Exception {
Queue =2,queue=2,queue=2,queue=2,queue=2
// org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,
// active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9
long start = System.currentTimeMillis();
Thread pool 1
CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");
// Execute together
CompletableFuture.allOf(task1, task2, task3, task4, task5).join();
long end = System.currentTimeMillis();
log.info("All tasks completed, total time:" + (end - start) + "毫秒"); }}Copy the code
Execute this to generate log information like the following:
Click to view log
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324) at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604) at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830) at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274) at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>) at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75) at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Copy the code
Copy the code
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture
AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
... 74 more
From the exception information org. Springframework.. The core task. TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, Queued tasks = 2, completed tasks = 0]] did not accept task: queued tasks = 2, completed tasks = 0]] did not accept task:
So, by default, the thread pool rejection policy is: when the thread pool queue is full, the task is discarded and an exception is thrown.
Configuring a Denial Policy
The thread pool has the default refusal strategies, but the actual development process, some business scenarios, often do not apply directly refuse strategy, sometimes we may choose to give away the earliest start and unfinished tasks, beginning of may also choose to give up the unfinished tasks such as closer to the strategy of business needs. So, it is a common requirement to configure other rejection policies or custom rejection policies for thread pools. How do you implement this?
Let’s talk about how to configure and customize the reject policy for the thread pool.
See below the last line of this code, setRejectedExecutionHandler method is set to refuse strategy for thread pool:
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Other thread pool configurations
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
Copy the code
In ThreadPoolExecutor there are four threads that can be used directly by the developer. You just need to set them up like this:
/ / AbortPolicy strategy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
/ / DiscardPolicy strategy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
/ / DiscardOldestPolicy strategy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
/ / CallerRunsPolicy strategy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Copy the code
The meanings of the four policies are as follows:
- Queue AbortPolicy strategy: the default policy, if the thread pool is full of lost this task and throw RejectedExecutionException anomalies.
- DiscardPolicy: If the thread pool queue is full, the task is discarded without exception.
- DiscardOldestPolicy: If the queue is full, the system deletes the earliest entered task to make room and attempts to join the queue again.
- CallerRunsPolicy policy: If adding to the thread pool fails, the main thread will perform the task itself and will not wait for threads in the thread pool to do so.
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// Reject the logic of the policy}});Copy the code
executor.setRejectedExecutionHandler((r, executor1) -> {// Reject policy logic});
Copy the code
From: https://blog.didispace.com/spring-boot-learning-2-7-8/