Distributed transaction

In the system development, are used to the relational database, file, and so on data storage, and transaction problems encountered in the process of system operation will be, the so-called transaction consists of four basic elements: atomicity, consistency, isolation, and durability, here in relational database MySQL, for example, MySQL InnoDB engine provides transaction support; In the single-machine single-process environment, transaction characteristics can be realized through MySQL transactions, but once the system architecture is upgraded to distributed system architecture, single machine transactions can not guarantee the transaction characteristics of the entire distributed system. For example, when the order service initiates an order, the inventory interface is called to deduct the inventory during the successful order process. However, due to network timeout, abnormal downstream service, abnormal downstream service business, etc., when the rollback is needed, the order service cannot sense when to roll back, or it cannot receive the rollback notification.

For distributed transaction problems, there are several common schemes in the industry, such as: XA- strong consistency scheme, two-phase commit, three-phase commit, TCC distributed transaction, BASE flexible transaction, DTP model, reliable message final consistency distributed transaction, maximum effort notification. This paper takes the distributed transaction of reliable message final consistency as an example. This paper realizes the demo of distributed transaction with a simple e-commerce case of order and inventory reduction.

Reliable messages are ultimately consistent distributed transaction flows

Here a simple sequence diagram is drawn to explain the flow of reliable message final consistency distributed transaction:

  • User order:The process of this interface is that the user initiates an order request, which is directed toRocketMQSend Half message, return order failure if sending message error, otherwise return success.
  • The order service performs local transactions:The order service listens for Half order messages to execute local transactions, stores order data and transaction logs, and sends them to RocketMQ based on the execution results of local transactionsCOMMITorROLLBACK.
  • The order service provides local transaction execution results query:This step is not shown in the sequence diagram, as shown here, if RocketMQ does not receive the messageCOMMITorROLLBACK, the business side provides a method for querying the status of a transaction, which queries the execution result of a local transaction and sends it to RocketMQCOMMITorUNKNOWN.
  • Order service minus inventory:The order service sends to RocketMQCOMMIT. RocketMQ delivers the message to the consumer of the message, in this case the inventory service, which receives the transaction message and performs a local transaction, which is destocking and logging the transaction.
  • Transaction rollback: If the inventory service is abnormal during the inventory deduction and cannot deduct the inventory, it sends a rollback message to RocketMQ, and the order service rolls back the order upon receipt of this message; The rollback method can still use transaction messages, or of course RPC interfaces, and then scheduled task retries, and so on.

Note: All of the above processes should support idempotent

Reliable messages are ultimately consistent distributed transaction dependent components

  • **MySQL 5.7: ** Provides database support
  • **RocketMQ: ** Reliable message support, supports transactional messages

coded

Note: default MySQL, RcoketMQ environment are available; And the implementation of only demo project use, intended to introduce the principle of “reliable message final consistency distributed transaction”;

Database building table sentences

- orders table
CREATE TABLE `order` (
                         id BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
                         order_no VARCHAR(64) COMMENT 'Order Number',
                         product_id BIGINT(20) COMMENT 'commodity id',
                         pay_count int(11) COMMENT 'Purchase Quantity',
                         create_time datetime  COMMENT 'Creation time'.PRIMARY KEY (`id`),
                         UNIQUE KEY `order_no_index` (`order_no`) USING BTREE
);

- inventory table
CREATE TABLE `stock` (
                         id BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
                         product_id BIGINT(20) COMMENT 'commodity id',
                         total_count int(11) COMMENT 'total',
                         create_time datetime  COMMENT 'Creation time'.PRIMARY KEY (`id`),
                         UNIQUE KEY `product_id_idx` (`product_id`) USING BTREE
);

-- Transaction log
CREATE TABLE `tx_log` (
                          id BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
                          tx_no VARCHAR(64) COMMENT 'Transaction Number',
                          create_time datetime  COMMENT 'Creation time'.UNIQUE KEY `tx_no_index` (`tx_no`) USING BTREE,
                          PRIMARY KEY (`id`)
);
Copy the code

Code implementation

Note: The order service and stock service of demo use different databases, which are order library and Stock library respectively. The ORDER library contains the ORDER table and the TX_log table; The STOCK table contains the STOCK table and the TX_log table

Set up the project

  • The project structure is as follows. Here’s an example. You can do whatever you want, right

  • POM

The POM dependencies of the two services can be exactly the same. Here’s an example of one of them:


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.2.6. RELEASE</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>order-service</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <druid.version>1.1.10</druid.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-undertow</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.18</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.22</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>order-service</finalName>
        <plugins>
            <plugin>
                <groupId>org.mybatis.generator</groupId>
                <artifactId>mybatis-generator-maven-plugin</artifactId>
                <version>1.3.2</version>
                <configuration>
                    <verbose>true</verbose>
                    <overwrite>true</overwrite>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
Copy the code

Order service

  • application.yml
server:
  port: 8081
  servlet:
    context-path: /order
  tomcat:
    uri-encoding: UTF-8

spring:
  main:
    allow-bean-definition-overriding: true
  profiles:
    include: db # application-db.yml
    active: db
  output:
    ansi:
      enabled: detect
  application:
    name: order-service-svc
rocketmq:
  name-server: 192.168. 5845.: 9876
  producer:
    group: order-group
Copy the code
  • application-db.yml
spring:
  datasource:
    url: JDBC: mysql: / / 127.0.0.1:3316 / tx - order? useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSS L=false
    username: root
    password: admin123
    driver-class-name: com.mysql.jdbc.Driver
    platform: mysql
    type: com.alibaba.druid.pool.DruidDataSource
    The following supplementary Settings for connection pooling apply to all of the above data sources
    # initialize size, min, Max
    initialSize: 10
    minIdle: 5
    maxActive: 20
    Set the connection wait timeout
    maxWait: 60000
    Configure how often to detect idle connections that need to be closed, in milliseconds
    timeBetweenEvictionRunsMillis: 3600000
    Set the minimum time for a connection to live in the pool in milliseconds
    minEvictableIdleTimeMillis: 3600000
    validationQuery: select 1 from dual
    testWhileIdle: true
    testOnBorrow: false
    testOnReturn: false
    Turn on PSCache and specify the size of PSCache on each connection
    poolPreparedStatements: true
    maxPoolPreparedStatementPerConnectionSize: 20
    maxOpenPreparedStatements: 20
    # Configure the filters for monitoring statistics interception. After removing the filters, the MONITORING interface SQL cannot be counted. 'wall' is used for the firewall
    filters: stat
    Enable mergeSql via connectProperties; Slow SQL record
    # connectionProperties: druid.stat.mergeSql=true; druid.stat.slowSqlMillis=5000

mybatis-plus:
  global-config:
    db-config:
      id-type: auto
      field-strategy: not-empty
      table-underline: true
      db-type: mysql
      logic-delete-value: 1 # Logical deleted value (default: 1)
      logic-not-delete-value: 0 # Logical undeleted value (default: 0)
  configuration:
    jdbc-type-for-null: 'null'
  mapper-locations: classpath*:mapper/*.xml  Note: It must correspond to the path of the mapper mapping XML file
  type-aliases-package: io.transaction.order.entity.*   Note: the path of the corresponding entity class
Copy the code
  • MyBatis configuration
@Configuration
@EnableTransactionManagement(proxyTargetClass = true)
@MapperScan(value = {"org.transaction.order.mapper"})
public class MyBatisConfiguration {

    @Bean
    public PaginationInnerInterceptor paginationInterceptor(a) {
        return newPaginationInnerInterceptor(); }}Copy the code
  • Order interface
@RestController
@Slf4j
public class OrderController {

    private final OrderMapper orderMapper;
    private final OrderService orderService;

    @Autowired
    public OrderController(OrderMapper orderMapper, OrderService orderService) {
        this.orderMapper = orderMapper;
        this.orderService = orderService;
    }

    @PostMapping("/submitOrder")
    public String submitOrder(@RequestBody SubmitOrderDTO dto) {
        log.info("Start order submission");
        try {
            orderService.submitOrder(dto.getProductId(), dto.getPayCount());
            return "SUCCESS";
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "FAILED"; }}Copy the code
  • Request parameters
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SubmitOrderDTO implements Serializable {

    /** ** **
    private Long productId;
    /** * Purchase quantity */
    private Integer payCount;
}
Copy the code
  • Order processing interface and implementation
public interface OrderService {

    /** * Submit the order and save the transaction message *@paramTxMessage Transaction message */
    void submitOrderAndSaveTxNo(TxMessage txMessage);

    /** * Submit the order *@paramProductId productId *@paramPayCount Quantity purchased */
    void submitOrder(Long productId, Integer payCount);
}
Copy the code

OrderServiceImpl.java

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {

    private final RocketMQTemplate rocketMQTemplate;
    private final TxLogMapper txLogMapper;
    private final OrderMapper orderMapper;

    @Autowired
    public OrderServiceImpl(RocketMQTemplate rocketMQTemplate, TxLogMapper txLogMapper, OrderMapper orderMapper) {
        this.rocketMQTemplate = rocketMQTemplate;
        this.txLogMapper = txLogMapper;
        this.orderMapper = orderMapper;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void submitOrderAndSaveTxNo(TxMessage txMessage) {
        // Check whether the order exists based on the transaction number
        int count = txLogMapper.getCount(txMessage.getTxNo());
        if (0! = count) { log.info("Transaction exists: [{}]", txMessage.getTxNo());
            return;
        }
        Date date = new Date();
        // Generate the order
        Order order = new Order();
        order.setOrderNo(txMessage.getOrderNo());
        order.setCreateTime(date);
        order.setPayCount(txMessage.getPayCount());
        order.setProductId(txMessage.getProductId());
        log.info("Start storing order: [{}]", order.toString());
        int res = orderMapper.saveOrder(order);
        if (res > 0) {
            // Log transaction logs
            TxLog txLog = new TxLog();
            txLog.setTxNo(txMessage.getTxNo());
            txLog.setCreateTime(date);
            log.info("Start transaction logging: [{}]", txLog.toString()); txLogMapper.saveTxLog(txLog); }}@Override
    public void submitOrder(Long productId, Integer payCount) {
        // Send transaction messages
        TxMessage txMessage = new TxMessage();
        // Global transaction number
        String txNo = UUID.randomUUID().toString();
        txMessage.setProductId(productId);
        txMessage.setPayCount(payCount);
        txMessage.setTxNo(txNo);
        txMessage.setOrderNo(UUID.randomUUID().toString());
        String jsonString = JSONObject.toJSONString(txMessage);
        Message<String> msg = MessageBuilder.withPayload(jsonString).build();
        rocketMQTemplate.sendMessageInTransaction("tx_order_group"."topic_txmsg", msg, null); }}Copy the code
  • Transaction message processing

Message structure

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TxMessage implements Serializable {

    /** * product id */
    private Long productId;

    /** * Product purchase quantity */
    private Integer payCount;

    /** * global transaction number */
    private String txNo;

    /** * Order number */
    private String orderNo;
}
Copy the code

Transaction message listener OrderTxMessageListener

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener {

    private final OrderService orderService;

    private final TxLogMapper txLogMapper;

    @Autowired
    public OrderTxMessageListener(OrderService orderService, TxLogMapper txLogMapper) {
        this.orderService = orderService;
        this.txLogMapper = txLogMapper;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            // Perform a local transaction
            log.info("Start performing local transactions");
            // Submit the order and save the transaction message
            log.info("Submit order save transaction log");
            TxMessage txMessage = getTxMessage(message);
            orderService.submitOrderAndSaveTxNo(txMessage);
            / / return to commit
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            // rollback
            log.info("Perform rollback");
            returnRocketMQLocalTransactionState.ROLLBACK; }}@Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // Check whether a local transaction exists based on the transaction number
        TxMessage txMessage = getTxMessage(message);
        int count = txLogMapper.getCount(txMessage.getTxNo());
        if (0! = count) { log.info("Transaction exists, transaction message can be committed: [{}]", txMessage.getTxNo());
            return RocketMQLocalTransactionState.COMMIT;
        }
        // Commit, otherwise unknown
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    private TxMessage getTxMessage(Message message) {
        String messageString = new String((byte[]) message.getPayload());
        returnJSONObject.parseObject(messageString, TxMessage.class); }}Copy the code
  • Persistence layer implementation

OrderMapper

@Repository
public interface OrderMapper {

    Order selectByOrderNo(@Param("orderNo") String orderNo);

    int saveOrder(@Param("order") Order order);

    int deleteByOrderNo(@Param("orderNo") String orderNo);
}
Copy the code

OrderMapper.xml


      
<! DOCTYPEmapper PUBLIC "- / / mybatis.org//DTD Mapper / 3.0 / EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.transaction.order.mapper.OrderMapper" >

  <select id="selectByOrderNo" resultType="org.transaction.order.entity.Order" >
    select
    id as id, order_no as orderNo, pay_count as payCount, create_time as createTimes
    from `order`
    where order_no = #{orderNo};
  </select>

  <insert id="saveOrder" parameterType="org.transaction.order.entity.Order">
    insert into `order` (order_no, product_id, pay_count, create_time) values (#{order.orderNo}, #{order.productId}, #{order.payCount}, #{order.createTime});
  </insert>

  <delete id="deleteByOrderNo">
    delete from `order` where order_no = #{orderNo};
  </delete>
</mapper>
Copy the code

TxLogMapper

@Repository
public interface TxLogMapper {

    int getCount(@Param("txNo") String txNo);

    int saveTxLog(@Param("txLog")TxLog txLog);
}
Copy the code

TxLogMapper.xml


      
<! DOCTYPEmapper PUBLIC "- / / mybatis.org//DTD Mapper / 3.0 / EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.transaction.order.mapper.TxLogMapper" >
    <select id="getCount" resultType="java.lang.Integer">
        select count(*) from tx_log where tx_no = #{txNo}
    </select>

    <insert id="saveTxLog" parameterType="org.transaction.order.entity.TxLog">
        insert into `tx_log` (tx_no, create_time) values (#{txLog.txNo}, #{txLog.createTime});
    </insert>
</mapper>
Copy the code

The inventory service

  • application.yml
server:
  port: 8082
  servlet:
    context-path: /stock
  tomcat:
    uri-encoding: UTF-8
  Set encoding to UTF-8


mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml  Note: It must correspond to the path of the mapper mapping XML file
  type-aliases-package: io.transaction.stock.entity.*   Note: the path of the corresponding entity class


spring:
  main:
    allow-bean-definition-overriding: true
  profiles:
    include: db
    active: db
  output:
    ansi:
      enabled: detect
  application:
    name: tx-msg-stock

  http:
    encoding:
      charset: UTF-8
      enabled: true
      force: true

rocketmq:
  name-server: 192.168. 5845.: 9876
  producer:
      group: stock-group
Copy the code
  • application-db.yml

The basic implementation of the order service is similar or the same as the order service. Some code is omitted here, for example: transaction message structure, Mybatis configuration, persistence layer

spring:
  datasource:
    url: JDBC: mysql: / / 127.0.0.1:3316 / tx - stock? useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSS L=false
    username: root
    password: admin123
    driver-class-name: com.mysql.jdbc.Driver
    platform: mysql
    type: com.alibaba.druid.pool.DruidDataSource
    The following supplementary Settings for connection pooling apply to all of the above data sources
    # initialize size, min, Max
    initialSize: 10
    minIdle: 5
    maxActive: 20
    Set the connection wait timeout
    maxWait: 60000
    Configure how often to detect idle connections that need to be closed, in milliseconds
    timeBetweenEvictionRunsMillis: 3600000
    Set the minimum time for a connection to live in the pool in milliseconds
    minEvictableIdleTimeMillis: 3600000
    validationQuery: select 1 from dual
    testWhileIdle: true
    testOnBorrow: false
    testOnReturn: false
    Turn on PSCache and specify the size of PSCache on each connection
    poolPreparedStatements: true
    maxPoolPreparedStatementPerConnectionSize: 20
    maxOpenPreparedStatements: 20
    # Configure the filters for monitoring statistics interception. After removing the filters, the MONITORING interface SQL cannot be counted. 'wall' is used for the firewall
    filters: stat
    Enable mergeSql via connectProperties; Slow SQL record
    # connectionProperties: druid.stat.mergeSql=true; druid.stat.slowSqlMillis=5000

mybatis-plus:
  global-config:
    db-config:
      id-type: auto
      field-strategy: not-empty
      table-underline: true
      db-type: mysql
      logic-delete-value: 1 # Logical deleted value (default: 1)
      logic-not-delete-value: 0 # Logical undeleted value (default: 0)
  configuration:
    jdbc-type-for-null: 'null'
  mapper-locations: classpath*:mapper/*.xml  Note: It must correspond to the path of the mapper mapping XML file
  type-aliases-package: io.transaction.stock.entity.*   Note: the path of the corresponding entity class
Copy the code
  • Fastener inventory interface and implementation
public interface StockService {

    /** * reduce inventory *@paramTxMessage Transaction message *@throwsThe Exception Exception * /
    void deleteStock(TxMessage txMessage) throws Exception;
}
Copy the code

It realize

@Service
@Slf4j
public class StockServiceImpl implements StockService {

    private final TxLogMapper txLogMapper;
    private final StockMapper stockMapper;

    @Autowired
    public StockServiceImpl(TxLogMapper txLogMapper, StockMapper stockMapper) {
        this.txLogMapper = txLogMapper;
        this.stockMapper = stockMapper;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void deleteStock(TxMessage txMessage) throws Exception {
        // Check local transactions
        int count = txLogMapper.getCount(txMessage.getTxNo());
        if(count ! =0) {
            log.info("Transaction exists: [{}]", txMessage.getTxNo());
            return;
        }
        // Get inventory based on product ID
        Stock stock = stockMapper.getStockByProductId(txMessage.getProductId());
        // Check the inventory to see if you can place an order
        if (txMessage.getPayCount() > stock.getTotalCount()) {
            log.info("Out of stock");
            // TODO tells order-service to rollback
            throw new Exception("Out of stock");
        }
        Date date = new Date();
        // Deduct inventory
        stockMapper.updateStock(txMessage.getProductId(), txMessage.getPayCount());
        log.info("Inventory reduction succeeded");
        // Log transaction logs
        TxLog txLog = new TxLog();
        txLog.setTxNo(txMessage.getTxNo());
        txLog.setCreateTime(date);
        txLogMapper.saveTxLog(txLog);
        log.info("Transaction logging succeeded"); }}Copy the code
  • Single transaction message monitoring
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements RocketMQListener<String> {

    private final StockService stockService;
    private final RocketMQTemplate rocketMQTemplate;


    @Autowired
    public StockTxMessageConsumer(StockService stockService, RocketMQTemplate rocketMQTemplate) {
        this.stockService = stockService;
        this.rocketMQTemplate = rocketMQTemplate;
    }

    @SneakyThrows
    @Override
    public void onMessage(String s) {
        // Deduct inventory based on news
        log.info("Start deducting inventory.");
        TxMessage txMessage = deserialization(s);
        try {
            stockService.deleteStock(txMessage);
        } catch (Exception e) {
            e.printStackTrace();
            log.info("An exception occurs. Notify upstream rollback."); rollback(txMessage); }}private void rollback(TxMessage txMessage) {
        String jsonString = JSONObject.toJSONString(txMessage);
        Message<String> msg = MessageBuilder.withPayload(jsonString).build();
        rocketMQTemplate.asyncSend("topic_tx_rollback_msg", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Rollback message sending result: [{}]", sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
                log.error("Rollback message sending exception"); }}); }private TxMessage deserialization(String msg) {
        returnJSONObject.parseObject(msg, TxMessage.class); }}Copy the code

The test results

  • Tx – library of the order

In tX-order, the ORDER table and tx_log table have no data

mysql> truncate tx_log;Query OK, 0 rows affected (0.01sec)
mysql> select * from tx_log;The Empty set (0.00 SEC)
mysql> truncate `order`;Query OK, 0 rows affected (0.01sec)Copy the code
  • Tx – stock library

In tx-stock table, there is product product_id 1 in stock table, the total number of products is 400, and there is no data in tx_log table

mysql> select * from stock;+----+------------+-------------+---------------------+ | id | product_id | total_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 | 400 | 2021-11-25 18:46:19 | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)
mysql> truncate tx_log;Query OK, 0 rows affected (0.01sec)Copy the code
  • By placing

Initiate order, purchase quantity 100

In tX-ORDER, the ORDER and TX_log tables hold order data and transaction logs

mysql> select * from `order`;+----+--------------------------------------+------------+-----------+---------------------+ | id | order_no | product_id | pay_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 7268e58e-5ee1-4fba-a998-e99d45d5ff47 | 1 | 100 | 2021-11-29 13:26:46 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.01 SEC)
mysql> select * from tx_log;+----+--------------------------------------+---------------------+ | id | tx_no | create_time | +----+--------------------------------------+---------------------+ | 1 | 49afa0ec-09a6-4bb1-8363-3c2d695aa1a5 | The 2021-11-29 13:26:46 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)Copy the code

In tX-stock, the number of products in stock with product_ID 1 decreases by 100, and tX-log stores the transaction log

mysql> select * from stock;+----+------------+-------------+---------------------+ | id | product_id | total_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 | 300 | 2021-11-25 18:46:19 | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)
mysql> select * from tx_log;+----+--------------------------------------+---------------------+ | id | tx_no | create_time | +----+--------------------------------------+---------------------+ | 1 | 49afa0ec-09a6-4bb1-8363-3c2d695aa1a5 | The 2021-11-29 13:27:09 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)Copy the code

RocketMQ transaction messages

  • Order quantity exceeds stock, roll back

Now the inventory is 300. If I place an order of 400, the order will fail. Then the inventory service will inform the order service to roll back.

In tX-ORDER, the ORDER table data remains unchanged and the TX_log table stores transaction logs

mysql> select * from `order`;+----+--------------------------------------+------------+-----------+---------------------+ | id | order_no | product_id | pay_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 7268e58e-5ee1-4fba-a998-e99d45d5ff47 | 1 | 100 | 2021-11-29 13:26:46 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.01 SEC)
mysql> select * from tx_log;+----+--------------------------------------+---------------------+ | id | tx_no | create_time | +----+--------------------------------------+---------------------+ | 1 | 49afa0ec-09a6-4bb1-8363-3c2d695aa1a5 | 2021-11-29 13:26:46 | | 2 | 1f71279d-1efa-4ee0-8416-9f13c661b58a | 2021-11-29 13:39:34 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 2 rows in the set (0.01 SEC)Copy the code

In the TX-stock table, the number of data in the stock table remains unchanged, and tX-log does not store transaction logs

mysql> select * from stock;+----+------------+-------------+---------------------+ | id | product_id | total_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 | 300 | 2021-11-25 18:46:19 | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)
mysql> select * from tx_log;+----+--------------------------------------+---------------------+ | id | tx_no | create_time | +----+--------------------------------------+---------------------+ | 1 | 49afa0ec-09a6-4bb1-8363-3c2d695aa1a5 | The 2021-11-29 13:27:09 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)Copy the code

RocketMQ has transaction rollback messages

conclusion

At this point, RocketMQ based on the implementation of reliable message final consistency distributed transaction demo is completed, the program is neither commercially nor technically available for production use, attack demonstrated reliable message final consistency distributed transaction processing flow.

The source code can be found at author github: github.com/Redick01/my…

  • Reference: Understanding Distributed Transactions in Depth