In every age, there is no ill-treatment of those who can learn.

Hello, I’m Yes.

Today we are going to talk about transaction messages in message queues. When it comes to transaction messages, everyone is familiar with ACID.

ACID is a strict definition of the implementation of a transaction. However, in a single system, transactions are not strictly implemented in accordance with ACID constraints, let alone distributed systems.

Distributed systems can only compromise to ensure the final consistency of data integrity and consistency, the main reason is that strength does not allow… Because usability is king.

It is also very expensive to ensure a full version of the transaction implementation. If you think about maintaining so much system data, no intermediate state data can be read, and all operations must be indivisible, which means that the execution of a transaction is blocked and resources are locked for a long time.

In the case of high concurrency, resources are tied up for long periods of time, which can be fatal. To take a taste example, the toilet rush hour, which is well understood.

ACID, ACID, ACID, ACID, ACID, ACID, ACID, ACID, ACID

Distributed transaction

When it comes to distributed transactions, the common ones are 2PC, TCC, and transaction messages. This article will focus on transaction messages, but 2PC and TCC will be mentioned briefly.

2PC

2PC is a two-stage commit, with two roles of coordinator and participant respectively. The two stages are the preparation stage and the commit stage respectively.

Preparation phase is to prepare to command coordinator sent to all participants, the participants at this stage in addition to transaction commit what have done, and the commit phase is coordinator at each participant preparation stage o not ok, if there is ok then send all participants to submit the order, if there is a not ok then send the rollback command.

The point here is that 2PC only applies to database level transactions. What does that mean? If you want to write a data in the database and upload a picture at the same time, these two operations 2PC cannot guarantee that the two operations meet the transaction constraints.

And 2 PC is a kind of strong consistency of the distributed transaction, it is a synchronized block, namely before receiving to commit or rollback command, all participants are waiting for each other, especially the execution of the preparation stage, the state of resources are locked, if have a participant card for a long time, the other participants have to wait for it, Generates a block in the resource lock state for a long time.

Overall efficiency is low, and there is a single point of failure problem, a coordinator is the single point, and the risk of data inconsistency in extreme conditions exist, such as some participants submit commands are not yet received, is down at this time, after the recovery data is rolled back, and other participants are actually carried out the command to commit the transaction.

TCC

TCC enables business level transactions, which means that it can do things like upload images, not just database level.

TCC is divided into three stages: try-confirm-cancel. Simply speaking, each service needs these three methods. The try method is executed first. For example, if you want to add 10 points, add the 10 points in the pre-added field first. At this time, the points in the user’s account actually do not increase.

Then, if both tries succeed, confirm is performed, and everyone does the actual business operation, and if one of the tries fails, cancel is performed to undo the change.

It can be seen that TCC has great coupling to the business, because the business needs some transformation to complete the three methods, which is actually the disadvantage of TCC. In addition, the confirm and cancel operations should be idempotent, because there is no retreat when the two steps are performed and they must be completed, so a retry mechanism is needed. So you have to keep it idempotent.

Transaction message

Transaction message is the protagonist of today’s article, it is mainly suitable for asynchronous update scenarios, and data real-time requirements are not high.

Its purpose is to solve the problem of data consistency between message producers and message consumers.

For example, when you order takeout, we first select fried chicken and add it to our shopping cart, then select a bottle of Coke, and then place an order. After paying, the process is over.

And the data in the shopping cart is very suitable for asynchronous deletion with the message notification, because generally speaking, we will not click on the menu of the shop after placing the order, and even if we click on the cart there are still these dishes, it does not matter, the impact is not big.

The hope is that the cart will eventually be deleted after the order is placed, so the point is that the order and the message will either work or they will not.

RocketMQ transaction messages

Let’s take a look at how RocketMQ implements transactional messages.

RocketMQ’s transaction message can also be thought of as a two-phase commit, which simply means that a half-message is sent to the Broker at the start of the transaction.

A semi-message means that the message is not visible to the Consumer and is not in the actual queue to be sent, but in a special queue.

The local transaction is executed after the half-message is sent, and the result of the local transaction determines whether to send a commit message or a rollback message to the Broker.

What if someone says this step failed to send a commit or rollback message?

The Broker will periodically check with Producer whether the transaction was successful or not. The Producer needs to expose an interface through which the Broker can determine whether the transaction has been successfully executed or not. Multiple queries are made.

If successful, the half-message is put back on the normal queue to be sent, so that the consumer can consume the message.

Let’s take a quick look at how to use it. I’ve simplified it according to the sample code on the official website.

As you can see, it’s easy and intuitive to use, just add a method to check back the result of the transaction and write the local transaction execution process in a TransationListener.

The general flow of a RocketMQ transaction message is now clear, so let’s draw an overall flow chart to walk through it. In fact, at step 4 the message will either be normal or nothing will be discarded, at which point the transaction message will end its life cycle.

RocketMQ transaction message source code analysis

Then we’ll look at the source code to see how this works. First we’ll take a look at the sendMessageInTransaction method. The method is a bit long, but the structure is clear.

The process, which we analyzed above, fills the message with properties that indicate whether the message is a half-message, sends it to the Broker, executes the local transaction, and then sends the execution status of the local transaction to the Broker. Let’s now look at how the Broker handles the message.

This half-message request is handled in the Broker’s SendMessageProcess #sendMessage. Since today’s analysis is mainly about transaction messages, other processes do not. I will outline the principle.

PROPERTY_TRANSACTION_PREPARED: Messageconst.property_transaction_prepared: Messageconst.property_transaction_prepared: True After determining whether the message has been consumed more than the maximum, whether it is delayed, and whether the Broker accepts the transaction message, the real topic and queue of the message are stored in the properties, and the topic of the message is reset to RMQ_SYS_TRANS_HALF_TOPIC. And the queue is 0, making it impossible for consumers to read the message.

The above is the whole process of processing half message, we have a look at the source code.

It is to wave civet cat for prince, in fact, the delay message is so realized, will eventually change the skin of the message into the disk.

The Broker handles the submission or rollback message with EndTransactionProcessor#processRequest. Let’s take a look at what it does.

As you can see, if the transaction is committed, the peel is swapped back and written to the queue of the real topic for consumption, while if the transaction is rolled back, the half-message is logged to a half_OP topic, and the background service scans the half-message to determine that the message has been processed.

The background is TransactionalMessageCheckService service, it will regularly scan half the message queue, and ask for the check interface check transaction succeeded, The concrete execution is TransactionalMessageServiceImpl# check method.

Let me briefly describe the process, this step actually involves a lot of code, I will not post the code, interested students to learn. But I’m sure you can say it in words.

First fetch all the queues under RMQ_SYS_TRANS_HALF_TOPIC. If you remember, the queue to which the half-message was written is the one whose ID is 0. Then fetch the queue corresponding to the half_op topic. That is, the queue under the topic RMQ_SYS_TRANS_OP_HALF_TOPIC.

The main purpose of this half_op is to record that the transaction message has been processed, that is, a message that knows whether the transaction message was committed or rolled back will be recorded in the Half_OP.

The fillOpRemoveMap method is called to retrieve a batch of processed messages from half_op, and putBackHalfMsgQueue is written to the Commitlog for those half-message calls that are not recorded in half_op. It then sends a transaction backcheck request, which is also oneWay, meaning that it does not wait for a response. Of course, the consumption offset of the half-message queue will also advance.

The request is then processed by the producer’s ClientRemotingProcessor#processRequest, which throws the task into the TransactionMQProducer thread pool. Will call when we send messages defined above checkLocalTransactionState method, and then sent to the Broker the transaction status, and using the method of oneWay.

Here you may have some questions, such as why there is a half_op and why half-messages are written to the commitlog.

First of all, RocketMQ is designed to append sequentially, so no changes will be made to the messages already in the disk, and the transaction message will need to be updated and backchecked a certain number of times, and the transaction will be rolled back if the backcheck fails.

Therefore, every time when the secondary check will be before the half message into the dish again, and advance the consumption progress. Half_op in turn records the result of each backcheck, both commit and rollback, so the next time it loops through the half-message, it knows from half_op that the transaction has ended and is therefore filtered out.

If the result of the backcheck is unknown, the result is not recorded in half_op, so it can be backchecked again and updated.

Now that the process is clear, let me draw a diagram summarizing the Broker’s transaction flow.

Kafka transaction messages

Kafka’s transaction messages are different from Those of RocketMQ, which addresses the need for local transactions to execute and send messages within transaction constraints.

Kafka transaction messages are used when multiple messages need to be sent in a single transaction to ensure transaction constraints between multiple messages, that is, multiple messages will either be sent successfully or fail, as shown in the code below.

Kafka’s transactions basically implement Exactly Once semantics in collaboration with their idempotent mechanism, so Kafka’s transaction messages are not what we think of as transactional messages, RocketMQ’s are.

What Exactly is Once you say that?

We know that there are three types of message reliability, namely, at most once, exactly once and at least once. As I mentioned in the previous article on message queuing, basically we use at least once and then cooperate with idempotence on the consumer side to achieve exactly once.

The message is consumed exactly once of course what we all seek, but as I have analyzed from all aspects in the previous article, it is basically unattainable.

Kafka says it can implement Exactly Once? Is it that bad? You can say he’s wrong, he’s right, you can say he’s right, but what he achieves is not Exactly what you think it is.

It can only exist in exactly one scenario at a time, which is from Kafka as the message source, and then after doing something, write to Kafka.

So how did he manage to do it exactly once? By idempotent, just like we do in business by having a unique Id, and then writing it down, not writing it down if it’s already written down, just to make sure it happens once.

So Kafka implements exactly once in a particular scenario. Instead of sending a message in Kafka, which we think of, the message will only be consumed exactly once.

This is actually the same as Redis saying he implemented the transaction, and it’s not the transaction we had in mind.

So open source software says what features are developed, we blindly believe, so it is often broken or in special scenarios to meet, do not be misled, do not believe the surface description, but also have to look at the documentation or source code in detail.

However, from another point of view, as an open source software must want more people to use, I am not lying, my documentation is very clear, the title is not lying?

Indeed, if you click on the article with the headline “shocking XXXX”, he is not deceiving you, he is shocked himself.

Back to Kafka transaction messages, so this transaction message is not the one we want, it’s not really the topic for today, but I’ll keep it brief.

Kafka’s transactions have the role of the transaction coordinator, which is part of the Broker.

At the beginning of a transaction, the producer sends a request to the transaction coordinator to indicate that the transaction is open. The transaction coordinator logs this message to a special log, the transaction log. Then the producer sends the message that it really wants to send. Kafka processes these transaction messages as if they were normal messages, which are filtered by the consumer.

Send after producers will then sends a commit or rollback the transaction coordinator at the request of the two-phase commit transaction coordinator to, if it is submitted that will execute a submit first, put the state of the transaction to submit in advance and then write the transaction log, and then to write a similar all affairs related to partition the news of the end of the transaction, In this way, when the consumer consumes the message, it knows that the transaction is ready and can put the message out.

Finally, the coordinator writes one more transaction termination message to the transaction log, and the Kafka transaction is complete. I summarize the process using the graph on Confluent.

The last

Now that we know the entire flow of RocketMQ and Kakfa transaction messages, we can see that RocketMQ transaction messages are what we want, and of course Kakfa transaction messages are what you want if you are using streaming.

Need to paste the code of the article is actually very uncomfortable, this paste is not good, paste less and afraid of not clear, really difficult, if you think the article is good remember to point at yo.


I’m yes, from a little bit to a billion bits. See you next time.