An overview of the
This is the end of the series of articles on high availability, mainly covering current limiting, circuit breakers, peak shaving and Sentinel actual combat:
- High Availability Series – what do you mean by stream limiting?
- High Availability Series — Sentinel, the current limiting device, don’t you know about it?
- High Availability Series – Ali’s P7 guru takes you through Sentinel
- High Availability Series — CIRCUIT breaker downgrade I learned!
- High Availability Series – Talk about peak filling!
Let’s move on to our favorite high-performance series, which includes topics such as message queues, caching, and distributed deployment architectures. Hold your hand! “, explains the blogger’s tragic experience, so as we learn about the technology, we will apply it to our actual project. After the high Performance section, we will move on to architecture Diagram 2.0
What is asynchrony
Synchronous call: The caller continues to wait for the return result during the call. Asynchronous call: During the call, the caller does not directly wait for the return result, but performs other tasks, usually in the form of a callback function.Copy the code
It is easier for us to understand the principle of synchronization and asynchrony without IO. Synchronization and asynchrony are communication mechanisms that express whether we actively ask or wait for feedback in the process of communication. This is reflected in synchronous (active) and asynchronous (passive).
Asynchronous in-process calls
1, the Thread
Processes and Threads: Processes are the smallest unit of resource allocation and threads are the smallest unit of CPU scheduling
The simplest method of asynchronous invocation within a Java process is to start a new Thread to execute the task (CPU scheduling) with new Thread().start().
public static void main(String[] args) {
System.out.println("Boil water");
// Create a new thread
Thread thread1= new Thread(()->{
try {
Thread.sleep(5000);
System.out.println("The water is boiling."+Thread.currentThread().getName());
}catch(Exception e){ e.printStackTrace(); }}); thread1.start(); System.out.println("Movement");
}
Copy the code
1.1. Start () and run()
In the example code above, we implement the new thread as we implement the Runnable interface, but instead of using the run() method, we use the start() method.
Run () : The run () method call is executed using the current thread, understandably synchronously
Start ()
When a method is called, in the code logic, a local method is calledstart0
.
After downloading the JDK source code, you can see that the Thread class has a registerNatives local method. The primary purpose of this method is to register some local methods for Thread to use, such as start0(), stop0(), etc. All local methods that operate on the local thread are registered by it.
You can see that a Java thread calling the start->start0 method actually calls the JVM_StartThread method, creating the thread by calling new JavaThread(&thread_entry,sz).
In jVM.cpp, we have the following code snippet:
After the thread is created, the run() method is called through thread_entry
1.2, the Future
The calling mode of Future is synchronous and non-blocking. The main reason is that when obtaining the processing result of asynchronous thread, the main thread needs to take the initiative to obtain it. Asynchronous thread does not update or callback the data structure through active notification.
public static void main(String[] args) throws Exception {
System.out.println("Boil water");
FutureTask<String> futureTask = new FutureTask(()->{
try {
Thread.sleep(5000);
System.out.println("The water is boiling."+Thread.currentThread().getName());
}catch (Exception e){
e.printStackTrace();
}
return "water";
});
// Create a new thread
Thread thread1= new Thread(futureTask);
thread1.start();
System.out.println("Movement");
Thread.sleep(3000);
// Block waiting for data
String result= futureTask.get(5, TimeUnit.SECONDS);
System.out.println("Water," + result);
}
Copy the code
How Future is implemented
The class inheritance diagram is as follows. You can see that FutureTask implements the Runnable interface, so in the overwritten run () method, you can see that after the call () method gets the result, it is updated to the member variable through CAS.
Task call result update:
1.3, ThreadPoolExecutor
public static void main(String[] args) throws Exception {
ExecutorService executors = Executors.newFixedThreadPool(10);
System.out.println("Boil water");
Future<String> future = executors.submit(() -> {
try {
Thread.sleep(5000);
System.out.println("The water is boiling." + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
return "water";
});
System.out.println("Movement");
String result = future.get(5, TimeUnit.SECONDS);
System.out.println("Water," + result);
}
Copy the code
Submit () uses the Callable
implementation class to wrap the FutureTask in the submit() method, and then makes the actual call:
1.4,
The core difference is between start() and run(). Start () starts a new thread while completing the run() method, which is an asynchronous operation. If the run() method is executed directly, it is executed directly in the current thread, a synchronous blocking operation.
The calling mode of Future is a synchronous non-blocking processing. After the task is submitted, the execution of the main thread is not blocked. After a certain time, the main thread can obtain the processing results of asynchronous tasks through the get () method.
ThreadPoolExecutor maintains a pool of reusable threads, which solves the problem of resource overcommitment and performance time. The default Java thread size is 1MB, and the creation and destruction of the thread take up memory and GC time. However, the unrestricted creation of threads will lead to high CPU load and little time slice allocated to each thread, resulting in low processing efficiency.
2, EventBus
public class JiulingTest {
public static void main(String[] args) throws Exception {
System.out.println("Start");
// Use the asynchronous event bus
EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(10));
// Register a listener with the above EventBus object
eventBus.register(new EventListener());
// Publish an event using EventBus, which will be notified to all registered listeners
eventBus.post(new Event("Boil water"));
System.out.println("Movement"); }}// Event, the wrapper object of the event that the listener listens to
class Event {
// Event action
public String action;
Event(String action) {
this.action = action; }}/ / listener
class EventListener {
// The listening method must be declared with annotations and can only have one argument. The actual firing of an event will trigger the method based on the argument type
@Subscribe
public void listen(Event event) {
try {
System.out.println("Event listener receive message: " + event.action + " threadName:" + Thread.currentThread().getName());
Thread.sleep(5000);
System.out.println("The water is boiling!);
}catch(Exception e){ e.printStackTrace(); }}}Copy the code
2.1 Observer mode
In EventBus, @subscribe defines the behavior of the abstract observer. In the example above, listen(Event Event) will only observe events of this class.
/** * Returns all subscribers for the given listener grouped by the type of event they subscribe to. */
privateMultimap<Class<? >, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<? >, Subscriber> methodsInListener = HashMultimap.create(); Class<? > clazz = listener.getClass();// Iterates over the @subscribe method
for(Method method : getAnnotatedMethods(clazz)) { Class<? >[] parameterTypes = method.getParameterTypes(); Class<? > eventType = parameterTypes[0];
// Then categorize by parameter type, that is, event type
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
Copy the code
Then, when the event is published, we iterate through all the listener methods by calling eventbus.post () :
Public void post(Object event) { Find all the observer methods Iterator < the Subscriber > eventSubscribers = subscribers. GetSubscribers (event); If (eventintimy.hasnext ()) {dispatcher.dispatch(event, eventSubscribers); } else if (! (event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); }}Copy the code
2.2, AsyncEventBus
In the example code, we use the new AsyncEventBus (Executors. NewFixedThreadPool (10)) build asynchronous event bus.
Let’s look at how Listern executes the event handling method. It’s a little easier to understand here. It’s done by calling the @subscribe annotation method through the thread pool.
So where did the Executor come from?
this.executor = bus.executor(); // From the event bus
Going back to EventBus, we can see that the constructor does not provide an entry point to initialize the thread pool, so the creation of the default thread pool can be traced
Instead of creating a new thread to execute the Runnable method, the pool’s execute method uses the current thread (see 1.1 for details). Therefore, EventBus does not support asynchronous event processing!
In the dispatchEvent method, it is straightforward to see that the overall design supports asynchronous events. All we need to do is modify the Executor to be a reasonable thread pool, and AsyncEventBus provides this capability.
/**
* Creates a new AsyncEventBus that will use {@code executor} to dispatch events.
*
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
*/
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
Copy the code
3, Spring Event
By default, Spring Events and Event Bus execute synchronously. You can configure the Executors to change the events to asynchronous.
Core components:
- Event class: Defines events, inherits
ApplicationEvent
Becomes an event class. - Publisher: Publish the event. Passed
ApplicationEventPublisher
Publish events. - Listener: Listens for and handles events, implements
ApplicationListener
Interface or use@EventListener
Annotation.
Because the code is too much, you can download it directly on Github and read it. Here are just some of the key code:
In the publish event method: AbstractApplicationContext# publishEvent would go below the SimpleApplicationEventMulticaster# multicastEvent perform specific tasks scheduling. The design and the EventBus above, when executed, through the actual scheduling to distinguish the thread pool, to determine synchronous | asynchronous!
3.1, asynchronous ApplicationEventMulticaster
Modify ApplicationEventMulticaster set the initial thread pool, and the EventBus solution:
@Order(0)
@Bean
public ApplicationEventMulticaster applicationEventMulticaster(a) {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(Executors.newFixedThreadPool(10));
return eventMulticaster;
}
Copy the code
When the Spring context is initialized, this bean is loaded into the context,
The problem: because of replaces the whole context ApplicationEventMulticaster so on event handling process, all the events will be in an asynchronous manner, so the risk control, it is hard to do it. Not recommended, but useful (tested, after all)
3.2. @async for asynchronous
Implement the AsyncConfigurer interface, customize the thread pool, and execute the reflection proxy on the aspect method
org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
Core principles
Asynchronous inter-process calls
Dubbo asynchronous invocation
In THE RPC framework, we generally use synchronous invocation pattern, but in the underlying implementation of Dubbo, it is implemented as asynchronous invocation. Let’s take a quick look at the call link:
Core code in
com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
Asynchronous decoupling of message queues
When introducing EventBus, I looked at a number of articles that described the Design pattern of EventBus as a public-subscribe model. First of all, this description is wrong, and then let’s compare the differences:
On the face of it:
- In Observer mode, there are only two roles — observer + observed
- In the publish-subscribe model, there is not only the role of publisher and subscriber, but also the role of Broker, which is often overlooked
To go deeper:
- The observer and the observed are loosely coupled
- Publisher and subscriber, there is no coupling at all
Publish-subscribe pattern:
Message queues enable decoupling through messaging middleware such as RocketMQ, Kafka and rabbitMQ. Complete the receiving and pushing of messages, so as to achieve the effect of asynchronous processing.
Pay attention, don’t get lost
Image address: draw. IO original
Well folks, that’s all for this post, and I’ll be updating it weekly with a few high-quality articles about big factory interviews and common technology stacks. Thank everyone can see here, if this article is well written, please three!! Thanks for your support and recognition. See you in the next article!
I am Nine ling, there is a need to communicate children’s shoes can pay attention to the public number: Java tutorial! If there are any mistakes in this blog, please comment and comment. Thank you very much!