>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE Backup: 👉 gitee.com/antblack/ca…
A. The preface
Dubbo 3.0 Server reception, this article will take a look at the Dubbo filter chain. Filtration chain is also a very important part of the whole process.
2. Starting point for processing
// Invoke logicC-channeleventrunnable # run: the starting point of the call where a message is received C-exchangeHandler # reply: C- FilterChainBuilder # Invoke: Start the Invoke Filter chain construction processCopy the code
2.1 Invoke Initiate process
// as you saw in the previous article, reply calls are made by building an ExchangeHandlerAdapter
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
// Keep only the core logic
Invocation inv = (Invocation) message;
// FilterChainBuilder is actually called hereInvoker<? > invoker = getInvoker(channel, inv);/ /...Result result = invoker.invoke(inv); }}Copy the code
2.2 Filter chain construction process
C- FilterChainBuilder
// Build the Filter chain and invoke the operation
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// Filter chain processing
asyncResult = filter.invoke(nextNode, invocation);
} catch (Exception e) {
// select * from Listerner; // select * from Listerner
throw e;
} finally{}return asyncResult.whenCompleteWithContext((r, t) -> {
// Notify the listener after the processing is complete
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if(listener ! =null) {
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else{ listener.onError(t, originalInvoker, invocation); }}}finally{ listenableFilter.removeListener(invocation); }}else if (filter instanceof FILTER.Listener) {
// Process the basefilter.listener interface
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else{ listener.onError(t, originalInvoker, invocation); }}}); }Copy the code
2.3 Call structure of Filter chain
// Take a look at FilterChainBuilder, whose core object is one of the inner classes
class FilterChainNode<T.TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{
// Source Invoke type
TYPE originalInvoker;
// Next invoke node
Invoker<T> nextNode;
// The current filter object
FILTER filter;
}
/ / the following place have a look at the construction of the Filter - > DefaultFilterChainBuilder
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
Invoker<T> last = originalInvoker;
// Load external Extension
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
if(! filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
// You can see that the FilterChainNode loop is created, and the responsibility chain mode is created
last = newFilterChainNode<>(originalInvoker, next, filter); }}return last;
}
// Here is the loading mode of SPI, corresponding SPI class is
dubbo-rpc-api/META-INF.dubbo.internal -> org.apache.dubbo.rpc.Filter
// It contains the following contents
echo=org.apache.dubbo.rpc.filter.EchoFilter
generic=org.apache.dubbo.rpc.filter.GenericFilter
genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
token=org.apache.dubbo.rpc.filter.TokenFilter
accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter
classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter
context=org.apache.dubbo.rpc.filter.ContextFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
// PS: Dubbo SPI loads in a similar way
Copy the code
SPI can be seen at @juejin.cn/post/698945…
2.4 Differences between Dubbo 3.0 and Dubbo 2.0
The biggest difference between the two is the change of Filter mechanism.
// Step 1: Filter
@SPI
public interface Filter extends BaseFilter {}// Step 2: BaseFilter structure
public interface BaseFilter {
// Always invoke invoke.invoke() in the implementation, passing the request to the next filter node
Result invoke(Invoker
invoker, Invocation invocation) throws RpcException;
interface Listener {
// When the call is complete, choose whether to call it according to the situation
void onResponse(Result appResponse, Invoker
invoker, Invocation invocation);
void onError(Throwable t, Invoker
invoker, Invocation invocation); }}// There is an interface inside the interface. // There is an interface inside the interfaceAs far as implementation is concerned, it is used in the same way as the inner class:public class ExceptionFilter implements Filter.Filter.Listener
Copy the code
3. The Filter chain
Filter architecture
The list of the main filters is as follows:
- EchoFilter
- ClassLoaderFilter
- GenericFilter
- ContextFilter
- ExceptionFilter
- MonitorFilter
- TimeoutFilter
- TraceFilter
3.1 EchoFilter
ECHO indicates the ECHO, which is used to check whether the service is available
public Result invoke(Invoker
invoker, Invocation inv) throws RpcException {
// String $ECHO = "$echo";
if(inv.getMethodName().equals($ECHO) && inv.getArguments() ! =null && inv.getArguments().length == 1) {
// An asynchronous Result return is built directly, and the echo service only checks if the service is available
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}
Copy the code
3.2 ClassLoaderFilter
The Filter is used for secondary processing of the ClassLoader
// Set the current thread class ClassLoader to the class ClassLoader of the service interface
public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
// Switch the ClassLoader
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
try {
return invoker.invoke(invocation);
} finally {
// Re-place the original ClassLoad hereThread.currentThread().setContextClassLoader(ocl); }}// class loaders can't handle classes loaded by external Jars
// When there are multiple classloaders, the parent class fails to be loaded due to the parent delegate mechanism, and the final ClassLoader is loaded, the relevant switch needs to be made
Copy the code
3.3 GenericFilter
GenericFilter also implements two interfaces: Filter, filter.listener, which means that Response processing,
TODO: GenericFilter is a class whose main purpose is not clear, and this class is too long, we will write a separate chapter to clarify
GenericFilter implements Filter, Filter.Listener
@Override
public Result invoke(Invoker
invoker, Invocation inv) throws RpcException {
// String $INVOKE = "$invoke";
// String $INVOKE_ASYNC = "$invokeAsync";
if((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC)) && inv.getArguments() ! =null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
/ /...
}
return invoker.invoke(inv);
}
@Override
public void onResponse(Result appResponse, Invoker
invoker, Invocation inv) {
if((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC)) && inv.getArguments() ! =null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
/ /...}}@Override
public void onError(Throwable t, Invoker
invoker, Invocation invocation) {}Copy the code
3.4 ContextFilter
ContextFilter is used to process the context, which stores the environment information required during the current call
static {
UNLOADING_KEYS = new HashSet<>(128);
UNLOADING_KEYS.add(PATH_KEY);
UNLOADING_KEYS.add(INTERFACE_KEY);
UNLOADING_KEYS.add(GROUP_KEY);
UNLOADING_KEYS.add(VERSION_KEY);
UNLOADING_KEYS.add(DUBBO_VERSION_KEY);
UNLOADING_KEYS.add(TOKEN_KEY);
UNLOADING_KEYS.add(TIMEOUT_KEY);
UNLOADING_KEYS.add(TIMEOUT_ATTACHMENT_KEY);
// Remove the async property to avoid passing to the following call chain
UNLOADING_KEYS.add(ASYNC_KEY);
UNLOADING_KEYS.add(TAG_KEY);
UNLOADING_KEYS.add(FORCE_USE_TAG);
}
@Override
public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
Map<String, Object> attachments = invocation.getObjectAttachments();
if(attachments ! =null) {
Map<String, Object> newAttach = new HashMap<>(attachments.size());
// Attachments are mainly metadata information of requests
// {" input ":" 265 ", "dubbo" : "2.0.2", "path" : "org. Apache. Dubbo. Metadata. MetadataService", "version" : "1.0.0", "group" : "dubbo - demo - annotation-provider"}
for (Map.Entry<String, Object> entry : attachments.entrySet()) {
String key = entry.getKey();
// ["path","_TO","group","dubbo.tag","version","dubbo.force.tag","dubbo","interface","timeout","token","async"]
if(! UNLOADING_KEYS.contains(key)) {// Collect attributes that are excluded
newAttach.put(key, entry.getValue());
}
}
attachments = newAttach;
}
// Invoke to ServiceContext5
RpcContext.getServiceContext().setInvoker(invoker)
.setInvocation(invocation);
RpcContext context = RpcContext.getServerAttachment();
context.setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
String remoteApplication = (String) invocation.getAttachment(REMOTE_APPLICATION_KEY);
if (StringUtils.isNotEmpty(remoteApplication)) {
RpcContext.getServiceContext().setRemoteApplicationName(remoteApplication);
} else {
RpcContext.getServiceContext().setRemoteApplicationName((String) context.getAttachment(REMOTE_APPLICATION_KEY));
}
// Call timeout, not set to -1
long timeout = RpcUtils.getTimeout(invocation, -1);
if(timeout ! = -1) {
// pass to next hop
RpcContext.getClientAttachment().setObjectAttachment(TIME_COUNTDOWN_KEY, TimeoutCountDown.newCountDown(timeout, TimeUnit.MILLISECONDS));
}
// Some attachments may have been added to the RpcContext before the filter
if(attachments ! =null) {
if(context.getObjectAttachments() ! =null) {
// The additional properties described above are set here
context.getObjectAttachments().putAll(attachments);
} else{ context.setObjectAttachments(attachments); }}if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
context.clearAfterEachInvoke(false);
return invoker.invoke(invocation);
} finally {
context.clearAfterEachInvoke(true);
RpcContext.removeServerAttachment();
RpcContext.removeServiceContext();
// For asynchronous scenarios, we must remove the context from the current thread, so we always create a new RpcContext for the next call from the same threadRpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY); RpcContext.removeServerContext(); }}@Override
public void onResponse(Result appResponse, Invoker
invoker, Invocation invocation) {
// Pass the attachment to the result
appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
}
@Override
public void onError(Throwable t, Invoker
invoker, Invocation invocation) {}Copy the code
3.5 ExceptionFilter
ExceptionFilter mainly processes the exceptions in the Response response
@Override
public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker
invoker, Invocation invocation) {
if(appResponse.hasException() && GenericService.class ! = invoker.getInterface()) {try {
Throwable exception = appResponse.getException();
// If it is checked for an exception, it is thrown
if(! (exceptioninstanceof RuntimeException) && (exception instanceof Exception)) {
return;
}
// If an exception occurs in the signature, it is thrown directly
try{ Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()); Class<? >[] exceptionClasses = method.getExceptionTypes();for(Class<? > exceptionClass : exceptionClasses) {if (exception.getClass().equals(exceptionClass)) {
return; }}}catch (NoSuchMethodException e) {
return;
}
/ /.. Ellipsis: Prints an ERROR message in the server log for exceptions not found in the method signature
// If the exception class and interface class are in the same JAR file, they are thrown directly
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return;
}
// If it is a JDK exception, throw it directly
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return;
}
// If it is a Dubbo exception, throw it directly
if (exception instanceof RpcException) {
return;
}
// Otherwise, wrap it with RuntimeException and return it to the client
appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
}
}
}
Copy the code
3.6 MonitorFilter
MonitorFilter is used to monitor operations. It invokes the interceptor and collects call data about this call and sends it to the monitoring center
/ / Invoke the process
public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
// private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";
// private static final String MONITOR_REMOTE_HOST_STORE = "monitor_remote_host_store";
invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getServiceContext().getRemoteHost());
getConcurrent(invoker, invocation).incrementAndGet(); // count up
}
return invoker.invoke(invocation); // proceed invocation chain
}
// The main Monitor processes are the following
@Override
public void onResponse(Result result, Invoker
invoker, Invocation invocation) {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
collect(invoker, invocation, result,
(String) invocation.get(MONITOR_REMOTE_HOST_STORE),
(long) invocation.get(MONITOR_FILTER_START_TIME), false);
getConcurrent(invoker, invocation).decrementAndGet(); // count down}}@Override
public void onError(Throwable t, Invoker
invoker, Invocation invocation) {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
collect(invoker, invocation, null, (String) invocation.get(MONITOR_REMOTE_HOST_STORE), (long) invocation.get(MONITOR_FILTER_START_TIME), true);
getConcurrent(invoker, invocation).decrementAndGet(); // count down}}private void collect(Invoker<? > invoker, Invocation invocation, Result result, String remoteHost,long start, boolean error) {
try {
Object monitorUrl;
// Get the MONITOR_KEY built above
monitorUrl = invoker.getUrl().getAttribute(MONITOR_KEY);
if(monitorUrl instanceof URL) {
//
Monitor monitor = monitorFactory.getMonitor((URL) monitorUrl);
if (monitor == null) {
return;
}
// Create the URL for the data
URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
// Collect monitoring datamonitor.collect(statisticsURL.toSerializableURL()); }}catch (Throwable t) {
logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: "+ t.getMessage(), t); }}// PS: This refers to MonitorService objects. We will not go into the details here, but walk through the process
C- MonitorService
- void collect(URL statistics): Collect monitoring data - List<URL>lookup(URL query)
Copy the code
3.7 TimeoutFilter
Handle timeout cases
@Override
public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker
invoker, Invocation invocation) {
// String TIME_COUNTDOWN_KEY = "timeout-countdown";
Object obj = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
if(obj ! =null) {
TimeoutCountDown countDown = (TimeoutCountDown) obj;
//
if (countDown.isExpired()) {
// // clears the response in case of timeout
((AppResponse) appResponse).clear();
// Just print a log
if (logger.isWarnEnabled()) {
logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
", invoke elapsed " + countDown.elapsedMillis() + " ms."); }}}}Copy the code
3.8 TraceFilter
The Filter is used to trace requests, and there is a collection that maintains a list of TRACES keys and channels, and after the invoke call is complete, the log is sent to the relevant Channel
Basically listen on any method or method on an interface n times
There is a major set involved here
private static final ConcurrentMap<String, Set<Channel>> TRACERS = new ConcurrentHashMap<>();
@Override
public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
// The common way to get before and after the call time
long start = System.currentTimeMillis();
Result result = invoker.invoke(invocation);
long end = System.currentTimeMillis();
// If trace exists
if (TRACERS.size() > 0) {
// Build the trace key as the proxy interface. Proxy method
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
Set<Channel> channels = TRACERS.get(key);
if (channels == null || channels.isEmpty()) {
key = invoker.getInterface().getName();
channels = TRACERS.get(key);
}
// Get the corresponding set of channels from the set
if (CollectionUtils.isNotEmpty(channels)) {
for (Channel channel : new ArrayList<>(channels)) {
// If a channel is connected or an exception occurs, it is removed from the collection
if (channel.isConnected()) {
try {
int max = 1;
Integer m = (Integer) channel.getAttribute(TRACE_MAX);
if(m ! =null) {
max = m;
}
int count = 0;
AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
if (c == null) {
c = new AtomicInteger();
channel.setAttribute(TRACE_COUNT, c);
}
count = c.getAndIncrement();
// There are two important parameters involved here, which represent the maximum number of listens and the current number of listens respectively
// private static final String TRACE_MAX = "trace.max";
// private static final String TRACE_COUNT = "trace.count";
if (count < max) {
String prompt = channel.getUrl().getParameter(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
channel.send("\r\n" + RpcContext.getServiceContext().getRemoteAddress() + "- >"
+ invoker.getInterface().getName()
+ "." + invocation.getMethodName()
+ "(" + JSON.toJSONString(invocation.getArguments()) + ")" + "- >" + JSON.toJSONString(result.getValue())
+ "\r\nelapsed: " + (end - start) + " ms."
+ "\r\n\r\n" + prompt);
}
if (count >= max - 1) { channels.remove(channel); }}catch(Throwable e) { channels.remove(channel); }}else{ channels.remove(channel); }}}}return result;
}
// PS: Here is a supplement to the Trace adding process. TraceFilter provides two methods to manage Trace channels
public static void addTracer(Class<? > type, String method, Channel channel,int max)
- String key = method ! =null && method.length() > 0 ? type.getName() + "."+ method : type.getName(); ? - It may be noted thattracer key j
public static void removeTracer(Class
type, String method, Channel channel)
// Trace's main management and processing class is TraceTelnetHandler
public class TraceTelnetHandler implements TelnetHandler {}Copy the code
conclusion
- The Filter chain is loaded via SPI
- Filter chains are built with FilterChainBuilder
- Filter by invocation. GetObjectAttachments () to obtain the properties
This article is not deep, mainly because the author is still in the surface learning, second, because this article is a general article, the subsequent in-depth Dubbo 3.0