In our development, with the complex and call link business growing, we may be slowly introduced more and more middleware to better service to our system, but each sample technology is a double-edged sword, in the view of improving the performance of our system at the same time, we also want to looking for ways to reduce its influence on the stability of the system, Today we are going to show you how to make RabbitMQ reliable.

To understand how to ensure RabbitMQ’s reliability, start with its execution process.

Execute the process

  • A producer sends a message or a consumer consumes a messageA long connectionThe message is carried by Channal in the long link.
    • Advantage of long connection: IF a customer is down or offline, MQ will sense it and store the message again when it is no longer able to distribute it, preventing massive message loss
  • A message consists of a header, a message body, and a routing key.
  • The message is sent to a virtual host specified by the MQ server. The Exchange switch in the virtual host receives the message and determines which queue it is sent to based on the routing key and binding relationship of the message.
  • The consumer gets the message by listening on the specified queue.

What if the execution process can see that messages are at risk of being lost, either during producer delivery to the MQ server or during consumption?

Message confirmation mechanism – Reliable arrival

The transaction

RabbitMQ also provides transaction messages, but the official documentation states that transaction messages can degrade MQ performance by up to 250 times, so in today’s performance-demanding environments, transaction messages are not a good idea.

There are only a few other ways to ensure reliability.

This is a diagram of a message sent to a consumption. To ensure reliability, consider three aspects.

Publisher → Broker, confirmCallback mechanism

  • Start confirmCallback by setting the publisherConfirm (True) option when creating the connectionFactory.
  • ConfirmCallback is executed whenever a message is received by the broker; in cluster mode, all brokers need to receive it before confirmCallback is called.
  • A message received by the broker only indicates that the message has reached the server. It does not guarantee that the message will be delivered to the destination queue. So the next returnCallback is needed.
# Configure the YML file
spring:
	rabbitmq:
    	# Enable send order confirmation
    	publisher-confirms: true
Copy the code
	/ / custom abbitTemplate
    @PostConstruct  // After MyRabbitConfig is initialized, execute this method
    public void initRabbitTemplate(a){

        // The server receives a message acknowledging the callback
        /* correlationData Unique ID of the message Whether the ACK message is successfully received. Cause Failure cause */
        rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            log.info("confirm---->correlationData{},-------->ack{},-------->cause{}",correlationData,ack,cause);
        }));
	}
Copy the code

Exchange → Queue, returnCallback

  • Confirm mode only ensures that messages reach the broker, not the target queue. In some business scenarios, the return mode is used to ensure that messages are delivered to the destination queue.
  • ReturnCallback will be called if a post to the target queue is not delivered, which can record the detailed post data that will be needed for regular patrols or auto-correction.
# Configure the YML file
spring:
	rabbitmq:
    	 # Enable sending queue confirmation
    	 publisher-returns: true
    	 Callback this returnConfirm asynchronously as soon as it arrives in the queue
   		 template:
      		mandatory: true
Copy the code
	/ / custom abbitTemplate
    @PostConstruct  // After MyRabbitConfig is initialized, execute this method
    public void initRabbitTemplate(a){

        // Set the acknowledgement callback for messages arriving at the queue (this failure callback is triggered only if messages are not delivered to the specified queue)
        Exchange to which switch the message was sent routingKey the routingKey used for the message */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("return---->message{},-->replyCode{},-->replyText{},-->exchange{},-->routingKey{}",
                    message,replyCode,replyText,exchange,routingKey);
        });
	}
Copy the code

Queue → Consumer, ACK message confirmation mechanism

  • By default, messages are automatically ack and removed from the broker queue when received by consumers
    • Problem: Received many messages, automatic reply server ACK, processing only one message, server down. All messages will be lost, so turn off the default automatic ACK mechanism.
  • Consumers get the message,An ACK can be sent back to the broker
    • Basic. ack is used for affirmative confirmation; The broker will remove this message
    • Basic. nack is used for negative confirmation; You can indicate whether the broker dropped the message, in batches
    • Basic. reject is used to deny or confirm; Same as above, but not in bulk
  • Queue without consumers, messages are still stored until consumers consume them
  • When a consumer receives a message, it will automatically ack by default. But if there is no confirmation that the message was processed, or that it was processed successfully. We can turn on manual ACK mode
    • Message processing is successful, ack() accepts the next message, and the message broker is removed
    • Message processing failure, nack()/reject(), resend to someone else for processing, or fault tolerant post-ack
    • The message never called ack/ nACK. The broker believes that the message is being processed and will not be delivered to anyone else. The client is disconnected. Messages are not removed by the broker but delivered to others
    • In manual ACK, a message remains unacked as long as there is no ack to tell MQ that it has been consumed. Even if the consumer is down, the message will not be lost and will become ready. The next time a new consumer is connected, the message will be sent to him
# Configure the YML file
spring:
	rabbitmq:
    	# Switch to manual ACK
        listener:
          direct:
            acknowledge-mode: manual
Copy the code

After the above three configurations, our code for consuming messages looks like this

/ * * *@author lp
 * @date 2020/8/9 15:05
 */
@Service
@Slf4j
@RabbitListener(queues = "demo.queue")
public class DemoListener {

    @Autowired
    private DemoService service;

    @RabbitHandler
    public void listener(DemoEntity entity, Channel channel, Message message) throws IOException {
        log.info("----------- Start consuming message ----------");
        try {
        	// Specific business
            service.doSomething(entity);
            // The acl was successfully restored
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
        	// Failed processing returns to queue
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); }}}Copy the code

Of course, this three-step configuration alone is not enough, because the above steps do not do message persistence, after the persistence plan, our message will be invincible. (exaggerated figure of speech)

Summary:

  • The message was sent but did not reach the server due to a network problem
    • Make a good try-catch method. The network may fail to send messages. After the failure, a retry mechanism should be established, which can be recorded in the database and periodically scanned and resend messages.
    • Do a good log (save the details of each message to the database), each message status is received by the server, should be recorded.
    • Periodically resend the message. If the message fails to be sent, periodically scan the database for the failed message to resend the message.
  • The message arrives at the Broker, which writes the message to disk (persistence) for success. The Broker has not been persisted and is down
    • Publisher must also incorporate an acknowledgement callback mechanism to acknowledge success messages and modify the database message state.
  • Automatic ACK. The consumer receives the message, but fails to process it
    • Manual ACK must be enabled, the message is successfully consumed before removal, failure or no time to process noAck and re-join the queue.

Once these are done, the possibility of message loss is very small, but there will be new problems, such as repeated consumption, too many messages will cause consumers to break down, etc.

Preventing message duplication

  • Message consumption successful, transaction committed, machine down when ACK. If no ACK succeeds, the message from the Broker changes from unACK to ready and is sent to other consumers
  • When the consumption fails, the message is automatically sent again due to the retry mechanism
  • The ack fails, the message changes from unack to Ready, and the Broker resends
    • The consumer’s business consumption interface should be designed to be idempotent. For example, there is a status sign of the work order
    • Using anti-duplicate table (Redis /mysql), each message has a unique identification of the service. Once processed, no processing is required
    • RabbitMq has a reDELIVERED field for every message that was redelivered, not for the first time

Keep an eye on me for details on how to ensure idempotence, and I’ll talk about it in a separate lecture.

Message backlog

  • Causes:
    • Customer outage
    • Lack of consumer capacity
    • The sender sends too much traffic. Procedure
  • How to solve:
    • Online more consumers, normal consumption
    • Online special message queue service, take out the messages in batches first, record them to the database, and slowly process them offline