Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, at the same time, welcome to add my personal wechat “JAVA_front” to communicate and learn together

0 Article Overview

In RPC scenarios, repeated data problems caused by retry or failure to implement idempotency mechanism must be paid attention to. For example, multiple orders may be created for one purchase, and a notification message may be sent multiple times. These are problems that technical personnel must face and solve.

One might say: why duplicate data problems when the program does not display retries when the call fails? This is because even though retries are not shown, the RPC framework automatically retries in the cluster fault tolerance mechanism, and this issue must be of concern.

In this paper, we take DUBBO framework as an example to analyze why retry, how to retry, how to do idempotent three problems.



1 Why Do I Retry?

If an RPC interaction process is simply classified, it can be divided into three categories: response success, response failure, and no response.



The consumer handles both successful and failed responses very well. Because the response information is clear, you simply continue processing the success or failure logic based on the response information. But the no-response scenario is more difficult to deal with because no response may include the following:

(1) The producer did not receive the request at all. (2) the producer received the request and successfully processed it, but the consumer did not receive the response. (3) The producer received the request and successfully processed it, but the consumer did not receive the responseCopy the code

If you are an RPC framework designer, should you retry or abandon the call? The final choice depends on the business characteristics. Some services are idempotent in nature, but others cannot allow retries without duplicating data.

So who is most familiar with the business features? The answer is the consumer, because the consumer, as the caller, must be most familiar with his business, so the RPC framework simply provides some policies for the consumer to choose from.


2 How do I retry

2.1 Cluster fault tolerance Policy

DUBBO, as an excellent RPC framework, provides the following cluster fault tolerance policies for consumers to choose from:

Failover Failfast Failsafe Failback asynchronous retry Forking: parallel invocation Broadcast: Broadcast invocationCopy the code

(1) Failover

Failover policy. As the default policy, when the consumption is abnormal, a producer node is selected by the load balancing policy until the number of retries reaches

(2) Failfast

Quick fail strategy. The consumer consumes the service only once and throws it when an exception occurs

(3) Failsafe

Security failure policy. The consumer consumes the service only once, and if it fails, wraps an empty result without throwing an exception

(4) Failback

Asynchronous retry policy. An empty result is returned when a consumption exception occurs, and the failed request is asynchronously retried. If the maximum number of retries is exceeded and the retries are not successful, retry is abandoned and no exception is thrown

(5) Forking

Parallel invocation strategy. The consumer invokes multiple producers concurrently through the thread pool, and one success counts as success

(6) Broadcast

Broadcast call policy. The consumer traverses all producer nodes and throws an exception on any of them


2.2 Source Code Analysis

2.2.1 Failover

The Failover policy is used as the default policy. When consumption is abnormal, a producer node is selected based on the load balancing policy until the number of retries reaches. Even if the business code does not show retries, it is possible to execute the consumption logic multiple times, resulting in duplicate data:

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // All producer Invokers
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);

        // Get the number of retries
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        RpcException le = null;

        // The producer that has already been called
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
        Set<String> providers = new HashSet<String>(len);

        Retry until the maximum number of times is reached
        for (int i = 0; i < len; i++) {
            if (i > 0) {

                // An exception is thrown if the current instance is destroyed
                checkWhetherDestroyed();

                // Select the available producers Invokers according to the routing policy
                copyInvokers = list(invocation);

                // Check again
                checkInvokers(copyInvokers, invocation);
            }

            // Load balancing selects a producer Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // Service consumption initiates a remote invocation
                Result result = invoker.invoke(invocation);
                if(le ! =null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + "(" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
                }
                // Return if there is a result
                return result;
            } catch (RpcException e) {
                // The service exception is thrown directly
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                // RpcException is not thrown and retry
                le = new RpcException(e.getMessage(), e);
            } finally {
                // Save producers that have already been visitedproviders.add(invoker.getUrl().getAddress()); }}throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + "(" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: "+ le.getMessage(), le.getCause() ! =null? le.getCause() : le); }}Copy the code

When RpcException (such as timeout exception) occurs when A consumer invokes producer node A, before the maximum number of retries is reached, the consumer selects another producer node for consumption through the load balancing policy. Imagine if producer node A actually processed successfully, but did not return the successful result to the consumer in A timely manner, then retry might cause duplicate data problems.


2.2.2 Failfast

Quick fail strategy. The consumer consumes the service only once, and when an exception occurs it is thrown without a retry:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // Check whether the producer Invokers are valid
        checkInvokers(invokers, invocation);

        // Load balancing selects a producer Invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            // Service consumption initiates a remote invocation
            return invoker.invoke(invocation);
        } catch (Throwable e) {

            // Service consumption fails and an exception is thrown without a retry
            if (e instanceof RpcException && ((RpcException) e).isBiz()) {
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0."Failfast invoke providers " + invoker.getUrl() + "" + loadbalance.getClass().getSimpleName()
                                   + " select from all providers " + invokers + " for service " + getInterface().getName()
                                   + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                   + " use dubbo version " + Version.getVersion()
                                   + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e.getCause() ! =null? e.getCause() : e); }}}Copy the code


2.2.3 Failsafe

Security failure policy. The consumer consumes the service only once, and if it fails, wraps an empty result, throws no exception, and does not retry:

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {

            // Check whether the producer Invokers are valid
            checkInvokers(invokers, invocation);

            // Load balancing selects a producer Invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

            // Service consumption initiates a remote invocation
            return invoker.invoke(invocation);

        } catch (Throwable e) {
            // Consume failure wrapped as an empty result object
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return newRpcResult(); }}}Copy the code


2.2.4 Failback

Asynchronous retry policy. An empty result is returned when a consumption exception occurs, and the failed request is asynchronously retried. If the retry exceeds the maximum number of retries and is not successful, retry is abandoned without throwing an exception:

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    private static final long RETRY_FAILED_PERIOD = 5;

    private final int retries;

    private final int failbackTasks;

    private volatile Timer failTimer;

    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);

        int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
        if (retriesConfig <= 0) {
            retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
        }
        int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
        if (failbackTasksConfig <= 0) {
            failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
        }
        retries = retriesConfig;
        failbackTasks = failbackTasksConfig;
    }

    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    // Create a timer
                    failTimer = new HashedWheelTimer(new NamedThreadFactory("failback-cluster-timer".true), 1, TimeUnit.SECONDS, 32, failbackTasks); }}}// Construct a scheduled task
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            // The scheduled task is put into a timer to wait for execution
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: "+ e.getMessage()); }}@Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {

            // Check whether the producer Invokers are valid
            checkInvokers(invokers, invocation);

            // Be responsible for balancing the selection of a producer Invoker
            invoker = select(loadbalance, invocation, invokers, null);

            // The consuming service initiates a remote invocation
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ",", e);

            // If the service consumption fails, the failed request is logged
            addFailed(loadbalance, invocation, invokers, invoker);

            // Returns an empty result
            return newRpcResult(); }}@Override
    public void destroy(a) {
        super.destroy();
        if(failTimer ! =null) { failTimer.stop(); }}/** * RetryTimerTask */
    private class RetryTimerTask implements TimerTask {
        private final Invocation invocation;
        private final LoadBalance loadbalance;
        private final List<Invoker<T>> invokers;
        private final int retries;
        private final long tick;
        private Invoker<T> lastInvoker;
        private int retryTimes = 0;

        RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
            this.loadbalance = loadbalance;
            this.invocation = invocation;
            this.invokers = invokers;
            this.retries = retries;
            this.tick = tick;
            this.lastInvoker = lastInvoker;
        }

        @Override
        public void run(Timeout timeout) {
            try {
                // Load balancing selects a producer Invoker
                Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                lastInvoker = retryInvoker;

                // Service consumption initiates a remote invocation
                retryInvoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);

                // No exception is thrown when the maximum number of retries is exceeded
                if ((++retryTimes) >= retries) {
                    logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                } else {
                    // The timer does not exceed the maximum number of retriesrePut(timeout); }}}private void rePut(Timeout timeout) {
            if (timeout == null) {
                return;
            }

            Timer timer = timeout.timer();
            if (timer.isStop() || timeout.isCancelled()) {
                return; } timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS); }}}Copy the code


2.2.5 Forking

Parallel invocation strategy. The consumer calls multiple producers concurrently through the thread pool, and if only one of them succeeds, it succeeds:

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer".true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;

            // Get configuration parameters
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // Get the list of invokers executed in parallel
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                for (int i = 0; i < forks; i++) {
                    // Select the producer
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    // Prevent repeated Invoker additions
                    if(! selected.contains(invoker)) { selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected);final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {

                // Execute concurrently in the thread pool
                executor.execute(new Runnable() {
                    @Override
                    public void run(a) {
                        try {
                            // Execute the consumption logic
                            Result result = invoker.invoke(invocation);
                            // Store consumption results
                            ref.offer(result);
                        } catch (Throwable e) {
                            // If the number of exceptions is greater than or equal to the forks parameter value, all calls fail. The exception is queued
                            int value = count.incrementAndGet();
                            if(value >= selected.size()) { ref.offer(e); }}}}); }try {
                // Get the result from the queue
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                // Throw an exception if the exception type means that all calls failed
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0."Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e.getCause() ! =null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: "+ e.getMessage(), e); }}finally{ RpcContext.getContext().clearAttachments(); }}}Copy the code


2.2.6 Broadcast

Broadcast call policy. The consumer traverses all producer nodes and throws an exception on any of them:

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;

        // Call all producer nodes through the loop
        for (Invoker<T> invoker : invokers) {
            try {
                // Execute the consumption logic
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = newRpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); }}// If any exception occurs, throw an exception
        if(exception ! =null) {
            throw exception;
        }
        returnresult; }}Copy the code


How do I make 3 idempotent

After the above analysis, we know that the retry mechanism of RPC framework may cause data repetition, so idempotency must be considered in use. Idempotent means that one operation produces the same result as multiple operations, and there is no inconsistency due to multiple operations. Common idempotent schemes include canceled retry, idempotent tables, database locks, and state machines.


3.1 Canceling Retry

There are two ways to cancel retry. The first way is to set the retry times to zero. The second way is to select a cluster fault tolerance policy that does not retry.

<! -- Set the retry count to zero -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" retries="0" />

<! -- Select cluster fault tolerance -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" cluster="failfast" />
Copy the code


3.2 idempotent table

Assume that after the payment is successful, the payment system sends the payment success message to the message queue. The logistics system subscribes to this message and prepares to create a logistics order for this order.

However, the message queue may be pushed repeatedly, and the logistics system may receive the message multiple times. We want to achieve this: no matter how many duplicate messages are received, only one logistics order can be created.

The solution is idempotent table scheme. Create a new idempotent table, this table is used for idempotent, no other business purpose, a field named key has a unique index, this field is idempotent standard.

After the logistics system subscribes to the message, it first tries to insert the idempotent table with the order number as the key field. If the order number already exists, it violates the principle of uniqueness and cannot be inserted successfully, indicating that services have been processed and the message is discarded.

This table has a large amount of data. We can archive the data through scheduled tasks. For example, the data is only reserved for 7 days and other data is stored in the archive table.

Another kind of generalized idempotent table is that we can replace the database with Redis. Before creating the logistics order, we can check whether Redis has the order number data, and we can set a 7 day expiration time for such data.


3.3 the state machine

After the logistics order is successfully created, a message will be sent, and the order system will update the status as complete after subscribing to the message. Assume that the change is to update the order status from 0 to 1. The order system may also receive multiple messages. It may receive a successful order creation message even after the status has been updated to status 1.

The solution is a state machine solution. First, draw the state machine diagram and analyze the state flow pattern. For example, after analysis, state 1 is the final state, and the message will not be processed and discarded even after receiving the message that logistics order is successfully created.


3.4 Database Lock

Database lock can be divided into pessimistic lock and optimistic lock two types, pessimistic lock is to acquire data lock:

select * from table where col='xxx' for update 
Copy the code

Optimistic locking is locking at update time. The first step is to check the data, which contains the Version field. If the record has been modified, the version field has changed and cannot be updated successfully:

update table set xxx,
version = #{version} + 1 
where id = #{id} 
and version = #{version}
Copy the code


4 Article Summary

This article starts with an analysis of why retry is an important option for RPC interaction scenarios with no response. Then we analyze the six cluster fault-tolerant policies provided by DUBBO. As the default policy, Failover provides a retry mechanism. It is possible to initiate multiple calls even if the business code does not display retry, which must be paid attention to. Finally, we analyze several common idempotent schemes, hoping that this paper will be helpful to you.

Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, at the same time, welcome to add my personal wechat “JAVA_front” to communicate and learn together