By Threedayman Hang Seng LIGHT Cloud Community

Following on from the previous lecture on RabbitMQ In Messaging Middleware (1), this time we will talk about the problem of message loss in RabbitMQ. How to avoid message loss in the core business.

Story of blood and Tears: RabbitMQ was introduced in the delivery process of commodity shopping. One day, due to network jitter, the producer’s message was not sent to RabbitMQ. As the reliability transmission of the message was not guaranteed, the message was lost.

To prevent this from happening again, let’s look at how we can ensure that messages are not lost in RabbitMQ.

When does message loss occur

The message transmission process is roughly as follows

Message loss can occur when

  • The sending of messages from the Producer to RabbitMQ fails due to network exceptions or service exceptions.
  • The RabbitMQ server is abnormal or restarted, resulting in message loss.
  • After receiving the message, the Consumer failed to process the message and the message was lost.

In RabbitMQ, the producer sends messages to an Exchange, which is routed to a Queue according to routing rules. If the routing rules are set incorrectly, messages can be lost.

Reliability of Producer messages is guaranteed

To avoid message sending failure due to network jitter or abnormal RabbitMQ server. An ACK mechanism can be introduced when the Producer sends messages. After receiving the message, the server returns a success or failure acknowledgement message to the Producer.

RabbitMQ provides two solutions:

  • Transaction mechanism
  • Sender acknowledgement mechanism

Transaction mode, the main methods are as follows

  • Channel.txselect () sets the current channel to transactional mode.
  • Channel.txcommit () is used to commit the transaction.
  • Channel.txrollback () is used to roll back transactions

The following code is a simple example

try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
// Send failed follow-up processing, resend or persist the exception message later retry
}
Copy the code

The flow process of the signal is as follows

Image from RabbitMQ Field Guide

If the transaction commits successfully, the message must reach RabbitMQ.

Image from RabbitMQ Field Guide

The transaction mechanism solves the problem of message acknowledgement between the message producer and RabbitMQ. The transaction can only be committed if the message is successfully received by RabbitMQ. But transactions are blocked synchronously, which greatly reduces RabbitMQ throughput, and RabbitMQ provides an improvement in the sender acknowledgement mechanism.

Sender confirmation mechanism:

  • channel.confirmSelect(); Set the confirmation mechanism for the channel
  • Channel. AddConfirmListener () as the channel add ConfirmListener this callback interface.
  • Com. The rabbitmq. Client. ConfirmListener# handleAck callback processing by the rabbitmq receive messages normally.
  • Com. The rabbitmq. Client. ConfirmListener# handleNack callback process has not been the rabbitmq normal receiving messages.
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
	public void handleAck(long deliveryTag, boolean multiple) throws IOException {
		if (multiple) {
			confirmSet.headSet(deliveryTag + 1).clear();
		} else{ confirmSet.remove(deliveryTag); }}public void handleNack(long deliveryTag, boolean multiple) throws IOException {
		System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
		if (multiple) {
			confirmSet.headSet(deliveryTag + 1).clear();
		} else {
			confirmSet.remove(deliveryTag);
		}
		// We need to add code to handle message sending failure, resend or persist compensation.}});// Simulate sending messages all the time
while (true) {
	long nextSeqNo = channel.getNextPublishSeqNo();
	channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
	confirmSet.add(nextSeqNo);
}
Copy the code

The above example demonstrates the asynchronous confirm format, which ensures that the producer message is received by RabbitMQ without synchronous blocking causing significant reduction in RabbitMQ throughput.

The RabbitMQ client

To avoid message loss due to RabbitMQ service exceptions or restarts, you need to persist the information and save it to disks. To ensure that messages are not lost, persist the main queue, persist. Exchange is not persistent. After the RabbitMQ service is restarted, exchange metadata is lost, but messages are not lost, but messages cannot be sent to the exchange.

  • The durable parameter needs to be set to true when declaring queues. (Because messages are queued, if queues are not persistent, then messages will be lost after RabbitMQ restarts.)
  • Messages are persisted by setting the deliveryMode to 2 (deliveryMode in BasicProperties).
channel.queueDeclare(QUEUE_NAME,true.//durable
                     false.false.null);
channel.basicPublish("",QUEUE_NAME, 
                     MessageProperties.PERSISTENT_TEXT_PLAIN,// See below for specific properties
                     message.getBytes(StandardCharsets.UTF_8));
Copy the code
public static final BasicProperties PERSISTENT_TEXT_PLAIN = 
new BasicProperties("text/plain".null.null.2.//deliveryMode
					0.null.null.null.null.null.null.null.null.null);
Copy the code

The Consumer end

To ensure that the Consumer end does not lose messages due to abnormal consumption processing or Consumer application restart. We need to do the following

  • Turn off the default automatic confirmation. Set this parameter to manual confirmation mode.

Manual confirmation: RabbitMQ waits for a confirmation from the consumer before deleting the message.

Automatic validation (default) : RabbitMQ automatically sets outgoing messages to validation and deletes them, whether or not the consumer actually consumed them.

When set to manual validation, there are two types of messages in the queue for the RabbitMQ server

  • Ready: Waiting for a message to be delivered to the consumer.
  • Unacked: has been delivered to the customer but has not received a message from the customer confirming the new number.

For an Unacked message, the following can happen:

  • RabbitMQ receives an ACK signal from the consumer holding the message, and the RabbitMQ server deletes the message.
  • The RabbitMQ server receives a nack/reject signal from the consumer holding the message, with requeue set to true, and RabbitMQ queues the message again.
  • The RabbitMQ server receives a nack/reject signal from the consumer holding the message, with requeue set to false. If a dead-letter queue is configured, the message is added to the queue. If not, the message is removed from the queue by RabbitMQ.
  • If the RabbitMQ server does not receive an acknowledgement from the message holder and the consumer is not disconnected, the RabbitMQ server waits without a timeout.
  • The RabbitMQ server does not receive an acknowledgement from the consumer of the message, and if the consumer of the message has been disconnected, RabbitMQ will arrange for the message to be re-queued.

Message rejection can be done using the basicReject or basicNack methods of the Channel class. Let’s look at the differences.

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

void basicReject(long deliveryTag, boolean requeue) throws IOException;
Copy the code
  • DeliveryTag: a 64-bit long integer value used as the number of the message.
  • Requeue: indicates whether to re-enter the queue.
  • Multiple: Whether to batch process messages that are not confirmed by the current consumer.

BasicReject Only one message can be rejected at a time.

BasicNack when multiple is set to false, reject a deliveryTag message at a time, similar to basicReject. When multiple is true, all messages before the deliveryTag number that have not been confirmed by the current consumer are rejected.

Let’s look at a code example:

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag".new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
			 try{
				Message processing business logic processing
				channel.basicAck(deliveryTag, false);
			 }catch(Exception e){
                 // Handle failure processing logic
				channel.basicReject(deliveryTag, false); }}});Copy the code

With the manual acknowledgement mode, RabbitMQ will only delete messages when it receives a reply from the Consumer holding the message, ensuring that the message is not lost due to an exception in the Consumer’s application.

If RabbitMQ has sent a message to a Consumer, the Consumer has processed the message properly, but RabbitMQ does not receive an ACK message from the Consumer due to network jitter or other reasons. If the Consumer is considered disconnected, RabbitMQ queues the message again and delivers it to the Consumer. This can lead to problems with some messages being sent to the Consumer repeatedly.

RabbitMQ does have the potential to create duplicate messages in this scenario, which we will address in the next article.

The scheme only guarantees that messages will be delivered At least Once.

Dead-letter queue

DLX, Dead Letter-Exchange, Dead Letter Exchange. When a message becomes dead message, it can be re-dlX. The queue bound to DLX is a dead letter queue.

There are several possibilities for a message to become private

  • The message is basicNack/basicReject and the Requeue parameter is set to false;
  • Message expired.
  • The queue exceeds the maximum length. Procedure

Here is a simplified code example to demonstrate the use of a dead letter queue. See the comments for details

// Declare the switch
channe1.exchangeDeclare("exchange.dlx"."direct ".true);
channe1.exchangeDeclare( "exchange.normal "." fanout ".true);
Map<String , Object> args = new HashMap<String, Object>( );
// Set the message timeout period
args.put("x-message-ttl " , 10000);
// Execute DLX with the x-dead-letter-exchange parameter
args.put( "x-dead-letter-exchange "."exchange.dlx");
// Specify a routing key for the DLX
args.put( "x-dead-letter-routing-key"." routingkey");
channe1.queueDec1are( "queue.norma1 ".true,fa1se,fa1se,args);
channe1.queueBind( "queue.normal "."exchange .normal"."");
channe1.queueDec1are( "queue.d1x ".true , false , false , null); channe1.queueBind("queue.dlx"."exchange.dlx ", routingkey");
channe1.basicPublish( "exchange.normal","rk" ,
MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes()) ;
Copy the code

The following figure shows the message flow

For RabbitMQ, analyzing the messages in the dead-letter queue can be used to improve and optimize the system.

Summary: Message loss may occur at the production end, server end, and consumer end. For important business we can use the above method to ensure that the message is not lost. You can also leave a comment about the pitfalls you’ve encountered with RabbitMQ.

Reference documentation

  1. RabbitMQ Practice Guide
  2. www.rabbitmq.com/reliability…