Dubbo cluster fault tolerance
In a distributed environment, services are typically clustered, so it is a matter of which of the many services to call and what to do if they fail. We did it all in Dubbo. Dubbo designed a cluster fault tolerance mechanism, and Dubbo helped us deal with the failed logic after the caller failed to call. Dubbo defines Cluster and Invoker. The implementation class of Cluster interface is divided into: FailOverCluster, FailFastCluster, FailSafeCluster, FailBackCluster, ForkingCluster and BroadcastCluster, corresponding to XxxInvoker classes
FailOverCluster
Dubbo default cluster fault tolerance policy. If the invocation fails, the consumer tries again. If the invocation fails, an exception is thrown. The default number of retries is 2.
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return newFailoverClusterInvoker<T>(directory); }}Copy the code
The job of FailoverCluster is to combine multiple invokers into a single Invoker, that is, to create the FailoverClusterInvoker. The specific fault tolerant logic is corresponding to the Invoker. The AbstractClusterInvoker#invoke method is invoked for a specific service invocation, and the template doInvoke method is finally invoked, with subclasses completing the specific invocation logic.
FailoverClusterInvoker# doInvoke:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// The Invokers are a collection of invokers obtained from the service list and routed
List<Invoker<T>> copyinvokers = invokers;
// Check whether Invokers is empty or a null-throw exception
checkInvokers(copyinvokers, invocation);
// Get the name of the called method
String methodName = RpcUtils.getMethodName(invocation);
The default value is 2. Retries exclude the first call, so + 1 is required
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
// If this parameter is not valid, set it to 1
if (len <= 0) {
len = 1;
}
// Record the last exception after the number of retries is reached
RpcException le = null;
// Record the services that have been invoked
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
Set<String> providers = new HashSet<String>(len);
Retry Dubbo's default cluster fault tolerance policy after the call fails
for (int i = 0; i < len; i++) {
if (i > 0) {
// It is necessary to re-check that the service directory has been re-fetched and route to get the new Invoker list
checkWhetherDestroyed();
// Get the list of services again
copyinvokers = list(invocation);
// Retest whether invokers is empty
checkInvokers(copyinvokers, invocation);
}
// Get the service that needs to be called eventually through load balancing
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// If the invocation succeeds, the system returns a failure and obtains the service list to perform load balancing
// If the failed service is still available after load balancing, load balancing is performed again
// The same service will not be dropped
Result result = invoker.invoke(invocation);
if(le ! =null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ "(" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally{ providers.add(invoker.getUrl().getAddress()); }}// If the retry fails, an exception is thrown
throw newRpcException(le ! =null ? le.getCode() : 0."Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ "(" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "+ (le ! =null ? le.getMessage() : ""), le ! =null&& le.getCause() ! =null ? le.getCause() : le);
}
Copy the code
FailFastCluster
Fail fast. Only one call is made. When the call fails, an exception is thrown directly.
Likewise, the final call will come to the FailfastClusterInvoker doInvoke method, and the analysis of the front logic will no longer be done, and the analysis of the doInvoke method will directly skip the foreplay
FailFastClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// Check whether Invokers is empty or a null-throw exception
checkInvokers(invokers, invocation);
// Execute load balancing logic
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
// Perform the call
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) {
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0."Failfast invoke providers " + invoker.getUrl() + "" + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e.getCause() ! =null? e.getCause() : e); }}Copy the code
As you can see, the logic is very simple: an exception is thrown when the call fails without much analysis
FailSafeCluster
Failure security: After a call fails, exceptions are not thrown but failure logs are logged and empty results are returned
FailSafeClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
// Check whether Invokers is empty or a null-throw exception
checkInvokers(invokers, invocation);
// Execute load balancing logic
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// Perform the call
return invoker.invoke(invocation);
} catch (Throwable e) {
// Record the exception log after the invocation failure
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
// Returns an empty result
return newRpcResult(); }}Copy the code
FailBackCluster
Record failure logs after the invocation fails. Empty result is returned first. Add the incorrect invocation to the scheduled task and execute it at an interval of 5 seconds.
FailBackClusterInvoker#doInvoke
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
// Check whether Invokers is empty or a null-throw exception
checkInvokers(invokers, invocation);
// Execute load balancing logic
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// Perform the call
return invoker.invoke(invocation);
} catch (Throwable e) {
// Record the invocation failure exception log
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ",", e);
// Add the error call to the scheduled task every 5 seconds
addFailed(invocation, this);
// Return an empty result
return newRpcResult(); }}Copy the code
FailBackClusterInvoker#addFailed
private volatileScheduledFuture<? > retryFuture;private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2.new NamedInternalThreadFactory("failback-cluster-timer".true));
private finalConcurrentMap<Invocation, AbstractClusterInvoker<? >> failed =newConcurrentHashMap<Invocation, AbstractClusterInvoker<? > > ();private void addFailed(Invocation invocation, AbstractClusterInvoker
router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
// Create a scheduled task and execute it every five seconds
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run(a) {
try {
retryFailed();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); }}}// Add invocation and invoker to failed
failed.put(invocation, router);
}
Copy the code
void retryFailed(a) {
if (failed.size() == 0) {
return;
}
/ / traverse failed
for(Map.Entry<Invocation, AbstractClusterInvoker<? >> entry :newHashMap<Invocation, AbstractClusterInvoker<? >>( failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<? > invoker = entry.getValue();try {
// Perform the call
invoker.invoke(invocation);
// Remove from failed after successful invocation
failed.remove(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); }}}Copy the code
ForkingCluster
Multiple services are invoked in parallel, and success is returned when one of them returns success
ForkingClusterInvoker#doInvoke
// Create a thread pool
private final ExecutorService executor = Executors.newCachedThreadPool(
new NamedInternalThreadFactory("forking-cluster-timer".true));
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
// Check whether Invokers is empty or a null-throw exception
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
// Get the value of the forks parameter, which defaults to 2
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
// Get the value of timeout. The default value is 1000
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// If the forks parameter is not set properly, assign Invokers to selected
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
// Select forks invoker for load balancing
for (int i = 0; i < forks; i++) {
// Load balancing
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
// Filter repeated invokers
if(! selected.contains(invoker)) { selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected);final AtomicInteger count = new AtomicInteger();
// Create a blocking queue
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
// The thread pool performs multiple invokers
executor.execute(new Runnable() {
@Override
public void run(a) {
try {
// Perform the call
Result result = invoker.invoke(invocation);
// Put it in the blocking queue
ref.offer(result);
} catch (Throwable e) {
// Incrementing after a failed call, when incrementing >= the size of the invoker list, the exception is queued
// Why is >= because if only one of them succeeds, we can return success
// The exception is added to the blocking queue when all calls fail
// If there is no element in the queue, it will block until there is an element, and if there is a successful return, it will be added to the queue
int value = count.incrementAndGet();
if(value >= selected.size()) { ref.offer(e); }}}}); }try {
// There are two outcomes
// 1. The call was successful and the result was retrieved from the queue
// 2. All services fail to be invoked, and the elements in the queue are Throwable exceptions
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0."Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e.getCause() ! =null ? e.getCause() : e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e); }}finally{ RpcContext.getContext().clearAttachments(); }}Copy the code
BroadcastCluster
Broadcasting, as the name suggests, calls each service and throws an exception if one of them returns a failure (that is, if most of the machines succeed and only one fails), just like a transaction. Note: Here you can configure the ratio of failed machines, such as: @reference(cluster = “broadcast”, parameters = {“broadcast.fail.percent”, “20”}) indicates that 20% of the machines in the cluster return a failure, and no other machines will be invoked.
BroadcastClusterInvoker#doInvoke
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// Check whether Invokers is empty or a null-throw exception
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
// Used to save exception information
RpcException exception = null;
Result result = null;
// Invoke each service
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = newRpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); }}// Throw an exception whenever there is a failure
if(exception ! =null) {
throw exception;
}
return result;
}
Copy the code