Wechat search BGM7756, free access to the most powerful information from zero base to architecture!Copy the code

How are RocketMQ transactions implemented?

First let’s look at RocketMQ transactions. I have shown you the general flow of RocketMQ transactions in previous lectures, so let’s go over the flow again through the code.

+V: BGM7756 +V: BGM7756 +V: BGM7756

public class CreateOrderService {
    @Inject
    private OrderDao orderDao;
    // Inject the DAO of the order table
    @Inject
    private ExecutorService executorService;
    // Inject an ExecutorService
    private TransactionMQProducer producer;
    // Initialize the transactionListener and producer
    @Init
    public void init(a) throws MQClientException {
        TransactionListener transactionListener = createTransactionListener();
        producer = new TransactionMQProducer("myGroup");
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
    }
    // Create a request entry for the order service
    @PUT
    @RequestMapping(...).public Boolean createOrder(@RequestBody CreateOrderRequest request) {
        Create a message based on the create order request
        Message msg = createMessage(request);
        // Send transaction messages
        SendResult sendResult = producer.sendMessageInTransaction(msg, request);
        // Returns whether the transaction was successful
        return sendResult.getSendStatus() == SendStatus.SEND_OK;
    }
    private TransactionListener createTransactionListener(a) {
        return new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                CreateOrderRequest request = (CreateOrderRequest ) arg;
                try {
                    // Execute the local transaction create order
                    orderDao.createOrderInDB(request);
                    // If no exception is thrown, the transaction message is submitted
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                catch (Throwable t) {
                    // The transaction message is rolled back on failure
                    returnLocalTransactionState.ROLLBACK_MESSAGE; }}// backcheck local transactions
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // Get the order ID from the message
                String orderId = msg.getUserProperty("orderId");
                // Check whether the order number exists in the database, if so, commit transaction;
                // If it does not exist, the local transaction either failed or is still executing, so return unknown //
                returnorderDao.isOrderIdExistsInDB(orderId)? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; }}; }//....
}+V: BGM7756 +V: BGM7756 +V: BGM7756Copy the code

In this process, we provide a create order service that inserts an order record into the database and sends a create order message, requiring both write to the database and send the message to be performed in a single transaction, with either success or failure. In this code, we first initialize the transactionListener and the variable Producer that produces the RocketMQ transaction message in the init() method. The real method to create the order service is createOrder(), where we create a message based on the parameters of the request and then call RocketMQ Producer to send the transaction message and return the result of the transaction execution.

After createTransactionListener () method is in the init () method calls, it constructs an anonymous class directly, to realize the RocketMQ TransactionListener interface, this interface need to implement two methods:

  • ExecuteLocalTransaction: Performs local transactions, where we directly insert the order data into the database and return the execution results of the local transaction.
  • If the transaction does not exist, then the transaction is committed. If the transaction does not exist, then the local transaction failed or is still executing.

Thus, a distributed transaction that creates an order is implemented using RocketMQ’s transaction messaging capabilities. Let’s take a look at the RocketMQ source code to see how its transaction messages are implemented.

Wechat search BGM7756, free access to the most powerful information from zero base to architecture!Copy the code



First, look at how producer sends transaction messages:

+V: BGM7756 +V: BGM7756 +V: BGM7756 public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter locthrows MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // Add attributes to the message, indicating that this is a transaction message, Also is half message MessageAccessor. PutProperty (MSG, MessageConst PROPERTY_TRANSACTION_PREPARED, "True" MessageAccessor. PutProperty (MSG, MessageConst PROPERTY_PRODUCER_GROUP, enclosing defaultM / / call the method that sends regular message, Try {sendResult = this.send(MSG); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() ! = null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT if (null ! = transactionId && !" ".equals(transactionId)) { msg.setTransactionId(transactionId); } // Perform a local transaction if (null! = localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransac } else if (transactionListener ! = null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState ! = LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransaction log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } // Based on the transaction message and the execution result of the local transaction, localTransactionState, decide to commit or roll back the transaction message // Here send an RPC request to the Broker to commit or roll back the transaction. try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broke } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; +V: BGM7756 +V: BGM7756 +V: BGM7756Copy the code

The implementation logic of this code is that it first adds a property PROPERTY_TRANSACTION_PREPARED to the message to indicate that it is a transaction message, that is, a half-message, and then sends the message to the Broker as if it were a normal message. If the delivery is successful, we can execute the local transaction by calling the executeLocalTransaction() method from the implementation class of our previously provided interface, TransactionListener, which in our case inserts an order record into the database.

Finally, the transaction is committed or rolled back based on the result of the semi-message sending and the result of the local transaction execution. In the implementation method endTransaction(), a producer sends a one-way RPC request to the Broker telling it to complete the transaction or roll back. Because of the transaction investigation mechanism, the failure or loss of this RPC request does not affect the final result of the transaction. Finally, the sending result of the transaction message is constructed and returned.

That’s RocketMQ being implemented in Producer transaction messages. Then we look at the Broker side and how it handles transaction messages and counterchecks.

The Broker, when processing a request from Producer to send a message, determines whether the message is a normal or semi-message based on the properties in the message:

+V: BGM7756 +V: BGM7756 +V: BGM7756
// ...
if(traFlag ! =null && Boolean.parseBoolean(traFlag)) {
    // ...
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMes
} else {
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
// ...Copy the code

This code in the org. Apache. Rocketmq. Broker. Processor. SendMessageProcessor# sendMessage method, and then we go in to see real deal with half the business logic of the news, The processing logic in the class org. Apache. Rocketmq. Broker., queue. TransactionalMessageBridge:

+V: BGM7756 +V: BGM7756 +V: BGM7756
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // Record the subject and queue of the message in the new property
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getMessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
    String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
    MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANS// Replace the topic and queue of the message with: RMQ_SYS_TRANS_HALF_TOPIC, 0
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProreturn msgInner;
}+V: BGM7756 +V: BGM7756 +V: BGM7756Copy the code

As you can see in this code, RocketMQ does not store the half-message in the queue specified by the client in the message. Instead, after logging the original topic queue, RocketMQ stores the half-message in a special internal topic, RMQ_SYS_TRANS_HALF_TOPIC, using a fixed queue number of 0. The topic and queue are invisible to consumers, so messages inside are never consumed. This ensures that the half-message is not consumed by the consumer until the transaction commits successfully.

Then let’s look at how RocketMQ does transaction backchecking: In the Broker TransactionalMessageCheckService service launched a timer, regularly read from half a message queue to counter check the news, for each need to counter check the message, The Broker sends an RPC request to the respective Producer to perform a backcheck of the transaction state. This part of the logical method in org. Apache. Rocketmq. Broker. Transaction. AbstractTransactionalMessageCheckListener# sendCheckMessage, The result of the backcheck in the RPC response is used to determine whether the half-message needs to be committed, rolled back, or backchecked later.

Finally, commit or rollback transaction implementation logic is the same, the first half message is marked as processed, if it is to commit the transaction, then the half message copied to the news from half a message queue to the real subject and the queue, if you want to roll back the transaction, what all don’t need to do in this step, the final end of the transaction. This part of the implementation of logic in the org. Apache. Rocketmq. Broker. Processor. EndTransactionProcessor this class.

What problems can Kafka transactions and Exactly Once solve?

Let’s talk about Kafka transactions. Earlier when we talked about transactions, Kafka’s transactions solve a different problem than RocketMQ. Transactions in RocketMQ, which addresses the problem of ensuring that both local transactions and messages are performed successfully or fail. Furthermore, RocketMQ adds a transaction backcheck mechanism to maximize transaction execution success and data consistency.

Transactions in Kafka, on the other hand, solve the problem of ensuring that multiple messages sent in a transaction either succeed or fail. Note that multiple messages do not have to be in the same topic and partition, but can be messages destined for multiple topics and partitions. Of course, you can add a local transaction to a Kafka transaction to achieve similar effects to RocketMQ transactions, but Kafka does not have a transaction backcheck mechanism.

Kafka’s transaction mechanism is rarely used in isolation. More often than not, it is used in conjunction with Kafka’s idempotent mechanism to implement Kafka’s Exactly Once semantics. As I emphasized in previous lectures, ExactlyOnce is not Exactly the same as ExactlyOnce in what we normally understand as the service level of a message queue.

We usually understand Exactly Once in the service level of message queues, which means that a message is sent from the producer to the Broker, and then the consumer pulls the message from the Broker and consumes it. In this process, ensure that each message is transmitted exactly once and is not duplicated or lost. As we mentioned earlier, several common open source message queues, including Kafka, are At Least Once. Can’t Exactly Once.

So what Exactly Once in Kafka solves? It solves the problem of using Kafka as the data source in streaming computing and saving the results to a Kafka scenario where the data is consumed from a Kafka topic, computed in a compute cluster, and stored in other Kafka topics. In this process, ensure that each message is calculated exactly once to ensure that the calculation results are correct.

For example, let’s store all Order messages in one Kafka topic Order, run a calculation in the Flink cluster, count Order receipts per minute, and store the results in another Kafka topic Income. To ensure that the results are accurate, ensure that each message can only be counted once, not twice, if any node in either the Kafka or Flink cluster fails, otherwise the calculation will be wrong. One of the most important restrictions is that the data must come from Kafka and the results must be saved to Kafka to qualify for Kafka Excactly Once.

Kafka’s Exactly Once mechanism is designed to solve the problem of “read data, calculate data, save results” and not Exactly Once when a message is produced and consumed using a message queue.

How are Kafka transactions implemented?

How do Kafka transactions work? The implementation principle is similar to RocketMQ transactions, which are based on two-phase commit, but the implementation process is more complex.

First, a few roles, or modules, that participate in Kafka transactions. To address the distributed transaction problem, Kafka introduces the role of the transaction coordinator, responsible for coordinating the entire transaction on the server side. The coordinator is not an independent process, but part of the Broker process. The coordinator, like the partition, is elected to ensure its own availability.

Similar to RocketMQ, the Kafka cluster has a special topic for logging transactions. This topic is implemented in the same way as a normal topic, and logs data such as “open transaction” and “commit transaction”. Log topics also contain many partitions. In a Kafka cluster, there can be multiple coordinators, each responsible for managing and using several partitions in the transaction log. This is designed to allow multiple transactions to be executed in parallel to improve performance.

The implementation flow of Kafka transactions is described below.

First, when we start a transaction, the producer sends a request to the coordinator to start the transaction, and the coordinator records the transaction ID in the transaction log.

Then, before sending the message, the producer sends a request to the coordinator telling him which topic and partition the message to send belongs to, and this information is also recorded in the transaction log by the coordinator. The producer can then send the transaction message as if it were a normal message. Unlike RocketMQ, which stores the uncommitted transaction message in a special queue, Kafka processes the uncommitted transaction message as if it were a normal message and sends it directly to the Broker. Stored in the corresponding partition of these messages, Kafka temporarily filters uncommitted transaction messages among the consumers of the client.

After the message is sent, the producer sends a request to the coordinator to commit or roll back the transaction, and the coordinator begins the two-phase commit to complete the transaction. In the first phase, the coordinator sets the state of the transaction to “pre-committed” and writes to the transaction log. At this point, the transaction has actually succeeded, and no matter what happens next, the transaction will eventually commit.

Started after the second phase, the coordinator in affairs related to all partitions, will write a special message “end of the transaction”, while Kafka’s customer, client, special message read to the end of the transaction, it can temporarily to filter the uncommitted transactions before the news, release to the business code to consume. Finally, the coordinator logs the last transaction log, indicating that the transaction has ended.

I have drawn a simple sequence diagram of the entire transaction to make it easier for you to understand.

To summarize Kafka’s two-stage process, in the preparation phase, the producer sends a message to the coordinator to start a transaction, and the message is sent to each partition. In the commit phase, the producer sends a message to the coordinator to commit the transaction, and the coordinator sends a “end of transaction” message to each partition to complete the distributed transaction commit.

conclusion

This article explains how Kafka and RocketMQ implement transactions, respectively. As you can see, they have something in common in implementing transactions. They are two-phase commit based transactions, and both use queues and partitions in a particular topic for transaction logging.

The difference lies in the way messages are handled in a transaction. RocketMQ stores these messages in a special queue until the transaction commits and then moves them to the business queue. Kafka, on the other hand, puts messages directly into the corresponding business partition and works with client filtering to temporarily mask ongoing transaction messages.

RocketMQ deals with local transactions and data consistency for sending messages, whereas Kafka deals with Exactly Once. It is used in real-time computing scenarios.

Wechat search BGM7756, free access to the most powerful information from zero base to architecture!



Space is limited, part of the data below!