I recently read a post by @Javaguide “An interviewer asked me how to ensure Kafka doesn’t lose messages. I cried!” This article picks up on this topic and talks about how to keep RocketMQ from losing messages.

0x00. Message sending process

A message goes through three stages from production to consumption:

  • In the production phase, Producer creates a message and delivers it to the MQ Broker over the network
  • In the storage phase, messages are stored on the Broker side disk
  • In the message phase, the Consumer pulls the message from the Broker

Messages may be lost at any of the above stages. As long as we find out the reasons for message loss at these three stages and take reasonable measures to avoid loss, the problem of message loss can be completely solved.

0x01. Production phase

The Producer sends a message to the Broker over the network. When the Broker receives the message, it sends an acknowledgement response to the Producer. So as long as the producer receives the acknowledgement response back, the message is not lost during the production phase.

The RocketMQ example code for sending a message is as follows:

DefaultMQProducer mqProducer=new DefaultMQProducer("test");
// Set the nameSpace address
mqProducer.setNamesrvAddr("namesrvAddr");
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */."Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send messages to a Broker
try {
    SendResult sendResult = mqProducer.send(msg);
} catch (RemotingException e) {
    e.printStackTrace();
} catch (MQBrokerException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}
Copy the code

The send method is a synchronous operation, and as long as the method does not throw any exceptions, the message has been sent successfully.

The message is sent successfully only when it reaches the Broker. The Broker may return different response states under different configurations:

  • SendStatus.SEND_OK
  • SendStatus.FLUSH_DISK_TIMEOUT
  • SendStatus.FLUSH_SLAVE_TIMEOUT
  • SendStatus.SLAVE_NOT_AVAILABLE

To quote the official status note:

The different broker side configurations shown above will be explained in detail below

In addition, RocketMQ also provides asynchronous transmission, which is suitable for service scenarios with long links and sensitive response times.

DefaultMQProducer mqProducer = new DefaultMQProducer("test");
// Set the nameSpace address
mqProducer.setNamesrvAddr("127.0.0.1:9876");
mqProducer.setRetryTimesWhenSendFailed(5);
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */."Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

try {
    // Asynchronously send a message to, the main thread will not block, immediately return
    mqProducer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            // Message sent successfully,
        }

        @Override
        public void onException(Throwable e) {
            // The message fails to be sent. This data can be persisted and compensated later}}); }catch (RemotingException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}
Copy the code

When sending messages asynchronously, you must override the callback method to check the result of sending messages.

No matter in synchronous or asynchronous mode, network problems may cause sending failure. In this case, you can set a reasonable number of retries. When a network problem occurs, you can retry automatically. The Settings are as follows:

// Retry times for sending messages synchronously. The default value is 2
mqProducer.setRetryTimesWhenSendFailed(3);
// Retry times for sending asynchronous messages. The default value is 2
mqProducer.setRetryTimesWhenSendAsyncFailed(3);
Copy the code

0x02. Broker storage phase

By default, as soon as a message arrives at the Broker, it is stored in memory first, and an acknowledgement response is immediately returned to the producer. The Broker then periodically brushes a batch of messages asynchronously from memory to disk.

This method reduces I/O times and achieves better performance. However, if the machine is powered off or abnormally outages, messages will be lost before they are flushed into the disk in time.

To ensure that the Broker does not lose messages and ensure the reliability of messages, you need to change the message saving mechanism to synchronous flush mode, that is, the message storage disk will return a response.

Modify the Broker configuration as follows:

ASYNC_FLUSH flushDiskType = SYNC_FLUSH by defaultCopy the code

If the Broker does not flush within the synchronous flush time (5s by default), sendStatus. FLUSH_DISK_TIMEOUT is returned to the producer.

Cluster deployment

To ensure availability, brokers are usually deployed in a master, slave mode. To ensure that messages are not lost, messages also need to be copied to the slave node.

By default, when a message is written to the master successfully, an acknowledgement response is returned to the producer, and the message is then copied asynchronously to the slave node.

Note: Master configuration: flushDiskType = SYNC_FLUSH

If the master fails suddenly and cannot be recovered, messages that have not been copied to the slave will be lost.

To further improve the reliability of the message, we can adopt synchronous replication. The master node will wait for the replication to complete before returning an acknowledgement response.

The differences between asynchronous replication and synchronous replication are as follows:

Note: Do not be misled by the figure above. Only one replication mode can be configured for the broker master. The figure above explains the concepts of synchronous and asynchronous replication.

The Broker Master node synchronous replication configuration is as follows:

## Default ASYNC_MASTER brokerRole=SYNC_MASTERCopy the code

If the slave node does not return a synchronized response within the specified time, the producer will receive the return status of sendStatus.flush_slave_TIMEOUT.

summary

Combining the production phase with the storage phase, to ensure that messages are not lost, the broker needs the following configuration:

BrokerRole = flushDiskType = master brokerRole= SYNC_FLUSH brokerRole= Master brokerRole= Slave flushDiskType = SYNC_FLUSHCopy the code

We also need the producer’s cooperation to determine whether the return status is sendStatus.send_OK. For other states, you need to consider compensating retry.

Although the above configuration improves the high reliability of messages, it degrades the performance and requires a comprehensive selection in production practice.

0x03. Consumption phase

The consumer pulls the message from the broker and then executes the corresponding business logic. Once successful, will return ConsumeConcurrentlyStatus. CONSUME_SUCCESS state to the Broker.

If the Broker does not receive a consumer confirmation response or other status, the consumer pulls the message again and tries again. In this way, consumers can effectively avoid abnormal consumption process, or messages lost in network transmission.

The code for message consumption is as follows:

// Instantiate the consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");

// Set the address of NameServer
consumer.setNamesrvAddr("namesrvAddr");

Subscribe to one or more topics and tags to filter messages that need to be consumed
consumer.subscribe("test_topic"."*");
// Register the callback implementation class to handle messages pulled back from the broker
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // Execute business logic
        // Mark that the message has been successfully consumed
        returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the consumer instance
consumer.start();
Copy the code

To consume the message process above, we need to pay attention to the return message status. Only when the business logic executed true success, we can return ConsumeConcurrentlyStatus. CONSUME_SUCCESS. Otherwise, we need to return ConsumeConcurrentlyStatus RECONSUME_LATER, try again later.

0 x04. Summary

Finish see RocketMQ don’t throw the message processing way, turn head to look at this piece of kafka, have found that the solution is the same, the difference is the different parameters configuration.

So the next time the interviewer asks you how does the XX message queue keep you from losing messages? If you haven’t used the message queue, don’t cry, smile at him, calmly analyze which steps will be lost, and then roughly solve the problem.

Finally, we can also say our thinking, although improve the reliability of the message, but may lead to message retransmission, repeated consumption. So for the consumer client, care needs to be taken to ensure idempotency.

But be careful, the interviewer may ask you to talk about how to keep idempotent. Think about it first.

What? You don’t know how to do idempotent yet? Then pay attention to **@ program general affairs **, the next article we will talk about idempotent this topic.

0x05. Reference

  • Geek Time – Message queuing Master class
  • Github.com/apache/rock…

One last word (for attention)

If you find something wrong, please leave a message and point it out to me so that I can modify it.

Thank you again for reading. I am a tool ape downstairs, and I will see you in the next article

Welcome to pay attention to my public account: procedures to get daily dry goods push. If you are interested in my topics, you can also follow my blog: studyidea.cn