1. Overall message processing process

Here, the overall processing phase of the message is divided into three stages for analysis:

Producer Sends messages.

The Broker processes the message phase.

The Consumer message phase.

Producer Sends messages

The sending phase involves network communication between Producer and broker, so there must be a chance of loss of messages. What measures does RocketMQ use to ensure that messages are not lost (or to reduce the possibility of loss) during this phase?

Method 1: Provide SYNC’s way of sending messages and wait for the broker to process the results.

RocketMQ provides three ways to send messages, which are:

Synchronous sending: The Producer sends a message to the broker and blocks the current thread until the broker responds.

Asynchronous sending: The Producer first builds a task to send messages to the broker and submits the task to the thread pool. When the task is finished, the Producer calls back the user-defined callback function and executes the processing result.

Oneway sending: The Oneway mode only sends the request without waiting for the reply, while the Producer only sends the request without processing the response result.

When we call the producer.send method, we do not specify the callback method, so we send the message synchronously by default, which is the sending method with the least chance of loss.

Method 2: If sending a message fails or times out, resend the message.

Send retry source code is as follows, essentially is a for loop, when the send message when an exception occurs. The default number of retries is three. The number of retries can be specified by producer.

Solution 3: Brokers provide multiple master modes to ensure that messages can be delivered to another normal broker even if one goes down.

If the broker has only one node, it will break down. Even if the producer has a retry mechanism, it will not work. Therefore, the multi-master mode is adopted.

conclusion

There are three modes of sending messages for producer, but in order to reduce the possibility of message loss, synchronous sending mode is adopted as far as possible. Synchronous sending + retry mechanism + multiple master nodes are used to minimize the possibility of message loss.

The Broker processes the message phase

Method four: provide synchronous disk flushing strategy

Public enum FlushDiskType {SYNC_FLUSH, // Synchronous flush ASYNC_FLUSH// Asynchronous flush (default)}

After a message is sent to the broker, it is stored in the Page Cache and flushed immediately according to the flushing policy set by the broker. In other words, if the flushing policy is asynchronous, the broker does not wait for the message to be flushed and returns to the producer successfully. This means that when the server on which the broker resides suddenly goes down, some pages of messages are lost.

Method 5: Provide the primary/secondary mode, and the primary/secondary mode supports synchronous dual-write

Even if the broker has a synchronous flush set up, messages can be lost if the main broker disk is damaged. Therefore, you can specify slave to the broker, set master to SYNC_MASTER, and set slave to the synchronous flush policy.

In this mode, every time a producer sends a message, the broker considers the message to be successfully delivered only after both the master and slave are successfully sent to the disk, so as to ensure the rest of the message is not lost.

conclusion

On the broker side, the likelihood of message loss is mainly due to flush policies and synchronization mechanisms. The default flushing policy of the RocketMQ broker is asynchronous. If there is a master/slave, the synchronization policy is asynchronous by default. This improves the efficiency of the broker’s processing of messages, but there is a risk of loss. Therefore, you can use the synchronous flush policy, synchronous slave policy, and master/slave policy to solve the possibility of message loss.

The Consumer message phase

Measure 6: By default, consumers provide At least Once

Sending messages from the producer to the broker, even though these processes ensure normal persistence of messages, does not mean the message is absolutely reliable if the consumer consumes the message and does not consume it. By default, RockerMQ provides At least Once to ensure that messages are consumed reliably.

What is At least Once?

The Consumer pulls the message locally and returns an ACK to the server when the consumption is complete.

Generally, the ACK mechanism for consuming messages is divided into two ideas:

1. Submit before consumption;

2. Consume first and submit after successful consumption;

The first idea can solve the problem of repeated consumption but will lose the message, so Rocketmq default implementation of the second idea, by the respective consumer business party to ensure idempotent to solve the problem of repeated consumption.

Method 7: consume message retry mechanism

If the consumption message fails, it is not fully reliable consumption without providing the ability to retry the message, so RocketMQ itself provides the ability to re-consume the message.

conclusion

At least Once+ consumption retry mechanism is used to ensure the reliability of consumer messages.

2. How to ensure that messages are not consumed repeatedly

To answer this question, first of all, you should not hear about the repeated news, so you don’t know anything about it. First, you should outline what kind of repeated consumption problems may occur.

First of all, for example, RabbitMQ, RocketMQ, Kafka, it is possible to have the problem of repeated message consumption. Because this problem is usually not guaranteed by MQ itself, but by our development. Take a Kafka as an example and talk about repeat consumption.

Kafka has an offset for every message that is written to it. The consumer consumes the data and then at regular intervals submits the offset for the message that he has consumed. I have already spent, next time I restart something, you let me continue to spend from the last time to offset.

But there are always accidents, for example, we often encountered before production, is you sometimes restart the system, see how you restart, if encountered a bit anxious, directly kill the process, and then restart. This will cause the Consumer to have some messages processed, but not enough time to submit the offset. After the restart, a few messages will be consumed again.

Here’s a scenario. Kafka will assign an offset to each of the three data pieces, which represents the sequence number of the data. We assume that the offset is 152/153/154. When consumers consume from Kafka, they consume in this order. If the consumer consumes the offset=153 and is about to submit the offset to ZooKeeper, the consumer process is restarted. In this case, the offset of half of the consumed data is not committed, and Kafka does not know that you have consumed offset=153. So after the reboot, the consumer will come to Kafka and say, hey, dude, you give me that and then send me the data from where I spent the last time. Since the previous offset was not successfully submitted, the data half will be sent again, and if the consumer does not redo at this time, it will lead to repeated consumption.

If all the consumer does is take a piece of data and write a piece of data into the database, it’s going to say, well, maybe you just inserted half of the data into the database twice, and the data is wrong.

In fact, it’s not so scary to have repeated consumption, but what’s scary is that you don’t think about how to guarantee idempotent after repeated consumption.

Let me give you an example. Suppose you have a system that consumes one message and then inserts one data into the database, and if you repeat one message twice, you’re inserting two, and you’re inserting the wrong data. But if you consume to the second time, you judge whether it has been consumed, if it is thrown directly, so as not to retain a data, so as to ensure the correctness of the data.

When a piece of data is repeated twice, there is only one piece of data in the database, which ensures the idempotent of the system.

Idempotent, in general terms, if you take a piece of data, or a request, and give it to you multiple times, you have to make sure that the corresponding data doesn’t change, you can’t make mistakes.

So the second question is, how do you guarantee idempotent message queue consumption?

In fact, we still have to think in combination with the business. Here are some ideas:

For example, if you take data and you want to write to the library, you look it up against the primary key, and if it’s already there, you don’t insert it, you update it.

If you say Redis, that’s fine, it’s always set, natural idempotent.

For example, if you are not in the above two scenarios, it will be a little more complicated. You need to ask the producer to add a globally unique ID, such as the order ID, to each data sent by the producer. Then after you consume the data here, first check the id in Redis, have you ever consumed before? If it hasn’t been consumed, you process it, and then the ID is Redis. If you do, don’t process it. Just make sure you don’t process the same message twice.

For example, database based unique keys to ensure that duplicate data is not inserted multiple times. Because of the unique key constraint, repeated data inserts will only cause errors and will not cause dirty data in the database.