1. Introduction

Have you ever encountered a similar scenario in development where a message was consumed, but because of a code problem, the business did not execute correctly? Compensation bar, do not know what data to compensate; If you do not make amends, you will be scolded again. Find the production side to push it again, people are not happy, and even do not want to bird you. Such problems can be boiled down to final consistency. What is final consistency? That is, I don’t care what you use, what means, as long as you ensure the success of the business. If so, how would you ensure the ultimate consistency of message consumption? This article will walk you through the ultimate consistency scheme for RabbitMQ message queues.

2. rabbtimq

2.1 Default ACK mode

As you know, using RabbitMQ in the Springboot framework, the ack mode defaults to AUTO, From the code can also confirm this AbstractRabbitListenerContainerFactory setAcknowledgeMode () statement is as follows

/ * * *@param acknowledgeMode the acknowledge mode to set. Defaults to {@link AcknowledgeMode#AUTO}
  * @see AbstractMessageListenerContainer#setAcknowledgeMode(AcknowledgeMode)
  */
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) {
    this.acknowledgeMode = acknowledgeMode;
}
Copy the code

This AUTO means different from the automatic ack you know from rabbitMQ documentation. It means that the rabbitMQ framework performs ack and nACK automatically for you, and you do not need to do it yourself

2.2 AUTO logic code

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR

    Channel channel = consumer.getChannel();

    List<Message> messages = null;
    long deliveryTag = 0;

    for (int i = 0; i < this.batchSize; i++) {
        Message message = consumer.nextMessage(this.receiveTimeout);
        if (message == null) {
            break;
        }
        if (this.consumerBatchEnabled) {
            
        }
        else {
            messages = debatch(message);
            if(messages ! =null) {
                break;
            }
            try {
                executeListener(channel, message);
            }
            catch (ImmediateAcknowledgeAmqpException e) {
            }
            catch (Exception ex) {
                if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {}
                else {
                    // 1. The message execution is abnormal and is rolled back
                    consumer.rollbackOnExceptionIfNecessary(ex);
                    throwex; }}}}if(messages ! =null) {
        executeWithList(channel, messages, deliveryTag, consumer);
    }
    // 2. Message execution is normal and commit
    return consumer.commitIfNecessary(isChannelLocallyTransacted());

}
Copy the code

2.3 Message Submission

public boolean commitIfNecessary(boolean localTx) throws IOException {
    try {
        // 1. Ack mode is auto
        boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
        if (ackRequired && (!this.transactional || isLocallyTransacted)) {
            long deliveryTag = new ArrayList<Long>(this.deliveryTags).get(this.deliveryTags.size() - 1);
            // 2. Confirm the successful consumption message
            this.channel.basicAck(deliveryTag, true); }}finally {
        this.deliveryTags.clear();
    }

    return true;
}
Copy the code

2.4 Message Rollback

public void rollbackOnExceptionIfNecessary(Throwable ex) {

    // 1. Ack mode is auto
    boolean ackRequired = !this.acknowledgeMode.isAutoAck()
        && (!this.acknowledgeMode.isManual() || ContainerUtils.isRejectManual(ex));
    try {
        if (this.transactional) {}
        if (ackRequired) {
            OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
            if (deliveryTag.isPresent()) {
                // 2. Re-queue the message
                this.channel.basicNack(deliveryTag.getAsLong(), true,
                                       ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger)); }}}catch (Exception e) {
       
    }
    finally {
        this.deliveryTags.clear(); }}Copy the code

2.5 summary

As you already know from the above analysis, the framework wraps the consumer logic we wrote with a try catch, and if the consumer logic executes an exception, it puts the message back on the message queue; If the message executes successfully, an ACK operation is performed. With that in mind, let’s take a look at sample consumer code.

2.6 Message consumption example

@RabbitHandler
public void helloConsumer(String content) throws IOException {
    try {
        System.out.println(【receive message】 : + content);
        // Simulate a service exception
        if (Objects.equals(content, "hello rabbitMQ")) {
            throw new NullPointerException();
        }
        System.out.println("Execute operation database logic");
        / /... Subsequent business
    } catch (Exception e) {

    }
}
Copy the code

In the preceding example, if no exception is triggered, services are running normally. When an exception is raised, the business fails and the final consistency cannot be achieved. So what can be done to achieve the ultimate consistency?

2.7 Solutions

After some analysis, you can see that the reason why the above example failed to achieve the final consistency is the developer’s own fault. The developer used a try catch block so that the consumer logic would not throw an exception and the framework would ack it automatically if there were no exceptions. Therefore, to achieve final consistency, just remove the try catch

@Component
@RabbitListener(queues = RabbitmqConfig.QUEUE_NAME)
public class HelloConsumer {

    int number = 0;

    @RabbitHandler
    public void helloConsumer(String content) throws IOException {
        number++;
        System.out.println(【receive message】 : + content);
        // Simulate network jitter, interface invocation failure and service exception occasionally
        if (number < 5) {
            throw new NullPointerException();
        }
        System.out.println("Execute operation database logic");
        / /... Subsequent business}}Copy the code

2.8 Result

【 Receive message】 : Hello rabbitMQ2021-11-18 20:12:49.795  WARN 70173 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.boot.example.HelloConsumer.helloConsumer(java.lang.String) throws java.io.IOException'threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:252) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(Messaging MessageListenerAdapter.java:194) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapt er.java:137) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContai ner.java:1654) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerCo ntainer.java:1573) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContaine r.java:1561) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerConta iner.java:1552) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContain er.java:1496) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContain er.java:968) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer .java:914) [spring-rabbit-2.39..jar:2.39.]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMe ssageListenerContainer.java:1289) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessage ListenerContainer.java:1195) [spring-rabbit-2.39..jar:2.39.]
	at java.lang.Thread.run(Thread.java:748) [na:1.8. 0 _161]
Caused by: java.lang.NullPointerException: null
	at com.boot.example.HelloConsumer.helloConsumer(HelloConsumer.java:27) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8. 0 _161]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8. 0 _161]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8. 0 _161]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8. 0 _161]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.38..jar:5.38.]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.38..jar:5.38.]
	at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:164) ~[spring-rabbit-2.39..jar:2.39.]
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:244) ~[spring-rabbit-2.39..jar:2.39.]...13Common frames omitted [Receive message] : Hello rabbitMQ2021-11-18 20:12:49.804  WARN 70173 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.boot.example.HelloConsumer.helloConsumer(java.lang.String) throws java.io.IOException'threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:252) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(Messaging MessageListenerAdapter.java:194) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapt er.java:137) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContai ner.java:1654) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerCo ntainer.java:1573) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContaine r.java:1561) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerConta iner.java:1552) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContain er.java:1496) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContain er.java:968) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer .java:914) [spring-rabbit-2.39..jar:2.39.]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMe ssageListenerContainer.java:1289) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessage ListenerContainer.java:1195) [spring-rabbit-2.39..jar:2.39.]
	at java.lang.Thread.run(Thread.java:748) [na:1.8. 0 _161]
Caused by: java.lang.NullPointerException: null
	at com.boot.example.HelloConsumer.helloConsumer(HelloConsumer.java:27) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8. 0 _161]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8. 0 _161]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8. 0 _161]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8. 0 _161]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.38..jar:5.38.]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.38..jar:5.38.]
	at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:164) ~[spring-rabbit-2.39..jar:2.39.]
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:244) ~[spring-rabbit-2.39..jar:2.39.]...13Common frames omitted [Receive message] : Hello rabbitMQ2021-11-18 20:12:49.813  WARN 70173 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.boot.example.HelloConsumer.helloConsumer(java.lang.String) throws java.io.IOException'threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:252) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(Messaging MessageListenerAdapter.java:194) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapt er.java:137) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContai ner.java:1654) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerCo ntainer.java:1573) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContaine r.java:1561) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerConta iner.java:1552) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContain er.java:1496) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContain er.java:968) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer .java:914) [spring-rabbit-2.39..jar:2.39.]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMe ssageListenerContainer.java:1289) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessage ListenerContainer.java:1195) [spring-rabbit-2.39..jar:2.39.]
	at java.lang.Thread.run(Thread.java:748) [na:1.8. 0 _161]
Caused by: java.lang.NullPointerException: null
	at com.boot.example.HelloConsumer.helloConsumer(HelloConsumer.java:27) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8. 0 _161]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8. 0 _161]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8. 0 _161]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8. 0 _161]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.38..jar:5.38.]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.38..jar:5.38.]
	at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:164) ~[spring-rabbit-2.39..jar:2.39.]
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:244) ~[spring-rabbit-2.39..jar:2.39.]...13Common frames omitted [Receive message] : Hello rabbitMQ2021-11-18 20:12:49.818  WARN 70173 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.boot.example.HelloConsumer.helloConsumer(java.lang.String) throws java.io.IOException'threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:252) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(Messaging MessageListenerAdapter.java:194) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapt er.java:137) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContai ner.java:1654) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerCo ntainer.java:1573) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContaine r.java:1561) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerConta iner.java:1552) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContain er.java:1496) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContain er.java:968) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer .java:914) [spring-rabbit-2.39..jar:2.39.]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMe ssageListenerContainer.java:1289) [spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessage ListenerContainer.java:1195) [spring-rabbit-2.39..jar:2.39.]
	at java.lang.Thread.run(Thread.java:748) [na:1.8. 0 _161]
Caused by: java.lang.NullPointerException: null
	at com.boot.example.HelloConsumer.helloConsumer(HelloConsumer.java:27) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8. 0 _161]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8. 0 _161]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8. 0 _161]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8. 0 _161]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.38..jar:5.38.]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.38..jar:5.38.]
	at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:164) ~[spring-rabbit-2.39..jar:2.39.]
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) ~[spring-rabbit-2.39..jar:2.39.] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:244) ~[spring-rabbit-2.39..jar:2.39.]...13Common frames omitted [Receive message] : hello rabbitMQ Executes the database logicCopy the code

From the execution results, you can see that although there are occasional exceptions, the final consistency of business data is achieved through retry

The final consistency is achieved by removing the try catch block, so are there other problems with this scheme?

2.9 Problems Caused by Retry

If you just trust your producer and don’t validate the message body content, your business will perform fine in most scenarios. But you always have received the message body let you surprise, your business will throw an exception at this time, an exception is thrown, framework, automatically nack, the message is new in the queue, consumers continue to consume the message, and then put in the message queue and so on, not only consumes server resources, you may also affect the other services, such as log service (because you called the log, The log volume will increase).

Therefore, when using retry to achieve final consistency, you must be careful to use the message retry mechanism to consider that exception scenarios (such as network jitter, interface exception) can be fixed automatically by retry.

2.10 Database Solution

As long as you are doing business logic, there will always be exceptions that you don’t expect to happen, and too much to think about can become a burden. If you want to maximize ultimate consistency, there’s only one thing your consumers need to do: store messages in the database and consume them asynchronously. The probability of database failure is very small. Even if an exception occurs, the message can be written to the database by retry, which can basically meet the needs of your business.