In the previous article, we learned about the message retry mechanism in RocketMQ and how retry messages are processed at both Producer and Consumer.

RocketMQ pushes messages to the consumer for message retries according to certain rules during message consumption. Here comes the concept of message idempotent.

First let’s look at what idempotent is and what message idempotent is.

What is idempotent

Baidu explains “idempotent” as follows

Let f be a unary operation that maps from X to X, then f is idempotent, if f(f(X)) = f(X) for all X in X. In particular, the identity function must be idempotent, and any constant function is idempotent.

The key here is that f(f(x)) = f(x).

An operation is idempotent if it is performed multiple times and has the same effect as a single execution.

About message idempotent

Based on the above concepts, combined with the message consumption scenario, we can easily summarize the concept of message idempotent:

That is:

The process is said to be message idempotent if the message is retried multiple times and the consumer consumes the repeated message multiple times as well as once, and multiple consumption has no side effects on the system.

Such as:

In the payment scenario, the message of consumer consumption deduction is used for the deduction operation of an order, which requires deduction of 10 yuan.

The effect of this deduction operation repeated for many times is the same as that of one time, only one real deduction is made, and there is only one deduction flow corresponding to this order in the user’s deduction record. No more. Then we say that the debit operation is in order and that the consumption process is message idempotent.

A scenario where message idempotent is required

First, let’s review the scenario where message idempotent is required, which is the message repetition scenario mentioned in the previous article.

  1. Repeat when sending:

When the producer sends a message, the message is successfully delivered to the Broker. However, the network is intermittently disconnected or the producer is down, causing the Broker to fail to send ACK. Since the producer did not receive a message sending response, it considers the message to have failed and tries to resend the message to the broker. When the Message is successfully sent, there are two messages with the same content in the broker, and the consumer eventually pulls two messages with the same content and the same Message ID. This results in repeated messages.

  1. Repeat when consuming:

Repeated consumption also occurs when consuming messages. When a consumer returns the consumptionstatus to the broker while processing a transaction, the CONSUME_SUCCESS status of the consumptioncompletion cannot be returned to the broker due to anomalies such as network outages. In order to ensure that the Message is consumed at least once, the broker will re-deliver the processed Message after the network environment recovers. As a result, consumers will receive messages with the same content and same Message ID many times, resulting in Message duplication.

As you can see, both sending and consuming repetitions end up with the consumer receiving repeated messages while consuming, so we know that message idempotent can be achieved only by uniformly idempotent processing on the consumer side.

Implement message idempotent

So how do you implement message idempotent?

First we define two elements of the message idempotent:

  1. Idempotent token
  2. Handle uniqueness assurance

To be considered successful, we must ensure the uniqueness of business processing results in the presence of idempotent tokens.

The two elements are explained separately

Idempotent token

Idempotent tokens are established agreements between producers and consumers. In a business, they are usually strings with unique business identifiers, such as order numbers, serial numbers, etc. It is generally generated by the producer end and passed to the consumer end.

Handle uniqueness assurance

That is, the server must adopt certain policies to ensure that the same service logic cannot be successfully executed multiple times. For example: use Alipay to pay, buy a product to pay multiple times will only be successful.

The more common approach is to use cache de-duplication and idempotent by adding a unique index of the database to the business identity.

The specific idea is as follows: In a payment scenario, the payment originator generates a payment serial number. After the server processes the payment request successfully, the data persistence succeeds. As a unique index is added to the payment flow in the table, duplicate entry will be reported due to the existence of the unique index in duplicate payment. The business logic of the server catches the exception and returns the “duplicate payment” message on the calling side. That way you don’t have to double deduct.

Based on the above scenario, we can also introduce cache components such as Redis to implement de-duplication: When the payment request is sent to the server, the cache is first used for judgment, and the value stored by GET is determined according to key= “payment serial number”. If the return is null, It is the first time that the payment operation is performed and the current payment serial number is used as key and value. It can be any string and stored in Redis through set(Key,value,expireTime).

When a duplicate payment request arrives, a get(payment serial number) operation is attempted. This operation hits the cache, so we can consider the request to be a duplicate payment request, and the server business returns the business hint of duplicate payment to the requester.

Since we typically set expiration times during cache use, the cache can fail and cause requests to penetrate persistent storage (e.g., MySQL). Therefore, the introduction of caching should not eliminate the use of unique indexes, and a combination of the two is a good solution.

How to handle message idempotent in the RocketMQ scenario

With two elements and a typical case, let’s return to the message consumption scenario.

As a high-performance messaging middleware, RocketMQ ensures that messages are not lost but not duplicated. It is possible to implement message de-duplication in RocketMQ, but considering the high availability and high performance requirements, if message de-duplication is implemented on the server side, RocketMQ will need to perform additional rehash and sorting operations on messages, which will cost a lot of time, space and other resources, and the benefits are not obvious. RocketMQ takes into account the low probability of duplicate messages under normal circumstances, so RocketMQ leaves message idempotent operations to the business side.

In fact, the essence of the above problem lies in the fact that the network call itself is uncertain, which is neither successful nor failed in the third state, the so-called processing state, so there will be repeated situations. This problem is also encountered by many other MQ products, and the common approach is to require the consumer to de-iterate when consuming the message, which is what we refer to as consumption idempotency in this article.

Readers with some experience with RocketMQ may have noticed that each message has a MessageID, so can we use that ID as a basis for de-duplication, namely the idempotent token mentioned above?

The answer is no, because MessageID may conflict, so it is not recommended to use MessageID as the processing basis, but should use unique business identifiers such as order number and serial number as the key basis for idempotent processing.

Also mentioned above, the power basis shall be generated by the message producer, such as when sending a message, we can through the message key set the id, the corresponding API for org.apache.rocketmq.com mon. Message. SetKeys (String keys) code is as follows:

    Message sendMessage = new Message(
                    MessageProtocolConst.WALLET_PAY_TOPIC.getTopic(),
                    message.getBytes());Copy the code
    sendMessage.setKeys("OD0000000001");Copy the code

When news consumers receive the message, according to the key do idempotent processing the message, mon API for org.apache.rocketmq.com. The message. The getKeys () code is as follows:

(MSGS, context) -> {try {for (MessageExt MSG: MSGS) {String key = msg.getKeys(); return walletCharge(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) {logger. error(" wallet debit Exception,e={}", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; }}Copy the code

With getKeys(), the consumer can read the idempotent basis set by the producer (such as the order number, etc.) and then the business logic can idempotent around that ID.

If you feel like every time you have to setkey on the producer side, getkey on the consumer side, that’s a little tedious. The idempotent basis can also be set in the message protocol, and the consumer can parse the ID to perform idempotent operations after receiving the message. All it takes is for the producer and consumer of the message to agree on how to resolve the ID.

The exact idempotent logic depends on the scenario used, and I’m trying to draw some conclusions from my experience here.

Idempotent operations common on the consumer side

  1. Perform status query before performing service operations

When the consumer starts to perform business operations, it first queries the business status by idempotent ID, for example, modifying the order status. When the order status is successful/failed, no further processing is required. So we only need in consumption logic order status by the order number query before execution, once get submitted to determine the order status of the message, inform the broker message status to: ConsumeConcurrentlyStatus. CONSUME_SUCCESS.

  1. Data retrieval before service operations

The logic is similar to the first point, that is, data is retrieved before consumption. If the corresponding data can be queried through the unique business ID, there is no need to carry out the subsequent business logic. For example, in the ordering process, consumers can first check whether the order exists through the order number before placing an asynchronous order, which can be checked in the database or cache. If there is, the consumption success is returned directly, otherwise, the order operation is carried out.

  1. The uniqueness constraint guarantees the last line of defense
The second operation does not guarantee must not duplicate data, such as: concurrent insert scenarios, if not optimistic locking, distributed lock as guarantee under the premise of data is very likely to be repeated injection operation, so we must add the uniqueness of idempotent id index, so it can guarantee in concurrent scenarios also can guarantee the uniqueness of the data.Copy the code
  1. Introducing locking
In the first point above, in the case of concurrent update, if the update is carried out without pessimistic lock, optimistic lock, distributed lock and other mechanisms, it is likely that multiple updates will lead to inaccurate status. For example, to update the order status, the service requires that the order can only be updated from the initial -> Processing, processing -> Success, processing -> failure, not across the status update. If there is no locking mechanism, it is likely that initialized orders will be updated as successful, successful orders will be updated as failed, and so on. In the case of high concurrency, it is recommended to define service state changes through state machines. Optimistic locking and distributed locking mechanisms are used to ensure that the results of multiple updates are determined. Pessimistic locking is not recommended in the case of concurrency, which is not conducive to the improvement of service throughput.Copy the code
  1. Message log sheet
This scheme is similar to the idempotent operation done by the business layer. Since our message ID is unique, we can use this ID to carry out message deduplication operation and indirectly realize consumption idempotent operation. First prepare a message form, at the same time in the consumption of successful insert a has been successfully processed the message id of the record to the table, the notice must be * * * *, and business operations in the same things when new messages arrive, according to the new message id query whether the id already exists in the table, if there is suggests that message has been consumption, Then, you can discard the message and no further business operations can be performed. .Copy the code

There are certainly more scenarios that I haven’t covered, but the operations described here are all related to each other, and working together can help keep the consumer business idempotent.

Either way, keep one rule in mind: caching is unreliable, and queries are unreliable.

In high concurrency scenarios, make sure to persist unique indexes and introduce locking mechanisms as a last line of defense for data accuracy and integrity!

conclusion

This paper mainly explains what is idempotent and how to pass unique idempotent ID in message consumption scenario, further analyzes how to guarantee message idempotent and summarizes common message idempotent processing methods.

Routines are changeable, the key is to grasp the ideas and methods, our principle is that no matter how many times to perform, show business behavior is unified, under this premise, we introduced the operation before check before operation libraries, cache, optimistic locking/distributed lock mechanism, to join the only index such as multiple prevent replay strategy, through a combination of such strategies, Finally, the message idempotent is achieved.

Finally, there is a word to share. There is no art to be found. There is no art without art. I believe that you will be smart in the technical road combined with the actual scene will be all kinds of technical means through, so as to go further and further.