preface

In the previous chapter, we addressed the question: What do we do with the current request when a service invocation fails? Throw an exception or retry?

To solve this problem, Dubbo defines the Cluster interface Cluster and Cluster Invoker. The purpose of a Cluster Cluster is to combine multiple service providers into a Cluster Invoker and expose this Invoker to the service consumer. As a result, the service consumer only needs to make remote calls through the Invoker, leaving it to the cluster module to decide which service provider to call and what to do if the call fails.

A, merging,

During the service reference process, we end up encapsulating one or more service provider Invokers as service catalog objects, but we also end up merging and transforming them into Cluster Invoker objects. Invoker invoker = cluster.join(directory);

In Dubbo, the default is Failover, so the above code will call:

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        returnnew FailoverClusterInvoker<T>(directory); }}Copy the code

The code above is simple, so the final Invoker object points to the FailoverClusterInvoker instance. It is also an Invoker, which inherits the AbstractClusterInvoker.

Let’s look at the Invoke method in the AbstractClusterInvoker class.

public abstract class AbstractClusterInvoker<T> implements Invoker<T> { public Result invoke(final Invocation invocation) throws RpcException { LoadBalance loadbalance = null; List<Invoker<T>> Invokers = Directory. List (Invocation);if(invokers ! = null && ! Invokers. IsEmpty ()) {/ / load the load balancing module loadbalance = ExtensionLoader getExtensionLoader (loadbalance. Class). getExtension(invokers.get(0).getUrl(). getMethodParameter(invocation.getMethodName(),"loadbalance"."random")); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // Call subclass implementation, different cluster fault tolerance mechanismreturn doInvoke(invocation, invokers, loadbalance); }}Copy the code

The above code is also very simple, we divided into three steps to look at

  • Invoke the service catalog to get a list of all service providers
  • Load a load balancing component
  • Call the subclass implementation and forward the request

We’ll learn more about load balancing later, except that it’s responsible for returning one of several Invokers.

Second, cluster fault tolerance strategy

Dubbo provides multiple cluster fault tolerance mechanisms. Mainly as follows:

  • Failover Cluster – Automatic switchover fails

FailoverClusterInvoker If the invocation fails, Invoker is automatically switched to retry. By default, Dubbo uses this class as the default Cluster Invoker.

  • Failfast Cluster – Fast failure

FailfastClusterInvoker is invoked only once and an exception is thrown immediately after a failure.

  • Failsafe Cluster – Failsafe

FailsafeClusterInvoker When an exception occurs during invocation, the system prints the exception but does not throw the exception.

  • Failback Cluster – Automatic recovery after failure

The FailbackClusterInvoker returns an empty result to the service provider after the invocation fails. In addition, the failed invocation is retransmitted through scheduled tasks, which is suitable for message notification.

  • Forking Cluster – Invokes multiple service providers in parallel

ForkingClusterInvoker creates multiple threads from a thread pool at run time to concurrently invoke multiple service providers. The doInvoke method terminates as soon as one of the service providers successfully returns a result. The application scenario of ForkingClusterInvoker is used in some read operations that require high real-time performance (note that it is read operations, and parallel write operations may be unsafe), but this will consume more resources.

  • BroadcastClusterInvoker radio –

BroadcastClusterInvoker calls each service provider one by one. If one of them fails, the BroadcastClusterInvoker throws an exception when the loop ends. This class is typically used to notify all providers to update local resource information such as caches or logs.

Three, automatic switching

FailoverClusterInvoker If the invocation fails, Invoker is automatically switched to retry. Let’s focus on its doInvoke method.

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; // Check whether invokers is empty (copyinvokers, Invocation); Int len = getUrl().getMethodParameter(Invocation.getMethodName(),"retries", 2) + 1;if(len <= 0) { len = 1; } // RpcException = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); Set<String> providers = new HashSet<String>(len); // Loop call failed retry len timesfor (int i = 0; i < len; i++) {
		if(i > 0) { checkWhetherDestroyed(); // Re-get the service provider list copyinvokers = list(Invocation); // Check checkInvokers(Vice versa); } // Select an Invoker Invoker<T> Invoker = SELECT (loadBalance, Invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); Invocation Result Result = Invocation. Invoke (Invocation);if(le ! = null && logger.isWarnEnabled()) { logger.warn("");
			}
			return result;
		} catch (RpcException e) {
			if(e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } // throw new RpcException("");
}
Copy the code

As we can see, the point is that invoker is called in a circular method. As long as there is no return, it will keep calling, retry len times. Let’s summarize the process:

  • Check whether invokers are empty
  • Gets the number of retries. The default value is 3
  • Into the loop
  • If it is retry, get the list of service providers again and verify
  • Select Invoker and call
  • No exception, return the result, end of the loop
  • If an exception is caught, continue the loop until the maximum number of retries

4. Fail fast

FailfastClusterInvoker is simple, making only one call and throwing an exception immediately after failure.

public Result doInvoke(Invocation invocation, 
		List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
		
	checkInvokers(invokers, invocation);
	Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
	try {
		return invoker.invoke(invocation);
	} catch (Throwable e) {
		if (e instanceof RpcException && ((RpcException) e).isBiz()) {
			throw (RpcException) e;
		}
		throw new RpcException("..."); }}Copy the code

Five, failure safety

FailsafeClusterInvoker is not very different from FailsafeClusterInvoker in that it does not throw an exception when the call fails. Instead, it prints an exception message and returns an empty result object.

public Result doInvoke(Invocation invocation, 
	List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
	
	try {
		checkInvokers(invokers, invocation);
		Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
		return invoker.invoke(invocation);
	} catch (Throwable e) {
		logger.error("Failsafe ignore exception: " + e.getMessage(), e);
		returnnew RpcResult(); }}Copy the code

Six, automatic recovery

After the invocation fails, the FailbackClusterInvoker also prints an exception and returns an empty result object. However, before the invocation ends, the FailbackClusterInvoker starts a scheduled task and invokes it again.

protected Result doInvoke(Invocation invocation, 
		List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
	
	try {
		checkInvokers(invokers, invocation);
		Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
		return invoker.invoke(invocation);
	} catch (Throwable e) {
		logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
				+ e.getMessage() + ",", e); // addFailed(invocation, this);returnnew RpcResult(); }}Copy the code

As you can see, after the call fails, in addition to printing an exception message and returning an empty result object, there is another method addFailed which is where the scheduled task is started.

1. Enable scheduled tasks

First, define a thread pool object that contains two threads.

Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));

Then, after a five-second delay, call the retryFailed method every five seconds until the call succeeds.

private void addFailed(Invocation invocation, AbstractClusterInvoker<? > router) {if (retryFuture == null) {
		synchronized (this) {
			if (retryFuture == null) {
				retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
					public void run() {try {// retry method retryFailed(); } catch (Throwable t) { logger.error("Unexpected error occur at collect statistic", t); } } }, 5000, 5000, TimeUnit.MILLISECONDS); //ConcurrentHashMap Failed. Put (Invocation, router); }Copy the code

Finally, we need to note that failed. Put (Router); It adds the currently failed task to Failed, which is a ConcurrentHashMap object.

2, try again

The logic of retry is also not complicated. You can call the failed object to retrieve the failed record.

void retryFailed() {// If empty, there are no failed tasksif (failed.size() == 0) {
		return; Set<Entry<Invocation, AbstractClusterInvoker<? >>> failedSet = failed.entrySet();for(Entry<Invocation, AbstractClusterInvoker<? >> entry : failedSet) { Invocation invocation = entry.getKey(); Invoker<? > invoker = entry.getValue(); Invocation invocation. Invoke (Invocation); // Remove invoker failed. Remove (Invocation) from failed; } catch (Throwable e) { logger.error("...", e); }}}Copy the code

In the code above, the point is to remove the Invocation after the invocation succeeds. When this method is called again and the initial condition is determined to be true, it returns directly and does not continue the call.

Seven, parallel call

ForkingClusterInvoker creates multiple threads from a thread pool at run time to concurrently invoke multiple service providers. The doInvoke method terminates as soon as one of the service providers successfully returns a result.

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { final List<Invoker<T>> selected; Final int forks = getUrl().getParameter()"forks", 2); Final int timeout = getUrl().getParameter()"timeout", 1000);
	if (forks <= 0 || forks >= invokers.size()) {
		selected = invokers;
	} else{ selected = new ArrayList<Invoker<T>>(); // Select Invoker and add it to Selectedfor (int i = 0; i < forks; i++) {
			Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
			if(! selected.contains(invoker)) {//Avoid add the same invoker several times. selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); // Final BlockingQueue<Object> ref = new blocdBlockingQueue <Object>();for (final Invoker<T> invoker : selected) {
		executor.execute(new Runnable() {
			@Override
			public void run() {try {// Call the service to put the Result into the queue Result Result = invoker. Invocation; ref.offer(result); } catch (Throwable e) {int value = count.incrementandGet ();if(value >= selected.size()) { ref.offer(e); }}}}); } try {Object ret = ref.poll(timeout, timeunit.milliseconds);if (ret instanceof Throwable) {
			Throwable e = (Throwable) ret;
			throw new RpcException("...");
		}
		return(Result) ret; } catch (InterruptedException e) { throw new RpcException(e.getMessage(), e); }}Copy the code

The focus of the above code is to block the LinkedBlockingQueue. If a result is put in, the poll method returns immediately, completing the call. Let’s summarize the overall process:

  • Gets the maximum number of parallelism, default is 2; Get timeout
  • Select Invoker and add it to Selected
  • Create multiple threads using newCachedThreadPool to invoke the service.
  • After a normal return, offer the result to the queue. At this point, the call process ends and a normal message is returned.
  • After invoking service exceptions, determine whether the number of exceptions is greater than or equal to the maximum number of parallelism. If the condition is true, the exception information is sent to the queue. At this time, the invocation process ends and the exception information is returned.

Eight, radio,

BroadcastClusterInvoker calls each service provider one by one. If one of them fails, the BroadcastClusterInvoker throws an exception when the loop ends.

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; // Loop to invoke the servicefor(Invoker<T> invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); }} // Exceptionif(exception ! = null) { throw exception; }return result;
}
Copy the code