1. Analysis of message reliability delivery

In the process of sending RabbitMQ messages, we can see that in order to ensure the reliability of the message delivery, the following four steps must be taken to ensure the reliability of the message

  1. The Producer sends messages to the Broker.
  2. Messages are routed from an Exchange to a Queue.
  3. Messages are stored persistently in queues.
  4. Consumers subscribe to and consume messages.

According to the above four links, we will analyze one by one.

1.1. Producers send messages to brokers

In the process of sending a message to the Broker, if the Broker fails to receive the message due to network or disk problems, how does the Producer know whether the message was sent to the Broker?

RabbitMQ provides an acknowledgement mechanism for the Producer to receive a message from the Broker before the message is successfully received. There are two kinds of confirmation mechanisms:

  • **Transaction mode **
  • Confirm mode

1.1.1 Transaction mode

Here is a transaction and I normally understand the transaction, in fact, the same reason, in the case of failure in the sending of the message, the message will be rolled back.

By capturing packets using WireShark, we can know. When using the transaction pattern, the interaction between the Producer and Broker is as follows:

The Java API is as follows:

Code sample

try {
    
    channel.txSelect();// Start transaction mode
    channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
    channel.txCommit();/ / submit
    System.out.println("Message sent successfully");
} catch (Exception e) {
    channel.txRollback();/ / rollback
    System.out.println("Message has been rolled back.");
}
Copy the code

Springboot is as follows:

Configure it on TemplateConfig

Code sample

rabbitTemplate.setChannelTransacted(true) :Copy the code

In transaction mode, the Commit succeeds only when the Commit OK directive from the server is received. So the problem of producer and server validation can be solved. However, the transaction mode has a disadvantage, it is blocked, one message is not sent, the next message can not be sent, it can drain the performance of the RabbitMQ server. Therefore, it is not recommended for use in production environments.

1.1.2 Confirm mode

1.1.2.1. Common confirmation mode

ConfirmSelect () turns on the confirmation mode, the Broker will send a basic. Ack receiving messages, the Producer receives receipt by channel.waitforconfirmConfirm () acknowledge receiving.

Code sample

// Enable the sender acknowledgement mode
channel.confirmSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// Common Confirm, send one, Confirm one
if (channel.waitForConfirms()) {
    System.out.println("Message sent successfully" );
}else{
    System.out.println("Message sending failed");
}
Copy the code
1.1.2.2 Batch confirmation mode

Hannel. ConfirmSelect (confirm) open mode, the Broker to receive all the news will give a Basic Ack, through the channel. The Producer side waitForConfirmsOrDie () a receipt. As long as waitForConfirmsOrDie does not throw an exception, the server receives successfully. This approach improves efficiency. However, if there is a problem with one of the messages sent in bulk, all the messages will not be sent successfully.

Code sample

try {
    channel.confirmSelect();
    for (int i = 0; i < 5; i++) {
        // Send the message
        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
    }
    If Multiple=True, all the messages before the delivery-tag in the ACK are confirmed
    // For example, 5 messages may receive only 1 ACK, or 2 ACK.
    // Until all messages are published, there will be an IOException if one is not confirmed by the Broker
    channel.waitForConfirmsOrDie();
    System.out.println("Message sent, batch confirmation successful.");
} catch (Exception e) {
    // An exception has occurred and all messages may need to be resended
    e.printStackTrace();
}
Copy the code
1.1.2.3 Asynchronous confirmation mode

Asynchronous acknowledgement, as the name suggests, means that sending and acknowledging messages are not synchronous. This allows you to acknowledge the message as you send it. ConfirmSelect: confirmSelect: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener: ConfirmListener The handleAck method is a callback to the message that the server has acknowledged. The handleNack method is served as a callback for the acknowledgement message.

Code sample

// The deliveryTag used to maintain unacknowledged messages
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

// This will not print all ACK responses; There may be more than one ACK, and it may be multiple or one ACK at a time
// Listen asynchronously for acknowledged and unacknowledged messages
// To repeat, stop the previous producer and clear the queue
channel.addConfirmListener(new ConfirmListener() {
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Broker unacknowledged message, identified:" + deliveryTag);
        if (multiple) {
            // headSet Delete all elements before the following parameters
            confirmSet.headSet(deliveryTag + 1L).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
        // Here is the method of adding weights
    }
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        // If true, all messages (smaller than the deliveryTag) before the value of deliveryTag were batched, and if false, a single acknowledgement
        System.out.println(String.format("Broker confirms messages, id: %d, multiple messages: %b", deliveryTag, multiple));
        if (multiple) {
            // headSet Delete all elements before the following parameters
            confirmSet.headSet(deliveryTag + 1L).clear();
        } else {
            // Remove only one element
            confirmSet.remove(deliveryTag);
        }
        System.out.println("Unconfirmed information :"+confirmSet); }});// Enable the sender acknowledgement mode
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
    long nextSeqNo = channel.getNextPublishSeqNo();
    // Send the message
    // String exchange, String routingKey, BasicProperties props, byte[] body
    channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
    confirmSet.add(nextSeqNo);
}
System.out.println("All the news :"+confirmSet);

// If it is closed first, you may not receive the next ACK
//channel.close();
//conn.close();
Copy the code

Springboot example:

Set it up in TemplateConfig. Code sample

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(! ack) { System.out.println("Sending message failed:" + cause);
            throw new RuntimeException("Sending exception:"+ cause); }}});Copy the code

Messages are routed from Exchange to Queue

There was a problem routing messages from Exchange to Queue. There are two possible scenarios. Routing key error. The or queue does not exist.

RabbitMQ provides two ways to handle this:

  • The Broker is sent back to the Producer (via callback set by the Producer).
  • Route the switch to the backup switch.

1.2.1 The Broker resends to the Producer

Java API, by setting the ReturnListener to achieve the callback.

Code sample

channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int replyCode,
                             String replyText,
                             String exchange,
                             String routingKey,
                             AMQP.BasicProperties properties,
                             byte[] body)
        throws IOException {
        System.out.println("========= listener received unroutable message returned ============");
        System.out.println("replyText:"+replyText);
        System.out.println("exchange:"+exchange);
        System.out.println("routingKey:"+routingKey);
        System.out.println("message:"+newString(body)); }});Copy the code

Used in SpringBoot, set in TemplateConfig, and implement the callback by setting ReturnCallback.

Code sample

 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
     public void returnedMessage(Message message,
                                 int replyCode,
                                 String replyText,
                                 String exchange,
                                 String routingKey){
         System.out.println("Returned message:");
         System.out.println("replyCode: "+replyCode);
         System.out.println("replyText: "+replyText);
         System.out.println("exchange: "+exchange);
         System.out.println("routingKey: "+routingKey); }});Copy the code

1.2.2 Switch Routing to backup switch

The alternate switch is specified by setting the alternate-exchange parameter when the switch is declared.

Code sample

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
    contentEncoding("UTF-8").build();

// Backup switch
channel.exchangeDeclare("ALTERNATE_EXCHANGE"."topic".false.false.false.null);
channel.queueDeclare("ALTERNATE_QUEUE".false.false.false.null);
channel.queueBind("ALTERNATE_QUEUE"."ALTERNATE_EXCHANGE"."#");

// Specify the backup switch when declaring the switch
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange"."ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE"."topic".false.false.false, arguments);
Copy the code

1.3 persistent storage of messages in Queue

Here we will learn some Settings for message store persistence.

1.3.1 Queue persistence

Declare the queue as durable=true.

// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
Copy the code

Durable: Queues that do not persist. They are stored in memory and disappear when the service is restarted.

AutoDelete: Automatically delete when no consumer is connected.

The characteristics of an exclusive queue are:

  1. Visible only to the Connection that first declared it.
  2. Will be automatically deleted when the connection is disconnected.

1.3.2 Switch persistence

Again, set the following parameters to achieve persistence. durable

// Declare the switch
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct".false.false.null);
Copy the code

1.3.3 message persistence

The message is persisted by setting deliveryMode=2 in the BasicProperties construct.

// Set the expiration time for each message
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .deliveryMode(2) // Persist the message
    .contentEncoding("UTF-8")
    .expiration("10000") // TTL
    .build();

Copy the code

1.3.4, cluster

Clustering improves the availability of RabbitMQ by using backup.

1.4 Consumer news

If the Consumer receives the message and an exception occurs during processing, the Comsumer fails to consume the message. In this case, what to do?

Don’t panic, RabbitMQ provides us with a Consumer confirmation mechanism. After Comsumer receives and processes the message, it either manually or automatically sends an ACK to the server.

No ACK message is received. When the consumer is disconnected, RabbitMQ sends the message to the other consumer. If there are no other consumers, the consumer reconsumes the message and repeats the business logic (or better yet, if the code is fixed).

There are two ways consumers can determine how to receive messages:

  • Automatic acknowledgement (ACK)
  • Manual confirmation (ACK)

1.4.1 Automatic ACK

Automatic ACK, which is also the default. Instead of writing an ACK code at the consumer, the consumer will automatically send an ACK when it receives a message, rather than when the method finishes (regardless of whether you have a normal message).

1.4.2 Manual ACK

Validation can be implemented after the processing of the consumer business logic is complete.

1. Java API usage
// Set autoAck to false to declare automatic acknowledgment
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);

// Next, in the Consumer's callback, call the channel's following methods to implement message confirmation, rejection, and exception handling
 if (msg.contains("No." ")) {// Reject the message
     // Requeue: indicates whether to requeue. True: yes. False: directly discard, indicating that the queue can be deleted directly
     // TODO if there is only one consumer, requeue true will result in repeated consumption of messages
     channel.basicReject(envelope.getDeliveryTag(), false);
 } else if (msg.contains("Abnormal")) {// Batch reject
     // Requeue: indicates whether to requeue
     // TODO if there is only one consumer, requeue true will result in repeated consumption of messages
     channel.basicNack(envelope.getDeliveryTag(), true.false);
 } else {
     // Manually reply
     // If you do not respond, the message will remain in the queue and will be reused when reconnecting
     channel.basicAck(envelope.getDeliveryTag(), true);
 }
Copy the code

Code sample

2. Use springBoot

First, in the application.properties file, do the following configuration

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
Copy the code

Note that there are three options to choose from

  • noneAutomatic ACK:
  • manualManual ACK
  • auto: Sends an ACK if the method does not throw an exception. If the method throws an exception and is notAmqpRejectAndDontRequeueExceptionThe nACK is sent and re-enqueued. If you throw an exceptionAmqpRejectAndDontRequeueExceptionThe nACK is not requeued.

Then confirm in the following consumption method

@RabbitListener(queues = "${com.fanger.secondqueue}", containerFactory="rabbitListenerContainerFactory")
public class SecondConsumer {
    @RabbitHandler
    public void process(String msgContent,Channel channel, Message message) throws IOException {
        System.out.println("Second Queue received msg : " + msgContent );
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); / / confirm
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false); // Batch reject
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); / / deny}}Copy the code

1.5. Consumer callback

To handle the above four steps to ensure reliable delivery of the message, we can also ensure the reliability of the message by generating the receipt after the consumer has executed successfully. Here the producer can provide a response API to receive notifications.

1.6 Message compensation mechanism

If, for a variety of reasons, the consumer does not call back to the producer API. What if the message is not processed successfully?

At this point, we need to compensate for the message and resend the message.

You can do this with a status table of messages, you can do this with a scheduled task, you can scan the table, you can resend the message. The thing to notice here is

  • The number of resends?
  • The time interval for resending the message?

These two problems, depending on the system design, but do not try more than 3 times.

1.7 Idempotence of messages

If you have a message compensation mechanism in 1.6, you must ensure that messages are idempotent.

Possible reasons for possible duplication:

  1. The problem of the producer is that the link (1) sends the message repeatedly. For example, when the Confirm mode is enabled but no confirmation is received, the consumer sends the message repeatedly.
  2. There is a problem in step 4, because the consumer has not sent an ACK or other reasons, the message is repeated consumption.
  3. Producer code or network problem.

How do you avoid repeated consumption of messages?

Duplicate messages can be controlled by generating a unique business ID for each message and then reweighting when it is dropped from the library.

1.8 Sequence of messages

Sequentiality means that the order in which consumers consume messages is the same as the order in which producers produce them.

In RabbitMQ, when there are more than one consumer in a queue, the order cannot be guaranteed because different consumers consume messages at different rates. Sequential consumption (different business messages sent to different dedicated queues) is guaranteed only in the case of one consumer per queue.