@ (toc) an article in front of the scene and we talked for MQ high availability of how to ensure the success of the message to send, various configuration together, finally ensure the success of the message to send, and even in some extreme cases, sending the same message repeated may happen, no matter how, I finally got my message sent, if friends haven’t seen the last article, It is recommended to have a look at it first and then study this article:

  • Four strategies to ensure RabbitMQ message delivery reliability! Which one do you use?

Today we are going to talk about message consumption and how to ensure that message consumption is successful and idempotent.

1. Two ways of consumption

Message consumption for RabbitMQ generally has two different approaches:

  • Push: MQ actively pushes messages to the consumer. This method requires the consumer to set up a buffer to cache messages. For the consumer, there is always a pile of messages in memory, so this method is more efficient, and this is the consumption method used by most applications today.
  • Pull: Consumers actively pull messages from MQ. This is not very efficient, but it can sometimes be used if the server needs to pull messages in batches.

Let me give you an example of both ways.

Let’s start with push:

A common way to tag consumers with @rabbitListener is as follows:

@Component
public class ConsumerDemo {
    @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
    public void handle(String msg) {
        System.out.println("msg = "+ msg); }}Copy the code

This method is triggered when there is a message in the listening queue.

Now pull:

@Test
public void test01(a) throws UnsupportedEncodingException {
    Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
    System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
}
Copy the code

The receiveAndConvert method is called with the queue name as the parameter. After the method is executed, a message is pulled from MQ. If the method returns null, there is no message on the queue. The receiveAndConvert method has an overloaded method in which you can pass in a wait timeout, such as 3 seconds. At this point, assuming there are no messages in the queue, the receiveAndConvert method blocks for 3 seconds if there is a new message in the queue, and returns null if there is no new message in the queue after 3 seconds. This timeout is set to 0 by default if not set.

This is the message of two different consumption patterns.

If you need to continuously get messages from the message queue, you can use the push pattern; If you are simply consuming a message, use pull mode. Do not put pull patterns into an endless loop, which can seriously affect RabbitMQ performance.

2. Two ideas to ensure the success of consumption

In the last article, we tried our best to ensure that messages can be sent successfully. In fact, there are official mechanisms for successful message consumption. Let’s take a look.

To ensure that messages reach message consumers reliably, RabbitMQ provides message consumption confirmation. When a consumer consumes a message, an autoAck parameter can be specified to indicate how the message consumption is acknowledged.

  • When autoAck is false, RabbitMQ will not remove the message immediately, even if the consumer has already received it, but will wait for an explicit acknowledgement from the consumer before marking the message for deletion and then deleting it.
  • When autoAck is true, the message consumer automatically sets the sent messages to acknowledge and then removes them (from memory or disk), even if they do not reach the consumer.

Let’s look at a picture:

On the RabbitMQ Web management page, as shown above:

  • Ready indicates the number of messages to be consumed.
  • Unacked indicates the number of messages that have been sent to a consumer but have not yet received an ACK from the consumer.

This is where we can look at the consumption of messages at the UI level and confirm them.

When autoAck is set to false, consumption for RabbitMQ is split into two parts:

  • News of pending consumption
  • The message has been delivered to the consumer, but has not been confirmed by the consumer

In other words, when autoAck is set to false, the consumer is comfortable enough to process the message, and when the message has been properly processed, RabbitMQ will manually ack it and consider it successful. If RabbitMQ does not receive any feedback from the client and the client is disconnected, RabbitMQ will put the message back into the queue for the next time it is consumed.

To sum up, ensuring that messages are successfully consumed is nothing more than manual Ack or automatic Ack. Of course, either way, messages can end up being consumed repeatedly, so in general we also need to address idempotent issues when dealing with messages.

3. Message rejection

When a client receives a message, it can choose to consume the message or reject the message. Let’s look at ways of saying no:

@Component
public class ConsumerDemo {
    @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
    public void handle(Channel channel, Message message) {
        // Get the message number
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // Reject the message
            channel.basicReject(deliveryTag, true);
        } catch(IOException e) { e.printStackTrace(); }}}Copy the code

After receiving the message, consumers can choose to refuse to consume the message, and the steps of refusing are divided into two steps:

  1. Get message number deliveryTag.
  2. Call the basicReject method to reject the message.

When the basicReject method is called, the second argument is Requeue, which is whether to rejoin the queue. If the second argument is true, the rejected message is re-queued for consumption. If the second argument is false, the rejected message is discarded and no new consumers will consume it.

Note that the basicReject method can only reject one message at a time.

4. Confirm the message

Message confirmation can be divided into automatic confirmation and manual confirmation. Let’s look at them separately.

4.1 Automatic Confirmation

Let’s start with automatic validation, which is what message consumption is by default in Spring Boot.

Let’s look at the following message consumption method:

@Component
public class ConsumerDemo {
    @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
    public void handle2(String msg) {
        System.out.println("msg = " + msg);
        int i = 1 / 0; }}Copy the code

Inject the current class into the Spring container with the @Componet annotation, and mark a message consuming method with the @RabbitListener annotation. By default, the message consuming method has its own transaction, that is, if the method throws an exception during execution, The consumed message is returned to the queue for the next time it is consumed. If the method completes without an exception, the message is consumed.

4.2 Manual Confirmation

Manual confirmation I divide it into two kinds: push mode manual confirmation and pull mode manual confirmation.

4.2.1 Manual confirmation in Push Mode

To enable manual confirmation, we need to first turn off automatic confirmation, which is as follows:

spring.rabbitmq.listener.simple.acknowledge-mode=manual
Copy the code

This configuration changes the message validation mode to manual validation.

Let’s look at the code in the consumer:

@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle3(Message message,Channel channel) {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        // The message consumption code is written here
        String s = new String(message.getBody());
        System.out.println("s = " + s);
        // After the consumption is complete, manually ack
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        / / nack manually
        try {
            channel.basicNack(deliveryTag, false.true);
        } catch(IOException ex) { ex.printStackTrace(); }}}Copy the code

Put consumer actions into a try.. In the catch block.

If the message is successfully consumed normally, basicAck is executed to complete the confirmation.

If message consumption fails, the basicNack method is executed to tell RabbitMQ that message consumption failed.

There are two methods involved:

  • BasicAck: This method manually confirms that the message has been successfully consumed. This method takes two parameters: the first parameter indicates the id of the message; The second parameter multiple, if false, indicates that only the current message has been successfully consumed; if true, it indicates that all messages prior to the current message have been successfully consumed by the current consumer.
  • BasicNack: This tells RabbitMQ that the current message has not been successfully consumed. This method takes three arguments: the first argument is the id of the message; The second parameter multiple, if false, rejects only the consumption of the current message; if true, rejects all messages before the current message that have not been confirmed by the current consumer. The third parameter requeue means, as previously stated, whether the rejected message is re-enqueued.

When the last parameter of basicNack is set to false, there is also a dead letter queue problem, which Songo will discuss in a future article.

4.2.2 Manual confirmation in Pull Mode

Pull-mode manual ack is a bit more cumbersome and there is no way to do this in Spring’s RabbitTemplate package, so we need to use the native ack method as follows:

public void receive2(a) {
    Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
    long deliveryTag = 0L;
    try {
        GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
        deliveryTag = getResponse.getEnvelope().getDeliveryTag();
        System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
        channel.basicAck(deliveryTag, false);
    } catch (IOException e) {
        try {
            channel.basicNack(deliveryTag, false.true);
        } catch(IOException ex) { ex.printStackTrace(); }}}Copy the code

The basicAck and basicNack methods involved here are the same as the previous ones, so I won’t repeat them.

5. Idempotence problem

Finally, the idempotence of messages.

Consider the following scenario:

When a consumer sends an ACK to RabbitMQ, RabbitMQ will not delete the ack because it was not received due to a network disconnect or other reasons. When a connection is re-established, Consumers will still receive the message again, resulting in repeated consumption of the message. Also, the same message may be sent twice for similar reasons (see Four Strategies to Ensure RabbitMQ Message Delivery reliability! Which one do you use? . For a variety of reasons, it is important to deal with idempotent issues when consuming messages.

Idempotent problem is not difficult to deal with, basically from the business to deal with, let me give a general idea.

Using Redis, before consumers consume messages, the id of the message is now put into Redis as follows:

  • Id-0 (Performing services)
  • Id-1 (The service is executed successfully)

If the ack fails, before RabbitMQ passes the message to another consumer, setnx will be executed to retrieve the value of the key if it already exists (indicating that the message has been consumed before). If it is 0, the current consumer will do nothing, if it is 1, it will be ack.

Extreme case: deadlock occurs when the first consumer executes a business, and set a lifetime for the key based on setNx. Producer: specifies the messageId when sending messages.

Of course, this is just a simple idea for your reference.

Songo also dealt with message idempotence in VHR project. If you are interested, you can check the VHR source code (github.com/lenve/vhr). The code is in mailServer.

6. Summary

RabbitMQ: RabbitMQ, RabbitMQ, RabbitMQ, RabbitMQ, RabbitMQ

Copy the title of the article and reply to the official account in the background, you can download this case ~