1. Previous episodes

RabbitMQ installation and Configuration and Hello World example

RabbitMQ User Management, role management and permission Settings

Shortly after these two blogs were published, careful commenter commented that queues created and messages sent were lost if the RabbitMQ service was restarted without the consumer application being started.

This leads to a very important question that is often asked in interviews: how do you ensure that messages are not lost and consumed correctly when using RabbitMQ?

2. Summary of this article

RabbitMQ provides the following mechanisms to solve this problem:

  1. Producer confirmation
  2. persistence
  3. Manual Ack

In this blog, we will first explain the producer validation mechanism, and then write a separate blog to explain the rest of the mechanism.

3. Producer confirmation

To ensure that messages are not lost, we must first ensure that the producer can successfully send messages to the RabbitMQ server.

But in the previous example, when the producer sent the message, did the message actually reach the server correctly? Without special configuration, the sending of messages by default does not return any messages to the producer, that is, the producer does not know whether the message reached the server correctly by default.

We also know from the return type of the basicPublish method:

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
    this.basicPublish(exchange, routingKey, false, props, body);
}
Copy the code

For better understanding, we will use channel.queueDECLARE (QUEUE_NAME, false, false, false, null) from the previous Producer Producer class; Note:

package com.zwwhnly.springbootaction.rabbitmq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection
        ConnectionFactory factory = new ConnectionFactory();
        // Set the hostname of RabbitMQ
        factory.setHost("localhost");
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Specify a queue. If no queue exists, it will be created automatically
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // Send a message
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        // Close channels and connectionschannel.close(); connection.close(); }}Copy the code

Now we run the code, because the queue doesn’t exist, there’s no place to store the message, but the program doesn’t fail, so the message is lost and we don’t know it.

RabblitMQ provides two solutions to this problem:

  1. Through the transaction mechanism
  2. This is done through publisher Confirm

4. Transaction mechanism

The RabblitMQ client has three methods related to the transaction mechanism:

  1. Channel.txselect: Used to set the current channel to transaction mode
  2. Channel.txcommit: Used to commit transactions
  3. Channel.txrollback: Used to roll back transactions

Create a new TransactionProducer class, TransactionProducer, with the following code:

package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TransactionProducer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection
        ConnectionFactory factory = new ConnectionFactory();
        // Set the hostname of RabbitMQ
        factory.setHost("localhost");
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Specify a queue. If no queue exists, it will be created automatically
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);

        channel.txSelect();

        // Send a message
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        channel.txCommit();
        System.out.println(" [x] Sent '" + message + "'");

        // Close channels and connectionschannel.close(); connection.close(); }}Copy the code

Run the code and find the queue is added successfully and the message is sent successfully:

To modify the code slightly, look at the transaction rollback of the exception mechanism:

try {
    channel.txSelect();

    // Send a message
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

    int result = 1 / 0;

    channel.txCommit();
    System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
    e.printStackTrace();
    channel.txRollback();
}
Copy the code

Because int result = 1/0; Will trigger the Java. Lang. ArithmeticException anomalies, so the transaction is rolled back, the message is sent failure:

If you want to send multiple messages to the channel. The basicPublish, channel. The methods of txCommit placed inside a loop, as shown below:

channel.txSelect();
int loopTimes = 10;

for (int i = 0; i < loopTimes; i++) {
    try {
        // Send a message
        String message = "Hello World!" + i;
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        channel.txCommit();
        System.out.println(" [x] Sent '" + message + "'");
    } catch(IOException e) { e.printStackTrace(); channel.txRollback(); }}Copy the code

Although transactions can solve the problem of message acknowledgement between the sender and RabbitMQ, the transaction can only commit if the message is successfully received by RabbitMQ, otherwise the transaction can be rolled back after the exception is caught. However, using transactions can suck the performance out of RabbitMQ, so it is recommended to use the sender confirmation mechanism described below.

5. Sender confirmation mechanism

Sender confirmation means that the producer sets the channel to Confirm mode. Once the channel is in Confirm mode, all messages posted on the channel are assigned a unique ID (starting from 1). Once the message has been delivered to the RabbitMQ server, RabbitMQ sends an acknowledgement (basic.ack) to the producer (containing the unique ID of the message), which lets the producer know that the message has arrived at its destination correctly.

If RabbitMQ loses a message due to its own internal error, it will send a nACK (basic.nack) command, which the producer application can also process in the callback method.

If the message and queue are persistent, the acknowledgement message is sent after the message is written to disk.

Transactions block the sender after a message is sent, waiting for a response from RabbitMQ before sending the next message.

In contrast, the biggest benefit of sender confirmation is that it is asynchronous, once a message is published. The producer application can continue to send the next message while waiting for the channel to return an acknowledgement. When the message is finally acknowledged, the producer application can process the acknowledgement through a callback method.

5.1 ordinary confirm

Create a new NormalConfirmProducer class with the following code:

package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class NormalConfirmProducer {
    private final static String EXCHANGE_NAME = "normal-confirm-exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection
        ConnectionFactory factory = new ConnectionFactory();
        // Set the hostname of RabbitMQ
        factory.setHost("localhost");
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Create an Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");


        try {
            channel.confirmSelect();
            // Send a message
            String message = "normal confirm test";
            channel.basicPublish(EXCHANGE_NAME, "".null, message.getBytes());
            if (channel.waitForConfirms()) {
                System.out.println("send message success");
            } else {
                System.out.println("send message failed");
                // do something else...}}catch (InterruptedException e) {
            e.printStackTrace();
        }

        // Close channels and connectionschannel.close(); connection.close(); }}Copy the code

channel.confirmSelect(); Set the channel to confirm mode.

channel.waitForConfirms(); Wait for an acknowledgement message to send the message and return true if it was sent successfully or false if it failed.

If you want to send multiple messages to the channel. The basicPublish, channel. The methods of waitForConfirms placed inside a loop, as shown below:

channel.confirmSelect();
int loopTimes = 10;

for (int i = 0; i < loopTimes; i++) {
    try {
        // Send a message
        String message = "normal confirm test" + i;
        channel.basicPublish(EXCHANGE_NAME, "".null, message.getBytes());
        if (channel.waitForConfirms()) {
            System.out.println("send message success");
        } else {
            System.out.println("send message failed");
            // do something else...}}catch(InterruptedException e) { e.printStackTrace(); }}Copy the code

Running results:

send message success

send message success

send message success

send message success

send message success

send message success

send message success

send message success

send message success

send message success

If confirm mode for channels is not enabled, calling channel.waitforConfirms () will bring an error:

Matters needing attention:

1) Transactions and Publisher Confirm are mutually exclusive and cannot coexist.

RabbitMQ will report an error if it attempts to reset a channel to Publisher Confirm mode after transaction mode has been enabled:

channel.txSelect();
channel.confirmSelect();
Copy the code

RabbitMQ will also report an error if it attempts to reset a channel that has Publisher Confirm enabled to transaction mode:

channel.confirmSelect();
channel.txSelect();
Copy the code

2) Transaction and publisher confirm ensure that messages are sent correctly to RabbitMQ, where “send to RabbitMQ” means messages are sent correctly to the exchange destined for RabbitMQ. If there is no matching queue for the exchange, messages will also be lost. So when using these two mechanisms make sure that the switch involved has a matching queue.

For example, a message sent by the NormalConfirmProducer class above is sent to the normal-confirm-exchange, but the exchange is not bound to any queues, so the message is still lost from a business perspective.

The normal confirm mode is to call channel.waitforConfirms () after sending a message, and then wait for confirmation from servers. This is a serial synchronous wait. Therefore, the performance improvement is not much compared to transaction mechanism.

5.2 the batch confirm

Batch Confirm mode calls channel.waitforConfirms () after sending a batch of messages and waits for confirmation from servers. Therefore, it has better performance than confirm mode of 5.1.

The downside, however, is that if basic. Nack or timeout occurs, the producer client will need to resend the entire batch of messages, which will result in a significant number of duplicate messages. If messages are frequently lost, batch confirm performance should decline rather than increase.

Code examples:

package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class BatchConfirmProducer {
    private final static String EXCHANGE_NAME = "batch-confirm-exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection
        ConnectionFactory factory = new ConnectionFactory();
        // Set the hostname of RabbitMQ
        factory.setHost("localhost");
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Create an Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        int batchCount = 100;
        int msgCount = 0;
        BlockingQueue blockingQueue = new ArrayBlockingQueue(100);
        try {
            channel.confirmSelect();
            while (msgCount <= batchCount) {
                String message = "batch confirm test";
                channel.basicPublish(EXCHANGE_NAME, "".null, message.getBytes());
                // Store outgoing messages in a cache, such as an ArrayList or BlockingQueue
                blockingQueue.add(message);
                if (++msgCount >= batchCount) {
                    try {
                        if (channel.waitForConfirms()) {
                            // Clear the cache of messages
                            blockingQueue.clear();
                        } else {
                            // Resend cached messages}}catch (InterruptedException e) {
                        e.printStackTrace();
                        // Resend cached messages}}}}catch (IOException e) {
            e.printStackTrace();
        }

        // Close channels and connectionschannel.close(); connection.close(); }}Copy the code

5.3 the asynchronous confirm

In asynchronous confirm mode, the ConfirmListener callback interface is added to the producer client and the handAck() and handNack() methods of the interface are overwritten to handle basic. Ack and basic.nack sent back by RabblitMQ, respectively.

Both methods take two parameters, the first parameter deliveryTag is used to mark the unique sequence number of the message, and the second parameter multiple indicates whether there are multiple acknowledgements. A value of true indicates multiple acknowledgements, and a value of false indicates a single acknowledgement.

Sample code:

package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

public class AsyncConfirmProducer {

    private final static String EXCHANGE_NAME = "async-confirm-exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection
        ConnectionFactory factory = new ConnectionFactory();
        // Set the hostname of RabbitMQ
        factory.setHost("localhost");
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Create an Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        int batchCount = 100;
        long msgCount = 1;
        SortedSet<Long> confirmSet = new TreeSet<Long>();
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Ack, SeqNo." + deliveryTag + "Multiple," + multiple);
                if (multiple) {
                    confirmSet.headSet(deliveryTag - 1).clear();
                } else{ confirmSet.remove(deliveryTag); }}@Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack, SeqNo." + deliveryTag + "Multiple," + multiple);
                if (multiple) {
                    confirmSet.headSet(deliveryTag - 1).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
                // Notice that you need to add a scenario to handle message retransmission}});// Demo sends 100 messages
        while (msgCount <= batchCount) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish(EXCHANGE_NAME, "".null."async confirm test".getBytes());
            confirmSet.add(nextSeqNo);
            msgCount = nextSeqNo;
        }

        // Close channels and connectionschannel.close(); connection.close(); }}Copy the code

Running results:

Ack,SeqNo: 1,multiple: false

Ack,SeqNo: 2,multiple: false

Ack,SeqNo: 3,multiple: false

Ack,SeqNo: 4,multiple: false

Ack,SeqNo: 5,multiple: false

Ack,SeqNo: 6,multiple: false

Ack,SeqNo: 7,multiple: false

Ack,SeqNo: 8,multiple: false

Ack,SeqNo: 9,multiple: false

Ack,SeqNo: 10,multiple: false

Note: The RabbitMQ ack messages sent back to the producer do not have a fixed batch size.

6. Performance comparison

So far, we have seen that there are four modes (transaction, normal Confirm, batch Confirm, and asynchronous Confirm) that can implement producer confirmation. Let’s compare their performance and simply modify the number of messages sent in the above sample code, such as 10000.

Send 10000 messages, transaction mechanism time: 2103

Sending 10000 messages takes 1483 hours for common confirm mechanism

Sending 10000 messages and batch confirm takes 281

Sending 10000 messages and using the asynchronous confirm mechanism takes 214

As you can see, the transaction mechanism is the slowest, while the ordinary confirm mechanism has little improvement. Batch confirm and asynchronous confirm have the best performance. You can choose which mechanism to use according to your preferences.

7. Source code and reference

Source code address: github.com/zwwhnly/spr… Welcome to download.

RabbitMQ Combat Guide by Zhu Zhonghua