This is the fifth day of my participation in the August More text Challenge. For details, see: August More Text Challenge
Event, one of the means to achieve the end of the lotus root
Recently, there was an online fault of the project, which was caused by the event. It is necessary to sort out the knowledge points of the event
The event is the introduction, behind the thread pool knowledge
Interestingly enough, during the interview, the interviewer asked the thread pool question several times, indicating that the company must have had a thread pool accident, or everyone has a high level of awareness of thread pools, otherwise the first problem they encountered was caused by thread pools
You can see how far the interview gap is
Now the company is using the Spring bucket, so the technical aspects will be in Spring Boot, this aspect I am also a novice, learning and summary and advance together, new and old knowledge coherent
This article covers several points
- Event Implementation Principle
- Events are used in several ways
- Asynchronous events
- Beginning and end of the fault
The principle of
The event implementation principle is actually an observer pattern, so there’s nothing to say, right
implementation
The implementation here is based on Spring Boot, and the convention is more than the configuration, so in The Spring Boot project, the configuration file is greatly reduced, compared with the previous writing a lot of code, configuration file is really not used to, I feel hard to get used to life, is really not used to, I feel uneasy
To reach an event, you need three things: event, Publisher, and Listener
In Spring Boot, a few annotations will do
Annotation way
The most common and common events
The event
A simple class
@Data
public class DemoEvent {
public DemoEvent(String data){
this.eventData = data;
}
private String eventData;
}
Copy the code
The listener
A simple bean, with an @eventListener, and a method argument that is the event that you want to listen for
@Component @Slf4j public class DemoEventListener { @EventListener public void demoEventListener(DemoEvent demoEvent){ log.info("demoevent listener,eventValue:{},order=2",demoEvent.getEventData()); }}Copy the code
The publisher
Spring has a ready-made ApplicationEventPublisher ApplicationContext is implementer, so as long as it is the spring container, will have a context
Implementing the listening interface
The publisher is unchanged, and the event and listener are changed
The listener
Implement the ApplicationListener interface
@Component
@Slf4j
public class DemoEventApplictionListener implements ApplicationListener<AppEvent> {
@Override
//@Async
@Order(3)
public void onApplicationEvent(AppEvent demoEvent) {
log.info("listener value:{},order=3",demoEvent.getEventData());
}
}
Copy the code
The event
Because ApplicationListener is used, the event needs to be a subclass of ApplicationEvent
@Data public class AppEvent extends org.springframework.context.ApplicationEvent { private String eventData; public AppEvent(String source) { super(source); this.eventData = source; }}Copy the code
There are order listeners
An event can have many listeners, and these listeners need to be executed in order of execution
At this point, the listener realize SmartApplicationListener, this interface has getOrder ()
@Component
@Slf4j
public class DemoEventSmartApplicationListener implements SmartApplicationListener {
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {
return aClass == AppEvent.class;
}
@Override
public boolean supportsSourceType(Class<?> aClass) {
return true;
}
@Override
public int getOrder() {
return 1;
}
@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
AppEvent event = (AppEvent)applicationEvent;
log.info("smartapplicationListener,value:{},order=1",event.getEventData());
}
}
Copy the code
There’s also the @Order annotation, which is the same thing, but you don’t have to implement this interface
Asynchronous events
Previous implementations have been synchronous, and although uncoupling, performance has not improved much
Asynchronous events, on the other hand, are sometimes used in order not to drag down the main process
To be asynchronous, use two annotations
Use @enableAsync on the entry and @async on the listening method
Asynchronous thread
To use asynchrony, there must be a thread pool. Thread pools are not configured at this point, so what is the default thread pool
For spring asynchronous implementation principle of the boot AsyncExecutionAspectSupport class, the implementation method:
Get Executor
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
Copy the code
Qualifier is not configured and the default thread pool is taken
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory ! = null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; }Copy the code
The default thread pool in TaskExecutionAutoConfiguration configuration
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
Copy the code
springboot1
In SpringBooT1, there is actually no default thread pool, it comes after SpringBoot2, see releas-Notes
Task Execution
Spring Boot now provides auto-configuration for ThreadPoolTaskExecutor. If you are using @EnableAsync, your custom TaskExecutor can be removed in favor of customizations available from the spring.task.execution namespace. Custom ThreadPoolTaskExecutor can be easily created using TaskExecutorBuilder.
So which thread pool is used by default in SpringBooT1?
In AsyncExecutionInterceptor
@Override @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor ! = null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }Copy the code
As you can see, SimpleAsyncTaskExecutor is SimpleAsyncTaskExecutor if no thread pool is configured
This class comment makes it very clear
{@link TaskExecutor} implementation that fires up a new Thread for each task,executing it asynchronously.
protected void doExecute(Runnable task) { Thread thread = (this.threadFactory ! = null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); }Copy the code
When there is a task, it is a new thread
The thread pool
The event itself doesn’t have much to say, but the thread pool is a bit of a refresher, and it will be the thread pool that matters in the analysis
From the first impression of understanding the thread pool, we think that the thread pool is actually a consumer producer pattern for the combination of threads and tasks
But what if, in an interview, the interviewer asks how do you understand thread pools? I also won’t answer like this, this involves two people to the resonance range of knowledge, it is best to talk about some of the biggest point of consensus, the advantage is not to step on the pit, the disadvantage is ordinary
Starting with JDK5, the JDK introduced the Concurrent package, better known as JUC, of which ThreadPoolExecutor is the most common
General knowledge, thread pool several important parameters
- Number of core threads: corePoolSize
- Maximum number of threads: maxPoolSize
- Thread space duration: keepAliveTime
- Core thread timeout: allowCoreThreadTimeout. The default value is false. If true, there will be no threads in the pool when idle
- Task queue: workQueue
- Reject handler: rejectedExecutionHandler
Built-in rejection policy
- AbortPolicy: the default policy, discarding the task, throw RejectedExecutionException anomalies
- DiscardPolicy: discards tasks but does not throw exceptions
- DiscardOldestPolicy: Discards the task at the top of the queue and tries to execute the task again
- CallerRunsPolicy: The calling thread handles the task
There are a lot of data on the Internet. Submit tasks to the thread pool. The general process is:
- When the number of threads in the thread pool is less than the corePoolSize, the thread is created and the request is processed
- When the number of threads in the pool is greater than or equal to the corePoolSize, the request is placed in the workQueue. As the core threads in the pool continue to execute tasks, as long as there are idle core threads in the pool, the pool takes tasks from the workQueue and processes them
- When the taskQueue is full and cannot accommodate new tasks, new non-core threads are pooled and processed until the number of threads reaches maximumPoolSize.
- If the number of threads in the thread pool is greater than maximumPoolSize, then RejectedExecutionHandler is used to RejectedExecutionHandler
Parameter setting
In the daily use process, the need to pay attention to the parameter value setting, then how to set?
Is the more threads, the better?
This is obviously not, for example, a single-core CPU, it doesn’t make sense to have tens of thousands of threads; And too many threads, context switch, but reduce performance
Which brings up the question, can a single core be multithreaded?
A process is the basic unit of resources, and a thread is the basic unit of CPU scheduling
Multithreading is useless if one thread is always using the CPU, but there are waiting times, such as IO, where multithreading is useful
The number of threads is small, and the display does not maximize CPU performance
2. Is larger queue capacity better?
Obviously not, ordinary people are concerned about the number of threads set, but few people ask about the size of the queue, if the queue is too large, a considerable backlog of tasks, inevitably lead to a long response time
If the queue is too small, or even none at all, then the task is not buffered and can cause the thread to expand rapidly
Thread count and queue size need to be used together. What is a reasonable parameter?
The most direct way is to simulate the online request for pressure measurement. As the request volume increases, QPS increases. When the request volume reaches a certain threshold, QPS does not increase, or the increase is not obvious, while the response time of the request increases significantly. This threshold is what we consider to be the optimal number of threads
There are many theoretical reasons for how many threads there are
Task type
Set by task type, which is often used as a rough estimate
- IO intensive
- cpu-intensive
If it is IO intensive, the number of cpus x 2. I/O intensive CPU usage is low. You can make full use of the CPU to process other tasks while waiting for I/O
If CPU intensive, the number of cpus +1. The CPU usage is high. If multiple threads are enabled, the number of thread context switches increases, resulting in additional overhead
The formula
The formulas for calculating this have been around for a long time
Java Concurrency in Practice
In this book the author gives a definition
For example, if the average CPU running time per thread is 0.5s, the thread wait time (non-CPU running time, such as IO) is 1.5s, and the number of CPU cores is 8, then the above formula is estimated as: 8 * (1+(1.5/0.5)=32
Programming Concurrency on the JVM Mastering
Number of threads =Ncpu/ (1- blocking factor) : where the computation-intensive blocking factor is 0, and the IO intensive blocking factor is close to 1:
If you use the first formula for this example: 2 * (1+(0.9/0.1)) =20, the result is also 20
Think of it this way: The two formulas are different, but they’re the same
Ncpu/ (1 – blocking factor) =Ncpu*(1+ W /c)
Then the blocking coefficient = W /(w+c), the blocking coefficient = blocking time /(blocking time + calculation time)
Use the second formula to calculate the example in the first formula: 8 /(1 -(1.5/(1.5+0.5))) = 32
So the two formulas are going to be the same
Calculate the classification method of task type by formula:
- IO type: W/C ≈1, the formula is 2* CPU
- CPU type: W/C ≈0, the formula is calculated as 1* CPU
Now that we have the formula, how do we determine the values of the variables in the formula?
The most primitive way to print logs, waiting for CPU time is nothing more than DB, IO, network calls, before sending packets, after sending packets on the log, calculate the time difference, and calculate the average
3. Other formulas
Set the number of threads = target QPS/(1/ actual task processing time) = QPS* each task processing time
(Core threads are based on average QPS; The maximum thread is determined by the highest QPS.
For example, assume that the target QPS is 100, the actual processing time of the task is 0.2s, and 100 * 0.2 = 20 threads. The 20 threads must correspond to 20 physical CPU cores, otherwise the estimated QPS indicator cannot be achieved
Queue size = Number of threads * (maximum response time/actual task processing time)
Assuming the target maximum response time is 0.4s, calculate the length of the blocking queue as 20 * (0.4/0.2) = 40
This formula is a little hard to understand, but the original formula would have been
Number of threads/actual processing time of tasks * Response time = queue size
Similarly, velocity times time is equal to length
QPS is the number of tasks to be processed per second. If a thread is processed for 1s, then the number of QPS is the number of threads to be processed. If a thread is processed for 0.2s, then the number of threads to be processed for 1s is only QPS /5. Which is the formula QPS /(1/ task processing time)
Fault description
With that in mind, let’s review the entire failure process
The cause of
Springboot1 is used in the project. In view of the above thread implementation, my colleagues think that every asynchronous event is a new thread, which is very unreasonable. Needless to say, it is very inappropriate to put the thread pool instead of using the original new thread. So you need to optimize and use thread pools
process
private Integer corePoolSize = 8;
private Integer maxPoolSize = 40;
private Integer queueCapacity = 200;
Copy the code
Set the thread pool parameter, because the machine is 4 cores, so the CPU *2, analysis that the request is not large, so look at the configuration is not too large
The appearance
After the release of the version, found the problem, found that the online business data is wrong, I feel that all requests failed.
According to the log, the request comes in normally, but the asynchronous event service is not executed normally
Analysis of the
According to the results of the whole event, the event dealt with a lot of problems, list some, for the future
1. The global configuration cannot be moved lightly
Today’s system is a legacy system with complex business and interlaced code. Even a normal and robust system can not be changed at will, what’s more, it is not stable now, and there is no clear root node, it can not be moved
The movement must not only have a theoretical basis, but also with the practical operation, the stress test is indispensable
You must also leave a backhand, such as a switch, so that the sub-thread pool does not have a rejection policy configured
2, the first priority of troubleshooting
It is inevitable that a fault occurs after the release. However, once a fault occurs, services are immediately restored instead of being analyzed
Now it’s all based on containers, it’s all about keeping one for the accident scene, for the later staging
Rather than allowing the failure to spread, constantly analyzing the causes of the debate
Of course, the scroll back to make a larger, you can set the switch, this time the bigger problem is, the switch has, the result is not complete test, not effective
3. Solid basic knowledge
In this case, the default thread pool of SpringBooT1 was not known at first and was thought to be the default CachedThreadPool. When the switch failed, the thread pool configuration was changed to fix the problem
The number of threads should not be too large, but it does not say that only the maximum 2*CPU, for the number of CPU cores 2 times, for the number of CPU cores 8 times, for the number of CPU cores 32 times, the method is not determined method