An overview of the

This is the end of the series of articles on high availability, mainly covering current limiting, circuit breakers, peak shaving and Sentinel actual combat:

  • High Availability Series – what do you mean by stream limiting?
  • High Availability Series — Sentinel, the current limiting device, don’t you know about it?
  • High Availability Series – Ali’s P7 guru takes you through Sentinel
  • High Availability Series — CIRCUIT breaker downgrade I learned!
  • High Availability Series – Talk about peak filling!

Let’s move on to our favorite high-performance series, which includes topics such as message queues, caching, and distributed deployment architectures. Hold your hand! “, explains the blogger’s tragic experience, so as we learn about the technology, we will apply it to our actual project. After the high Performance section, we will move on to architecture Diagram 2.0

What is asynchrony

Synchronous call: The caller continues to wait for the return result during the call. Asynchronous call: During the call, the caller does not directly wait for the return result, but performs other tasks, usually in the form of a callback function.Copy the code

It is easier for us to understand the principle of synchronization and asynchrony without IO. Synchronization and asynchrony are communication mechanisms that express whether we actively ask or wait for feedback in the process of communication. This is reflected in synchronous (active) and asynchronous (passive).

Asynchronous in-process calls

1, the Thread

Processes and Threads: Processes are the smallest unit of resource allocation and threads are the smallest unit of CPU scheduling

The simplest method of asynchronous invocation within a Java process is to start a new Thread to execute the task (CPU scheduling) with new Thread().start().

public static void main(String[] args) {
    System.out.println("Boil water");
    // Create a new thread
    Thread thread1= new Thread(()->{
        try {
            Thread.sleep(5000);
            System.out.println("The water is boiling."+Thread.currentThread().getName());
        }catch(Exception e){ e.printStackTrace(); }}); thread1.start(); System.out.println("Movement");
}
Copy the code

1.1. Start () and run()

In the example code above, we implement the new thread as we implement the Runnable interface, but instead of using the run() method, we use the start() method.

Run () : The run () method call is executed using the current thread, understandably synchronously

Start ()When a method is called, in the code logic, a local method is calledstart0.

After downloading the JDK source code, you can see that the Thread class has a registerNatives local method. The primary purpose of this method is to register some local methods for Thread to use, such as start0(), stop0(), etc. All local methods that operate on the local thread are registered by it.

You can see that a Java thread calling the start->start0 method actually calls the JVM_StartThread method, creating the thread by calling new JavaThread(&thread_entry,sz).

In jVM.cpp, we have the following code snippet:

After the thread is created, the run() method is called through thread_entry

1.2, the Future

The calling mode of Future is synchronous and non-blocking. The main reason is that when obtaining the processing result of asynchronous thread, the main thread needs to take the initiative to obtain it. Asynchronous thread does not update or callback the data structure through active notification.

public static void main(String[] args) throws Exception {
    System.out.println("Boil water");
    FutureTask<String> futureTask = new FutureTask(()->{
        try {
            Thread.sleep(5000);
            System.out.println("The water is boiling."+Thread.currentThread().getName());
        }catch (Exception e){
            e.printStackTrace();
        }
        return "water";
    });
    // Create a new thread
    Thread thread1= new Thread(futureTask);
    thread1.start();
    System.out.println("Movement");
    Thread.sleep(3000);
    // Block waiting for data
    String result= futureTask.get(5, TimeUnit.SECONDS);
    System.out.println("Water," + result);
}
Copy the code

How Future is implemented

The class inheritance diagram is as follows. You can see that FutureTask implements the Runnable interface, so in the overwritten run () method, you can see that after the call () method gets the result, it is updated to the member variable through CAS.

Task call result update:

1.3, ThreadPoolExecutor

public static void main(String[] args) throws Exception {
    ExecutorService executors = Executors.newFixedThreadPool(10);
    System.out.println("Boil water");
    Future<String> future = executors.submit(() -> {
        try {
            Thread.sleep(5000);
            System.out.println("The water is boiling." + Thread.currentThread().getName());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "water";
    });
    System.out.println("Movement");

    String result = future.get(5, TimeUnit.SECONDS);
    System.out.println("Water," + result);
}
Copy the code

Submit () uses the Callable

implementation class to wrap the FutureTask in the submit() method, and then makes the actual call:

1.4,

The core difference is between start() and run(). Start () starts a new thread while completing the run() method, which is an asynchronous operation. If the run() method is executed directly, it is executed directly in the current thread, a synchronous blocking operation.

The calling mode of Future is a synchronous non-blocking processing. After the task is submitted, the execution of the main thread is not blocked. After a certain time, the main thread can obtain the processing results of asynchronous tasks through the get () method.

ThreadPoolExecutor maintains a pool of reusable threads, which solves the problem of resource overcommitment and performance time. The default Java thread size is 1MB, and the creation and destruction of the thread take up memory and GC time. However, the unrestricted creation of threads will lead to high CPU load and little time slice allocated to each thread, resulting in low processing efficiency.

2, EventBus

public class JiulingTest {
    public static void main(String[] args) throws Exception {
        System.out.println("Start");
        // Use the asynchronous event bus
        EventBus eventBus  = new AsyncEventBus(Executors.newFixedThreadPool(10));
        // Register a listener with the above EventBus object
        eventBus.register(new EventListener());
        // Publish an event using EventBus, which will be notified to all registered listeners
        eventBus.post(new Event("Boil water"));
        System.out.println("Movement"); }}// Event, the wrapper object of the event that the listener listens to
class Event {
    // Event action
    public String action;

    Event(String action) {
        this.action = action; }}/ / listener
 class EventListener {
    // The listening method must be declared with annotations and can only have one argument. The actual firing of an event will trigger the method based on the argument type
    @Subscribe
    public void listen(Event event) {
        try {
            System.out.println("Event listener receive message: " + event.action + " threadName:" + Thread.currentThread().getName());
            Thread.sleep(5000);
            System.out.println("The water is boiling!);
        }catch(Exception e){ e.printStackTrace(); }}}Copy the code

2.1 Observer mode

In EventBus, @subscribe defines the behavior of the abstract observer. In the example above, listen(Event Event) will only observe events of this class.

/** * Returns all subscribers for the given listener grouped by the type of event they subscribe to. */
privateMultimap<Class<? >, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<? >, Subscriber> methodsInListener = HashMultimap.create(); Class<? > clazz = listener.getClass();// Iterates over the @subscribe method
  for(Method method : getAnnotatedMethods(clazz)) { Class<? >[] parameterTypes = method.getParameterTypes(); Class<? > eventType = parameterTypes[0];
    // Then categorize by parameter type, that is, event type
    methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
  }
  return methodsInListener;
}
Copy the code

Then, when the event is published, we iterate through all the listener methods by calling eventbus.post () :

Public void post(Object event) { Find all the observer methods Iterator < the Subscriber > eventSubscribers = subscribers. GetSubscribers (event); If (eventintimy.hasnext ()) {dispatcher.dispatch(event, eventSubscribers); } else if (! (event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); }}Copy the code

2.2, AsyncEventBus

In the example code, we use the new AsyncEventBus (Executors. NewFixedThreadPool (10)) build asynchronous event bus.

Let’s look at how Listern executes the event handling method. It’s a little easier to understand here. It’s done by calling the @subscribe annotation method through the thread pool.

So where did the Executor come from?

this.executor = bus.executor(); // From the event bus

Going back to EventBus, we can see that the constructor does not provide an entry point to initialize the thread pool, so the creation of the default thread pool can be traced

Instead of creating a new thread to execute the Runnable method, the pool’s execute method uses the current thread (see 1.1 for details). Therefore, EventBus does not support asynchronous event processing!

In the dispatchEvent method, it is straightforward to see that the overall design supports asynchronous events. All we need to do is modify the Executor to be a reasonable thread pool, and AsyncEventBus provides this capability.

/**
 * Creates a new AsyncEventBus that will use {@code executor} to dispatch events.
 *
 * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
 *     down the executor after the last event has been posted to this event bus.
 */
public AsyncEventBus(Executor executor) {
  super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
Copy the code

3, Spring Event

By default, Spring Events and Event Bus execute synchronously. You can configure the Executors to change the events to asynchronous.

Core components:

  • Event class: Defines events, inheritsApplicationEventBecomes an event class.
  • Publisher: Publish the event. PassedApplicationEventPublisherPublish events.
  • Listener: Listens for and handles events, implementsApplicationListenerInterface or use@EventListenerAnnotation.

Because the code is too much, you can download it directly on Github and read it. Here are just some of the key code:

In the publish event method: AbstractApplicationContext# publishEvent would go below the SimpleApplicationEventMulticaster# multicastEvent perform specific tasks scheduling. The design and the EventBus above, when executed, through the actual scheduling to distinguish the thread pool, to determine synchronous | asynchronous!

3.1, asynchronous ApplicationEventMulticaster

Modify ApplicationEventMulticaster set the initial thread pool, and the EventBus solution:

@Order(0)
@Bean
public ApplicationEventMulticaster applicationEventMulticaster(a) {
    SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
    eventMulticaster.setTaskExecutor(Executors.newFixedThreadPool(10));
    return eventMulticaster;
}

Copy the code

When the Spring context is initialized, this bean is loaded into the context,

The problem: because of replaces the whole context ApplicationEventMulticaster so on event handling process, all the events will be in an asynchronous manner, so the risk control, it is hard to do it. Not recommended, but useful (tested, after all)

3.2. @async for asynchronous

Implement the AsyncConfigurer interface, customize the thread pool, and execute the reflection proxy on the aspect method

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

Core principles

Asynchronous inter-process calls

Dubbo asynchronous invocation

In THE RPC framework, we generally use synchronous invocation pattern, but in the underlying implementation of Dubbo, it is implemented as asynchronous invocation. Let’s take a quick look at the call link:

Core code in

com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke

Asynchronous decoupling of message queues

When introducing EventBus, I looked at a number of articles that described the Design pattern of EventBus as a public-subscribe model. First of all, this description is wrong, and then let’s compare the differences:

On the face of it:

  • In Observer mode, there are only two roles — observer + observed
  • In the publish-subscribe model, there is not only the role of publisher and subscriber, but also the role of Broker, which is often overlooked

To go deeper:

  • The observer and the observed are loosely coupled
  • Publisher and subscriber, there is no coupling at all

Publish-subscribe pattern:

Message queues enable decoupling through messaging middleware such as RocketMQ, Kafka and rabbitMQ. Complete the receiving and pushing of messages, so as to achieve the effect of asynchronous processing.

Pay attention, don’t get lost

Image address: draw. IO original

Well folks, that’s all for this post, and I’ll be updating it weekly with a few high-quality articles about big factory interviews and common technology stacks. Thank everyone can see here, if this article is well written, please three!! Thanks for your support and recognition. See you in the next article!

I am Nine ling, there is a need to communicate children’s shoes can pay attention to the public number: Java tutorial! If there are any mistakes in this blog, please comment and comment. Thank you very much!