Dubbo cluster fault tolerance

In a distributed environment, services are typically clustered, so it is a matter of which of the many services to call and what to do if they fail. We did it all in Dubbo. Dubbo designed a cluster fault tolerance mechanism, and Dubbo helped us deal with the failed logic after the caller failed to call. Dubbo defines Cluster and Invoker. The implementation class of Cluster interface is divided into: FailOverCluster, FailFastCluster, FailSafeCluster, FailBackCluster, ForkingCluster and BroadcastCluster, corresponding to XxxInvoker classes

FailOverCluster

Dubbo default cluster fault tolerance policy. If the invocation fails, the consumer tries again. If the invocation fails, an exception is thrown. The default number of retries is 2.

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return newFailoverClusterInvoker<T>(directory); }}Copy the code

The job of FailoverCluster is to combine multiple invokers into a single Invoker, that is, to create the FailoverClusterInvoker. The specific fault tolerant logic is corresponding to the Invoker. The AbstractClusterInvoker#invoke method is invoked for a specific service invocation, and the template doInvoke method is finally invoked, with subclasses completing the specific invocation logic.

FailoverClusterInvoker# doInvoke:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    // The Invokers are a collection of invokers obtained from the service list and routed
    List<Invoker<T>> copyinvokers = invokers;
    // Check whether Invokers is empty or a null-throw exception
    checkInvokers(copyinvokers, invocation);
    // Get the name of the called method
    String methodName = RpcUtils.getMethodName(invocation);
    The default value is 2. Retries exclude the first call, so + 1 is required
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    // If this parameter is not valid, set it to 1
    if (len <= 0) {
        len = 1;
    }
    // Record the last exception after the number of retries is reached
    RpcException le = null; 
    // Record the services that have been invoked
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); 
    Set<String> providers = new HashSet<String>(len);
    Retry Dubbo's default cluster fault tolerance policy after the call fails
    for (int i = 0; i < len; i++) {
        if (i > 0) {
            // It is necessary to re-check that the service directory has been re-fetched and route to get the new Invoker list
            checkWhetherDestroyed();
            // Get the list of services again
            copyinvokers = list(invocation);
            // Retest whether invokers is empty
            checkInvokers(copyinvokers, invocation);
        }
        // Get the service that needs to be called eventually through load balancing
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // If the invocation succeeds, the system returns a failure and obtains the service list to perform load balancing
     	   // If the failed service is still available after load balancing, load balancing is performed again
           // The same service will not be dropped
            Result result = invoker.invoke(invocation);
            if(le ! =null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + invocation.getMethodName()
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + "(" + providers.size() + "/" + copyinvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally{ providers.add(invoker.getUrl().getAddress()); }}// If the retry fails, an exception is thrown
    throw newRpcException(le ! =null ? le.getCode() : 0."Failed to invoke the method "
            + invocation.getMethodName() + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + "(" + providers.size() + "/" + copyinvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "+ (le ! =null ? le.getMessage() : ""), le ! =null&& le.getCause() ! =null ? le.getCause() : le);
}
Copy the code

FailFastCluster

Fail fast. Only one call is made. When the call fails, an exception is thrown directly.

Likewise, the final call will come to the FailfastClusterInvoker doInvoke method, and the analysis of the front logic will no longer be done, and the analysis of the doInvoke method will directly skip the foreplay

FailFastClusterInvoker#doInvoke

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    // Check whether Invokers is empty or a null-throw exception
    checkInvokers(invokers, invocation);
    // Execute load balancing logic
    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    try {
        // Perform the call
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        if (e instanceof RpcException && ((RpcException) e).isBiz()) {
            throw (RpcException) e;
        }
        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0."Failfast invoke providers " + invoker.getUrl() + "" + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e.getCause() ! =null? e.getCause() : e); }}Copy the code

As you can see, the logic is very simple: an exception is thrown when the call fails without much analysis

FailSafeCluster

Failure security: After a call fails, exceptions are not thrown but failure logs are logged and empty results are returned

FailSafeClusterInvoker#doInvoke

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        // Check whether Invokers is empty or a null-throw exception
        checkInvokers(invokers, invocation);
        // Execute load balancing logic
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        // Perform the call
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        // Record the exception log after the invocation failure
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        // Returns an empty result
        return newRpcResult(); }}Copy the code

FailBackCluster

Record failure logs after the invocation fails. Empty result is returned first. Add the incorrect invocation to the scheduled task and execute it at an interval of 5 seconds.

FailBackClusterInvoker#doInvoke

protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
         // Check whether Invokers is empty or a null-throw exception
        checkInvokers(invokers, invocation);
        // Execute load balancing logic
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        // Perform the call
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        // Record the invocation failure exception log
        logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                + e.getMessage() + ",", e);
        // Add the error call to the scheduled task every 5 seconds
        addFailed(invocation, this);
        // Return an empty result
        return newRpcResult(); }}Copy the code

FailBackClusterInvoker#addFailed

private volatileScheduledFuture<? > retryFuture;private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2.new NamedInternalThreadFactory("failback-cluster-timer".true));
private finalConcurrentMap<Invocation, AbstractClusterInvoker<? >> failed =newConcurrentHashMap<Invocation, AbstractClusterInvoker<? > > ();private void addFailed(Invocation invocation, AbstractClusterInvoker
        router) {
    if (retryFuture == null) {
        synchronized (this) {
            if (retryFuture == null) {
                // Create a scheduled task and execute it every five seconds
                retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                    @Override
                    public void run(a) {
                        try {
                            retryFailed();
                        } catch (Throwable t) { // Defensive fault tolerance
                            logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); }}}// Add invocation and invoker to failed
    failed.put(invocation, router);
}
Copy the code
void retryFailed(a) {
    if (failed.size() == 0) {
        return;
    }
    / / traverse failed
    for(Map.Entry<Invocation, AbstractClusterInvoker<? >> entry :newHashMap<Invocation, AbstractClusterInvoker<? >>( failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<? > invoker = entry.getValue();try {
            // Perform the call
            invoker.invoke(invocation);
            // Remove from failed after successful invocation
            failed.remove(invocation);
        } catch (Throwable e) {
            logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); }}}Copy the code

ForkingCluster

Multiple services are invoked in parallel, and success is returned when one of them returns success

ForkingClusterInvoker#doInvoke

// Create a thread pool
private final ExecutorService executor = Executors.newCachedThreadPool(
            new NamedInternalThreadFactory("forking-cluster-timer".true));

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        // Check whether Invokers is empty or a null-throw exception
        checkInvokers(invokers, invocation);
        final List<Invoker<T>> selected;
        // Get the value of the forks parameter, which defaults to 2
        final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
        // Get the value of timeout. The default value is 1000
        final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // If the forks parameter is not set properly, assign Invokers to selected
        if (forks <= 0 || forks >= invokers.size()) {
            selected = invokers;
        } else {
            selected = new ArrayList<Invoker<T>>();
            // Select forks invoker for load balancing
            for (int i = 0; i < forks; i++) {
                // Load balancing
                Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                // Filter repeated invokers
                if(! selected.contains(invoker)) { selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected);final AtomicInteger count = new AtomicInteger();
        // Create a blocking queue
        final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
        for (final Invoker<T> invoker : selected) {
            // The thread pool performs multiple invokers
            executor.execute(new Runnable() {
                @Override
                public void run(a) {
                    try {
                        // Perform the call
                        Result result = invoker.invoke(invocation);
                        // Put it in the blocking queue
                        ref.offer(result);
                    } catch (Throwable e) {
                        // Incrementing after a failed call, when incrementing >= the size of the invoker list, the exception is queued
                        // Why is >= because if only one of them succeeds, we can return success
                        // The exception is added to the blocking queue when all calls fail
                        // If there is no element in the queue, it will block until there is an element, and if there is a successful return, it will be added to the queue
                        int value = count.incrementAndGet();
                        if(value >= selected.size()) { ref.offer(e); }}}}); }try {
            // There are two outcomes
            // 1. The call was successful and the result was retrieved from the queue
            // 2. All services fail to be invoked, and the elements in the queue are Throwable exceptions
            Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
            if (ret instanceof Throwable) {
                Throwable e = (Throwable) ret;
                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0."Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e.getCause() ! =null ? e.getCause() : e);
            }
            return (Result) ret;
        } catch (InterruptedException e) {
            throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e); }}finally{ RpcContext.getContext().clearAttachments(); }}Copy the code

BroadcastCluster

Broadcasting, as the name suggests, calls each service and throws an exception if one of them returns a failure (that is, if most of the machines succeed and only one fails), just like a transaction. Note: Here you can configure the ratio of failed machines, such as: @reference(cluster = “broadcast”, parameters = {“broadcast.fail.percent”, “20”}) indicates that 20% of the machines in the cluster return a failure, and no other machines will be invoked.

BroadcastClusterInvoker#doInvoke

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    // Check whether Invokers is empty or a null-throw exception
    checkInvokers(invokers, invocation);
    RpcContext.getContext().setInvokers((List) invokers);
    // Used to save exception information
    RpcException exception = null;
    Result result = null;
    // Invoke each service
    for (Invoker<T> invoker : invokers) {
        try {
            result = invoker.invoke(invocation);
        } catch (RpcException e) {
            exception = e;
            logger.warn(e.getMessage(), e);
        } catch (Throwable e) {
            exception = newRpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); }}// Throw an exception whenever there is a failure
    if(exception ! =null) {
        throw exception;
    }
    return result;
}
Copy the code