define
The RocketMQ Transactional Message means that applying a local transaction and sending a Message operations can be defined into a global transaction that either succeeds or fails at the same time. RocketMQ’s transaction messages provide distributed transaction functionality similar to X/Open XA, through which the ultimate consistency of distributed transactions can be achieved.
Demo
The following example, again based on the Spring Cloud Stream programming model and with the implementation of Spring Cloud Alibaba RocketMQ, demonstrates the use of transactional messages.
process
The transaction message interaction flow is as follows:
Transaction message sending steps are as follows:
- The producer sends a semi-transactional message to the RocketMQ version of the message queue server.
- Message queue After the RocketMQ server persists the message successfully, it sends an Ack to the producer to confirm that the message has been successfully sent. In this case, the message is a semi-transaction message.
- The producer starts executing the local transaction logic.
- The producer submits the result (Commit or Rollback) to the server based on the local transaction execution result. After receiving the result, the server processes the result as follows:
- The result of the second confirmation is Commit: the server marks the semi-transaction message as deliverable and delivers it to the consumer.
- The result is Rollback: The server will not deliver the message to the consumer and performs Rollback based on the following logic.
The steps for checking back transaction messages are as follows:
- In the case that the network is disconnected or the producer application is restarted, the secondary confirmation submitted in Step 4 does not reach the server. After a fixed period of time, the server will initiate message back check for any producer instance in the producer cluster.
- After receiving the message, the producer needs to check the final result of the local transaction execution of the corresponding message.
- The producer submits a second acknowledgement based on the final status of the local transaction, and the server continues to process the half-transaction message according to Step 4.
configuration
As before, I’ve configured the producer messages together, starting with the configuration file:
spring:
application:
name: mq-example
cloud:
stream:
bindings:
input-transaction:
content-type: application/json
destination: TransactionTopic
group: transaction-consumer-group
output-transaction:
content-type: application/json
destination: TransactionTopic
rocketmq:
# RocketMQ Binder configuration items, corresponding RocketMQBinderConfigurationProperties class
binder:
Configure rocketMQ nameserver address
name-server: 127.0. 01.: 9876
group: rocketmq-group
bindings:
output-transaction:
# corresponding RocketMQProducerProperties class
producer:
producerType: Trans
group: transaction-producer-group # producer grouping
transactionListener: myTransactionListener
Copy the code
Note that the producer type is Trans, transaction message.
Corresponding producer and consumer groups are also configured, and these two concepts are reviewed here
Consumer group: A set of producers of the same kind who send the same kind of messages and send them logically. If a transaction message is sent and the original producer crashes after sending, the Broker server contacts other producer instances in the same producer group to commit or backtrack consumption.
Producer group: A collection of the same kind of consumers, which usually consume the same kind of messages and consume logically. Consumer groups make it easy to achieve the goals of load balancing and fault tolerance in terms of message consumption. Note that the consumer instances of the consumer group must subscribe to the exact same Topic. RocketMQ supports two messaging modes: Clustering and broadcast consumption.
implementation
TransactionListener is our custom transactionListener. See the code below for details:
@Component("myTransactionListener")
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = msg.getProperty("test");
if ("1".equals(num)) {
System.out.println("executer: " + new String(msg.getBody()) + " unknown");
return LocalTransactionState.UNKNOW;
} else if ("2".equals(num)) {
System.out.println("executer: " + new String(msg.getBody()) + " rollback");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
System.out.println("executer: " + new String(msg.getBody()) + " commit");
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("check: " + new String(msg.getBody()));
returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code
The above code is a reference to the official Demo. You can see that different transaction states are returned depending on num
- if
num
为1
, the returnUNKNOW
“, indicating that the local transaction status is unknown and the transaction status needs to be checked periodically, the checkLocalTransaction method is executed. - if
num
为2
, the returnROLLBACK_MESSAGE
, indicating that the local transaction status is rolled back. The broker rolls back previously committed transaction messages, i.e. undelivered messages. - if
num
为3
, the returnCOMMIT_MESSAGE
, indicating that the local transaction status is committed and the broker will post messages.
Sending a message is similar to the previous code:
@GetMapping("/send_transaction")
public void sendTransaction(a) {
String msg = "This is a transaction message";
Integer num = 2;
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
builder.setHeader("test", String.valueOf(num));
builder.setHeader(RocketMQConst.USER_TRANSACTIONAL_ARGS, "binder");
Message message = builder.build();
mySource.outputTransaction().send(message);
}
Copy the code
To ensure ultimate consistency of reliable messages, you need to have a database table that records transaction state,
The UNKNOW state is stored at the beginning of the transaction, and when the transaction is abnormal, the ROLLBACK_MESSAGE state is returned and logged in the database table. When the transaction commits successfully, change the state to COMMIT_MESSAGE.
With the transaction message table, the checkLocalTransaction method can query the transaction status against this table.
Of course, if A complete distributed transaction spans two systems A and B as shown in the figure above, if the transaction of SYSTEM B fails to roll back, consider whether the transaction of system A needs to be rolled back. If so, system A should provide A rollback interface for system B to call.
reference
- www.alibabacloud.com/help/zh/doc…
- www.iocoder.cn/Spring-Clou…