Clusters, cluster

Objective: Introduce several modes of cluster fault tolerance in Dubbo, introduce the source code of support package under Dubbo – Cluster.

preface

Cluster fault tolerance is well understood and is what you do when a call fails. Let’s take a look at the patterns:

Each Cluster implementation class has an invoker, because this mode is enabled at the time of invocation. As I mentioned in the previous article, invoker runs through the entire service invocation. However, in addition to invoking failed patterns, there are several special patterns, which should be described as failed measures, just the way of invoking.

  1. Failsafe Cluster: indicates failure security. If an exception occurs, it is ignored. Failsafe means that when an exception occurs during invocation, FailsafeClusterInvoker only prints the exception but does not throw an exception. This operation applies to writing audit logs
  2. Failover Cluster: When a call fails, it will automatically switch to another server in the Cluster to obtain invoker retry. This is usually used for read operations, but the retry will have a longer delay. The number of retries is usually set.
  3. Failfast Cluster: invokes Failfast Cluster once and throws an exception immediately. Ideal for idempotent operations, such as new records.
  4. Failback Cluster: Automatic recovery after a failed invocation, returns an empty result to the service provider. And through the scheduled task of the failed call record and retransmission, suitable for the implementation of message notification and other operations.
  5. Forking Cluster: Runs multiple threads in the thread pool to call multiple servers and returns if one succeeds. It is usually used for read operations that require high real-time performance but waste more service resources. The maximum number of parallelism is usually set.
  6. Available Cluster: Calls the first Available server, only for multiple registries.
  7. Broadcast Cluster: Broadcast calls all providers, one by one, and reports an error if any one of them reports an error at the end of the loop. Typically used to notify all providers to update local resource information such as caches or logs
  8. Mergeable Cluster: This section is covered in group aggregation.
  9. MockClusterWrapper: This section is told in disguise locally.

Source code analysis

(a) AbstractClusterInvoker

This class implements the Invoker interface and is an abstract class for the cluster Invoker.

1. The attribute

private static final Logger logger = LoggerFactory
        .getLogger(AbstractClusterInvoker.class);
/** * directory containing multiple invoker */
protected final Directory<T> directory;

/** * Whether to check the availability of */
protected final boolean availablecheck;

/** * whether to destroy */
private AtomicBoolean destroyed = new AtomicBoolean(false);

/** * sticky connection Invoker */
private volatile Invoker<T> stickyInvoker = null;
Copy the code

2.select

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    // If invokers is empty, return null
    if (invokers == null || invokers.isEmpty())
        return null;
    // Get the method name
    String methodName = invocation == null ? "" : invocation.getMethodName();

    // Whether a sticky connection is started
    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
    {
        //ignore overloaded method
        // If the last call to a sticky connection was not in the optional provider list, set it to null
        if(stickyInvoker ! =null && !invokers.contains(stickyInvoker)) {
            stickyInvoker = null;
        }
        //ignore concurrency problem
        // stickyInvoker is not null and is not in the selected list. The last service provider stickyInvoker is returned, but the reacability is enforced before.
        // Since stickyInvoker cannot be included in the selected list, forking and failover policies do not use the sticky attribute
        if(sticky && stickyInvoker ! =null && (selected == null| |! selected.contains(stickyInvoker))) {if (availablecheck && stickyInvoker.isAvailable()) {
                returnstickyInvoker; }}}// Select a provider using load balancing
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    // If a sticky connection is started, the call is logged
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}
Copy the code

This method implements the selection of a caller using a load balancing policy. First, a caller is selected using loadBalance. Reselect if this caller is in a previously selected list, or if this caller is unavailable, otherwise return the first selected caller. Re-select the validation rule: choose > Available. This rule ensures that the selected caller has at least a chance of becoming one of the previously selected lists and that the caller is available.

3.doSelect

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    // If there is only one, return this directly
    if (invokers.size() == 1)
        return invokers.get(0);
    // If no load balancing policy is specified, the random load balancing policy is used by default
    if (loadbalance == null) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    // Invoke load balancing selection
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
    // Re-select the selected provider if it is already in Selected or unavailable
    if((selected ! =null&& selected.contains(invoker)) || (! invoker.isAvailable() && getUrl() ! =null && availablecheck)) {
        try {
            // Re-select
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if(rinvoker ! =null) {
                invoker = rinvoker;
            } else {
                //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                // If the re-selection fails, look at the first selected position, if not last, select +1 position.
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    // Finally avoid selecting the same invoker
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); }}}catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); }}return invoker;
}
Copy the code

This method is the main logic for selecting an Invoker with load balancing.

4.reselect

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
        throws RpcException {

    //Allocating one in advance, this list is certain to be used.
    // Assign a pre-selected list, which must be used.
    List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    //First, try picking a invoker not in `selected`.
    // Select from non-SELECT
    // Place the providers not included in selected in the reselectInvokers list and let the load balancer select
    if (availablecheck) { // invoker.isAvailable() should be checked
        for (Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                if (selected == null| |! selected.contains(invoker)) { reselectInvokers.add(invoker); }}}// Select the load balancer from the re-selection list
        if(! reselectInvokers.isEmpty()) {returnloadbalance.select(reselectInvokers, getUrl(), invocation); }}else { // do not check invoker.isAvailable()
        // Without checking whether the service is available, put the providers that are not included in the selected list reselectInvokers and let the load balancer select it
        for (Invoker<T> invoker : invokers) {
            if (selected == null || !selected.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
        if(! reselectInvokers.isEmpty()) {returnloadbalance.select(reselectInvokers, getUrl(), invocation); }}// Just pick an available invoker using loadbalance policy
    {
        // If not selected from the unselected list, select from the selected list
        if(selected ! =null) {
            for (Invoker<T> invoker : selected) {
                if ((invoker.isAvailable()) // available first&&! reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); }}}if(! reselectInvokers.isEmpty()) {returnloadbalance.select(reselectInvokers, getUrl(), invocation); }}return null;
}
Copy the code

The method is the logical implementation of the re – selection.

5.invoke

@Override
public Result invoke(final Invocation invocation) throws RpcException {
    // Check whether it has been destroyed
    checkWhetherDestroyed();
    LoadBalance loadbalance = null;

    // binding attachments into invocation.
    // Get the added value of context
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    // Put the added value into the session domain
    if(contextAttachments ! =null&& contextAttachments.size() ! =0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // Generate a collection of service providers
    List<Invoker<T>> invokers = list(invocation);
    if(invokers ! =null && !invokers.isEmpty()) {
        // Get the load balancer
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}
Copy the code

This method is a mandatory method for the Invoker interface, calling the logic of the chain, but the main logic is in the doInvoke method, which is an abstract method of the class that lets subclasses focus only on the doInvoke method.

6.list

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
    // Add the session domain invoker to the collection
    List<Invoker<T>> invokers = directory.list(invocation);
    return invokers;
}
Copy the code

This method calls directory’s list method to retrieve all Invoker collections from the session domain. I will cover directory in a future article.

(2) AvailableCluster

public class AvailableCluster implements Cluster {

    public static final String NAME = "available";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {

        // Create an AbstractClusterInvoker
        return new AbstractClusterInvoker<T>(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
                // Loop through all involer, calling as long as one is available.
                for (Invoker<T> invoker : invokers) {
                    if (invoker.isAvailable()) {
                        returninvoker.invoke(invocation); }}throw new RpcException("No provider available in "+ invokers); }}; }}Copy the code

Available Cluster I mentioned above, as soon as I find one Available, I call it.

(3) BroadcastCluster

public class BroadcastCluster implements Cluster {

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        // Create a BroadcastClusterInvoker
        return newBroadcastClusterInvoker<T>(directory); }}Copy the code

The key implementation is BroadcastClusterInvoker.

(4) BroadcastClusterInvoker

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked"."rawtypes"})
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // Check whether invokers is empty
        checkInvokers(invokers, invocation);
        // Put invokers in context
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;
        // Iterate through the invokers one by one. At the end of the loop, an error will be reported whenever any of the invokers fail
        for (Invoker<T> invoker : invokers) {
            try {
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = newRpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); }}if(exception ! =null) {
            throw exception;
        }
        returnresult; }}Copy the code

(5) ForkingCluster

public class ForkingCluster implements Cluster {

    public final static String NAME = "forking";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        / / create ForkingClusterInvoker
        return newForkingClusterInvoker<T>(directory); }}Copy the code

(6) ForkingClusterInvoker

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

    /** * thread pool * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
     * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
     */
    private final ExecutorService executor = Executors.newCachedThreadPool(
            new NamedInternalThreadFactory("forking-cluster-timer".true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked"."rawtypes"})
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            // Check whether invokers is empty
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            // Get the forks configuration
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            // Get the timeout configuration
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // If the forks configuration is incorrect, assign Invokers directly to Selected
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<Invoker<T>>();
                // Loop through the forks Invoker and add it to the selected
                for (int i = 0; i < forks; i++) {
                    // TODO. Add some comment here, refer chinese version for more details.
                    / / select Invoker
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    if(! selected.contains(invoker)) {//Avoid add the same invoker several times.
                        // Add to the selected collectionselected.add(invoker); }}}// Put it in context
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
            // Iterate over the Selected list
            for (final Invoker<T> invoker : selected) {
                Create a thread of execution for each Invoker
                executor.execute(new Runnable() {
                    @Override
                    public void run(a) {
                        try {
                            // Make a remote call
                            Result result = invoker.invoke(invocation);
                            // Store the result in a blocking queue
                            ref.offer(result);
                        } catch (Throwable e) {
                            // Exception objects are raised only if value is greater than or equal to selection.size ()
                            // To prevent anomalies from overwriting normal results
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                // Store the exception object to the blocking queueref.offer(e); }}}}); }try {
                // Fetch the result of the remote call from the blocking queue
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                // If it is an exception, it is thrown
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0."Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e.getCause() ! =null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e); }}finally {
            // clear attachments which is binding to current thread.RpcContext.getContext().clearAttachments(); }}}Copy the code

(7) FailbackCluster

public class FailbackCluster implements Cluster {

    public final static String NAME = "failback";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        // Create a FailbackClusterInvoker
        return newFailbackClusterInvoker<T>(directory); }}Copy the code

FailbackClusterInvoker (eight)

1. The attribute

private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

// Retry interval
private static final long RETRY_FAILED_PERIOD = 5 * 1000;

/** * timer * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
 * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
 */
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2.new NamedInternalThreadFactory("failback-cluster-timer".true));

/** * failed set */
private finalConcurrentMap<Invocation, AbstractClusterInvoker<? >> failed =newConcurrentHashMap<Invocation, AbstractClusterInvoker<? > > ();/** * future */
private volatileScheduledFuture<? > retryFuture;Copy the code

2.doInvoke

@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        // Check whether invokers is empty
        checkInvokers(invokers, invocation);
        // Select Invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        / / call
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                + e.getMessage() + ",", e);
        // If it fails, it is added to the failure queue and waits for a retry
        addFailed(invocation, this);
        return new RpcResult(); // ignore}}Copy the code

This method is to select the invoker call logic, when the exception is thrown, do the mechanism of failure retry, mainly implemented in addFailed.

3.addFailed

private void addFailed(Invocation invocation, AbstractClusterInvoker
        router) {
    if (retryFuture == null) {
        / / lock
        synchronized (this) {
            if (retryFuture == null) {
                // Create a scheduled task and execute it every 5 seconds
                retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                    @Override
                    public void run(a) {
                        // collect retry statistics
                        try {
                            // Retry the failed call
                            retryFailed();
                        } catch (Throwable t) { // Defensive fault tolerance
                            logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); }}}// Add invocation and invoker to failed
    failed.put(invocation, router);
}
Copy the code

What this method does is create the timer, and then put the failed call into the collection.

4.retryFailed

void retryFailed(a) {
    // If the failure queue is 0, return
    if (failed.size() == 0) {
        return;
    }
    // Traverse the failure queue
    for(Map.Entry<Invocation, AbstractClusterInvoker<? >> entry :newHashMap<Invocation, AbstractClusterInvoker<? >>( failed).entrySet()) {// Get the session domain
        Invocation invocation = entry.getKey();
        / / get the invokerInvoker<? > invoker = entry.getValue();try {
            // call again
            invoker.invoke(invocation);
            // Remove from the failure queue
            failed.remove(invocation);
        } catch (Throwable e) {
            logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); }}}Copy the code

This method is the mechanism by which invoker is re-invoked when the call fails.

FailfastCluster (9)

public class FailfastCluster implements Cluster {

    public final static String NAME = "failfast";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        / / create FailfastClusterInvoker
        return newFailfastClusterInvoker<T>(directory); }}Copy the code

FailfastClusterInvoker (10)

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // Check whether invokers is empty
        checkInvokers(invokers, invocation);
        // Select an invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            / / call
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                // Throw an exception
                throw (RpcException) e;
            }
            // Throw an exception
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0."Failfast invoke providers " + invoker.getUrl() + "" + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e.getCause() ! =null? e.getCause() : e); }}}Copy the code

The logic is simple, call throw exception directly throw.

(xi) FailoverCluster

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        / / create FailoverClusterInvoker
        return newFailoverClusterInvoker<T>(directory); }}Copy the code

(12) FailoverClusterInvoker

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked"."rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // Copy an invoker collection
        List<Invoker<T>> copyinvokers = invokers;
        // Check whether it is null
        checkInvokers(copyinvokers, invocation);
        // Get the number of retries
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        // Record the last exception
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        // loop, retry on failure
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            // Re-enumerate Invoker before retrying. The advantage of this is that if a service dies,
            // Call list to get the most recent Invoker list available
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                // check again
                // Check whether copyinvokers is empty
                checkInvokers(copyinvokers, invocation);
            }
            // Select Invoker by load balancing
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            // Add to the Invoker to the Invoked list
            invoked.add(invoker);
            // Set the field invoked to the RPC context
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // Invoke the target Invoker invoke method
                Result result = invoker.invoke(invocation);
                if(le ! =null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + "(" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally{ providers.add(invoker.getUrl().getAddress()); }}// If the retry fails, an exception is thrown
        throw newRpcException(le ! =null ? le.getCode() : 0."Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + "(" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "+ (le ! =null ? le.getMessage() : ""), le ! =null&& le.getCause() ! =null? le.getCause() : le); }}Copy the code

This class implements a fail-retry tolerance strategy. When a call fails, an exception is logged and the next selected invoker is looped until the number of retries runs out and the last exception is thrown.

(13) FailsafeCluster

public class FailsafeCluster implements Cluster {

    public final static String NAME = "failsafe";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        / / create FailsafeClusterInvoker
        return newFailsafeClusterInvoker<T>(directory); }}Copy the code

(14) Failsafe Lusterinvoker

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            // Check whether invokers is empty
            checkInvokers(invokers, invocation);
            // Select an invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            / / call
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // If failure prints an exception, a null result is returned
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult(); // ignore}}}Copy the code

The logic is simple, that is, no exception is thrown, just print exceptions.

Afterword.

The source code for this section is github.com/CrazyHZM/in…

This article explains the implementation of cluster in the cluster, describes several invocation methods and fault tolerance strategy. I’ll start with the configuration rules section of the cluster module.