Cluster call

To provide performance and maintain high availability, there are often multiple providers of an interface capability, known as clusters.

Let’s examine the flow of an Invoke in Dubbo.

First, introduce a few key classes, which can be digested in combination with specific processes.

Relationship between nodes (refer to Dubbo’s official website)

  • ClusterDirectoryIn the multipleInvokerMasquerade as aInvokerThe masquerading process contains fault-tolerant logic. If the call fails, retry the other one
  • InvokerProviderIs a callableServiceThe abstract,InvokerEncapsulates theProviderAddress andServiceInterface information.

  • DirectoryOn behalf of multipleInvokerYou can view it asList<Invoker>But, unlikeListThe difference is that its value may change dynamically, such as when the registry pushes changes
  • RouterResponsible for work from multipleInvokerSelect subsets according to routing rules, such as read/write separation, application isolation, etc
  • LoadBalanceResponsible for work from multipleInvokerThe selection process contains the load balancing algorithm. If the call fails, the call needs to be selected again

Invoke the process

Using the template method design pattern, we can see that some main processes are in the parent class AbstractClusterInvoker, and some special implementations such as doXxx (such as doList and doInvoke) are customized according to different subclasses of configuration or SPI.

AbstractClusterInvoker#invoke

public Result invoke(final Invocation invocation) throws RpcException {
    // Check whether the service has been destroyed
    checkWhetherDestroyed();

    // Bind some context information to the Invocation to the remote server
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if(contextAttachments ! =null&& contextAttachments.size() ! =0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }
    // list all available invokers (DynamicDirectory, RegistryDirectory), then filter the subsets according to the router rules, such as read/write separation, etc
    List<Invoker<T>> invokers = list(invocation);
    // Load balancing, select a provider that is actually called, using Random by default
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    // If the call is asynchronous, record the call ID, for idempotent
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // Based on the cluster fault tolerance policy, failover is used by default
    return doInvoke(invocation, invokers, loadbalance);
}
Copy the code

You can see that a lot of defaults are used. How was the default implementation chosen? You can see that the default implementation is specified through the SPI mechanism

@SPI(Cluster.DEFAULT)
public interface Cluster {
    String DEFAULT = FailoverCluster.NAME;
    
}

public class FailoverCluster extends AbstractCluster {
    public final static String NAME = "failover";
}
Copy the code

The cluster tolerance

There are many fault-tolerant policies, and Failover is implemented by default

Failover: When the call fails, log the initial error and retry the other callers (retry n times, which means that at most n different callers will be called) please note that retries cause delays. The default delay is 5s. Suitable for read operations with low real-time performance.

FailoverClusterInvoker#doInvoke is the concrete invoke implementation of the last parent class step.

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    // Check whether invokers is null
    checkInvokers(copyInvokers, invocation);
    // Get the method name
    String methodName = RpcUtils.getMethodName(invocation);
    // Get the configured number of retries from the URL and +1. If the number of retries is set to 2, the maximum number of calls is 3.
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    // Defensive programming, if configured negative or 0 will also make a call
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    // last exception.
    RpcException le = null;
    // Those that have already been called will be filtered in subsequent load balancing.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); 
    // Record the address of the called service
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            // Verify whether it is destroyed
            checkWhetherDestroyed();
            // List the providers
            copyInvokers = list(invocation);
            // Check again whether it is empty
            checkInvokers(copyInvokers, invocation);
        }
        // Perform load balancing selection and reselect if invoker has already been called
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        // Put the called into the list for logging use
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            if(le ! =null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + "(" + providers.size() + "/" + copyInvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            // Business exceptions are thrown
            if (e.isBiz()) { 
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            // If the network is abnormal, try again and record the last exception
            le = new RpcException(e.getMessage(), e);
        } finally{ providers.add(invoker.getUrl().getAddress()); }}throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + "(" + providers.size() + "/" + copyInvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "+ le.getMessage(), le.getCause() ! =null ? le.getCause() : le);
}
Copy the code

The whole process