preface

Hello everyone, today we start to share with you — Dubbo special Dubbo event notice. In the previous chapter, we introduced Dubbo local call, learned what local call is, and learned the daily usage scenarios and implementation principles. At the same time, we know that local call is a pseudo-protocol, which does not open ports, does not initiate remote calls, and is only directly associated within the JVM, but will execute Dubbo’s Filter chain. Some of you may want to know if there is an event notification during a call to a service. So in this chapter we will look at how Dubbo implements event notification by introducing Dubbo event notification. The first thing we have to understand is what is an event? And what are the use scenarios and implementation principles. So let’s get started fast!

1. Introduction to event notification

As anyone who has programming experience knows, there are a lot of events and listeners in our Spring framework. So why do we use events and listeners? First of all, we know that events and listeners can actually be understood in our design mode as observer mode. What does that mean? For example, in our desktop programming, there is a button in our UI interface. When the user clicks the button, an event will be triggered, which will notify the listener concerned about the event. When the listener receives the event, the subsequent processing logic will be triggered, which is what we call event monitoring. The advantage of event-based notification is that listeners of events can be added and removed dynamically, and listeners can be notified in real time or asynchronously.

2. Usage

Three types of events are provided in Dubbo: before, after, and when an exception occurs. The corresponding callback methods are onInvoke, onReturn, and onThrow. We can use these three events to configure the behavior of a service before and after invocation or when an exception occurs. The configuration example is as follows:

<dubbo:reference id="demoService" interface="com.muke.dubbocourse.common.api.BookFacade">
      <dubbo:method name="get" async="true" onreturn = "demoCallback.onreturn" onthrow="demoCallback.onthrow" />
</dubbo:reference>
Copy the code

Async =true indicates whether the result is returned immediately, and onReturn indicates whether a callback is required.

There are several combinations of the two superimposed:

  1. Async =true onReturn =” XXX”

  2. Synchronous callback mode: async=false onReturn =” XXX”

  3. Asynchronous no callback: async=true

  4. Synchronous without callback: async=false

3. Application scenarios

From the previous discussion, we know that there are three kinds of events that can be triggered when we call a service, and we can handle the corresponding things through the behavior of these three events.

  1. Scenario 1: We can record the start time before invoking the service method, calculate the total call cost after the invocation, and generate alarms or print error logs when exceptions occur.

  2. Scenario 2: We may need to set and destroy the context before and after invoking a service, for example. Then we can call onInvoke and onReturn to do the context operation of the response.

  3. Scenario 3: Trigger some pre – and post-processing before and after invoking the service. For example, record request logs and response logs.

4. Example demonstration

Here we call the service trigger notification by getting the list of books as an example. Here is the project structure:

We won’t discuss the consumer and service provider code, but we will focus on the consumer side configuration file dubo-consumer-xml. XML:


      
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
      xmlns="http://www.springframework.org/schema/beans"
      xsi:schemaLocation="Http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
   <dubbo:application name="demo-consumer" logger="log4j"/>
   <! -- Use zooKeeper registry -->
   <dubbo:registry address=Zookeeper: / / "127.0.0.1:2181" />
  <! -- Event notification interface -->
   <bean id ="notify" class = "com.muke.dubbocourse.eventnotify.consumer.NotifyImpl" />
   <! The url directly specifies the machine and port of the service provider.
   <dubbo:reference id="bookFacade" interface="com.muke.dubbocourse.common.api.BookFacade">
       <! Call queryByName to trigger related events -->
       <dubbo:method name="queryByName" async="true" onreturn="notify.onreturn" onthrow="notify.onthrow"/>
   </dubbo:reference>
</beans>
Copy the code

In the above code we configured an event notification interface NotifyImpl and configured the onReturn and onThrow callbacks to be triggered when the queryByName method is called. Let’s look at the definition of the callback method:

public interface Notify {

    void onreturn(List<Book> books, RequestParameter requestParameter);

    void onthrow(Throwable ex, RequestParameter requestParameter);

}

public class NotifyImpl implements Notify{

    @Override
    public void onreturn(List<Book> books, RequestParameter requestParameter) {
        System.out.println("Callback method request parameter :" +requestParameter+", return result:"+books);
    }

    @Override
    public void onthrow(Throwable ex, RequestParameter requestParameter) {
        System.out.println("Calling method throws an exception:"+ex+", method parameter:+requestParameter); }}Copy the code

In the notification interface we abstract the onReturn and onThrow methods and correspond to the above XML configuration, thus completing the code configuration simply.

5. Principle analysis

Below we through the source code simple analysis of the implementation principle.

** Event callback method config loading process: ** We all know that consumers will wrap ReferenceConfig for remotely invoked services, When we call org. Apache. Dubbo. Config. ReferenceConfig# triggered when the get method org. Apache. Dubbo. Config. ReferenceConfig# init method, In addition to handling configuration parameters, parameter verification, local stub checking, etc., this method also resolves onInvoke, onReturn, and onthrow attributes in the
tag. The core code is as follows:

public synchronized void init(a) {
        
        / /...
  
        if (CollectionUtils.isNotEmpty(getMethods())) {
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
                / /...
                // The callback methods' oninvoke ', 'onreturn', and 'onthrow' are encapsulated as AsyncMethodInfo
                ConsumerModel.AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if(asyncMethodInfo ! =null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);attributes.put(methodConfig.getName(), asyncMethodInfo); }}}// Register consumer information
        repository.registerConsumer(
                serviceMetadata.getServiceKey(),
                attributes,
                serviceDescriptor,
                this.null,
                serviceMetadata);

        // Create a remote proxy object
        ref = createProxy(map);

        // Set the proxy object
        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        repository.lookupReferredService(serviceMetadata.getServiceKey()).setProxyObject(ref);

			 / /...
    }
Copy the code

The core method org. Apache. Dubbo. Config. AbstractConfig# convertMethodConfig2AsyncInfo code is as follows:

 /*** ** Event notification callback interface method parsing: Oninvoke, ONReturn, and onThrow events will be triggered when an exception occurs before or after a method is called on the server side. Notify which method of which class * <dubbo:method ></dubbo:method> related method parameter resolution * *@author liyong
     * @dateHast judged the 2020-03-02 *@param methodConfig
     * @exception
     * @return org.apache.dubbo.rpc.model.ConsumerModel.AsyncMethodInfo
     **/
    public static ConsumerModel.AsyncMethodInfo convertMethodConfig2AsyncInfo(MethodConfig methodConfig) {
        if (methodConfig == null || (methodConfig.getOninvoke() == null && methodConfig.getOnreturn() == null && methodConfig.getOnthrow() == null)) {
            return null;
        }

        //check config conflict
        if(Boolean.FALSE.equals(methodConfig.isReturn()) && (methodConfig.getOnreturn() ! =null|| methodConfig.getOnthrow() ! =null)) {
            throw new IllegalStateException("method config error : return attribute must be set true when onreturn or onthrow has been set.");
        }

        ConsumerModel.AsyncMethodInfo asyncMethodInfo = new ConsumerModel.AsyncMethodInfo();
				// Get the callback method in the configuration
        asyncMethodInfo.setOninvokeInstance(methodConfig.getOninvoke());
        asyncMethodInfo.setOnreturnInstance(methodConfig.getOnreturn());
        asyncMethodInfo.setOnthrowInstance(methodConfig.getOnthrow());

        try {
            // Set the invokeMethod method
            String oninvokeMethod = methodConfig.getOninvokeMethod();
            if (StringUtils.isNotEmpty(oninvokeMethod)) {
                asyncMethodInfo.setOninvokeMethod(getMethodByName(methodConfig.getOninvoke().getClass(), oninvokeMethod));
            }

            // Set the returnMethod method
            String onreturnMethod = methodConfig.getOnreturnMethod();
            if (StringUtils.isNotEmpty(onreturnMethod)) {
                asyncMethodInfo.setOnreturnMethod(getMethodByName(methodConfig.getOnreturn().getClass(), onreturnMethod));
            }

            // Set throwMethod
            String onthrowMethod = methodConfig.getOnthrowMethod();
            if(StringUtils.isNotEmpty(onthrowMethod)) { asyncMethodInfo.setOnthrowMethod(getMethodByName(methodConfig.getOnthrow().getClass(), onthrowMethod)); }}catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }

        return asyncMethodInfo;
    }
Copy the code

The code above is mainly based on the XML configuration callback method encapsulated as ConsumerModel. AsyncMethodInfo entities.

** The process of calling a remote service by a service consumer is encapsulated as an Invoker call object, including a Filter object. The following code is the core of the construction process:

/*** ** filter chain is converted to Invoker call chain **@author liyong
     * @dateIn summer 2020-03-04 *@param invoker
     * @param key
     * @param group
     * @exception
     * @return org.apache.dubbo.rpc.Invoker<T>
     **/
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // Load the Filter chain through the SPI mechanism in Dubbo
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);/ / filter chain

        if(! filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                // Build the Invoker object
                last = new Invoker<T>() {

                    / /...
                    // Actually call the method
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            // Execute the filter invoke method
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                            / /...
                        } finally{}// Call completion triggers the response event
                        return asyncResult.whenCompleteWithContext((r, t) -> {
                            // Handle callback or listening events
                            if (filter instanceof ListenableFilter) {// Deprecated!
                                Filter.Listener listener = ((ListenableFilter) filter).listener();
                                if(listener ! =null) {
                                    if (t == null) {
                                        // Trigger a message callback
                                        listener.onMessage(r, invoker, invocation);
                                    } else {
                                        // An error occurs and the error callback method is executedlistener.onError(t, invoker, invocation); }}}else if (filter instanceof Filter.Listener) {
                                Filter.Listener listener = (Filter.Listener) filter;
                                if (t == null) {
                                     // Trigger a message callback
                                    listener.onMessage(r, invoker, invocation);
                                } else {
                                     // An error occurs and the error callback method is executedlistener.onError(t, invoker, invocation); }}else {// Deprecated!filter.onResponse(r, invoker, invocation); }}); }/ /...}; }}return last;
    }
Copy the code

Convert the Filter call to an Invoker call in the above code, Among them when the Filter perform call will trigger the response after the completion of events and org. Apache. Dubbo.. RPC protocol. The dubbo. Filter. FutureFilter# onMessage calls. The core code is as follows:

    @Override
    public void onMessage(Result result, Invoker
        invoker, Invocation invocation) {
        if (result.hasException()) {
            // An exception is triggered
            fireThrowCallback(invoker, invocation, result.getException());
        } else {
            // Execute the trigger normallyfireReturnCallback(invoker, invocation, result.getValue()); }}Copy the code

When the process is abnormal trigger org. Apache. Dubbo.. RPC protocol. The dubbo. Filter. FutureFilter# fireThrowCallback code is as follows:

/** ** Triggers abnormal callback **@author liyong 
     * @date 3:40 PM 2020/11/15 
     * @param invoker 
     * @param invocation 
     * @param exception 
     * @exception 
     * @return void 
     **/
    private void fireThrowCallback(finalInvoker<? > invoker,final Invocation invocation, final Throwable exception) {
        final ConsumerModel.AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
        if (asyncMethodInfo == null) {
            return;
        }

        // Get the configured throwMethod method
        final Method onthrowMethod = asyncMethodInfo.getOnthrowMethod();
        // Get the throwMethod instance
        final Object onthrowInst = asyncMethodInfo.getOnthrowInstance();

        / /...
      
        // Get the throwMethod method parameter typeClass<? >[] rParaTypes = onthrowMethod.getParameterTypes();//throwMethod sets the first parameter to Throwable and subclasses
        if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
            try {
                // Get all parameters of the callback method, such as ex and RequestParameter in onthrow(Throwable ex, RequestParameter RequestParameter)
                Object[] args = invocation.getArguments();
                Object[] params;

                if (rParaTypes.length > 1) {
                    // The throwMethod method takes two arguments and the second argument is an array type
                    if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                        params = new Object[2];
                        params[0] = exception;
                        // The second argument populates the array
                        params[1] = args;
                    } else {
                        params = new Object[args.length + 1];
                        params[0] = exception;
                        // Use array copy
                        System.arraycopy(args, 0, params, 1, args.length); }}else {
                    // No parameters-only exception exists
                    params = new Object[]{exception};
                }
                // Perform the configure throwMethod method call
                onthrowMethod.invoke(onthrowInst, params);
            } catch (Throwable e) {
                logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:"+ invoker.getUrl(), e); }}else {
            logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:"+ invoker.getUrl(), exception); }}Copy the code

The above method triggers an exception callback and populates the parameters of the method signature type. The following core code triggers the normal callback logic:

/** ** triggers a normal callback **@author liyong
     * @date 3:43 PM 2020/11/15
     * @param invoker
     * @param invocation
     * @param result
     * @exception
     * @return void
     **/
    private void fireReturnCallback(finalInvoker<? > invoker,final Invocation invocation, final Object result) {
        final ConsumerModel.AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
        if (asyncMethodInfo == null) {
            return;
        }

        // Get the configured throwMethod method
        final Method onReturnMethod = asyncMethodInfo.getOnreturnMethod();
        // Get the throwMethod instance
        final Object onReturnInst = asyncMethodInfo.getOnreturnInstance();

        / /...

        // Get all parameters of the callback methodObject[] args = invocation.getArguments(); Object[] params; Class<? >[] rParaTypes = onReturnMethod.getParameterTypes();if (rParaTypes.length > 1) {
            // Call the method with 2 arguments and the first one being an array type
            if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                params = new Object[2];
                // The 0th element of the array is the value returned by the original method
                params[0] = result;
                // The first parameter is the original method parameter
                params[1] = args;
            } else {
                params = new Object[args.length + 1];
                params[0] = result;
                // Use array copy
                System.arraycopy(args, 0, params, 1, args.length); }}else {
            // Only one parameter type
            params = new Object[]{result};
        }
        try {
            // Trigger the onreturn callback method
            onReturnMethod.invoke(onReturnInst, params);
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch(Throwable e) { fireThrowCallback(invoker, invocation, e); }}Copy the code

The two core methods above, fireThrowCallback and fireReturnCallback, are callback logic that is triggered when an exception occurs and when a business executes normally, respectively. The onInvoke method also triggers the fireInvokeCallback method call.

6. Summary

In this section we have looked at how events in Dubbo inform them separately: before, after, when exceptions occur, and common usage scenarios and ways to use them. We also analyze Dubbo’s event notification principle through sample demonstrations and source code analysis. One difficulty is to understand that service invocations in Dubbo are wrapped in Invoker objects, and I will fully parse the Dubbo source code later.

The highlights of this lesson are as follows:

  1. Understand event notifications in Dubbo

  2. Understand how to use event notification

  3. This section describes the application scenarios of event notification

  4. Understand the event notification principle in Dubbo

The author

Personally engaged in the financial industry, I have worked in chongqing’s first-class technical team of Yiji Pay, Sijian Technology and an online car hailing platform, and now I am working in a bank responsible for the construction of unified payment system. I have a strong interest in the financial industry. It also practices big data, data storage, automated integration and deployment, distributed microservices, responsive programming, and artificial intelligence. At the same time, he is also keen on technology sharing, creating public accounts and blog sites to share knowledge system. Concern public number: young IT male get latest technical article push!

Blog: Youngitman.tech

Wechat Official Account: