Since MQ is used in a project, it is inevitable that message loss needs to be considered. In situations where money is involved, message loss can be fatal. So what are the scenarios for message loss in RocketMQ? Let’s start with the simplest consumption flow chart:

Here are a few scenarios:

  1. The producer generates a message and sends it to RocketMQ
  2. Once RocketMQ receives a message, it must be saved to disk to avoid data loss after a power outage or outage
  3. The consumer retrives the message consumption from RocketMQ, and once the consumption is successful, the process ends

All three scenarios can result in message loss, as shown in the following figure:

  1. In scenario 1, when a producer sends a message to Rocket MQ, it can be lost if network jitter or a communication exception occurs
  2. In scenario 2, when messages need to be persistent to disk, there are two ways to cause message loss. (1) RocketMQ writes messages to OS cache rather than directly to disk to reduce disk I/O. Getting messages from OS cache is similar to getting messages directly from memory. After a period of time, the OS thread asynchronously flusher the message to disk, at which point the message is truly persistent. During this process, if the message has not been asynchronously flushed, the Broker in RocketMQ will fail, resulting in message loss. ② If the message has been flushed to disk, but the data has not been backed up, the message will also be lost if the disk is corrupted
  3. The consumer successfully received the message from RocketMQ, and before the message was completely consumed, RocketMQ informed RocketMQ that I had consumed the message, and the consumer went down, but RocketMQ believed that the consumer had successfully consumed the data, so the data was still lost

So how to guarantee zero message loss?

  1. The solution to avoid message loss in Scenario 1 is to use the transaction mechanism of RocketMQ to send messages. The process is as follows: (1) The producer sends half messages to RocketMQ, and the consumer cannot consume half messages. After the half message is successfully sent and RocketMQ returns a successful response, the producer’s core link will be executed. If the producer’s core link fails to execute, the producer’s core link will be rolled back and RocketMQ will be notified to delete the half message. The RocketMQ Commit Half message is notified so that the consumer can consume this data. Some RocketMQ has not received a commit/ ROLLBACK response from the producer for a long time. The details of the callback producer interface guarantee that the message will not be lost at this stage by successfully sending the producer’s message to RocketMQ using a RocketMQ transaction
  2. In scenario 2, to prevent message loss, you need to change the asynchronous flush policy of OS cache to synchronous flush. In this step, you need to modify the Broker configuration file to change the flushDiskType policy to SYNC_FLUSH. By default, ASYNC_FLUSH is asynchronous flush. Once the synchronous flush returns successfully, the message must have been persisted to disk; To ensure that data is not lost due to disk damage, RocketMQ needs to be deployed in a master-slave structure, with the Leader data backed up in multiple followers to prevent a single point of failure.
  3. In scenario 3, the message reaches the consumer, and RocketMQ guarantees in the code that the message will not be lost

 

/ / register message listener to handle consumer. RegisterMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {/ / the message handling return ConsumeConcurrentlyStatus. CONSUME_SUCCESS; }});Copy the code

In the code above, RocketMQ registers a listener with the consumer, and when the consumer gets a message, it calls back the listener function to process the message inside and when your message is processed, Will return ConsumeConcurrentlyStatus. Only returned to the CONSUME_SUCCESS CONSUME_SUCCESS, consumers will tell RocketMQ I have already finished consumption, if consumers downtime, news has finished processing, If it goes down before the consumer returns to CONSUME_SUCCESS, RocketMQ will assume that your consumer node has failed and will automatically failover the message to other consumers in the consumer group to consume the message, ensuring that the message is not lost

To ensure that messages are not lost, simply write the business logic for message consumption in the consumeMessage method, if you have to do something nasty, such as the following code

 

/ / register message listener to handle consumer. RegisterMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {/ / open the child Thread asynchronous message processing new Thread () {public void the run () {/ / the message processing}}. The start (); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});Copy the code

If the new child thread processes the message asynchronously, it is possible that the message has not been consumed, the consumer tells RocketMQ that the message has been consumed, and the message is lost as a result of downtime.

Using the whole set of schemes above, you can guarantee zero message loss when using RocketMQ, but there is a significant drop in performance and throughput

  1. Using a transaction mechanism to transfer messages takes many more steps than normal message transmission and costs performance
  2. Compared with asynchronous flush, synchronous flush is stored in disk and memory, and the speed is not of the same order of magnitude
  3. In a master-slave architecture, the Leader needs to synchronize data to the followers
  4. Consumption cannot be done asynchronously, and RocketMQ is notified that the consumption has completed after it has completed

Message zero loss is a double-edged sword. To use it well, it depends on the specific business scenario, and choosing the right solution is the best