1. Introduction to transaction consumption

We often transfer money from Alipay to Yu ‘ebao, which is a common thing in our daily life. But when we think about what to do if the system breaks down after deducting the transferred money, the yu ‘ebao account does not increase the corresponding amount, and the data will be inconsistent.

Similar shadows can be found in the above scenarios in various types of systems. For example, in the e-commerce system, when a user places an order, in addition to inserting a record in the order table, the quantity of the goods in the corresponding list must be reduced by 1. How to ensure that? ! In the search advertising system, when the user clicks on an AD, in addition to adding a record in the click event table, but also to the merchant account table to find the merchant and deduct the advertising fee, how to guarantee? ! Wait a minute, I’m sure you’ve all come across similar situations in one way or another.

In essence, the problem can be abstracted as: When the data of one table is updated, how to ensure that the data of another table must be updated successfully.

If it is a stand-alone system (where the database instance is also on the same system), we can easily solve this with a local transaction:

Take alipay to transfer Yu ‘ebao as an example (for example, transfer 10,000 yuan), suppose there is

Alipay account table: A (ID, userId, Amount) Yu 'ebao account table: B (ID, userId, amount) userId of user=1;Copy the code

Transferring 10,000 yuan from Alipay to Yu ‘ebao involves two steps:

1) Alipay table deduction1M: the update Aset amount=amount- 10000. where userId=1;
2) Yu 'ebao table increased1M: the update Bset amount=amount+10000 where userId=1;
Copy the code

How to ensure that alipay yu ‘ebao balance?

Some people say this is very simple, you can use transactions to solve.

Begin transaction 
  update A set amount=amount- 10000. where userId=1;
 update B set amount=amount+10000 where userId=1;
End transaction 
commit;
Copy the code

This does work, and if you’re using Spring, a single annotation will do the above transactions.

@Transactional(rollbackFor=Exception.class) 
public void update() { 
      //Update table A: updateATable();//UpdateBTable (); }Copy the code

If the system is small in size, data tables in a database instance, the local transaction mode is a good way to run, but if the system larger, such as pay treasure account table and precious account balance table clearly not on the same database instance, they are often distributed in different physical nodes, then local transaction has lost.

The author’s Spring Boot advanced, Mybatis advanced original column has been organized into a book, pay attention to the public number [code monkey technology column] reply keyword Spring Boot advanced, Mybatis advanced access.

Here’s a look at two of the more mainstream options:

2. Distributed transaction —————— two-phase commit protocol

The two-phase Commit protocol (2PC) is often used to implement distributed transactions. Generally, there are two roles: the coordinator TC and several transaction executors. In this case, the transaction executors are the specific database. The coordinator can be on the same machine as the transaction executors.

Let’s take a look at the main flow from the diagram above:

1) Our application (client) initiates a start request to the TC (Transaction);

2) The TC first writes the PREPARE message to the local log and then sends the PREPARE message to all Si. Take the transfer from Alipay to Yu ‘ebao as an example, the PREPARE message TC sent to A was to inform the corresponding account of Alipay database to deduct 10,000 YUAN, while the prepare message TC sent to B was to inform the corresponding account of Yu ‘ebao database to increase by 1W. The main reason why you need to write local logs before performing a task is to recover from a fault. Local logs function as credentials in real life. Without local logs (credentials), a fault may occur.

3) Si executes local transactions after receiving the PREPARE message, but does not commit the transaction. Si returns yes if it succeeds, and no if it fails. Similarly, the message to be returned should be written to the log as a credential before returning.

4) TC collects the messages returned by all actuators. If all actuators return yes, then commit messages are sent to all actuators. After receiving the COMMIT, the actuators execute the commit operation of local transactions. If any of the actuators returns no, then an abort message is sent to all of the actuators, which receive the abort message and perform the transaction abort operation.

Note: The TC or Si first writes the sent or received messages to the log for fault recovery. For example, after an Si recovers from a failure, the local log is checked. If commit is received, commit is committed, and if abort, roll back. If yes, ask TC again to determine the next step. If there is nothing, it is likely that the Si crashed during the PREPARE phase and therefore needs to be rolled back.

These days, it’s not that hard to implement distributed transactions based on two-phase commit. If you’re using Java, you can quickly implement them using the open source atomikos(www.atomikos.com/).

However, anyone who has used the above two phase submissions will find that the performance is too poor for a high concurrency system. Why is that?

1) Two-phase commit involves multiple network communication between nodes, which takes too long! 2) The transaction time is longer relative to that of the locked resource, resulting in a much longer wait time for the resource!Copy the code

Because of the serious performance problems of distributed transactions, most highly concurrent services are avoided and tend to solve data consistency problems in other ways.

Use message queues to avoid distributed transactions

If you look closely at life, many scenes of life have given us hints.

For example, after ordering and paying for the fried liver at the famous Yao Ji Fried Liver in Beijing, they don’t give you the fried liver you ordered directly, they give you a receipt, and then make you take the receipt to the shipping area and line up to get it. Why do they have to separate paying and picking up? There are many reasons, one of which is to increase their reception capacity.

But back to our question, as long as this receipt is here, you will eventually get the fried liver. Similarly money transfer service, too, when pay treasure account after 10000, as long as we generate a certificate (message), the certificate (message) that read “let the balance account an increase of 10000 treasure”, as long as the credentials (message) can reliable preservation, we finally can hold this certificate (message) to balance an increase of 10000 treasure account, That is, we can rely on this credential (message) for final consistency. In addition, follow the public number ape technology column, reply to the keyword 9527 to get Spring Cloud Alibaba combat video.

So there are two ways we can reliably save credentials (messages) :

1) The way business and message coupling

When alipay completes the payment deduction, it records the message data, which is stored in the same database instance with the business data (the message record table is named message).

Begin transaction 
       update A set amount=amount- 10000. where userId=1; 
       insert into message(userId, amount,status) values(1.10000.1); 
End transaction 
commit;
Copy the code

These transactions guarantee that messages will be kept as long as money is withheld from an Alipay account.

When the above transaction is successfully submitted, we will notify Yu ‘ebao of this message through the real-time messaging service. Yu ‘ebao will send a success message after processing the transaction successfully, and Alipay will delete the message data after receiving the reply.

2) Business and message decoupling mode

The above method of keeping messages tightly coupled to the message data and business data is not architecturally elegant and can lead to other problems. To understand decoupling, you can do the following.

A) Alipay requests to send a message to the real-time messaging service before the payment deduction transaction is submitted. The real-time messaging service only records the message data, but does not actually send the message. The transaction will be submitted only after the message is successfully sent;

B) When the payment payment transaction has been successfully submitted by Alipay, it shall be confirmed and sent to the real-time messaging service. The real-time messaging service does not actually send the message until it receives a confirmation to send the message;

C) When alipay payment deduction transaction fails to be submitted and rolled back, cancel sending to real-time messaging service. The message will not be sent after the unsend command is given;

D) For those unconfirmed messages or cancelled messages, a message status confirmation system is required to check the status of the message in alipay system regularly and update it. Why this step is needed? For example, suppose that after the payment payment transaction of Alipay was successfully submitted in step 2, the system hung up, and the message status was not updated to “send confirmation”, so the message could not be sent.

Advantages: The message data is stored independently, reducing the coupling between the business system and the message system; Cons: Two requests for one message; The business processing service needs to implement the message state backcheck interface.

The author’s Spring Boot advanced, Mybatis advanced original column has been organized into a book, pay attention to the public number [code monkey technology column] reply keyword Spring Boot advanced, Mybatis advanced access.

4. How to solve the problem of duplicate message delivery?

Another serious problem is the repeated delivery of messages. Take our alipay transfer to Yu ‘ebao as an example. If the same message is delivered twice, our Yu ‘ebao account will increase by 20,000 yuan instead of 10,000 yuan (as mentioned above).

Why is the same message delivered twice? For example, yu ‘ebao sends a successful message to Alipay after processing the message MSG. Under normal circumstances, Alipay should delete the message MSG, but if Alipay sadly dies at this time, it will continue to send the message MSG when it sees that the message MSG is still there after restarting.

Solution is very simple, on the balance Po increase message application status table (message_apply), is a popular book, used to record the consumption of news, to one message at a time, before the real execution, go to the message application state table query again, if found that is duplicate messages, discarded can, if you don’t find to perform, Simultaneously inserted into the message application state table (same transaction).

For each msg in queue 
 Begin transaction 
   select count(*) as cnt from message_apply where msg_id=msg.msg_id; 
   if cnt==0 then 
  update B set amount=amount+10000 where userId=1; 
  insert into message_apply(msg_id) values(msg.msg_id); 
   end if
 End transaction 
 commit;
End For
Copy the code

To help you understand, let’s take another example of a bank transfer (similar to the previous example) :

For example, Bob transfers $100 to Smith.

In a single-machine environment, a transaction might look something like this:

When the number of users grows to a point where Bob and Smith’s account and balance information are no longer on the same server, the above process becomes like this:

At this time, you will find that the same transfer business, in the cluster environment, the time has multiplied, which is obviously unacceptable. So how do you get around this?

5, large transactions = small transactions + asynchronous

Break large transactions into smaller transactions for asynchronous execution. This essentially optimizes the execution efficiency of cross-machine transactions to be consistent with that of single machines. The transfer transaction can be broken down into the following two smaller transactions:

The execution of the local transaction (Bob account debits) and the sending of the asynchronous message in the diagram should be guaranteed to succeed or fail at the same time, that is, if the debits succeed, the sending message must succeed, and if the debits fail, no more messages can be sent. So the question is: do we deduct or send the message first?

First of all, let’s look at the situation of sending the message first. The general diagram is as follows:

The problem is that if the message is sent successfully but the debit fails, the consumer consumes the message and adds money to Smith’s account.

The first message is no good, then the first deduction, the general schematic diagram is as follows:

The problem is similar to the above: if the deduction succeeds and the message fails to be sent, Bob has been charged, but Smith has not been charged.

There are a number of ways to solve this problem, such as putting the message directly into the transaction that Bob is charging, throwing an exception if the message fails, and rolling back the transaction. This approach also conforms to the principle that “just” does not need to be solved.

RocketMQ supports transaction messages. How does RocketMQ work?

When RocketMQ sends Prepared messages in phase 1, it gets the address of the message, phase 2 does the local thing, and Phase 3 accesses the message using the address it gets in phase 1 and changes the state of the message.

Careful you may find the problem again, if the confirmation message failed to send what? RocketMQ will periodically scan the cluster for transaction messages, and if it finds a Prepared message, it will ask the message sender (producer) whether Bob’s money has been reduced or not. If I subtract, do I roll back or continue to send the confirmation message?

RocketMQ will decide whether to roll back or continue sending the acknowledgement message based on the policy set by the sender. This ensures that the message sending succeeds or fails at the same time as the local transaction.

6. Rocket transaction process analysis

So let’s take a look at RocketMQ source code, how it handles transaction messages.

Part of the client sends a transaction message (please see the complete code: rocketmq – under the project example of com. Alibaba. Rocketmq. Example. Transaction. TransactionProducer)

/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = = transaction messages sent a series of preparations for = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
// Pending transactions, the MQ server checks back to the client
// As mentioned above, when RocketMQ finds a Prepared message, it determines the transaction based on the strategy implemented by the Listener
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// Construct the producer of the transaction message
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// Set the transaction decision processing class
producer.setTransactionCheckListener(transactionCheckListener);
// The processing logic of the local transaction is similar to the logic used in the example to check Bob's account and deduct the money
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// Construct MSG, omitting the construction parameters
Message msg = newMessage(......) ;// Send the message
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
Copy the code

Then look at the source code for the sendMessageInTransaction method, which is divided into three phases: sending the Prepared message, executing the local transaction, and sending the acknowledgement message.

/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = transaction message transmission process = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
public TransactionSendResult sendMessageInTransaction(...). {
    // Logical code, not actual code
    // 1. Send a message
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2. If the message is sent successfully, process the local transaction unit associated with the message
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3. Close the transaction
    this.endTransaction(sendResult, localTransactionState, localException);
}
 
Copy the code

The endTransaction method sends a request to the broker(MQ Server) to update the final state of the transaction message:

  • Find the Prepared message based on sendResult, which contains the ID of the transaction message
  • Update the final state of the message based on the localTransaction

If the endTransaction method fails to execute and the data is not sent to the broker, the status update of the transaction message will fail. The broker will scan each table file that stores the status of the transaction at a thread review time (1 minute by default). If the message has been committed or rolled back, it will skip. If the state is prepared may be launched an CheckTransaction request to the Producer, the Producer will call DefaultMQProducerImpl. CheckTransactionState () method to process the timer callback request broker, CheckTransactionState calls the decision method of our transaction setting to decide whether to roll back the transaction or continue, and endTransactionOneway calls the broker to update the final state of the message.

Again back to the example of the transfer, if Bob account balance has been reduced, and the message has been sent successfully, Smith began to consume the message, this time will appear failure and consumption timeout two questions, the idea is to solve the problem of overtime always try again, until the end consumption message is successful, the whole process of possible message repeated problems, Follow the previous thought to solve.

Consuming the transaction message this basically solves the consumer side timeout problem, but what if the consumption fails? Ali provides us with a solution: manual solution. You can consider that, according to the flow of the transaction, for some reason Smith failed to add money, so you need to roll back the entire process. If the messaging system were to implement this rollback process, the system would be much more complex and prone to bugs, with an estimated probability of bugs being much higher than the probability of consumption failure. This is why RocketMQ has not yet solved this problem. When designing and implementing a messaging system, we need to evaluate whether it is worth the cost to solve such a small probability problem, and this is where people need to think a lot when solving difficult problems.

It should be noted that the implementation of transactional messages was removed in version 3.2.6, so transactional messages are not supported in this version. That is, message failures are not checked.

Transaction transaction processing example

Let’s look at a simple example:

Message producer:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
        // Set the pool of processing threads used for transactional messages
        ExecutorService executorService = new ThreadPoolExecutor(2.5.100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                returnthread; }}); producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setExecutorService(executorService);
        / / set the transaction listener, the listener interface org. Apache. Rocketmq. Client. Producer. TransactionListener
        // The listener implements the processing of the transaction business logic that needs to be processed, and the MQ Broker validates the unacknowledged transaction and business logic
        producer.setTransactionListener(transactionListener);
        producer.start();
 
        // Generate different tags to simulate different processing scenarios
        String[] tags = new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};
        for (int i = 0; i < 10; i++) {
            try {
             // Assembly generates a message
                Message msg =
                    new Message("TopicTransaction", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // Send the message as a transaction and execute the user-defined transaction logic after the transaction message has been successfully prewritten into RocketMQ,
                // After the transaction logic is successfully executed, the submission logic of the business message is implemented
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                System.out.printf("%s%n", sendResult.getTransactionId());
                Thread.sleep(10);
            } catch(MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } producer.shutdown(); }}Copy the code

Business implementation class TransactionListenerImpl:

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
 
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
 
    /** * This method is executed after the message is successfully prewritten to RocketMQ */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        System.out.println("Start processing business logic...");
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        switch (status) {
         case 0:
          / / LocalTransactionState UNKNOW said unknown event, need RocketMQ further confirm the transaction processing services business
          // As a result, the method that confirms the message is called is checkLocalTransaction below.
          // Note: RocketMQ's ability to confirm the execution status of messages with the business has been removed. It was available in the earlier 3.0.8 release, so if
          // If the return status is unknown, the message will not be submitted
          return LocalTransactionState.UNKNOW;
         case 1:
          return LocalTransactionState.COMMIT_MESSAGE;
         case 2:
          return LocalTransactionState.ROLLBACK_MESSAGE;
         default:
          returnLocalTransactionState.COMMIT_MESSAGE; }}/** * This method is used for RocketMQ and the business to confirm the status of messages for uncommitted transactions, but the implementation of this method has been * removed from RocketMQ, so its functionality is meaningless. * However, this feature will work if the enterprise uses RocketMQ service on AliYun. * /
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        int mod = msg.getTransactionId().hashCode() % 2;
        if (null! = status) {switch (mod) {
                case 0:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                default:
                    returnLocalTransactionState.COMMIT_MESSAGE; }}returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code

Message consumer:

public class Consumer {
 public static void main(String[] args) throws MQClientException {
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_producer");
  consumer.setNamesrvAddr("127.0.0.1:9876");
  /** * set whether the Consumer starts to consume at the head of the queue or at the end of the queue for the first time 

* If not for the first time, then continue to consume at the same position as the last time */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTransaction"."*"); consumer.registerMessageListener(new MessageListenerOrderly() { private Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // Set automatic submission context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println("Get the message and start consuming:"+msg + " , content : " + new String(msg.getBody())); } try { // Simulate business processing TimeUnit.SECONDS.sleep(random.nextInt(5)); } catch (Exception e) { e.printStackTrace(); // Return that processing failed, and the message can be consumed again return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } // Return success, and the message will not be redelivered returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.println("consumer start ! "); }}Copy the code

Let’s start the consumer end and then the production end:

Before we run it, let’s look at the message from the Web console:

There are no unconsumed messages in Topic Transaction_Producer, so let’s start executing the code logic.

Production:

The log shows that four messages were sent, the transaction logic was invoked four times, and only one of the messages returned as COMMIT_MESSAGE.

Look at the console again:

You can see that the Topic Transaction_producer has only one message to consume, which is consistent with the conclusion that only one message is committed for the sender.

Consumers:

Start the consumer side, and only one message is consumed in the console.

The producer produced a total of four messages for the following reasons:

This is why we produced four messages and ended up consuming only one, again confirming that the method checkLocalTransaction in the business implementation class TransactionListenerImpl was not called.

8. How to ensure the final consistency of the deduction and the addition of money

In the above transfer transaction logic, there are two problems:

1) If the enterprise version of RocketMQ is not used, it may occur that the withheld transaction succeeds, but the withheld message fails to confirm to RocketMQ that it can be committed after the withheld transaction is successfully committed due to a failure of the producer, resulting in the message not being delivered to the consumer. As a result, Bob’s money was deducted, but Smith’s money did not increase.

2) of the production processing is completed all the logical deduction of money transactions in the database has been successfully submitted, the news of the buckles money was successful in RocketMQ confirmed, but the consumer in the consumer information, its own malfunctioned, or logical error which message processing, lead to combined with Smith’s money is not correct.

Although the probability of occurrence of the above two problems is very low, as long as there is the probability of occurrence, it will occur at a certain point in time. It is just a matter of the time when the fault occurs. In the financial system, daily reconciliation operation on system logs, is used to check the total payment and income is balanced, every single transaction results meet the loan balance, etc., thus to avoid the occurrence of the above two questions, my way or the introduction of the financial system to process the business logic to reconciliation.

The business processing logic is as follows:

After the transaction message is sent, a transaction reconciliation message is sent to the reconciliation Topic. The reconciliation message is non-transactional and successfully sent means successfully saved to RocketMQ. The transaction reconciliation message will not be used for consumer consumption, and the subsequent transaction reconciliation system will consume the reconciliation information in the queue. The transaction will be checked with the producer and consumer respectively, and the checking logic is as follows:

Transaction reconciliation system and trading of consumer first check, if consumer spending is successful, you can show the results meet the eventual consistency, because the news is that producers success after processing, trading and then send a confirmation message, therefore as long as the trading transaction confirmation message, you can certainly producers have normal execution up the logical deductions.

Only if the transaction message fails to be processed by the consumer or the message party does not consume the message, it needs to be confirmed with the producer again. If the producer successfully performs the deduction operation, it needs to roll back the deduction transaction. If the deduction is not successful, it means that there is no consumption on both sides of the transaction, so there is no need to do any operation.