Message-oriented middleware is a common component of distributed system. It is widely used in asynchronous, decouple and peak cutting. We often think of message-oriented middleware as a reliable component — by reliable I mean that as long as I successfully deliver a message to message-oriented middleware, the message will not be lost, that is, the message will be guaranteed to be successfully consumed by consumers at least once. This is one of the most basic features of message-oriented middleware. A message will be successfully consumed AT LEAST ONCE.
, for example, A message M sends the message middleware and message delivery to the consumer process A, A have received A message, and then to spend, but at the time of consumption to A half application restart, this time the news was not marked as consumption is successful, the news will continue to be delivered to the customer, until its consumer, message middleware will stop delivery.
However, because of this reliability, messages can be delivered more than once. For example, after receiving the message M and completing the consumption logic, program A is about to notify the message middleware that “I have consumed successfully”, and the program restarts. Therefore, the message has not been consumed successfully for the message middleware, so it will continue to deliver. For application A, it looks like the message was consumed successfully, but the message middleware is still delivering it again.
In the RockectMQ scenario, this means that messages with the same messageId are delivered repeatedly.
Message-based delivery reliability (messages not lost) is a higher priority, so the task of not heavy messages is shifted to application self-implementation, which is why the RocketMQ documentation emphasizes the need for self-realizable idempotent consumption logic. The logic behind this is that no loss and no weight are contradictory (in distributed scenarios), but there is a solution to message duplication, and message loss is troublesome.
Simple message deduplication solution
For example, suppose the message consumption logic of our business is to insert data from an order table and then update inventory:
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';
Copy the code
To implement idempotent messages, we might do something like this:
select * from t_order where order_no = 'order123' if(order ! = null) { return ; // Message is repeated, return directly}Copy the code
This works well in many cases, but can be problematic in concurrent scenarios.
Concurrent repeated message
Assuming that the consumer all the code for up to 1 second, there are duplicate messages in this arrived within 1 seconds (assuming 100 milliseconds) (such as producers fast retransmission, Broker restart, etc.), it is likely, to heavy code above it will find that the data is still empty (because we haven’t a message on the consumption, haven’t updated the order status),
This can penetrate the checks and eventually cause duplicate message consumption logic to enter non-idempotent business code, causing duplicate consumption problems (e.g. primary key collisions throw exceptions, inventory is repeatedly deducted without being released, etc.)
One of the solutions to concurrent deduplication
To solve the message idempotent problem in the concurrent scenario above, a desirable solution is to turn on transactions and change select to select for UPDATE, locking records.
Select * from t_order where order_no = 'THIS_ORDER_NO' for update = null) { return ; // Message is repeated, return directly}Copy the code
However, the logic of such consumption may increase the total message consumption and decrease the concurrency due to the introduction of transaction wrapping.
Of course, there are other more advanced solutions, such as optimistic locking for updating order status and re-consuming messages if the update fails. However, this requires more complex and detailed code development and library table design for specific business scenarios, which is beyond the scope of this article.
However, both the select for Update and optimistic locking solutions are actually based on the business table itself, which undoubtedly increases the complexity of business development. A large part of the request processing in a business system depends on MQ. This is tedious work if every consumption logic itself needs to be de-duplicated/idempotent based on the business itself. This paper aims to explore a general message idempotent processing method, so as to abstract a certain tool class for each business scenario.
Exactly Once
In messaging middleware, there is a concept of delivery semantics, and this semantics has what is called “Exactly Once”, meaning that a message is certain to be successfully consumed, and only Once. Here’s what Exactly Once explains to Aliyun:
Exactly-Once means that a message sent to the message system can be processed by the consuming end only Once. Even if a message is re-delivered at the production end, the message is consumed only Once at the consuming end.
In our domain of business message idempotent processing, where it can be assumed that the code for a business message is certain to be executed, and only executed Once, we can assume Exactly Once.
But it’s almost impossible to find a common solution in a distributed scenario. However, if you are targeting consumption logic based on database transactions, it actually works.
Insert message tables based on relational database transactions
Suppose the message consumption logic of our business is: update the status of an order table in the MySQL database:
update t_order set status = 'SUCCESS' where order_no= 'order123';
Copy the code
Exaclty Once To implement Exaclty Once, i.e. the message is Exaclty Once consumed (and is definitely guaranteed to be consumed Once), we can do the following: Add a message consumption record table to the database, insert the message into the table, and commit the original order update with the insert action in the same transaction to ensure that the message will only be consumed once.
1, start transaction 2, insert message table (deal with primary key conflict) 3, update order table (original consumption logic) 4, commit transaction
Description:
1. If the message is consumed successfully and the transaction is committed, then the message table is successfully inserted. Even if RocketMQ has not received the update of the consuming site, the message will fail to be inserted and the consuming site will be considered consumed. This ensures that our consuming code is executed only once. 2. If the service hangs (such as restart) before the transaction is committed, the local transaction is not executed so the order is not updated and the message table is not inserted successfully; In the Case of the RocketMQ server, the consumption point is not updated, so the message will continue to be delivered, and the message will be successfully inserted into the message table when it is delivered, so it can continue to be consumed. This ensures that messages are not lost.
In fact, the implementation of EXACTLY-ONCE semantics of Aliyun ONS is similar to this scheme based on the transaction characteristics of database. For more information: help.aliyun.com/document\_d…
In this way, it is indeed capable of scaling up to different application scenarios because its implementation is independent of the business itself — it relies on a message table.
But there are limitations
Message consumption logic must be dependent on relational database transactions. If the consumption process also involves modification of other data, such as Redis, a data source that does not support transaction features, the data cannot be rolled back. 2, the database data must be in a library, cross-library can not solve
Note: For business purposes, the message table design should not be identified by the message ID, but rather by the business primary key of the business, in order to cope with retransmissions by the producer. Message de-duplication on Ali Cloud is just a RocketMQ messageId and does not have the effect of de-duplication/idempotent (because of different message ids) in a scenario where the producer manually resends for some reason (e.g. upstream repeated requests for a transaction).
More complex business scenarios
As mentioned above, the implementation of Exactly Once semantics in this way actually has many limitations, which make this scheme basically worthless for wide application. And because it is based on transaction, it may lead to performance problems such as long time locking table.
For example, a common order application message may have the following steps (collectively referred to as Step X) :
1, check the inventory (RPC) 2, lock inventory (RPC) 3, open transaction, insert the order form (MySQL) 4, call some other downstream services (RPC) 5, 6 update order status and commit transaction (MySQL)
In this case, if we adopt the message table + local transaction implementation, many sub-processes in the message consumption process do not support rollback, which means that even if we add the transaction, the operation behind it is not atomic. It is possible that the service is restarted after the first step locks the inventory in the second step, and the inventory is actually locked in another service, which cannot be rolled back. Of course, the message will be delivered again, ensuring that the message is consumed at least once. In other words, the RPC interface that locks the inventory itself still supports idempotence.
Furthermore, if the transaction is wrapped in this time-consuming long chain scenario, the concurrency of the system will be greatly reduced. Therefore, the usual way to deal with message de-duplication in this scenario is to implement the de-duplication logic of the business itself, such as adding select for UPDATE in front, or using optimistic locking.
Is there a way we can extract a common solution that is scalable, versatile, and high-performance?
Disassemble the message execution process
One idea is to break the above steps into several different sub-messages, for example:
Consumption A: 1, the inventory system to check inventory and do lock, sending A message to order service 2 B, ordering system consumption news B: insert the order form (MySQL), send message to their C (downstream system) consumption 3, downstream system message C: logic processing parts, send A message to D order system 4, order message consumption D: Update order Status
Note: The above steps need to ensure that local transactions and messages are transactional (or at least ultimately consistent), which covers topics related to distributed transaction messages that are not covered in this article.
You can see that this approach makes each step of the operation atomic, and atomic means small transactions, and small transactions means the message table + transaction scenario is feasible.
However, this is too complicated! This split an otherwise continuous code logic into multiple system multiple message interactions! It would be better to implement locks at the business code level.
A more general solution
The message table + local transaction scheme has its limitations and concurrency limitations because it relies on the transactions of a relational database and must be wrapped around the entire message consumption process.
If we can implement message de-duplication without relying on transactions, then the scheme can be extended to more complex scenarios such as RPC, cross-library, etc.
For example, if we still use the message table, but do not rely on transactions, and instead increase consumption state against the message table, would that solve the problem?
A non-transactional scheme based on message idempotent table
67_1.png
The above is the process of message idempotent scheme after detransactionalization. It can be seen that this scheme has no transaction, but makes a state distinction for message table itself: consuming in, consuming completed. Only messages that have been consumed are idempotent. In the case of existing consumption, delayed consumption is triggered by subsequent repeated messages (to a RETRY TOPIC in the case of RocketMQ). Delayed consumption is triggered to control the loss of a second message while the first message is incomplete in the concurrent scenario (if directly idempotency, The message will be lost (with the same message ID), because if the last message has not been consumed by the time the second message has been told to the broker that it was successful, the first message will not be redelivered.
The github source code is available at github for reference. Here we go back to see if the problem we were trying to solve in the first place was solved:
1. The message has been consumed successfully, and the second message will be directly idempotent (consumed successfully). 2. Messages in concurrent scenarios can still avoid message repetition, that is, penetrating idempotent baffles. 3. Business repeated message idempotent problem that supports upstream business producer retransmission.
The first question has obviously been settled and will not be discussed here.
How was the second problem solved? If we use MySQL as the storage medium of the message table (set the unique ID of the message as the primary key), then only one message will succeed in the insert action, and the following message inserts will fail due to the primary key conflict, and go to the branch of delayed consumption. Then the later delay of consumption becomes the problem of the first scenario above.
On the third question, we can simply design the message key to support the primary key of the business (such as order number, request sequence number, etc.) rather than just the messageId. So it’s not a problem.
Is there a risk of message loss in this scenario?
A careful reader may notice that there is actually a logical loophole in the second of the three problems discussed above (concurrency scenarios), where we rely on the state of the message to do concurrency control so that repeated second messages are constantly delayed consumption (retry). But what if the first message fails because of some exception (the machine restarts, an external exception causes the consumption to fail)? This means that delayed purchases are actually seen as _ in _ every time, and the purchase is considered a failed purchase and posted to a dead letter Topic (RocketMQ can repeat purchases up to 16 times by default).
It is right to be concerned! Our solution to this problem is that the inserted message table must have a maximum consumption expiration time, such as 10 minutes, meaning that if a message has been consumed for more than 10 minutes, it needs to be removed from the message table (implementation required). So the final flow of the message would look like this:
67_2.png
More flexible message table storage medium
In fact, we have no transaction in this scheme, only need a central storage medium, so naturally we can choose a more flexible storage medium, such as Redis. Using Redis has two benefits:
2. The timeout time mentioned above can be directly realized by using THE TTL of Redis itself
Of course, Redis stores data reliability, consistency and other aspects are not as good as MySQL, users need to choose by themselves.
Source: RocketMQDedupListener
The Java implementation of RocketMQ is available on Github. For details, see github.com/Jaskey/Rock… .
Here is just one example of Redis de-duplication in Readme to illustrate how easy it is to add messages to de-idempotent businesses using this tool:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(" test-app1 "); consumer.subscribe("TEST-TOPIC", "*"); String appName = consumer.getConsumerGroup(); StringRedisTemplate StringRedisTemplate = null; / / here to omit the process of obtaining StringRedisTemplate DedupConfig DedupConfig = DedupConfig. EnableDedupConsumeConfig (appName, stringRedisTemplate); DedupConcurrentListener messageListener = new SampleListener(dedupConfig); consumer.registerMessageListener(messageListener); consumer.start();Copy the code
Most of the above code is required for the original RocketMQ, the only change is to create a DedupConcurrentListener example in which you specify your consumption logic and the business key (messageId by default).
See the Github instructions for more details.
Is this realization for good?
At this point, it seems that the scheme is quite perfect, all messages can be quickly accessed and de-duplicated, and it is completely decoupled from the specific business implementation. Is this a perfect way to complete all the tasks?
Unfortunately, it’s not. The reason is simple: because the message is guaranteed to be successfully consumed at least once, there is a chance that the message will fail halfway through consuming and trigger the possibility of message retries. Or the order process X above:
1, check the inventory (RPC) 2, lock inventory (RPC) 3, open transaction, insert the order form (MySQL) 4, call some other downstream services (RPC) 5, 6 update order status and commit transaction (MySQL)
When message consumption reaches Step 3, we assume that a MySQL exception caused a failure and trigger message retry. Since the idempotent table is deleted before the retry, the message reenters the consuming code when it is retried, and steps 1 and 2 are executed again. If step 2 itself is not idempotent, then the business message consumption is still not fully idempotent.
What is the value of this implementation?
So since this doesn’t complete the message idempotent, what’s the value? The value can be great! While this is not a silver bullet for message idempotence (in fact, there are few silver bullets in software engineering), it can be done in a convenient way:
1. Repeated problems of message redelivery due to Broker, load balancing, etc
Business level message duplication caused by various upstream producers
3, repeated message concurrent consumption control window problem, even if repeated, repeated can not enter the consumption logic at the same time
Some other messages to heavy suggestions
That is, using this approach ensures that under normal consumption logic scenarios (no exceptions, no exception exits), all idemidematism of the message is resolved, both in terms of business duplication and the duplication brought about by the RocketMQ feature.
In fact, this solves 99% of the message duplication problem, since the exception scenarios are definitely rare. If you want to handle idempotent problems in abnormal scenarios, you can do the following to reduce the problem rate:
1. Rollback the message consumption failure. If message consumption failure itself brings back a roll mechanism, message retries have no side effects. 2, consumers do a good job of elegant exit processing. This is to avoid message retries as much as possible due to program exit in the middle of message consumption. 3, some operations cannot be idempotent, at least to terminate consumption and alarm. Lock the operation of the inventory, for example, if a unified business running water lock a successful inventory, trigger lock stock again, if not the processing of idempotent, at least to do trigger the exception message consumption (such as primary key conflict resulting in abnormal consumption, etc.) 4, on the premise of # 3 well, good news consumption monitoring, found the message retry constantly fail, manual rollback completes the # 1, The next retry consumption succeeds.