>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE Backup: 👉 gitee.com/antblack/ca…

A. The preface

Dubbo 3.0 Server reception, this article will take a look at the Dubbo filter chain. Filtration chain is also a very important part of the whole process.

2. Starting point for processing

// Invoke logicC-channeleventrunnable # run: the starting point of the call where a message is received C-exchangeHandler # reply: C- FilterChainBuilder # Invoke: Start the Invoke Filter chain construction processCopy the code

2.1 Invoke Initiate process

// as you saw in the previous article, reply calls are made by building an ExchangeHandlerAdapter
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            // Keep only the core logic
            Invocation inv = (Invocation) message;
            // FilterChainBuilder is actually called hereInvoker<? > invoker = getInvoker(channel, inv);/ /...Result result = invoker.invoke(inv); }}Copy the code

2.2 Filter chain construction process

C- FilterChainBuilder
// Build the Filter chain and invoke the operation
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        // Filter chain processing
        asyncResult = filter.invoke(nextNode, invocation);
    } catch (Exception e) {
       // select * from Listerner; // select * from Listerner
        throw e;
    } finally{}return asyncResult.whenCompleteWithContext((r, t) -> {
        // Notify the listener after the processing is complete
        if (filter instanceof ListenableFilter) {
            ListenableFilter listenableFilter = ((ListenableFilter) filter);
            Filter.Listener listener = listenableFilter.listener(invocation);
            try {
              
                if(listener ! =null) {
                    if (t == null) {
                        listener.onResponse(r, originalInvoker, invocation);
                    } else{ listener.onError(t, originalInvoker, invocation); }}}finally{ listenableFilter.removeListener(invocation); }}else if (filter instanceof FILTER.Listener) {
            // Process the basefilter.listener interface
            FILTER.Listener listener = (FILTER.Listener) filter;
            if (t == null) {
                listener.onResponse(r, originalInvoker, invocation);
            } else{ listener.onError(t, originalInvoker, invocation); }}}); }Copy the code

2.3 Call structure of Filter chain

// Take a look at FilterChainBuilder, whose core object is one of the inner classes
class FilterChainNode<T.TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{
    	// Source Invoke type
        TYPE originalInvoker;
    	// Next invoke node
        Invoker<T> nextNode;
    	// The current filter object
        FILTER filter;
}

/ / the following place have a look at the construction of the Filter - > DefaultFilterChainBuilder
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
	Invoker<T> last = originalInvoker;
    // Load external Extension
	List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);

	if(! filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                // You can see that the FilterChainNode loop is created, and the responsibility chain mode is created
                last = newFilterChainNode<>(originalInvoker, next, filter); }}return last;
}


// Here is the loading mode of SPI, corresponding SPI class is
dubbo-rpc-api/META-INF.dubbo.internal -> org.apache.dubbo.rpc.Filter
    
// It contains the following contents
echo=org.apache.dubbo.rpc.filter.EchoFilter
generic=org.apache.dubbo.rpc.filter.GenericFilter
genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
token=org.apache.dubbo.rpc.filter.TokenFilter
accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter
classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter
context=org.apache.dubbo.rpc.filter.ContextFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
    
    
// PS: Dubbo SPI loads in a similar way
    
Copy the code

SPI can be seen at @juejin.cn/post/698945…

2.4 Differences between Dubbo 3.0 and Dubbo 2.0

The biggest difference between the two is the change of Filter mechanism.

// Step 1: Filter
@SPI
public interface Filter extends BaseFilter {}// Step 2: BaseFilter structure
public interface BaseFilter {
    
    // Always invoke invoke.invoke() in the implementation, passing the request to the next filter node
    Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException;

    interface Listener {
        // When the call is complete, choose whether to call it according to the situation
        void onResponse(Result appResponse, Invoker
        invoker, Invocation invocation);
        void onError(Throwable t, Invoker
        invoker, Invocation invocation); }}// There is an interface inside the interface. // There is an interface inside the interfaceAs far as implementation is concerned, it is used in the same way as the inner class:public class ExceptionFilter implements Filter.Filter.Listener
    
    
    



Copy the code

3. The Filter chain

Filter architecture

The list of the main filters is as follows:

  • EchoFilter
  • ClassLoaderFilter
  • GenericFilter
  • ContextFilter
  • ExceptionFilter
  • MonitorFilter
  • TimeoutFilter
  • TraceFilter

3.1 EchoFilter

ECHO indicates the ECHO, which is used to check whether the service is available

public Result invoke(Invoker
        invoker, Invocation inv) throws RpcException {
    // String $ECHO = "$echo";
    if(inv.getMethodName().equals($ECHO) && inv.getArguments() ! =null && inv.getArguments().length == 1) {
        // An asynchronous Result return is built directly, and the echo service only checks if the service is available
        return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
    }
    return invoker.invoke(inv);
}
Copy the code

3.2 ClassLoaderFilter

The Filter is used for secondary processing of the ClassLoader

// Set the current thread class ClassLoader to the class ClassLoader of the service interface
public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
    ClassLoader ocl = Thread.currentThread().getContextClassLoader();
    // Switch the ClassLoader
    Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
    try {
        return invoker.invoke(invocation);
    } finally {
        // Re-place the original ClassLoad hereThread.currentThread().setContextClassLoader(ocl); }}// class loaders can't handle classes loaded by external Jars
// When there are multiple classloaders, the parent class fails to be loaded due to the parent delegate mechanism, and the final ClassLoader is loaded, the relevant switch needs to be made

Copy the code

3.3 GenericFilter

GenericFilter also implements two interfaces: Filter, filter.listener, which means that Response processing,

TODO: GenericFilter is a class whose main purpose is not clear, and this class is too long, we will write a separate chapter to clarify

GenericFilter implements Filter, Filter.Listener

@Override
public Result invoke(Invoker
        invoker, Invocation inv) throws RpcException {
    // String $INVOKE = "$invoke";
    // String $INVOKE_ASYNC = "$invokeAsync";
    if((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC)) && inv.getArguments() ! =null
            && inv.getArguments().length == 3
            && !GenericService.class.isAssignableFrom(invoker.getInterface())) {
      / /...
    }
    return invoker.invoke(inv);
}


@Override
public void onResponse(Result appResponse, Invoker
        invoker, Invocation inv) {
    if((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC)) && inv.getArguments() ! =null
            && inv.getArguments().length == 3
            && !GenericService.class.isAssignableFrom(invoker.getInterface())) {

      / /...}}@Override
public void onError(Throwable t, Invoker
        invoker, Invocation invocation) {}Copy the code

3.4 ContextFilter

ContextFilter is used to process the context, which stores the environment information required during the current call

static {
    UNLOADING_KEYS = new HashSet<>(128);
    UNLOADING_KEYS.add(PATH_KEY);
    UNLOADING_KEYS.add(INTERFACE_KEY);
    UNLOADING_KEYS.add(GROUP_KEY);
    UNLOADING_KEYS.add(VERSION_KEY);
    UNLOADING_KEYS.add(DUBBO_VERSION_KEY);
    UNLOADING_KEYS.add(TOKEN_KEY);
    UNLOADING_KEYS.add(TIMEOUT_KEY);
    UNLOADING_KEYS.add(TIMEOUT_ATTACHMENT_KEY);

    // Remove the async property to avoid passing to the following call chain
    UNLOADING_KEYS.add(ASYNC_KEY);
    UNLOADING_KEYS.add(TAG_KEY);
    UNLOADING_KEYS.add(FORCE_USE_TAG);
}

@Override
public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
    Map<String, Object> attachments = invocation.getObjectAttachments();
    if(attachments ! =null) {
        Map<String, Object> newAttach = new HashMap<>(attachments.size());
        // Attachments are mainly metadata information of requests
        // {" input ":" 265 ", "dubbo" : "2.0.2", "path" : "org. Apache. Dubbo. Metadata. MetadataService", "version" : "1.0.0", "group" : "dubbo - demo - annotation-provider"}
        for (Map.Entry<String, Object> entry : attachments.entrySet()) {
            String key = entry.getKey();
            // ["path","_TO","group","dubbo.tag","version","dubbo.force.tag","dubbo","interface","timeout","token","async"]
            if(! UNLOADING_KEYS.contains(key)) {// Collect attributes that are excluded
                newAttach.put(key, entry.getValue());
            }
        }
        attachments = newAttach;
    }

    // Invoke to ServiceContext5
    RpcContext.getServiceContext().setInvoker(invoker)
            .setInvocation(invocation);

    RpcContext context = RpcContext.getServerAttachment();

    context.setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());

    String remoteApplication = (String) invocation.getAttachment(REMOTE_APPLICATION_KEY);
    if (StringUtils.isNotEmpty(remoteApplication)) {
        RpcContext.getServiceContext().setRemoteApplicationName(remoteApplication);
    } else {
        RpcContext.getServiceContext().setRemoteApplicationName((String) context.getAttachment(REMOTE_APPLICATION_KEY));
    }
	
    // Call timeout, not set to -1
    long timeout = RpcUtils.getTimeout(invocation, -1);
    if(timeout ! = -1) {
        // pass to next hop
        RpcContext.getClientAttachment().setObjectAttachment(TIME_COUNTDOWN_KEY, TimeoutCountDown.newCountDown(timeout, TimeUnit.MILLISECONDS));
    }

    // Some attachments may have been added to the RpcContext before the filter
    if(attachments ! =null) {
        if(context.getObjectAttachments() ! =null) {
            // The additional properties described above are set here
            context.getObjectAttachments().putAll(attachments);
        } else{ context.setObjectAttachments(attachments); }}if (invocation instanceof RpcInvocation) {
        ((RpcInvocation) invocation).setInvoker(invoker);
    }

    try {
        context.clearAfterEachInvoke(false);
        return invoker.invoke(invocation);
    } finally {
        context.clearAfterEachInvoke(true);
        RpcContext.removeServerAttachment();
        RpcContext.removeServiceContext();
        // For asynchronous scenarios, we must remove the context from the current thread, so we always create a new RpcContext for the next call from the same threadRpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY); RpcContext.removeServerContext(); }}@Override
public void onResponse(Result appResponse, Invoker
        invoker, Invocation invocation) {
    // Pass the attachment to the result
    appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
}

@Override
public void onError(Throwable t, Invoker
        invoker, Invocation invocation) {}Copy the code

3.5 ExceptionFilter

ExceptionFilter mainly processes the exceptions in the Response response

@Override
public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
	return invoker.invoke(invocation);
}


@Override
public void onResponse(Result appResponse, Invoker
        invoker, Invocation invocation) {
    if(appResponse.hasException() && GenericService.class ! = invoker.getInterface()) {try {
            Throwable exception = appResponse.getException();

            // If it is checked for an exception, it is thrown
            if(! (exceptioninstanceof RuntimeException) && (exception instanceof Exception)) {
                return;
            }
            // If an exception occurs in the signature, it is thrown directly
            try{ Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()); Class<? >[] exceptionClasses = method.getExceptionTypes();for(Class<? > exceptionClass : exceptionClasses) {if (exception.getClass().equals(exceptionClass)) {
                        return; }}}catch (NoSuchMethodException e) {
                return;
            }

            / /.. Ellipsis: Prints an ERROR message in the server log for exceptions not found in the method signature
          

            // If the exception class and interface class are in the same JAR file, they are thrown directly
            String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
            String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
            if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                return;
            }
            // If it is a JDK exception, throw it directly
            String className = exception.getClass().getName();
            if (className.startsWith("java.") || className.startsWith("javax.")) {
                return;
            }
            // If it is a Dubbo exception, throw it directly
            if (exception instanceof RpcException) {
                return;
            }

            // Otherwise, wrap it with RuntimeException and return it to the client
            appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
        } catch (Throwable e) {
          
        }
    }
}
Copy the code

3.6 MonitorFilter

MonitorFilter is used to monitor operations. It invokes the interceptor and collects call data about this call and sends it to the monitoring center

/ / Invoke the process
public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
    if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
        // private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";
        // private static final String MONITOR_REMOTE_HOST_STORE = "monitor_remote_host_store";
        invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
        invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getServiceContext().getRemoteHost());
        getConcurrent(invoker, invocation).incrementAndGet(); // count up
    }
    return invoker.invoke(invocation); // proceed invocation chain
}

// The main Monitor processes are the following
@Override
public void onResponse(Result result, Invoker
        invoker, Invocation invocation) {
    if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
        collect(invoker, invocation, result, 
            (String) invocation.get(MONITOR_REMOTE_HOST_STORE), 
            (long) invocation.get(MONITOR_FILTER_START_TIME), false);
            
        getConcurrent(invoker, invocation).decrementAndGet(); // count down}}@Override
public void onError(Throwable t, Invoker
        invoker, Invocation invocation) {
    if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
        collect(invoker, invocation, null, (String) invocation.get(MONITOR_REMOTE_HOST_STORE), (long) invocation.get(MONITOR_FILTER_START_TIME), true);
        getConcurrent(invoker, invocation).decrementAndGet(); // count down}}private void collect(Invoker<? > invoker, Invocation invocation, Result result, String remoteHost,long start, boolean error) {
    try {
        Object monitorUrl;
        // Get the MONITOR_KEY built above
        monitorUrl = invoker.getUrl().getAttribute(MONITOR_KEY);
        if(monitorUrl instanceof URL) {
            // 
            Monitor monitor = monitorFactory.getMonitor((URL) monitorUrl);
            if (monitor == null) {
                return;
            }
            // Create the URL for the data
            URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
            // Collect monitoring datamonitor.collect(statisticsURL.toSerializableURL()); }}catch (Throwable t) {
        logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: "+ t.getMessage(), t); }}// PS: This refers to MonitorService objects. We will not go into the details here, but walk through the process
C- MonitorService
    - void collect(URL statistics): Collect monitoring data - List<URL>lookup(URL query)


Copy the code

3.7 TimeoutFilter

Handle timeout cases

@Override
public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

@Override
public void onResponse(Result appResponse, Invoker
        invoker, Invocation invocation) {
    // String TIME_COUNTDOWN_KEY = "timeout-countdown";
    Object obj = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
    if(obj ! =null) {
        TimeoutCountDown countDown = (TimeoutCountDown) obj;
        // 
        if (countDown.isExpired()) {
            // // clears the response in case of timeout
            ((AppResponse) appResponse).clear(); 
            // Just print a log
            if (logger.isWarnEnabled()) {
                logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
                        Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
                        ", invoke elapsed " + countDown.elapsedMillis() + " ms."); }}}}Copy the code

3.8 TraceFilter

The Filter is used to trace requests, and there is a collection that maintains a list of TRACES keys and channels, and after the invoke call is complete, the log is sent to the relevant Channel

Basically listen on any method or method on an interface n times

There is a major set involved here
 private static final ConcurrentMap<String, Set<Channel>> TRACERS = new ConcurrentHashMap<>();


@Override
public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
    
    // The common way to get before and after the call time
    long start = System.currentTimeMillis();
    Result result = invoker.invoke(invocation);
    long end = System.currentTimeMillis();
    
    // If trace exists
    if (TRACERS.size() > 0) {
        
        // Build the trace key as the proxy interface. Proxy method
        String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
        Set<Channel> channels = TRACERS.get(key);
        if (channels == null || channels.isEmpty()) {
            key = invoker.getInterface().getName();
            channels = TRACERS.get(key);
        }
        
        // Get the corresponding set of channels from the set
        if (CollectionUtils.isNotEmpty(channels)) {
            for (Channel channel : new ArrayList<>(channels)) {
                // If a channel is connected or an exception occurs, it is removed from the collection
                if (channel.isConnected()) {
                    try {
                        int max = 1;
                        Integer m = (Integer) channel.getAttribute(TRACE_MAX);
                        if(m ! =null) {
                            max = m;
                        }
                        int count = 0;
                        AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
                        if (c == null) {
                            c = new AtomicInteger();
                            channel.setAttribute(TRACE_COUNT, c);
                        }
                        count = c.getAndIncrement();
                        // There are two important parameters involved here, which represent the maximum number of listens and the current number of listens respectively
                        // private static final String TRACE_MAX = "trace.max";
                        // private static final String TRACE_COUNT = "trace.count";
                        if (count < max) {
                            String prompt = channel.getUrl().getParameter(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
                            channel.send("\r\n" + RpcContext.getServiceContext().getRemoteAddress() + "- >"
                                    + invoker.getInterface().getName()
                                    + "." + invocation.getMethodName()
                                    + "(" + JSON.toJSONString(invocation.getArguments()) + ")" + "- >" + JSON.toJSONString(result.getValue())
                                    + "\r\nelapsed: " + (end - start) + " ms."
                                    + "\r\n\r\n" + prompt);
                        }
                        if (count >= max - 1) { channels.remove(channel); }}catch(Throwable e) { channels.remove(channel); }}else{ channels.remove(channel); }}}}return result;
}

// PS: Here is a supplement to the Trace adding process. TraceFilter provides two methods to manage Trace channels
public static void addTracer(Class<? > type, String method, Channel channel,int max) 
    -  String key = method ! =null && method.length() > 0 ? type.getName() + "."+ method : type.getName(); ? - It may be noted thattracer key j
    
public static void removeTracer(Class
        type, String method, Channel channel)
    
// Trace's main management and processing class is TraceTelnetHandler
public class TraceTelnetHandler implements TelnetHandler {}Copy the code

conclusion

  • The Filter chain is loaded via SPI
  • Filter chains are built with FilterChainBuilder
  • Filter by invocation. GetObjectAttachments () to obtain the properties

This article is not deep, mainly because the author is still in the surface learning, second, because this article is a general article, the subsequent in-depth Dubbo 3.0