Distributed transaction
In the system development, are used to the relational database, file, and so on data storage, and transaction problems encountered in the process of system operation will be, the so-called transaction consists of four basic elements: atomicity, consistency, isolation, and durability, here in relational database MySQL, for example, MySQL InnoDB engine provides transaction support; In the single-machine single-process environment, transaction characteristics can be realized through MySQL transactions, but once the system architecture is upgraded to distributed system architecture, single machine transactions can not guarantee the transaction characteristics of the entire distributed system. For example, when the order service initiates an order, the inventory interface is called to deduct the inventory during the successful order process. However, due to network timeout, abnormal downstream service, abnormal downstream service business, etc., when the rollback is needed, the order service cannot sense when to roll back, or it cannot receive the rollback notification.
For distributed transaction problems, there are several common schemes in the industry, such as: XA- strong consistency scheme, two-phase commit, three-phase commit, TCC distributed transaction, BASE flexible transaction, DTP model, reliable message final consistency distributed transaction, maximum effort notification. This paper takes the distributed transaction of reliable message final consistency as an example. This paper realizes the demo of distributed transaction with a simple e-commerce case of order and inventory reduction.
Reliable messages are ultimately consistent distributed transaction flows
Here a simple sequence diagram is drawn to explain the flow of reliable message final consistency distributed transaction:
- User order:The process of this interface is that the user initiates an order request, which is directed to
RocketMQ
Send Half message, return order failure if sending message error, otherwise return success. - The order service performs local transactions:The order service listens for Half order messages to execute local transactions, stores order data and transaction logs, and sends them to RocketMQ based on the execution results of local transactions
COMMIT
orROLLBACK
. - The order service provides local transaction execution results query:This step is not shown in the sequence diagram, as shown here, if RocketMQ does not receive the message
COMMIT
orROLLBACK
, the business side provides a method for querying the status of a transaction, which queries the execution result of a local transaction and sends it to RocketMQCOMMIT
orUNKNOWN
. - Order service minus inventory:The order service sends to RocketMQ
COMMIT
. RocketMQ delivers the message to the consumer of the message, in this case the inventory service, which receives the transaction message and performs a local transaction, which is destocking and logging the transaction. - Transaction rollback: If the inventory service is abnormal during the inventory deduction and cannot deduct the inventory, it sends a rollback message to RocketMQ, and the order service rolls back the order upon receipt of this message; The rollback method can still use transaction messages, or of course RPC interfaces, and then scheduled task retries, and so on.
Note: All of the above processes should support idempotent
Reliable messages are ultimately consistent distributed transaction dependent components
- **MySQL 5.7: ** Provides database support
- **RocketMQ: ** Reliable message support, supports transactional messages
coded
Note: default MySQL, RcoketMQ environment are available; And the implementation of only demo project use, intended to introduce the principle of “reliable message final consistency distributed transaction”;
Database building table sentences
- orders table
CREATE TABLE `order` (
id BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
order_no VARCHAR(64) COMMENT 'Order Number',
product_id BIGINT(20) COMMENT 'commodity id',
pay_count int(11) COMMENT 'Purchase Quantity',
create_time datetime COMMENT 'Creation time'.PRIMARY KEY (`id`),
UNIQUE KEY `order_no_index` (`order_no`) USING BTREE
);
- inventory table
CREATE TABLE `stock` (
id BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
product_id BIGINT(20) COMMENT 'commodity id',
total_count int(11) COMMENT 'total',
create_time datetime COMMENT 'Creation time'.PRIMARY KEY (`id`),
UNIQUE KEY `product_id_idx` (`product_id`) USING BTREE
);
-- Transaction log
CREATE TABLE `tx_log` (
id BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
tx_no VARCHAR(64) COMMENT 'Transaction Number',
create_time datetime COMMENT 'Creation time'.UNIQUE KEY `tx_no_index` (`tx_no`) USING BTREE,
PRIMARY KEY (`id`)
);
Copy the code
Code implementation
Note: The order service and stock service of demo use different databases, which are order library and Stock library respectively. The ORDER library contains the ORDER table and the TX_log table; The STOCK table contains the STOCK table and the TX_log table
Set up the project
- The project structure is as follows. Here’s an example. You can do whatever you want, right
- POM
The POM dependencies of the two services can be exactly the same. Here’s an example of one of them:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.2.6. RELEASE</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>order-service</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<druid.version>1.1.10</druid.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.18</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.22</version>
</dependency>
</dependencies>
<build>
<finalName>order-service</finalName>
<plugins>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.2</version>
<configuration>
<verbose>true</verbose>
<overwrite>true</overwrite>
</configuration>
</plugin>
</plugins>
</build>
</project>
Copy the code
Order service
- application.yml
server:
port: 8081
servlet:
context-path: /order
tomcat:
uri-encoding: UTF-8
spring:
main:
allow-bean-definition-overriding: true
profiles:
include: db # application-db.yml
active: db
output:
ansi:
enabled: detect
application:
name: order-service-svc
rocketmq:
name-server: 192.168. 5845.: 9876
producer:
group: order-group
Copy the code
- application-db.yml
spring:
datasource:
url: JDBC: mysql: / / 127.0.0.1:3316 / tx - order? useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSS L=false
username: root
password: admin123
driver-class-name: com.mysql.jdbc.Driver
platform: mysql
type: com.alibaba.druid.pool.DruidDataSource
The following supplementary Settings for connection pooling apply to all of the above data sources
# initialize size, min, Max
initialSize: 10
minIdle: 5
maxActive: 20
Set the connection wait timeout
maxWait: 60000
Configure how often to detect idle connections that need to be closed, in milliseconds
timeBetweenEvictionRunsMillis: 3600000
Set the minimum time for a connection to live in the pool in milliseconds
minEvictableIdleTimeMillis: 3600000
validationQuery: select 1 from dual
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
Turn on PSCache and specify the size of PSCache on each connection
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
maxOpenPreparedStatements: 20
# Configure the filters for monitoring statistics interception. After removing the filters, the MONITORING interface SQL cannot be counted. 'wall' is used for the firewall
filters: stat
Enable mergeSql via connectProperties; Slow SQL record
# connectionProperties: druid.stat.mergeSql=true; druid.stat.slowSqlMillis=5000
mybatis-plus:
global-config:
db-config:
id-type: auto
field-strategy: not-empty
table-underline: true
db-type: mysql
logic-delete-value: 1 # Logical deleted value (default: 1)
logic-not-delete-value: 0 # Logical undeleted value (default: 0)
configuration:
jdbc-type-for-null: 'null'
mapper-locations: classpath*:mapper/*.xml Note: It must correspond to the path of the mapper mapping XML file
type-aliases-package: io.transaction.order.entity.* Note: the path of the corresponding entity class
Copy the code
- MyBatis configuration
@Configuration
@EnableTransactionManagement(proxyTargetClass = true)
@MapperScan(value = {"org.transaction.order.mapper"})
public class MyBatisConfiguration {
@Bean
public PaginationInnerInterceptor paginationInterceptor(a) {
return newPaginationInnerInterceptor(); }}Copy the code
- Order interface
@RestController
@Slf4j
public class OrderController {
private final OrderMapper orderMapper;
private final OrderService orderService;
@Autowired
public OrderController(OrderMapper orderMapper, OrderService orderService) {
this.orderMapper = orderMapper;
this.orderService = orderService;
}
@PostMapping("/submitOrder")
public String submitOrder(@RequestBody SubmitOrderDTO dto) {
log.info("Start order submission");
try {
orderService.submitOrder(dto.getProductId(), dto.getPayCount());
return "SUCCESS";
} catch (Exception e) {
e.printStackTrace();
}
return "FAILED"; }}Copy the code
- Request parameters
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SubmitOrderDTO implements Serializable {
/** ** **
private Long productId;
/** * Purchase quantity */
private Integer payCount;
}
Copy the code
- Order processing interface and implementation
public interface OrderService {
/** * Submit the order and save the transaction message *@paramTxMessage Transaction message */
void submitOrderAndSaveTxNo(TxMessage txMessage);
/** * Submit the order *@paramProductId productId *@paramPayCount Quantity purchased */
void submitOrder(Long productId, Integer payCount);
}
Copy the code
OrderServiceImpl.java
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
private final RocketMQTemplate rocketMQTemplate;
private final TxLogMapper txLogMapper;
private final OrderMapper orderMapper;
@Autowired
public OrderServiceImpl(RocketMQTemplate rocketMQTemplate, TxLogMapper txLogMapper, OrderMapper orderMapper) {
this.rocketMQTemplate = rocketMQTemplate;
this.txLogMapper = txLogMapper;
this.orderMapper = orderMapper;
}
@Transactional(rollbackFor = Exception.class)
@Override
public void submitOrderAndSaveTxNo(TxMessage txMessage) {
// Check whether the order exists based on the transaction number
int count = txLogMapper.getCount(txMessage.getTxNo());
if (0! = count) { log.info("Transaction exists: [{}]", txMessage.getTxNo());
return;
}
Date date = new Date();
// Generate the order
Order order = new Order();
order.setOrderNo(txMessage.getOrderNo());
order.setCreateTime(date);
order.setPayCount(txMessage.getPayCount());
order.setProductId(txMessage.getProductId());
log.info("Start storing order: [{}]", order.toString());
int res = orderMapper.saveOrder(order);
if (res > 0) {
// Log transaction logs
TxLog txLog = new TxLog();
txLog.setTxNo(txMessage.getTxNo());
txLog.setCreateTime(date);
log.info("Start transaction logging: [{}]", txLog.toString()); txLogMapper.saveTxLog(txLog); }}@Override
public void submitOrder(Long productId, Integer payCount) {
// Send transaction messages
TxMessage txMessage = new TxMessage();
// Global transaction number
String txNo = UUID.randomUUID().toString();
txMessage.setProductId(productId);
txMessage.setPayCount(payCount);
txMessage.setTxNo(txNo);
txMessage.setOrderNo(UUID.randomUUID().toString());
String jsonString = JSONObject.toJSONString(txMessage);
Message<String> msg = MessageBuilder.withPayload(jsonString).build();
rocketMQTemplate.sendMessageInTransaction("tx_order_group"."topic_txmsg", msg, null); }}Copy the code
- Transaction message processing
Message structure
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TxMessage implements Serializable {
/** * product id */
private Long productId;
/** * Product purchase quantity */
private Integer payCount;
/** * global transaction number */
private String txNo;
/** * Order number */
private String orderNo;
}
Copy the code
Transaction message listener OrderTxMessageListener
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener {
private final OrderService orderService;
private final TxLogMapper txLogMapper;
@Autowired
public OrderTxMessageListener(OrderService orderService, TxLogMapper txLogMapper) {
this.orderService = orderService;
this.txLogMapper = txLogMapper;
}
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
// Perform a local transaction
log.info("Start performing local transactions");
// Submit the order and save the transaction message
log.info("Submit order save transaction log");
TxMessage txMessage = getTxMessage(message);
orderService.submitOrderAndSaveTxNo(txMessage);
/ / return to commit
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
// rollback
log.info("Perform rollback");
returnRocketMQLocalTransactionState.ROLLBACK; }}@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
// Check whether a local transaction exists based on the transaction number
TxMessage txMessage = getTxMessage(message);
int count = txLogMapper.getCount(txMessage.getTxNo());
if (0! = count) { log.info("Transaction exists, transaction message can be committed: [{}]", txMessage.getTxNo());
return RocketMQLocalTransactionState.COMMIT;
}
// Commit, otherwise unknown
return RocketMQLocalTransactionState.UNKNOWN;
}
private TxMessage getTxMessage(Message message) {
String messageString = new String((byte[]) message.getPayload());
returnJSONObject.parseObject(messageString, TxMessage.class); }}Copy the code
- Persistence layer implementation
OrderMapper
@Repository
public interface OrderMapper {
Order selectByOrderNo(@Param("orderNo") String orderNo);
int saveOrder(@Param("order") Order order);
int deleteByOrderNo(@Param("orderNo") String orderNo);
}
Copy the code
OrderMapper.xml
<! DOCTYPEmapper PUBLIC "- / / mybatis.org//DTD Mapper / 3.0 / EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.transaction.order.mapper.OrderMapper" >
<select id="selectByOrderNo" resultType="org.transaction.order.entity.Order" >
select
id as id, order_no as orderNo, pay_count as payCount, create_time as createTimes
from `order`
where order_no = #{orderNo};
</select>
<insert id="saveOrder" parameterType="org.transaction.order.entity.Order">
insert into `order` (order_no, product_id, pay_count, create_time) values (#{order.orderNo}, #{order.productId}, #{order.payCount}, #{order.createTime});
</insert>
<delete id="deleteByOrderNo">
delete from `order` where order_no = #{orderNo};
</delete>
</mapper>
Copy the code
TxLogMapper
@Repository
public interface TxLogMapper {
int getCount(@Param("txNo") String txNo);
int saveTxLog(@Param("txLog")TxLog txLog);
}
Copy the code
TxLogMapper.xml
<! DOCTYPEmapper PUBLIC "- / / mybatis.org//DTD Mapper / 3.0 / EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.transaction.order.mapper.TxLogMapper" >
<select id="getCount" resultType="java.lang.Integer">
select count(*) from tx_log where tx_no = #{txNo}
</select>
<insert id="saveTxLog" parameterType="org.transaction.order.entity.TxLog">
insert into `tx_log` (tx_no, create_time) values (#{txLog.txNo}, #{txLog.createTime});
</insert>
</mapper>
Copy the code
The inventory service
- application.yml
server:
port: 8082
servlet:
context-path: /stock
tomcat:
uri-encoding: UTF-8
Set encoding to UTF-8
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml Note: It must correspond to the path of the mapper mapping XML file
type-aliases-package: io.transaction.stock.entity.* Note: the path of the corresponding entity class
spring:
main:
allow-bean-definition-overriding: true
profiles:
include: db
active: db
output:
ansi:
enabled: detect
application:
name: tx-msg-stock
http:
encoding:
charset: UTF-8
enabled: true
force: true
rocketmq:
name-server: 192.168. 5845.: 9876
producer:
group: stock-group
Copy the code
- application-db.yml
The basic implementation of the order service is similar or the same as the order service. Some code is omitted here, for example: transaction message structure, Mybatis configuration, persistence layer
spring:
datasource:
url: JDBC: mysql: / / 127.0.0.1:3316 / tx - stock? useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSS L=false
username: root
password: admin123
driver-class-name: com.mysql.jdbc.Driver
platform: mysql
type: com.alibaba.druid.pool.DruidDataSource
The following supplementary Settings for connection pooling apply to all of the above data sources
# initialize size, min, Max
initialSize: 10
minIdle: 5
maxActive: 20
Set the connection wait timeout
maxWait: 60000
Configure how often to detect idle connections that need to be closed, in milliseconds
timeBetweenEvictionRunsMillis: 3600000
Set the minimum time for a connection to live in the pool in milliseconds
minEvictableIdleTimeMillis: 3600000
validationQuery: select 1 from dual
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
Turn on PSCache and specify the size of PSCache on each connection
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
maxOpenPreparedStatements: 20
# Configure the filters for monitoring statistics interception. After removing the filters, the MONITORING interface SQL cannot be counted. 'wall' is used for the firewall
filters: stat
Enable mergeSql via connectProperties; Slow SQL record
# connectionProperties: druid.stat.mergeSql=true; druid.stat.slowSqlMillis=5000
mybatis-plus:
global-config:
db-config:
id-type: auto
field-strategy: not-empty
table-underline: true
db-type: mysql
logic-delete-value: 1 # Logical deleted value (default: 1)
logic-not-delete-value: 0 # Logical undeleted value (default: 0)
configuration:
jdbc-type-for-null: 'null'
mapper-locations: classpath*:mapper/*.xml Note: It must correspond to the path of the mapper mapping XML file
type-aliases-package: io.transaction.stock.entity.* Note: the path of the corresponding entity class
Copy the code
- Fastener inventory interface and implementation
public interface StockService {
/** * reduce inventory *@paramTxMessage Transaction message *@throwsThe Exception Exception * /
void deleteStock(TxMessage txMessage) throws Exception;
}
Copy the code
It realize
@Service
@Slf4j
public class StockServiceImpl implements StockService {
private final TxLogMapper txLogMapper;
private final StockMapper stockMapper;
@Autowired
public StockServiceImpl(TxLogMapper txLogMapper, StockMapper stockMapper) {
this.txLogMapper = txLogMapper;
this.stockMapper = stockMapper;
}
@Transactional(rollbackFor = Exception.class)
@Override
public void deleteStock(TxMessage txMessage) throws Exception {
// Check local transactions
int count = txLogMapper.getCount(txMessage.getTxNo());
if(count ! =0) {
log.info("Transaction exists: [{}]", txMessage.getTxNo());
return;
}
// Get inventory based on product ID
Stock stock = stockMapper.getStockByProductId(txMessage.getProductId());
// Check the inventory to see if you can place an order
if (txMessage.getPayCount() > stock.getTotalCount()) {
log.info("Out of stock");
// TODO tells order-service to rollback
throw new Exception("Out of stock");
}
Date date = new Date();
// Deduct inventory
stockMapper.updateStock(txMessage.getProductId(), txMessage.getPayCount());
log.info("Inventory reduction succeeded");
// Log transaction logs
TxLog txLog = new TxLog();
txLog.setTxNo(txMessage.getTxNo());
txLog.setCreateTime(date);
txLogMapper.saveTxLog(txLog);
log.info("Transaction logging succeeded"); }}Copy the code
- Single transaction message monitoring
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements RocketMQListener<String> {
private final StockService stockService;
private final RocketMQTemplate rocketMQTemplate;
@Autowired
public StockTxMessageConsumer(StockService stockService, RocketMQTemplate rocketMQTemplate) {
this.stockService = stockService;
this.rocketMQTemplate = rocketMQTemplate;
}
@SneakyThrows
@Override
public void onMessage(String s) {
// Deduct inventory based on news
log.info("Start deducting inventory.");
TxMessage txMessage = deserialization(s);
try {
stockService.deleteStock(txMessage);
} catch (Exception e) {
e.printStackTrace();
log.info("An exception occurs. Notify upstream rollback."); rollback(txMessage); }}private void rollback(TxMessage txMessage) {
String jsonString = JSONObject.toJSONString(txMessage);
Message<String> msg = MessageBuilder.withPayload(jsonString).build();
rocketMQTemplate.asyncSend("topic_tx_rollback_msg", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Rollback message sending result: [{}]", sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
log.error("Rollback message sending exception"); }}); }private TxMessage deserialization(String msg) {
returnJSONObject.parseObject(msg, TxMessage.class); }}Copy the code
The test results
- Tx – library of the order
In tX-order, the ORDER table and tx_log table have no data
mysql> truncate tx_log;Query OK, 0 rows affected (0.01sec)
mysql> select * from tx_log;The Empty set (0.00 SEC)
mysql> truncate `order`;Query OK, 0 rows affected (0.01sec)Copy the code
- Tx – stock library
In tx-stock table, there is product product_id 1 in stock table, the total number of products is 400, and there is no data in tx_log table
mysql> select * from stock;+----+------------+-------------+---------------------+ | id | product_id | total_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 | 400 | 2021-11-25 18:46:19 | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)
mysql> truncate tx_log;Query OK, 0 rows affected (0.01sec)Copy the code
- By placing
Initiate order, purchase quantity 100
In tX-ORDER, the ORDER and TX_log tables hold order data and transaction logs
mysql> select * from `order`;+----+--------------------------------------+------------+-----------+---------------------+ | id | order_no | product_id | pay_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 7268e58e-5ee1-4fba-a998-e99d45d5ff47 | 1 | 100 | 2021-11-29 13:26:46 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.01 SEC)
mysql> select * from tx_log;+----+--------------------------------------+---------------------+ | id | tx_no | create_time | +----+--------------------------------------+---------------------+ | 1 | 49afa0ec-09a6-4bb1-8363-3c2d695aa1a5 | The 2021-11-29 13:26:46 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)Copy the code
In tX-stock, the number of products in stock with product_ID 1 decreases by 100, and tX-log stores the transaction log
mysql> select * from stock;+----+------------+-------------+---------------------+ | id | product_id | total_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 | 300 | 2021-11-25 18:46:19 | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)
mysql> select * from tx_log;+----+--------------------------------------+---------------------+ | id | tx_no | create_time | +----+--------------------------------------+---------------------+ | 1 | 49afa0ec-09a6-4bb1-8363-3c2d695aa1a5 | The 2021-11-29 13:27:09 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)Copy the code
RocketMQ transaction messages
- Order quantity exceeds stock, roll back
Now the inventory is 300. If I place an order of 400, the order will fail. Then the inventory service will inform the order service to roll back.
In tX-ORDER, the ORDER table data remains unchanged and the TX_log table stores transaction logs
mysql> select * from `order`;+----+--------------------------------------+------------+-----------+---------------------+ | id | order_no | product_id | pay_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 7268e58e-5ee1-4fba-a998-e99d45d5ff47 | 1 | 100 | 2021-11-29 13:26:46 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.01 SEC)
mysql> select * from tx_log;+----+--------------------------------------+---------------------+ | id | tx_no | create_time | +----+--------------------------------------+---------------------+ | 1 | 49afa0ec-09a6-4bb1-8363-3c2d695aa1a5 | 2021-11-29 13:26:46 | | 2 | 1f71279d-1efa-4ee0-8416-9f13c661b58a | 2021-11-29 13:39:34 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 2 rows in the set (0.01 SEC)Copy the code
In the TX-stock table, the number of data in the stock table remains unchanged, and tX-log does not store transaction logs
mysql> select * from stock;+----+------------+-------------+---------------------+ | id | product_id | total_count | create_time | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 1 | 300 | 2021-11-25 18:46:19 | + - + -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)
mysql> select * from tx_log;+----+--------------------------------------+---------------------+ | id | tx_no | create_time | +----+--------------------------------------+---------------------+ | 1 | 49afa0ec-09a6-4bb1-8363-3c2d695aa1a5 | The 2021-11-29 13:27:09 | + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 1 row in the set (0.00 SEC)Copy the code
RocketMQ has transaction rollback messages
conclusion
At this point, RocketMQ based on the implementation of reliable message final consistency distributed transaction demo is completed, the program is neither commercially nor technically available for production use, attack demonstrated reliable message final consistency distributed transaction processing flow.
The source code can be found at author github: github.com/Redick01/my…
- Reference: Understanding Distributed Transactions in Depth