The premise

Recently, I have to split and upgrade the current old system in my work. I happened to encounter problems such as distributed transaction, idempower control, asynchronous message disorder and compensation scheme. I just recorded some schemes and ideas based on practice and my personal views.

Distributed transaction

First of all, when doing system splitting, we will almost always encounter the problem of distributed transactions. A simulation case is as follows:

j-t-s-i-a-1.png

In the early stage of the project, due to the small user volume, the order module and wallet module shared the same library and application (in the era of large WAR package), so the module call could be simplified as local transaction operation, which could basically avoid data inconsistency as long as it was not a BUG of the program itself. Behind because the user volume increases increasingly, based on the fault tolerance, performance, function and consider, split the original application for orders and wallet service, through the local transaction between two service operation (here can be HTTP or message queue, etc.) for data synchronization, this time is probably due to abnormal scene data inconsistency.

Strong consistency of direct RPC calls in transactions

The above order microservice requests the wallet microservice to deduct money and update the order status. Take the call process of deduct money as an example, assuming HTTP synchronous call, if the project is developed by inexperienced developers, the following pseudocode may appear:

[Order micro service requests wallet micro service to deduct money and update order status]
Processing order microservice request Wallet microservice to deduct money and update order status method (){[Start transaction]    1, order query 2, HTTP call wallet micro service debit 3Update the order status as successful deduction[Commit transaction]} Copy the code

This is a seemingly harmless solution to the problem. HTTP calls are embedded directly inside the transaction block. The HTTP call is successful, the transaction is submitted, and the service is completed. This approach may seem desirable, but it brings with it a great deal of trouble, the root cause of which is the embedding of RPC calls in the transaction. Suppose two relatively common situations:

  • Step 2 in 1, the above method, due to various reasons wallet micro service itself cause deductions interface extremely slow response and can lead to the above method of handle affairs is a database connection (accurately) long time hangs, holding the database connection cannot be released, will lead to the connection of the database connection pool is used up, could easily lead to order other rely on micro service database interface can’t response.
  • 2. Wallet microservice is a single node deployment (not all company microservices are perfect), the application is stopped during the upgrade, the interface call of step 2 in the above method fails directly, which will cause all transactions to be rolled back in a short time, equivalent to the deduction entry of order microservice is unavailable.
  • 3, “the network is not reliable, the HTTP response time if there is a call or receive network failure likely state between the service situation that is not clear to each other, such as order micro service invocation wallets micro success, when receiving response to network problems, there will be deductions success but no update order status may order (micro service transaction rollback).
j-t-s-i-a-2.png

Although there are frameworks such as Hystrix that can isolate calls based on thread pools or fail fast based on fuses, this has little effect. Therefore, IN my opinion, “it is completely undesirable for direct RPC calls to achieve strong consistency in transactions”. If this way is used to achieve “distributed transactions”, I suggest rectification, otherwise we can only pray for no problems in downstream services or network every day.

Asynchronous message push in a transaction

The use of message queues is also a common way to invoke services, but the nature of message queue interaction is asynchronous, and there is no way to sense whether downstream message consumers are processing messages properly. Using the example in the previous section, assuming asynchronous message queue invocation, the following pseudocode might appear if the logic was developed by an inexperienced developer on a project:

[Order micro service requests wallet micro service to deduct money and update order status]
Processing order microservice request Wallet microservice to deduct money and update order status method (){[Start transaction]    1, order query 2Push wallet micro service debit message (push message) 3Update the order status as successful deduction[Commit transaction]} Copy the code

The above method can be abstracted as follows:

Methods () {    DataSource  dataSource = xx;
    Connection con = dataSource.getConnection();
    con.setAutoCommit(false);
    try{
 1SQL operation; 2, push message; 3SQL operation; con.commit();  }catch(Exception e){  con.rollback();  }finally{ Release other resources; release(con);  } } Copy the code

In this way, the transaction commits correctly under normal circumstances, that is, if the message queue middleware is successfully called to push the message. But there are two obvious problems:

  • 1. The message queue middleware is abnormal and cannot be called normally. The common situation is that the network or message queue middleware is unavailable, which will lead to an exception and make the transaction roll back. This may seem reasonable, but consider this: why would a message queue middleware call exception cause a business transaction to roll back, and if the middleware doesn’t recover, wouldn’t the interface call be unavailable?
  • 2. If the message queue middleware is normal and the message is pushed normally, but the transaction is rolled back due to syntax errors in SQL in step 3, then the downstream microservice is successfully invoked, but the local transaction is rolled back, resulting in inconsistent data in the upstream and downstream systems.
j-t-s-i-a-3.png

In summary: “Asynchronous message push in transactions is an unreliable implementation”.

Current solutions provided by the industry

Industry at present mainstream distributed transaction solution mainly has: multi-stage submitted plans (2 PC, 3 PC), compensating transactions (TCC) and message transaction (mainly RocketMQ, basic ideas and multi-stage submitted plans, and provides a polling based on middle and retry, other message queue middleware did not realize the distributed transaction). The principle of these schemes is not expanded here. There are many corresponding data in the network at present, and their characteristics are summarized as follows:

  • Multi-stage submission scheme: Commonly there are two stages and three stages to commit the transaction, need extra resource manager to coordinate the transaction, data consistency is strong, but the implementation scheme is more complex, the performance of sacrifice is large (mainly need for resources lock, waiting for all transaction commit to unlock), is not suitable for high concurrency scenario, there is relatively well-known ali fescar open source.
  • Compensation transaction: also commonly calledTCCBecause each transaction operation needs to provide three operation attempts (Try), confirm (Confirm) and compensation/cancellation (Cancel), the intensity of data consistency is lower than that of multi-stage submission scheme, but the complexity of implementation will be reduced. The obvious defect is that each business transaction needs to realize three groups of operations, and too many codes of compensation schemes may appear. In addition, there are many post-infusion scenarios where TCC is not appropriate.
  • Message transactions: only hereRocketMQThe execution process of a transaction includes: sending a pre-message, executing a local transaction, and confirming the success of sending a message. Its message-oriented middleware stores messages that fail to be consumed downstream, and constantly retries pushing downstream consumption messages, while the producer (upstream) needs to provide onecheckInterface to check the status of a transaction that successfully sent a pre-message but did not confirm the final message sending status.

The final solution used in project practice

The company I work for does not use RocketMQ in the technology stack, and mainly uses RabbitMQ, so I need to adapt message transactions to RabbitMQ. At present, there are three scenarios for asynchronous message interaction in service systems:

  • 1. Message push has high real-time performance and can accept loss.
  • 2. The real-time of message push is low and cannot be lost.
  • 3. Message push has high real-time performance and cannot be lost.

The final solution using the “local message table” is simple:

j-t-s-i-a-4.png

The main ideas are:

  • 1. The saving of the message to be sent to the consumer and the business processing are bound in the same local transaction, and an additional local message table needs to be established.
  • 2. After the local transaction is submitted, the local message table can be queried and pushed outside the transaction, or the local message table can be polling for message push through scheduled scheduling.
  • 3. A successful downstream service consumption message can call back an acknowledgement to the upstream service so that the corresponding message record can be deleted from the local message table of the upstream service.

The pseudocode is as follows:

[Message push is real-time and can accept loss - in this case, there is no need to write to the local message table - start]Processing method (){[Local Transaction started]    1, processing service operations[Local Transaction Submission] 2, assemble push messages and push them} [Message push is real-time and can accept loss - in this case, there is no need to write to the local message table - end] [Message push real-time is low, can not be lost - start]Processing method (){[Local Transaction started] 1, processing service operations 2Assembles push messages and writes them to the local message table[Local Transaction Submission]}  Message push scheduling module (){ 3, query the data to be pushed in the local message table} [Message push real-time is low, can not be lost -end] [Message push real-time, can not be lost - start]Processing method (){[Local Transaction started] 1, processing service operations 2Assembles push messages and writes them to the local message table[Local Transaction Submission] 3, message push}  Message push scheduling module (){ 4, query the data to be pushed in the local message table} [Message push real-time high, can not be lost -end]Copy the code
  • For the case of “high real-time message push, loss can be accepted”Instead of relying on the local message table, messages can be assembled and pushed after the business operation transaction is committed. In this case, messages can be lost due to message queue middleware being unavailable or local application downtime (Essentially because the data is in-memory and non-persistent), the reliability is not high, but in most cases there is no problem. If you are usingspring-txDeclarative transactions of@TransactionalOr programmatic transactionsTransactionTemplate, you canUse a transaction synchronizer to implement a deferred execution of RPC operations embedded in a business operation transaction block until the transaction commitsIn this way, the code physical location of the RPC call can be placed within the transaction code block, for example:
@Transactional(rollbackFor = RuntimeException.class)
public void process(a){
 1. Handle business logic TransactionSynchronizationManager.getSynchronizations().add(new TransactionSynchronizationAdapter() {
  @Override
 public void afterCommit(a) {  2. Perform notification push }  }); } Copy the code

For scenarios where local message tables are used, there are several issues to be aware of:

  • 1. Note that the local message table should not overstock data for a long time, and the data pushed successfully should be deleted in time.
  • 2. When the data of the local message table is queried and pushed, the upper limit of the maximum retry times should be designed. If the record of push failure still reaches the upper limit, early warning and human intervention should be carried out.
  • 3. If the incoming message body is large, the query may consume a large AMOUNT of IO, so it is necessary to split a separate message content table for storing message body content, and columns that change frequently should be split into another table.

For example, the design of the local message table is as follows:

CREATE TABLE `t_local_message`(
  id BIGINT PRIMARY KEY COMMENT 'primary key'.  module INT NOT NULL COMMENT 'Message module'.  tag VARCHAR(20) NOT NULL COMMENT 'Message tag'.  business_key VARCHAR(60) NOT NULL COMMENT 'Business key'. queue VARCHAR(60) NOT NULL COMMENT 'the queue'. exchange VARCHAR(60) NOT NULL COMMENT 'switch'. exchange_type VARCHAR(10) NOT NULL COMMENT 'Switch type'. routing_key VARCHAR(60) NOT NULL COMMENT 'Routing key'. retry_times TINYINT NOT NULL DEFAULT 0 COMMENT 'Retry times'. create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation date time'. edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modify date time'. seq_no VARCHAR(60) NOT NULL COMMENT 'Serial number'. message_status TINYINT NOT NULL DEFAULT 0 COMMENT 'Message status'. INDEX idx_business_key(business_key),  INDEX idx_create_time(create_time),  UNIQUE uniq_seq_no(seq_no) )COMMENT 'Local Message Table';   CREATE TABLE `t_local_message_content`(  id BIGINT PRIMARY KEY COMMENT 'primary key'. message_id BIGINT NOT NULL COMMENT 'Local message table primary key'. message_content TEXT COMMENT 'Message content'. UNIQUE uniq_message_id(message_id) )COMMENT 'Local Message Content Table'; Copy the code

Summary of distributed transactions

In my opinion, the best practices for resolving distributed transactions are:

  • The basic idea to avoid using a strongly consistent distributed transaction implementation is to abandon ACID in favor of BASE.
  • It is recommended to use message queue for decoupling between systems. In order to ensure the success of message push, the message pusher can attach message table independently to bind the message to be pushed to the same transaction with the business operation, and push the message asynchronously or in a scheduled way.
  • The message push side (upstream) needs to ensure that the message is delivered correctly to the message queue middleware, and the message consumption or compensation scheme is resolved by the message consumer (downstream), as explained in a later section.

In fact, for the implementation of distributed transactions requiring relatively high consistency and real-time performance, message queue decoupling also has corresponding solutions.

Idempotent control

The term idempotence is derived from the HTTP/1.1 definition:

Methods can also have the property of “idempotence” in that (aside from error or expiration issues) the side-effects of N > 0 identical requests is the same as for a single request.

To put it simply: with the exception of incorrect or expired requests (in other words, successful requests), the result of multiple calls is the same as that of a single call. In layman’s terms, once a call succeeds, the same request parameters should return success no matter how many times they are called (repeated submissions).

The downstream service must promise to implement idempotency of the service interface, which is extremely important in distributed system.

  • For HTTP calls, promised idempotency prevents duplication of business data due to repeated submission of forms or request operations.
  • For asynchronous message invocations, promised idempotency is also used to avoid repeated consumption of business data by dereprocessing the message.

In current practice, the following three aspects of control are used for idempotent processing:

  • 1. Distributed lock is used in the interface invocation to realize idemidematism, and mainstream Redisson is used to control the granularity of lock and the waiting and holding time of lock within a reasonable range (the author’s industry requires accurate data, so pessimistic lock is used for almost all core interface design, “it is better to be slow than wrong”. In fact, optimistic locking can be considered for performance optimization if collisions are low.
  • 2. Anti-duplication in business logic. For example, if the interface for creating an order does one step first, check whether the corresponding order exists in the database table by the order number.
  • 3, database table design to logically unique business keys do unique index, this is through the database level to do the final guarantee.

An example of pseudo-code based on idempotent control of message consumption:

[Processing message consumption]listen(request){
    1, construct the KEY of the distributed lock through the service KEY    2Redisson builds distributed locks and locks    3, execute business logic in locking code (including rescheduling, transactional and non-transactional operations, etc.) 4,finallyRelease distributed locks in code blocks} Copy the code

Compensation scheme

Compensation schemes mainly include HTTP synchronous invocation compensation and asynchronous message consumption failure compensation.

HTTP synchronous call compensation

In general, HTTP synchronous calls will get synchronized results from the downstream system. There are several common situations for processing the results:

  • 1. The synchronization result returns to normal, and the final state of the contract with the downstream is obtained. The interaction is over.
  • 2. The synchronization result returns to normal, and the “non” final state agreed with the downstream is obtained. It is necessary to compensate the final state periodically or mark the final state when the retry upper limit is reached.
  • 3. The synchronization result returns an exception, the most common being that the downstream service is unavailable. The HTTP status code is 5XX.

Start with a simple understanding that “HTTP retries over short periods of time are generally not valid”. If the network jitter is instantaneous, HTTP synchronous retry is feasible in a short period of time. In most cases, the downstream service cannot respond, the downstream service is restarting, or complex network conditions cause the recovery in a short period of time. In this case, it is not effective to perform HTTP synchronous retry calls.

If you are dealing with an internal low-concurrency HTTP interaction between systems, consider using an “exponential backoff” algorithm for retry, as an example:

1If the first call fails, retry the second time immediately2The second retry fails and the thread is hibernated2seconds3The third retry fails and the thread is hibernated4Seconds (2^2)
4The thread is hibernated because the fourth retry fails8Seconds (2^8)
5If the fifth retry fails, an exception is thrownCopy the code

If the above example uses the Hystrix control timeout of 1 second wrapped around the HTTP command to execute, the above retry process takes less than 20 seconds at most, which is acceptable for interaction between low-concurrency internal systems.

However, if you are dealing with a scenario with high concurrency and a high user experience priority, this is obviously not reasonable. To be on the safe side, you can take a relatively traditional and efficient approach: the instantaneous content of the HTTP call is stored in a local retry table, the save operation is bound to the transaction of the business process, and the “failed call” record is retried through a scheduled schedule. This scheme is similar to the scheme mentioned above to ensure the success of message push. Take a simulation example:

[Process of ordering interface requesting downstream wallet service to deduct money]process(){
[transaction code block -start]    1· Handled business logic, saved order information, and the order status was in the process of withholding money    2, assembles the HTTP call information to be made to the downstream wallet service, and stores it in the local table[Transaction code block-end] 3, make HTTP call (OkHttp client or Apache HTTP client) outside the transaction, and update the order status as successful deduction}  Timing scheduling (){ 4, regularly query the order status for the order in the process of money deduction for HTTP call, call successful update order status as successful money deduction} Copy the code

Compensation for asynchronous message consumption failures

Asynchronous message consumption failure scenarios can only occur on the message consumer, the downstream service. In order to reduce the cost, the compensation for the failure of message consumption should be borne by the party processing the message (the consumer).

j-t-s-i-a-5.png

If compensated by upstream services, there are two obvious problems:

  • 1. Message compensation modules need to be written in all upstream services, which is not reasonable.
  • 2. Once production problems occur in downstream consumption and upstream compensation is needed, it is necessary to locate the corresponding upstream service to push the message, and then compensate through the upstream service to deal with the increased complexity of production problems.

In some recent project practices, it was determined that “compensation is uniformly implemented by the message consumer” when using asynchronous message interaction. The simplest way is to use a similar way to the local message table. The message that fails to consume is put into the database and retry. If the message still fails to reach the retry limit, the warning and manual intervention can be done. The simple flow chart is as follows:

j-t-s-i-a-6.png

The asynchronous message is out of order

Asynchronous message out-of-order is a problem that needs to be considered and solved in asynchronous interaction scenarios using message queues. Here are some examples that may not be practical but will illustrate the point.

Scenario 1: An upstream service asynchronously changes the user gender through a message queue. Assume that the message is simplified as follows:

Queue: user - service. The modify. Sex. QeueMessage:{
   "userId": long integer,   "sex": The value is a character string. The optional values are MAN, WOMAN, and unknown} Copy the code

The user service uses a total of 10 consumer threads to listen to the user-service.modify.sex.qeue queue. Suppose the upstream service pushes the following two messages to the user-service.modify.sex. Qeue queue:

First message:{
   "userId": 1.   "sex": "MAN"
}  
 Second message:{  "userId": 1. "sex": "WOMAN" } Copy the code

The upper notification push and downstream processing have a higher probability of the following:

j-t-s-i-a-7.png

The original user with ID 1 changed the gender to MAN (first request) and then to WOMAN (second request), and finally saw that the updated gender might be MAN, which is obviously not reasonable. The point of this not very sensible example is that with asynchronous message interaction, the downstream service may process messages in a different time sequence than the upstream service sends messages, which can lead to business state disruption. To solve this problem, several feasible ideas are provided:

  • Scheme 1: When the concurrency requirement is not high, the message queue can be fully utilizedFIFOThe characteristics of this pointRabbitMQSet the consumption thread of the downstream service to 1, then the timing of the upstream push message and the downstream consumption message is consistent.
  • Scheme 2: use HTTP call, which requires the front-end or APP client to cooperate, and the request is designed to be serial.

Scenario 2: Asynchronous message processing with no timing requirement, but with timing required for final presentation. This may be a little abstract, for example: in borrow bai on borrowed 10000 yuan, repayment, the user is points for many times to pay off (such as repayment plan a: 2000,3000,5000; Repayment scheme 2:1000,1000,1000,7000, etc.), each time the money returned is different, and the final requirement of the bill display is in accordance with the user’s repayment operation order.

Suppose the borrowed upstream service interacts with it via asynchronous messages. A detailed analysis: In fact, this scenario does not need to perceive the order of user repayment (mainly for the purpose of recovering user’s repayment), but only need to consider whether the user has paid off. However, the use of asynchronous interaction may lead to the downstream can not correctly know the operation order of user repayment.

The solution is simple: When you push a message, you can attach a flag bit with an increasing or decreasing trend. For example, you can use a timestamp flag bit or use the Snowflake algorithm to generate a long integer with an increasing trend as a serial number, and then sort by the serial number to get the message operation sequence (this serial number needs to be saved downstream). However, the actual message processing does not require message timing awareness.

Asynchronous messaging combined with state driven

In my opinion, asynchronous message combined with state-driven can solve distributed transactions relatively perfectly, and combined with pre-processing (such as pre-deduction or pre-growth) can meet relatively high consistency and real-time performance. We’ll start with a transfer scenario that is often used to discuss strong consistency in distributed transactions.

j-t-s-i-a-8.png

If you solve this problem by using synchronous calls (which are essentially synchronous calls like TCC, 2PC, or 3PC), you can achieve strong consistency without sacrificing performance. This section does not discuss how to do this in the case of synchronous calls, but focuses on how to “achieve a high level of consistency” from a BASE perspective when using message queues. To abstract the example, suppose the account tables for both systems are designed like this:

CREATE TABLE `t_account`(
    id BIGINT PRIMARY KEY COMMENT 'primary key'.    user_id BIGINT NOT NULL COMMENT 'user ID'.    balance DECIMAL(10.2) NOT NULL DEFAULT 0 COMMENT 'Account balance'.    create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time'. edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modification time'. version BIGINT NOT NULL DEFAULT 0 COMMENT 'version' // omit the index)COMMENT 'Account sheet'; Copy the code

Both systems can establish a flow table of amount changes with similar table structure. The upstream system is used for withholding operations and flow recording, while the downstream system is used for flow recording. Then we can sort out the new interactive timing logic as follows:

[A system local transaction -start]1A The balance of user X in t_account table of the system minus1000
2A system flow table writes A pre-deduction of user X1000Record, mark state as processing, generate globally unique serial number denoted as SEQ_NO[A system local transaction -end]3A The system pushes A user X deduction through the message queue1000To the message queue middleware (where you can use the techniques mentioned above to ensure that the message push succeeds)[B]4B The balance of user X in the system T_account table plus1000 5B. Write a balance change (increase) of user X in the system flow table.1000<= <= <= <= <= <= <=[B]6B. System push the message that user X successfully processed the balance to the message queue middleware, and the serial number SEQ_NO must be attached (the skills mentioned above can be used to ensure the success of message push).[A system local transaction -start]7In the system update flow table of A, the status of the pre-lock record of user X whose serial number is SEQ_NO is successfully processed (idempotent control must be done in this step, and SEQ_NO can be considered as the KEY of distributed lock).[A system local transaction -end] Other:[A records in system flow table processing require scheduled polling and retry]1, scheduled scheduling and retry A The status in the system flow table is in process [Daily checking module of A-B system]1, daily cut, using T- successfully processed in system A1Daily flow record and flow chart in system B all T-1Daily records are reconciledCopy the code
j-t-s-i-a-9.png

The above steps seem like a lot, and you need to write the reconciliation and retry modules. Actually, “in upstream and downstream system, message queue middleware under the condition of normal operation, the above this interaction scheme can withstand the concurrency value is far higher than the synchronization scheme”, a service or a message queue middleware is not available, because the water table has outstanding local records, on these issues can retry after recovery, the reliability is high. In addition, the retry and reconciliation modules are required for all transactions involving amounts, and it doesn’t matter whether you choose synchronous or asynchronous interaction.

summary

You will find that many solutions throughout the article use the mode of “write the pending content to the local surface + trigger the transaction in real time + schedule compensation”. In fact, WHAT I want to express is that this mode is a relatively common mode in the current distributed solutions. It can basically meet the processing of distributed transactions, synchronous asynchronous compensation, real-time non-real-time trigger and other complex scenarios. There are also some obvious problems with this pattern (which are common if practiced) :

  • 1, the library table (local message table) unreasonable design or processing is easy to become the bottleneck of the database.
  • 2. Logic code for compensation or local database processing is prone to redundancy and corruption.
  • 3. In extreme cases, abnormal recovery scenarios may drag down services.

In fact, most of the time, it is necessary to combine the existing system or scenario for analysis, through data monitoring and analysis for subsequent optimization. After all, “Architecture is iterative, not designed.” In addition, the “final solution” mentioned in the article has actually been analyzed in detail in another article: a reusable transaction messaging scheme based on RabbitMQ.

(E-A-20190323 C-14-D 996 is an article written in late March 2019. I hope it is not out of date.)