The premise
Distributed transaction is a thorny problem in micro-service practice. In the micro-service practice scheme implemented by the author, compromise or avoid strong consistency scheme is adopted. Referring to the local message table scheme proposed by Ebay many years ago, lightweight encapsulation is made based on RabbitMQ and MySQL (JDBC) to achieve a low intrusion transaction message module. The content of this paper is a detailed analysis of the design and implementation of the whole scheme. The environment dependencies are as follows:
JDK1.8 +
spring-boot-start-web:2.x.x
,spring-boot-start-jdbc:2.x.x
,spring-boot-start-amqp:2.x.x
HikariCP:3.x.x
(spring-boot-start-jdbc
Bring their own),Mysql connector - Java: 5.1.48
Redisson: 3.12.1
Scheme design Idea
In principle, transaction messages are only suitable for scenarios of weak consistency (or “final consistency”). Common scenarios of weak consistency include:
- The user service completes the registration action and pushes a marketing related message to the SMS service.
- In the credit system, the order service saves the order and pushes a record of the order to be approved to the approval service.
- .
Transactional messages should generally not be used for strong consistency scenarios.
In general, demanding consistency requires strict synchronization, meaning that all operations must succeed or fail at the same time, which introduces additional costs of synchronization. If a transaction message module is properly designed, compensation, query, monitoring, and so on are completed, and because the system interaction is asynchronous, overall throughput is higher than strict synchronization. In the business system in charge of the author, a basic principle is customized based on the use of transaction messages: “On the premise that the message content is correct, consumers should take care of themselves if there is any abnormality”.
❝
To put it simply: the upstream ensures that its business is correct, and if it successfully pushes the correct message to RabbitMQ, the upstream duty is over.
❞
To make your code less intrusive, transactional messages need to leverage Spring’s “programmatic transactions” or “declarative transactions.” Programmatic transactions typically rely on TransactionTemplate, while declarative transactions rely on AOP modules and the @Transactional annotation.
Then you need to customize a transaction message function module and add a transaction message record table (actually the “local message table”), which is used to hold the record of every message that needs to be sent. The main functions of the transaction message function module are:
- Save message logs.
- Push a message to
RabbitMQ
The service side. - Query message records, compensation push, and so on.
The logical unit of transaction execution
Within the logical unit of transaction execution, the transaction message record to be pushed needs to be saved, i.e., “local (business) logic and transaction message record saving operations are bound to the same transaction.”
Sending a message to the RabbitMQ server needs to be delayed until after transaction commit so that the transaction commit and the message sent to the RabbitMQ server are the same. To send “save the momentum transaction message” and “send a message to the RabbitMQ” two actions from the perspective of the user perception combined into an action, it need to use the Spring transaction synchronizer TransactionSynchronization, analyze the transaction here synchronizer of the main methods of correction, Main reference AbstractPlatformTransactionManager# commit () or AbstractPlatformTransactionManager# processCommit () method:
The figure above only illustrates the scenario where the transaction commits correctly (without exceptions). It’s clear here that, Transaction of synchronizer TransactionSynchronization afterCommit () and afterCompletion (int Status) method in real transaction commit point AbstractPlatformTransactionManager# doCommit () after the callback, so you can choose one of these two methods are used to perform notification message to the RabbitMQ server, the pseudo code is as follows:
@Transactional
public Dto businessMethod(a){
business transaction code block ...
// Save the transaction message
[saveTransactionMessageRecord()]
// Register the transaction synchronizer - push messages to RabbitMQ in afterCommit() [register TransactionSynchronization,send message in method afterCommit(a)] business transaction code block ... } Copy the code
In the pseudocode above, the “save transaction message” and “register transaction synchronizer” steps can be inserted anywhere in the transaction method, that is, regardless of the order of execution.
Compensation for transaction messages
Although the author suggested that the downstream service take care of the abnormal scenario of its own service consumption, sometimes the upstream service is forced to push the corresponding message again, which is a special scenario. Another scenario to consider: transaction commits trigger affairs synchronizer TransactionSynchronization afterCommit failure () method. This is a low probability of the scene, but in the production, will be in a typical reason: “after the completion of the transaction commit has not yet had time to trigger TransactionSynchronization# afterCommit () method to push service instance is reset”. As shown below:
In order to deal with the problem of compensation push uniformly, finite state is used to judge whether the message has been pushed successfully:
- In the transaction method, the message record push status is marked as “processing” when the transaction message is saved.
- Transaction synchronizer interface
TransactionSynchronization
theafterCommit()
Method, push the corresponding message toRabbitMQ
, and then change the state of the transaction message record to“Push success“.
There is a very special case is the RabbitMQ server itself, abnormal failure message to push this case need to retry compensation (push), “experience has shown in a short period of time repeatedly try again is meaningless”, the fault transient recovery service does not generally, so consider using “index retreat algorithm” retry, You also need to limit the maximum number of retries.
The index value, interval value, and maximum number of retries must be set based on the actual situation. Otherwise, messages may be delayed too long or retries are too frequent.
Plan implementation
Introducing core dependencies:
<properties>
<spring.boot.version>2.2.4. RELEASE</spring.boot.version>
<redisson.version>3.12.1</redisson.version>
<mysql.connector.version>5.1.48</mysql.connector.version>
</properties>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.connector.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>${redisson.version}</version> </dependency> </dependencies> Copy the code
Spring-boot-starter-jdbc, mysql-connector-Java, and spring-boot-starter-AOP are mysql transaction dependent, Spring-boot-starter-amqp is the encapsulation of the RabbitMQ client. Redisson mainly uses its distributed lock to compensate the locking execution of scheduled tasks (to prevent multiple service nodes from concurrently executing the compensation push).
Table design
The transaction message module mainly involves two tables. Taking MySQL as an example, the DDL of the table is as follows:
CREATE TABLE `t_transactional_message`
(
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY. create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP. edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP. creator VARCHAR(20) NOT NULL DEFAULT 'admin'. editor VARCHAR(20) NOT NULL DEFAULT 'admin'. deleted TINYINT NOT NULL DEFAULT 0. current_retry_times TINYINT NOT NULL DEFAULT 0 COMMENT 'Current retry times'. max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT 'Maximum retry times'. queue_name VARCHAR(255) NOT NULL COMMENT 'Queue name'. exchange_name VARCHAR(255) NOT NULL COMMENT 'Switch name'. exchange_type VARCHAR(8) NOT NULL COMMENT 'Exchange type'. routing_key VARCHAR(255) COMMENT 'Routing key'. business_module VARCHAR(32) NOT NULL COMMENT 'Business module'. business_key VARCHAR(255) NOT NULL COMMENT 'Business key'. next_schedule_time DATETIME NOT NULL COMMENT 'Next dispatch time'. message_status TINYINT NOT NULL DEFAULT 0 COMMENT 'Message status'. init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT 'Retreat initialization value, in seconds'. backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT 'Retreat factor'. INDEX idx_queue_name (queue_name), INDEX idx_create_time (create_time), INDEX idx_next_schedule_time (next_schedule_time), INDEX idx_business_key (business_key) ) COMMENT 'Transaction Message Table'; CREATE TABLE `t_transactional_message_content` ( id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY. message_id BIGINT UNSIGNED NOT NULL COMMENT 'Transaction Message record ID'. content TEXT COMMENT 'Message content' ) COMMENT 'Transaction Message Content Table'; Copy the code
Because this module it would be possible to extend a background management module, so the news of the state of management and related fields and large volume of message content stored in two tables respectively, so as to avoid mass query news record MySQL service IO the high utilization rate (this is and discuss the DBA team in a company after get a more reasonable scheme). Two business fields, business_module and business_key, are reserved to identify the business module and the business key (usually a unique identifier, such as an order number).
In general, if the service is configured to declare the queue and exchange binding in advance, it will only rely on exchangeName and routingKey to send RabbitMQ messages (header exchanges are special and rarely used, so don’t consider this for now). To allow the service to omit the declaration operation, the message is first bound based on the queue and cached (the queue-exchange binding in RabbitMQ states that no exception will be thrown as long as the binding parameters are the same each time).
Scheme code design
The API design of the message transaction management background is ignored in the following schema design description, which can be supplemented later.
Define anemia model entity class TransactionalMessage and TransactionalMessageContent:
@Data
public class TransactionalMessage {
private Long id;
private LocalDateTime createTime;
private LocalDateTime editTime; private String creator; private String editor; private Integer deleted; private Integer currentRetryTimes; private Integer maxRetryTimes; private String queueName; private String exchangeName; private String exchangeType; private String routingKey; private String businessModule; private String businessKey; private LocalDateTime nextScheduleTime; private Integer messageStatus; private Long initBackoff; private Integer backoffFactor; } @Data public class TransactionalMessageContent { private Long id; private Long messageId; private String content; } Copy the code
Then define the DAO interface (the implementation details will not be expanded here, the storage uses MySQL, if you want to replace with another type of database, just use a different implementation) :
public interface TransactionalMessageDao {
void insertSelective(TransactionalMessage record);
void updateStatusSelective(TransactionalMessage record);
List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime, LocalDateTime maxScheduleTime, int limit); } public interface TransactionalMessageContentDao { void insert(TransactionalMessageContent record); List<TransactionalMessageContent> queryByMessageIds(String messageIds); } Copy the code
Then define the transaction message service interface TransactionalMessageService:
// The external service interface
public interface TransactionalMessageService {
void sendTransactionalMessage(Destination destination, TxMessage message);
}
@Getter @RequiredArgsConstructor public enum ExchangeType { FANOUT("fanout"), DIRECT("direct"), TOPIC("topic"), DEFAULT(""), ; private final String type; } // The destination to which the message is sent public interface Destination { ExchangeType exchangeType(a); String queueName(a); String exchangeName(a); String routingKey(a); } @Builder public class DefaultDestination implements Destination { private ExchangeType exchangeType; private String queueName; private String exchangeName; private String routingKey; @Override public ExchangeType exchangeType(a) { return exchangeType; } @Override public String queueName(a) { return queueName; } @Override public String exchangeName(a) { return exchangeName; } @Override public String routingKey(a) { return routingKey; } } // Transaction message public interface TxMessage { String businessModule(a); String businessKey(a); String content(a); } @Builder public class DefaultTxMessage implements TxMessage { private String businessModule; private String businessKey; private String content; @Override public String businessModule(a) { return businessModule; } @Override public String businessKey(a) { return businessKey; } @Override public String content(a) { return content; } } // Message status @RequiredArgsConstructor public enum TxMessageStatus { / * ** successful* / SUCCESS(1), / * ** to be processed* / PENDING(0), / * ** Processing failure* / FAIL(-1), ; private final Integer status; } Copy the code
TransactionalMessageService implementation class is the core of transaction message function realization, the code is as follows:
@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitTransactionalMessageService implements TransactionalMessageService {
private final AmqpAdmin amqpAdmin; private final TransactionalMessageManagementService managementService; private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>(); @Override public void sendTransactionalMessage(Destination destination, TxMessage message) { String queueName = destination.queueName(); String exchangeName = destination.exchangeName(); String routingKey = destination.routingKey(); ExchangeType exchangeType = destination.exchangeType(); // Pre-declaration of atomicity QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> { Queue queue = new Queue(queueName); amqpAdmin.declareQueue(queue); Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType()); amqpAdmin.declareExchange(exchange); Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs(); amqpAdmin.declareBinding(binding); return true; }); TransactionalMessage record = new TransactionalMessage(); record.setQueueName(queueName); record.setExchangeName(exchangeName); record.setExchangeType(exchangeType.getType()); record.setRoutingKey(routingKey); record.setBusinessModule(message.businessModule()); record.setBusinessKey(message.businessKey()); String content = message.content(); // Save transaction message records managementService.saveTransactionalMessageRecord(record, content); // Register the transaction synchronizer TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit(a) { managementService.sendMessageSync(record, content); } }); } } Copy the code
Persistent message record status and content of unified management in TransactionalMessageManagementService:
@Slf4j
@RequiredArgsConstructor
@Service
public class TransactionalMessageManagementService {
private final TransactionalMessageDao messageDao; private final TransactionalMessageContentDao contentDao; private final RabbitTemplate rabbitTemplate; private static final LocalDateTime END = LocalDateTime.of(2999.1.1.0.0.0); private static final long DEFAULT_INIT_BACKOFF = 10L; private static final int DEFAULT_BACKOFF_FACTOR = 2; private static final int DEFAULT_MAX_RETRY_TIMES = 5; private static final int LIMIT = 100; public void saveTransactionalMessageRecord(TransactionalMessage record, String content) { record.setMessageStatus(TxMessageStatus.PENDING.getStatus()); record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF, DEFAULT_BACKOFF_FACTOR, 0)); record.setCurrentRetryTimes(0); record.setInitBackoff(DEFAULT_INIT_BACKOFF); record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR); record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES); messageDao.insertSelective(record); TransactionalMessageContent messageContent = new TransactionalMessageContent(); messageContent.setContent(content); messageContent.setMessageId(record.getId()); contentDao.insert(messageContent); } public void sendMessageSync(TransactionalMessage record, String content) { try { rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content); if (log.isDebugEnabled()) { log.debug("Message sent successfully, destination queue :{}, message content :{}", record.getQueueName(), content); } // Mark success markSuccess(record); } catch (Exception e) { // Tag failed markFail(record, e); } } private void markSuccess(TransactionalMessage record) { // mark the next execution time as the maximum record.setNextScheduleTime(END); record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ? record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1); record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus()); record.setEditTime(LocalDateTime.now()); messageDao.updateStatusSelective(record); } private void markFail(TransactionalMessage record, Exception e) { log.error("Failed to send message, destination queue :{}", record.getQueueName(), e); record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ? record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1); // Calculate the next execution time LocalDateTime nextScheduleTime = calculateNextScheduleTime( record.getNextScheduleTime(), record.getInitBackoff(), record.getBackoffFactor(), record.getCurrentRetryTimes() ); record.setNextScheduleTime(nextScheduleTime); record.setMessageStatus(TxMessageStatus.FAIL.getStatus()); record.setEditTime(LocalDateTime.now()); messageDao.updateStatusSelective(record); } / * ** Calculate the next execution time * * @paramBase time * @paramInitBackoff Indicates the backoff base value * @paramThe backoffFactor * @paramRound round number * @return LocalDateTime * / private LocalDateTime calculateNextScheduleTime(LocalDateTime base, long initBackoff, long backoffFactor, long round) { double delta = initBackoff * Math.pow(backoffFactor, round); return base.plusSeconds((long) delta); } / * ** Push compensation - The parameters should be customized according to the actual scenario* / public void processPendingCompensationRecords(a) { // The right value of time is the current time minus the initial value of retreat, where the precaution also pushes the message that was just saved LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF); // The lvalue of the time is the rvalue minus 1 hour LocalDateTime min = max.plusHours(-1); Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT) .stream() .collect(Collectors.toMap(TransactionalMessage::getId, x -> x)); if(! collect.isEmpty()) { StringJoiner joiner = new StringJoiner(","."(".")"); collect.keySet().forEach(x -> joiner.add(x.toString())); contentDao.queryByMessageIds(joiner.toString()) .forEach(item -> { TransactionalMessage message = collect.get(item.getMessageId()); sendMessageSync(message, item.getContent()); }); } } } Copy the code
One thing that needs to be optimized here is that updating the state of the transaction message record can be optimized for batch updates, which are more efficient when the limit is large.
Finally, the configuration class for scheduled tasks:
@Slf4j
@RequiredArgsConstructor
@Configuration
@EnableScheduling
public class ScheduleJobAutoConfiguration {
private final TransactionalMessageManagementService managementService; / * ** Here we use native Redis, which is actually configured* / private final RedissonClient redisson = Redisson.create(); @Scheduled(fixedDelay = 10000) public void transactionalMessageCompensationTask(a) throws Exception { RLock lock = redisson.getLock("transactionalMessageCompensationTask"); // The wait time is 5 seconds and the execution is expected to finish in 300 seconds. The two values need to be customized based on actual scenarios boolean tryLock = lock.tryLock(5.300, TimeUnit.SECONDS); if (tryLock) { try { long start = System.currentTimeMillis(); log.info("Start transaction push compensation scheduled task..."); managementService.processPendingCompensationRecords(); long end = System.currentTimeMillis(); long delta = end - start; // In case the lock is released prematurely if (delta < 5000) { Thread.sleep(5000 - delta); } log.info("Execute transaction notification push compensation scheduled task completed, time :{} ms...", end - start); } finally { lock.unlock(); } } } } Copy the code
After the basic code is written, the structure of the whole project is as follows:
Finally add two test classes:
@RequiredArgsConstructor
@Component
public class MockBusinessRunner implements CommandLineRunner {
private final MockBusinessService mockBusinessService;
@Override public void run(String... args) throws Exception { mockBusinessService.saveOrder(); } } @Slf4j @RequiredArgsConstructor @Service public class MockBusinessService { private final JdbcTemplate jdbcTemplate; private final TransactionalMessageService transactionalMessageService; private final ObjectMapper objectMapper; @Transactional(rollbackFor = Exception.class) public void saveOrder(a) throws Exception { String orderId = UUID.randomUUID().toString(); BigDecimal amount = BigDecimal.valueOf(100L); Map<String, Object> message = new HashMap<>(); message.put("orderId", orderId); message.put("amount", amount); jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (? ,?) ", p -> { p.setString(1, orderId); p.setBigDecimal(2, amount); }); String content = objectMapper.writeValueAsString(message); transactionalMessageService.sendTransactionalMessage( DefaultDestination.builder() .exchangeName("tm.test.exchange") .queueName("tm.test.queue") .routingKey("tm.test.key") .exchangeType(ExchangeType.DIRECT) .build(), DefaultTxMessage.builder() .businessKey(orderId) .businessModule("SAVE_ORDER") .content(content) .build() ); log.info("Save order :{} successful...", orderId); } } Copy the code
A test result is as follows:
The 2020-02-05 21:10:13. 49556-287 the INFO [main] club. Throwable. Cm. MockBusinessService: Save order: 07A75323-460B-42CB-AA63-1A0A45CE19BF Success...Copy the code
The simulated order data is saved successfully and the RabbitMQ message is sent to the RabbitMQ server normally after the transaction has successfully committed, as shown in the RabbitMQ console data.
summary
The design of transaction message module is only to make the asynchronous message push this function tends to be complete, in fact, a reasonable asynchronous message interaction system, will provide synchronous query interface, which is based on the asynchronous message no callback or no response caused by the characteristics. Generally speaking, the throughput of a system is positively correlated with the proportion of asynchronous processing in the system (see Amdahl’s Law for this point), so asynchronous interaction should be used as much as possible in practical system architecture design to improve system throughput and reduce unnecessary waiting caused by synchronous blocking. The transaction message module can be extended to a back-end management system and can even work with Micrometer, Prometheus, and Grafana systems for real-time data monitoring.
This article demo project repository: Rabbit-transactional Message
Demo can start only when MySQL, Redis, and RabbitMQ are installed on the demo. A local database must be created and named local.
(The end of this article C-5-D E-A-20200202 serious epidemic, will soon start to work at home, go out less and read more)
Technical official account Throwable Digest (ID: Throwable – DOge) will push the author’s original technical articles from time to time (never plagiarize or reprint) :
This article is formatted using MDNICE