1. Overview of filters

The overall design of the filter conforms to the open and close principle in software design, that is, to achieve new functional requirements by extending rather than modifying the original design. It is accomplished by the design pattern of responsibility chain. The responsibility chain mode is shown in the figure below. When a request comes, a series of processing is required for this request. Using the responsibility chain mode, each processing can be componentized and coupling can be reduced. It can also be used when a request comes in and you need to find the right way to process it. When a processing method does not work for the request, it is passed to the next processing method, which then tries to process the request.

So it can be seen that based on the design of the chain of responsibility with low coupling, simplify the links between objects, the advantages of more flexible extension, but also can bring a certain performance loss, but also because of using the improper design may cause death cycle/logic errors, etc., in general, for a given priority to with the request/response model of RPC framework is not do more harm than good, More scalable. The Dubbo filter framework intercepts the entire call process, providing a way to insert custom logic before and after the call.

2. Overall filter structure

The Filter interface has the @SPI annotation, which is an extension point interface, as well as an internal definition of the Listener interface. The Filter is concerned with the invocation, and the Listener is concerned with the callback of the message after the response.

@SPI public interface Filter { Result invoke(Invoker<? > invoker, Invocation invocation) throws RpcException; interface Listener { void onResponse(Result appResponse, Invoker<? > invoker, Invocation invocation); void onError(Throwable t, Invoker<? > invoker, Invocation invocation); }}Copy the code

As you can see from the following figure, the Dubbo framework comes with a number of Filter implementation classes that implement a variety of functions. Many of the Filter implementation classes use the @activate annotation, which is activated by default in ExtensionLoader processing. Of course, the Filter implementation class also adds conditions such as service provider/consumer, conditional activation with specified parameters. The appropriate filter execution chain can be better constructed.

3. Filter implementation principle

Serviceconfigure #doExportUrlFor1Protocol Referenceconfigure services are exposed and referenced in the Protocol layer, where the ProtocolFilterWrapper class implements the assembly of the filter chain, Both service exposures and references are implemented using the ProtocolFilterWrapper#buildInvokeChain method.

public class ProtocolFilterWrapper implements Protocol { @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (UrlUtils.isRegistry(invoker.getUrl())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER)); } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); }}Copy the code

The core is that the current node holds the next node. When the user implements a custom filter class, the next node will be passed in as a construction parameter. Note that the invoker #invoker method of the current node will call the invoker #invoker method of the passed parameter object. Otherwise, the next execution node will be ignored and the remaining execution links will fail. Invoker is constructed by constructing anonymous classes from inside to outside, which requires reverse order traversal to make the outermost Invoker likely to be the last one to be executed. For example, the filter list is <A,B,C> and Invoker, and the filter link construction order is C->Invoker,B ->C->Invoker, A->B->C->Invoker, the final calls are in positive order, so it becomes A->B->C->Invoker.

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {// Save the real reference Invoker<T> last = Invoker; / / according to the group on the url parameter to obtain a List of default and all Filter object List < Filter > filters. = ExtensionLoader getExtensionLoader (Filter. The class) .getActivateExtension(invoker.getUrl(), key, group); if (! Filters. IsEmpty ()) {// Filter in reverse order for (int I = filters. Size () -1; i >= 0; i--) { final Filter filter = filters.get(i); Final Invoker<T> next = last; final Invoker<T> next = last; last = new Invoker<T>() { @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; AsyncResult = filter.invoke(next, Invocation); // Invocation asyncResult = Invoke (next, Invocation); } catch (Exception e) {// This will trigger the listener onResponse and onError methods, // the result of the response and the result of the Exception. } finally { ... } / / callback asynchronous results processing return asyncResult. WhenCompleteWithContext ((r, t) - > {/ / here will trigger the listener onResponse and onError method, // Handle the response results and abnormal results... }); }... }; } } return last; }Copy the code

4. Main filter implementation principle

4.1 Implementation Principle of AccessLogFilter

The AccessLogFilter functions on the service provider to log each request. A typical producer-consumer model is used here, where the log information is put into the buffer on each invocation and then the task thread is started to periodically fetch the log information from the buffer and write it to a file.

@Activate(group = PROVIDER, Value = ACCESS_LOG_KEY) public class AccessLogFilter implements Filter {public AccessLogFilter() { Every 5 seconds to write buffer of log information into the file LOG_SCHEDULED. ScheduleWithFixedDelay (this: : writeLogToFile, LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS); } @Override public Result invoke(Invoker<? > Invocation inv) throws RpcException {try {// Obtain the accessLog parameter on the URL, String accessLogKey = invoker.geturl ().getParameter(ACCESS_LOG_KEY); if (ConfigUtils.isNotEmpty(accessLogKey)) { AccessLogData logData = buildAccessLogData(invoker, inv); log(accessLogKey, logData); } } catch (Throwable t) { ... } return invoker.invoke(inv); } private void log(String accessLog, AccessLogData accessLogData) { Set<AccessLogData> logSet = LOG_ENTRIES.computeIfAbsent(accessLog, k -> new ConcurrentHashSet<>()); If (logset.size () < LOG_MAX_BUFFER) {logset.add (accessLogData); } else {// Write to writeLogSetToFile(accessLog, logSet) directly after exceeding buffer limit size; logSet.add(accessLogData); }}}Copy the code

4.2 Implementation Principles of TimeoutFilter

The Timeout filter works on the service provider and records the Invoker invocation time. If the Timeout value is exceeded, an alarm log is generated. In this case, the RpcContext class is mainly used to perform related functions. This class object information records the related time statistics of a call.

@Activate(group = CommonConstants.PROVIDER) public class TimeoutFilter implements Filter, Filter.Listener { .... @Override public void onResponse(Result appResponse, Invoker<? > invoker, Invocation invocation) { Object obj = RpcContext.getContext().get(TIME_COUNTDOWN_KEY); if (obj ! = null) { TimeoutCountDown countDown = (TimeoutCountDown) obj; if (countDown.isExpired()) { ((AppResponse) appResponse).clear(); . }}}}Copy the code

4.3 Implementation principle of TpsLimitFilter

TpsLimitFilter is mainly used to limit traffic of service providers, and it depends on the implementation of TpsLimiter. TpsLimiter’s flow limiting is implemented based on the idea of token bucket, that is, only N tokens are allocated within a period of time, and each request will consume one token. Until the bits are exhausted, subsequent requests will be rejected. The latitude of a traffic limiting object supports group, version, and interface level. The interface+ Group + Version is used as a unique identifier to determine whether the maximum value is exceeded. The specific logic is determined through DefaultTPSLimiter# isbehave method, then call StatItem# isbehave method, obtain the unique identification of services according to the rules, StatItem for each service interface. And use ConcurrentMap to store it.

public class DefaultTPSLimiter implements TPSLimiter { private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>(); @Override public boolean isAllowable(URL url, Invocation invocation) { ... If (rate > 0) {// Obtain the StatItem object by using the unique identifier of the service StatItem = stats.get(serviceKey); If (statItem == null) {stats. PutIfAbsent (serviceKey, new statItem (serviceKey, rate, interval)); statItem = stats.get(serviceKey); } else {// Update the corresponding StateItem object in the container if (statitem.getrate ()! = rate || statItem.getInterval() ! = interval) { stats.put(serviceKey, new StatItem(serviceKey, rate, interval)); statItem = stats.get(serviceKey); Return statitem.isallowable (); } else {// Remove the StateItem object StatItem StatItem = stats.get(serviceKey) from the container if there is no rate argument; if (statItem ! = null) { stats.remove(serviceKey); } } return true; } } @Activate(group = CommonConstants.PROVIDER, value = TPS_LIMIT_RATE_KEY) public class TpsLimitFilter implements Filter { private final TPSLimiter tpsLimiter = new DefaultTPSLimiter(); @Override public Result invoke(Invoker<? > invoker, Invocation invocation) throws RpcException { if (! tpsLimiter.isAllowable(invoker.getUrl(), invocation)) { ... } return invoker.invoke(invocation); }}Copy the code

The token bucket algorithm is realized in StatItem#isAllowable method, if the current request timestamp is greater than the sum of the last update time variable and time interval, i.e., after the time window is exceeded, regenerate the token number and update the last update time. If the total number of tokens is less than 0, the token is consumed by other requests in the time window, then the request fails to be returned. If not, the token number decreases and the request succeeds.

class StatItem { public boolean isAllowable() { long now = System.currentTimeMillis(); If (now > lastResetTime + interval) {// Regenerate token token = buildLongAdder(rate); LastResetTime = now; If (token.sum() < 0) {return false; } // Token number minus 1 token.decrement(); return true; }}Copy the code

The token bucket algorithm is relatively easy to understand, as shown in the figure below. If THE TPS limit is 4, it can be divided into three different situations. There is only one request in the time window during initialization, then the request is successful. If the next request timestamp exceeds the time window, the last time is updated and the token timestamp is regenerated, but the number of requests within this time period is only 3 and the number of tokens is not exhausted, then all three requests are successful. In other cases, multiple requests arrive in the time window and immediately consume four tokens, and subsequent requests do not get tokens, and the request fails.

4.4 Implementation principle of ExcuteLimitFilter

The ExecuteLimitFilter works primarily on the service provider side, limiting concurrent processing of server threads through the RpcStatus class. The RpcStatus#beginCount method is first used to determine whether the maximum number of concurrent processing on the provider side is exceeded. Then the invoke call is made, and after receiving the request recovery, endCount is called to count the reply results.

@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY) public class ExecuteLimitFilter implements Filter, Filter.Listener { @Override public Result invoke(Invoker<? > invoker, Invocation invocation) { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0); // call beginCount for active method-level calls to determine if (! RpcStatus.beginCount(url, methodName, max)) { ... }... try { return invoker.invoke(invocation); } catch (Throwable t) { ... } } @Override public void onResponse(Result appResponse, Invoker<? > Invoker, Invocation) {// The rpcstatus.endCount (Invoker.geturl (), Invocation. getElapsed(invocation), true); }}Copy the code

In the RpcStatus class, there are many statistical variables defined, such as the number of methods being called/the number of successful historical calls/the number of historical calls failed/the number of historical calls abnormal statistical variables and so on. The thread-safe container is used. In the beginCount method, the spin method is used to increment the number of method calls. If the number of concurrent processes exceeds the maximum value, the return fails. If the number of concurrent processes exceeds the maximum value, the return succeeds after the increment. Finally, the status results returned after the method call are counted in endCound.

public class RpcStatus { public static boolean beginCount(URL url, String methodName, int max) { ... // for (int I; ;) { i = methodStatus.active.get(); If (I + 1 > Max) {return false; } / / cas since increased, after the success of the operation is out of the if (methodStatus.active.com pareAndSet (I, I + 1)) {break; }} / / appStatus. Active. IncrementAndGet (); return true; }} public static void elapsed (int elapsed) public static void endCount(int elapsed) boolean succeeded) { endCount(getStatus(url), elapsed, succeeded); endCount(getStatus(url, methodName), elapsed, succeeded); } private static void endCount(RpcStatus status, long elapsed, Boolean succeeded) {/ / is called statistical number since the reduction status. The active. DecrementAndGet (); / / total number of calls on the history status. The total. IncrementAndGet (); status.totalElapsed.addAndGet(elapsed); // If successful, Then update history success number if (succeeded) {if (status. SucceededMaxElapsed. Get () < elapsed) {status. SucceededMaxElapsed. Set (elapsed); }} else {/ / if it fails, then the update number of historical failure status. Failed. IncrementAndGet (); status.failedElapsed.addAndGet(elapsed); if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); }}}Copy the code

5. To summarize

This chapter introduces the main structure and implementation principle of filters in Dubbo framework. It also explains the functions and attributes of some main filters. In the ProtocolFilterWrapper, the framework provides multiple layers of filters for each Invoker packet, forming a filter chain.

reference

www.jianshu.com/p/1828a3bf1…

Juejin. Cn/post / 684490…

Blog.csdn.net/hengyunabc/… Arthas looks at the Dubbo problem

Xilidou.com/2018/11/27/… LogAddr principle

Cloud.tencent.com/developer/a… Dubbo’s TPSLimiter implementation principle