The article Outlines

  1. The reason for this sharing
  2. How is the current distributed transaction problem solved
  3. What are the solutions in the industry
  4. What are the pros and cons of each of these solutions
  5. What do other people do
  6. How can we do that

The reason for this sharing

Pay the refactoring

When thinking about payment refactoring, it’s natural to think about how processing that once belonged to a local transaction will now be handled across applications. Take the top-up order as an example. Suppose: The original order module and account module are put together, and now we need to split the service into order service and account service. After receiving the recharge callback, you could have changed the order status and added gold coins in one mysql transaction, but because the service was split, you were faced with the need to coordinate two services to complete the transaction

Therefore, we are going to share and discuss the topic today: how to solve the problem of data consistency in the distributed scenario, let’s use the distributed transaction definition for a while.

The same problem exists in other scenarios:

Gifts:

1. Call the payment service: deduct the gold coins of the gift user first, and then add the corresponding litchi to the anchor 2. After confirming the success of the first step, play special effects, send chat room gift comments, etcCopy the code

Recharge success message:

  1. Complete the recharge order
  2. Send a Kafka message of order completion

The issue of data consistency is particularly important when it comes to paid interfaces such as payment transactions, because it’s all money

How are distributed transactions resolved today?

The problem is certainly not new, so there are already solutions, so let’s take a look at how these problems are being solved.

Take the payment order completion message sent after successful purchase of basic goods as an example:

Assume that the payment order for basic goods has been received at the moment, the payment callback has been processed successfully, this time Kafka service failure, message sending failure; At this time, the transaction to process the order has been submitted, how to ensure that the message of order completion can be sent out?

Read the process:

The green part indicates the normal process of the process:

  1. Submit a job to JobController (for fault recovery)
  2. Once the submission is successful, processing the order logic begins
  3. After processing the order logic, kafka messages are sent
  4. After the message is successfully sent, the job submitted in the first step is deleted

The yellow part indicates that the process is abnormal and the data may be inconsistent. This is where process recovery is needed

  1. JobController periodically queries the redis list of delayed tasks (each task has a timestamp, sorted by timestamp filter)
  2. Restore the task (processing method defined when calling job registration)
  3. If the task is successfully executed, the process is complete. Otherwise, retry in the next periodic period

Question:

  1. Data may be lost due to redis storage recovery tasks
  2. There is no uniform distributed transaction specification in the architecture system. Can this layer of logic be independent as distributed transaction middleware
  3. Lack of transaction execution policy management, such as controlling the maximum retry times
  4. Transaction execution status is not recorded, so tracing requires browsing the log

What are the solutions in the industry

Before we talk about solutions, let’s look at the theoretical basis of these solutions, which will help us understand and practice them

Theoretical basis (premises for discussion)

Local transactions, distributed transactions

Whereas local transactions address the consistency of data operations across a single data source, distributed transactions address the consistency of data operations across multiple data sources.

Strong consistency, weak consistency, final consistency

From the client side, when multiple processes access concurrently, different policies on how to obtain updated data in different processes determine different consistency. For relational databases, updated data is required to be visible to all subsequent accesses, which is strong consistency. If you can tolerate subsequent partial or total non-access, it is weak consistency. Final consistency is if access to the updated data is required after a period of time

From the perspective of the server, how to distribute the updated data to the whole system as soon as possible and reduce the time window to achieve the final consistency is a very important aspect to improve the availability of the system and user experience. For distributed data systems:

  • N – Number of data copies
  • W – Number of nodes on which data needs to be written when updating data
  • R – Number of nodes to be read

If W+R>N, write nodes and read nodes overlap, a strong consistency is found. For example, if N=2,W=2, and R=1 are used for a typical relational database with one primary and one standby synchronous replication, the data read from both the primary and standby databases is consistent.

If W+R<=N, it is weakly consistent. For example, if N=2,W=1, and R=1 are used for asynchronous replication of a relational database, the data that has been updated in the primary database may not be read from the standby database, so the consistency is weak.

Theory of CAP

In a distributed environment (data distribution), it is impossible to ensure data consistency at any time, and only a compromise scheme can be adopted to ensure the final consistency of data. This is also known as the CAP theorem.

To be clear, partition fault tolerance is a fundamental requirement for a distributed system. Since it is a distributed system, the components in the distributed system must be deployed to different nodes. Otherwise, there is no distributed system, so subnetworks must appear. But for distributed system, network problem is an inevitable abnormal situation, so partition fault tolerance has become a distributed system must face and solve the problem. As A result, system architects often need to focus on finding A balance between C (consistency) and A (availability) based on business characteristics.

The BASE theory of

BASE is an abbreviation for Basically Available, Soft state, and Eventually consistent. BASE theory is the result of the tradeoff between consistency and availability in CAP. It comes from the summary of the distributed practice of large-scale Internet systems, and is gradually evolved based on CAP theorem. The core idea of BASE theory is that even if strong consistency cannot be achieved, each application can adopt appropriate ways to achieve the final consistency of the system according to its own business characteristics.

BASE theory is aimed at large, highly available and scalable distributed systems. It is the opposite of the traditional thing ACID properties. It is completely different from ACID strong consistency model, but by sacrificing strong consistency to obtain availability, and allow data to be inconsistent over a period of time, but eventually achieve a consistent state. At the same time, in the actual distributed scenario, different business units and components have different requirements for data consistency, so in the specific distributed system architecture design process, ACID properties and BASE theory are often combined.

Flexible transaction

Different from ACID rigid transaction, the concept of flexible transaction appears in distributed scenario based on BASE theory. In order to achieve the final consistency through flexible transactions, we need to rely on some characteristics, which may not be met in specific schemes, because different schemes have different requirements; But if both are not satisfied, it is impossible to do flexible transactions.

Visibility (externally searchable)

In the process of distributed transaction execution, if the execution of a certain step fails, it is necessary to know the processing status of other several operations clearly, which requires that other services can provide query interfaces to ensure that the processing status of the operation can be judged through the query.

In order to ensure that operations are searchable, each invocation of each service needs to have a globally unique identifier, which can be a business bill number (such as order number) or a system-assigned operation sequence number (such as payment record sequence number). In addition, the time information of the operation should also be fully recorded.

Idempotent operation

Idempotence is actually a mathematical concept. An idempotent function, or idempotent method, is a function that can be executed repeatedly with the same arguments and can get the same result.

f(f(x)) = f(x)

A characteristic of an idempotent operation in programming is that any number of its executions have the same effect as a single execution. That is, the same method, with the same parameters, is called multiple times and the business result is the same as if it was called once. This requirement is also understandable, because many solutions require multiple retries to ensure the ultimate consistency of the data, and if a method is not idempotent, it will not be retried. There are many ways to implement idempotent operations, such as caching all requests and processing results in the system, and directly returning the processing results of the last time when repeated operations are detected.

The industry solution

Two-phase commit (2PC)

XA is a Transaction Manager (TM) and Resource (RM) defined in X/Open CAE Specification (Distributed Transaction Processing) model Manager).

In THE XA specification, the database plays the role of RM, and the application needs to play the role of TM, that is, to generate a global txId, call the XAResource interface, and coordinate multiple local transactions into a global unified distributed transaction.

Phase TWO commit is a standard implementation of XA. It splits the commit of a distributed transaction into two phases: PREPARE and COMMIT /rollback.

In the 2PC model, it needs to wait for the feedback of all participating subtransactions in the prepare phase, so the database resource lock time may be too long, which is not suitable for business scenarios with high concurrency and long life cycle of subtransactions. A two-phase delivery of this solution is a trade-off for consistency at the expense of usability.

saga

Saga was first proposed as a solution to the problem of long-running processes. Long-running distributed transactions refer to those enterprise business processes that need to complete a transaction across applications, across enterprises, and even with the participation of manual operations in the transaction process. Such transactions may be completed in minutes, hours, or even days. If such transactions are designed in accordance with the transaction ACID requirements, the availability of the system will be greatly reduced. Consider A transaction involving two servers. Server A initiates the transaction and server B participates in the transaction. B’s transaction requires human participation, so the processing time may be long. In order to maintain transaction isolation and consistency according to THE ACID principle, the transaction resources used in the transaction initiated by server A will be locked, and other applications will not be allowed to access the intermediate results of the transaction until the entire transaction is committed or rolled back. This causes resources in transaction A to be locked for A long time and the availability of the system to be unacceptable.

Saga, on the other hand, is a compensation based message-driven solution to the long running process. The goal is to maximize data consistency while ensuring high availability of the system. In the same example above, if implemented with Saga, this is the process: server A’s transaction is executed first, and if it goes well, transaction A is committed first. If the commit is successful, transaction B is executed. If transaction B is also executed successfully, transaction B is committed, and the transaction is considered complete. But if transaction B fails, transaction B itself needs to be rolled back. In this case, because transaction A has committed, A compensation operation needs to be performed to reverse the committed transaction and restore transaction A to the state it was in before transaction A was executed. Such a message-driven implementation is saga. We can see that Saga sacrifices strong data consistency, achieving only final consistency, but improving the overall availability of the system.

Compensation Transaction (TCC)

TCC is actually the compensation mechanism adopted. The core idea is that for each operation, a corresponding acknowledgement and compensation (undo) operation should be registered. The TCC model leaves lock granularity entirely to the business process. It is divided into three stages:

  1. In the Try phase, service systems are checked and resources are reserved
  2. The Confirm phase is mainly to Confirm and submit to the business system. When the Try phase is successfully executed and the Confirm phase starts, the default Confirm phase is error-free. As long as Try succeeds, Confirm must succeed.
  3. In the Cancel phase, services that need to be rolled back are cancelled and reserved resources are released.

In the following example, the remittance of 100 YUAN from account A to account B in TCC mode is taken to make A detailed analysis of the transformation of the business:

The remittance service and the collection service need to implement the try-confirm-Cancel interface, respectively, and inject it into the TCC transaction manager during the business initialization phase.

[Remittance service] Try: Check the validity of account A, that is, check whether the status of account A is "Transferring" or "Frozen". Check whether the balance of A account is sufficient; Deduct 100 yuan from A's account, and set the status to "transfer"; Reserve the deduction resources, and deposit the event of transferring 100 yuan from A to B into the message or log; Confirm: Not to perform any operation; Cancel: Account A increases 100 yuan; Release the withheld resource from the log or message. [collection service] Try: Check whether B account account is valid; Confirm: If logs or messages are read, account B increases by 100 yuan. Release the withheld resource from the log or message. Cancel: Do nothing.Copy the code

It can be seen from this that TCC model has a strong intrusion on business and is difficult to transform.

Local message table (asynchronous assurance)

The local message table implementation is probably the most used in the industry. The core idea is to split distributed transactions into local transactions. This idea comes from ebay. Some of the details can be seen in the following flowchart:

The basic idea is:

The message producer needs to build an additional message table and record the sending status of the message. Message tables and business data are committed in a transaction, which means they are committed in a database. The message is then sent through MQ to the consumer of the message. If the message fails to be sent, it will be sent again.

The message consumer needs to process the message and complete its own business logic. At this point, if the local transaction succeeds, the processing has been successful, if the processing fails, then the execution will be retried. If the fault is caused by a service failure, you can send a service compensation message to the production side to notify the production side of the rollback.

The producers and consumers scan the local message table regularly and send the incomplete or failed messages again. If there is a sound automatic reconciliation and reimbursement logic, this scheme is very practical.

Transaction message

As an asynchronous assurance transaction, the transaction message decoupled the two transaction branches asynchronously through MQ. The design process of the transaction message also draws on the two-phase commit theory. The overall interaction process is shown in the figure below:

  1. The transaction initiator first sends a PREPARE message to MQ.
  2. Executes the local transaction after sending the PREPARE message successfully.
  3. Return COMMIT or ROLLBACK based on the result of the local transaction.
  4. If the message is ROLLBACK, MQ will delete the PREPARE message without delivering it. If the message is COMMIT, MQ will send the message to the consumer.
  5. If the producer dies or times out during the execution of a local transaction, MQ will continuously ask other producers in its group to obtain the status.
  6. The consumable success mechanism on the Consumer side is MQ guaranteed.

There are third-party MQS that support transactional messaging, such as RocketMQ, but some mainstream MQS, such as RabbitMQ and Kafka, do not support transactional messaging.

Do your best to inform

The best effort notification scheme also relies primarily on the MQ messaging system for transaction control, as does the Reliable Messaging Ultimate consistency scheme. It does seem that MQ middleware plays an important role in a distributed system architecture. The best effort notification scheme is a relatively simple distributed transaction scheme, which is essentially data consistency through periodic proofreading.

Best efforts to inform the implementation of the scheme

  1. The active side of the business activity, after completing the business processing, sends a message to the passive side of the business activity, allowing the message to be lost.
  2. The active party can set a time ladder notification rule. If the notification fails, repeat the notification according to the rule until the notification is repeated for N times.
  3. The active party provides an interface for the passive party to retrieve lost service messages.
  4. If the passive side of the business activity receives the data normally, it normally returns a response and ends the transaction.
  5. If the passive party does not receive the lost service messages, it queries the active party based on the periodic policy and recovers the lost service messages

Best efforts to inform programme characteristics

  1. Service mode used: query operation, idempotent operation.
  2. The treatment result of the passive party does not affect the treatment result of the active party;
  3. Suitable for systems with low time sensitivity to business ultimate conformance;
  4. It is suitable for inter-system operations across enterprises, or inter-system operations within enterprises that are relatively independent, such as bank notices and merchant notices.

Scheme comparison

attribute 2PC TCC Local message table Transaction message Do your best to inform
Transaction consistency strong weak weak weak weak
complexity In the high low low low
Business intrusion small big In the In the In the
Limitations of use big big small In the In the
performance low In the high high high
Maintenance costs low high low In the In the

What do other people do

Alipay’s distributed transaction service DTS

www.cloud.alipay.com/docs/2/4688…

Distributed Transaction Service (DTS) is a Distributed Transaction framework, which is used to ensure the final consistency of transactions in large-scale Distributed environment. In terms of architecture, DTS is divided into xTS-Client and XTS-Server. The former is a Jar package embedded in the client application, which is mainly responsible for writing and processing transaction data. The latter is an independent system that is mainly responsible for the recovery of abnormal transactions.

The core concept

In DTS, we divide the related parties of a distributed transaction into the originator and the participant:

Initiator: The initiator of a distributed transaction is responsible for starting the distributed transaction and triggering the creation of the corresponding master transaction record. The initiator is the coordinator of the distributed transaction, responsible for invoking the participant’s services, recording the corresponding transaction log, and sensing the state of the entire distributed transaction to determine whether the entire transaction is COMMIT or ROLLBACK.

** Participants: ** Participant is an atomic unit in a distributed transaction. All participants must annotate the participant in the Prepare interface, which defines the three basic interfaces Prepare, COMMIT, and ROLLBACK. Business systems need to implement these three interfaces. Ensure that service data is idempotent and data operations in prepare can be committed or rolled back. In terms of storage structure, DTS transaction state data can be divided into two types: main transaction record (Activity) and branch transaction record (Action).

** Master transaction record Activity: ** Master transaction record is the main body of the entire distributed transaction, its most core data structure is the transaction number (TX_ID) and transaction STATE (STATE), it is in the start of the distributed transaction persistence write to the database, its STATE determines the STATE of the distributed transaction.

** Branch transaction record Action: ** Branch transaction record is a subset of the main transaction record. It records information about an actor, including the NAME of the actor, which DTS uses to uniquely locate an actor. With this branch transaction information, we can commit or roll back the participant.

This should fall under the TCC pattern we described above.

EBay local message table

www.infoq.com/cn/articles… Weibo.com/ttarticle/p…

The idea of a local message table was originally introduced on ebay and later popularized by companies like Alipay. The basic design idea is to break down a remote distributed transaction into a series of local transactions. If you don’t care about performance and elegant design, you can do it with tables in a relational database.

Take the classic example of an inter-bank transfer. The first step, a 1W deduction, ensures that the credential message is inserted into the message table through a local transaction. The second step is to inform the other party that 1W has been added to the bank account. Then the question comes, how to inform the other party?

Usually in two ways:

  1. MQ with high timeliness is adopted. The other party subscribes to messages and listens to them. When there is a message, the event is automatically triggered
  2. A periodic polling scan is used to check the data in the message table.

Similar to using local message table + message notification, where to go, Mogujie

Various third-party payment callbacks

Best effort notification. For example, the payment callback interface mode of Alipay and wechat keeps calling back until it succeeds, or until the number of calls attenuates to the failure state.

How can we do that

2PC/3PC requires the resource manager (mysql, Redis) to support XA, and the transaction resources need to be locked for the entire transaction execution, which can degrade performance. So exclude first.

TCC mode requires transaction interface to provide try,confirm and cancel interfaces, which increases the complexity of programming. It depends on the business side to cooperate in providing such an interface. Difficult to implement, temporarily excluded.

Best effort notification, applied to heterogeneous or service platforms

It can be seen that in the classic pattern of ebay, distributed transactions achieve the final consistency of transactions through local transactions + reliable messages. However, the presence of a transaction message makes it possible to cover the work of the local transaction within the transaction message. So the next step is to set up our application scenarios based on transaction messages to see if they meet our requirements for a distributed transaction product

Q&A

  1. Talk about DDD
  2. Saga fall to the ground

reference

Sharding-Sphere:mp.weixin.qq.com/s/LpebzrHU3…

RocketMQ officially open source distributed transaction message: mp.weixin.qq.com/s/Kxk2Ag-7d…

Best to inform: blog.csdn.net/zsh2050/art…

Talk about a distributed transaction: www.cnblogs.com/savorboard/…

Saga: www.cnblogs.com/netfocus/p/…

[git]Github.com/zhengpeitao…