Note: This series of source code analysis is based on RocketMq 4.8.0, gitee Repository link: gitee.com/funcy/rocke… .

RocketMq supports a special class of messages: transaction messages, and this article looks at how they are implemented from a source code perspective.

1. Prepare the demo

Transaction message of the sample is located in org. Apache. Rocketmq. Example. The transaction package, now let’s look at its use:

1.1 Preparing transaction Listeners:TransactionListener

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null! = status) {switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    returnLocalTransactionState.COMMIT_MESSAGE; }}returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code

TransactionListener is a TransactionListener interface that has two methods:

  • executeLocalTransaction(...): Executes a transaction, here is the content of the transaction
  • checkLocalTransaction(...): Checks the execution status of the transaction

1.2 Transaction messagesproducer

Next comes the producer of the transaction message, which looks like this:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // The producer type here is TransactionMQProducer
        TransactionMQProducer producer 
            = new TransactionMQProducer("please_rename_unique_group_name");
        // Prepare a thread pool
        ExecutorService executorService = new ThreadPoolExecutor(2.5.100, TimeUnit.SECONDS, 
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    returnthread; }}); producer.setExecutorService(executorService);// Set the listener
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.start();

        // Send transaction messages
        String[] tags = new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch(MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }}for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000); } producer.shutdown(); }}Copy the code

Unlike the producer of ordinary messages, the producer type of transaction messages is TransactionMQProducer and requires a transaction listener to be set.

With the sample demo, it’s time to look at the flow.

2. Start:TransactionMQProducer#start

The start method of TransactionMQProducer is start(), which reads as follows:

@Override
public void start(a) throws MQClientException {
    // Initialize the environment
    this.defaultMQProducerImpl.initTransactionEnv();
    // Call the parent DefaultMQProducer method
    super.start();
}
Copy the code

This method does some initialization by calling DefaultMQProducerImpl#initTransactionEnv, and then starts by calling the start() method of its parent DefaultMQProducer. It can be seen that compared with the process of starting producer of ordinary messages, producer of transaction messages only has one more step to initialize the transaction environment operation.

Let’s go to DefaultMQProducerImpl#initTransactionEnv and see what it does:

public void initTransactionEnv(a) {
    TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
    // The entire method is the assignment to checkExecutor
    if(producer.getExecutorService() ! =null) {
        this.checkExecutor = producer.getExecutorService();
    } else {
        this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(
            producer.getCheckRequestHoldMax());
        this.checkExecutor = new ThreadPoolExecutor(
            producer.getCheckThreadPoolMinSize(),
            producer.getCheckThreadPoolMaxSize(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.checkRequestQueue); }}Copy the code

As you can see from the code, the entire method is an assignment to the member variable checkExecutor.

3. Send a message:TransactionMQProducer#sendMessageInTransaction(...)

The method for sending a message is TransactionMQProducer#sendMessageInTransaction(…) , the code is as follows:

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final Object arg) throws MQClientException {
    if (null= =this.transactionListener) {
        throw new MQClientException("TransactionListener is null".null);
    }

    msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
Copy the code

To follow up, enter the DefaultMQProducerImpl#sendMessageInTransaction method:

public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
    / / get TransactionListener
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null".null);
    }

    // Clear the latency level and you can see that transaction messages do not support latency
    if(msg.getDelayTimeLevel() ! =0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // Set the message properties, specifying the message type as transaction message
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, 
        this.defaultMQProducer.getProducerGroup());
    try {
        // Send the message in synchronous mode
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    // Process the return value
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                // A lot of judgment is omitted here.// Send successfully, execute local transaction
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; }}break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        // End the remote transaction, noting the localTransactionState passed in
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch(Exception e) { log.warn(...) ; }// Construct the return value
    TransactionSendResult transactionSendResult = newTransactionSendResult(); .return transactionSendResult;
}
Copy the code

This method is used to send transaction messages, and the key points are summarized as follows:

  1. To obtainTransactionListenertheTransactionListenerThat’s what we did in the exampledemoIn the callproducer.setTransactionListener(...)Set up the
  2. If the latency level is not equal to 0, it is cleared, indicating that transaction messages do not support latency
  3. Set message properties, specify message type as transaction message,brokerTransaction messages are processed AD hoc when they are received
  4. Send a message in the same way as a normal message, but it should be noted that the message is sent in synchronous mode
  5. Process the result of sending a message, and if sending fails, set the transaction state toROLLBACK_MESSAGEIs required to roll back. If the send succeeds, the local transaction is executedtransactionListener.executeLocalTransaction(...)Method, which returns transaction state
  6. End the remote transaction, which sends the transaction state obtained in Step 5 tobrokerThe rest is up to youbrokerHave been processed

Here we see a glimpse of transactionListener. ExecuteLocalTransaction (…). Method contents:

public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        returnLocalTransactionState.UNKNOW; }... }Copy the code

This is from the sample demo, in executeLocalTransaction(…) Method can return the execution state of the transaction, which is important because this state is then sent to the broker, which determines whether to commit or roll back the message.

DefaultMQProducerImpl#endTransaction

public void endTransaction(
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, 
        MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if(sendResult.getOffsetMsgId() ! =null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    // Find a broker address
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(
        sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    // Set the message header and set the commit/rollback identifier based on the message status
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException ! =null ? ("executeLocalTransactionBranch exception: " + 
        localException.toString()) : null;
    // The sending mode is oneway
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, 
        remark, this.defaultMQProducer.getSendMsgTimeout());
}
Copy the code

This method sends an end transaction to the broker. There are two key points in the code:

  1. According to thelocalTransactionStateState to set up transactionsCommit/rollbackThe logo,localTransactionStateThe value of is derived from the sending result of a transaction message or the execution result of a local transaction
  2. The sending mode of the message isoneway, which suggests thatrocketMqDoes not care about the return value of the message. Why not? Because the transaction message has anotherbrokerThe reverse check mechanism, i.ebrokerTiming toproducerSend a message to check the status of the transaction, as discussed later in this article.

At this point, the producer has completed the process of sending transaction messages. Now let’s look at how the broker handles transaction related messages.

4. brokerProcessing transaction messages

In the previous section TransactionMQProducer#sendMessageInTransaction(…) Method, two messages are sent to the broker, and here we examine what they do.

4.1 Processing transaction Messages:SendMessageProcessor#asyncSendMessage

After the producer sends a transaction message to the broker, the processing flow is the same as that of an ordinary message. This article only focuses on the differences between the two. In SendMessageProcessor#asyncSendMessage, ordinary messages and transaction messages are distinguished:

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) {
    // If it is a transaction message
    if(transFlag ! =null && Boolean.parseBoolean(transFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
            return CompletableFuture.completedFuture(response);
        }
        // Process transaction messages
        putMessageResult = this.brokerController.getTransactionalMessageService()
            .asyncPrepareMessage(msgInner);
    } else {
        // Send a normal message
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, 
        responseHeader, mqtraceContext, ctx, queueIdInt);
}
Copy the code

Continue to follow up TransactionalMessageBridge# asyncPutHalfMessage

public CompletableFuture<PutMessageResult> asyncPutHalfMessage( MessageExtBrokerInner messageInner) {
    // parseHalfMessageInner(...) : Message conversion
    // asyncPutMessage(...) : Messages are stored in commitLog
    return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
Copy the code

There are two operations in this method:

  • parseHalfMessageInner(...): message transformation, which temporarily stores transaction messages in the exclusive queue of transaction messages
  • asyncPutMessage(...): Message store, that is, save tocommitLogThis is no different from ordinary messages

As a result of the transaction messages stored with normal storage and indifference, so here, let’s turn to the transaction message conversion process, main into TransactionalMessageBridge# parseHalfMessageInner method:

/** * Build message content *@param msgInner
 * @return* /
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // Save the original topic and queueId
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    // Specify the new topic and queue, which is temporarily stored in the transaction related queue
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), 
        MessageSysFlag.TRANSACTION_NOT_TYPE));
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder
        .messageProperties2String(msgInner.getProperties()));
    return msgInner;
}
Copy the code

This step is straightforward: save the topic and queueId of the message, replace them with transactional-specific topics and queueId, and store them in commitLog. After that, the transaction message is sent.

4.2 Processing the Message of Ending a Transaction:EndTransactionProcessor#processRequest

The code for the end transaction message is END_TRANSACTION, and the method for processing this code is EndTransactionProcessor#processRequest:

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request
        .decodeCommandCustomHeader(EndTransactionRequestHeader.class);
    LOGGER.debug("Transaction request:{}", requestHeader);
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
        LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
        returnresponse; }... OperationResult result =new OperationResult();
    // Transaction commit operation
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
        // Retrieve messages from commitLog
        result = this.brokerController.getTransactionalMessageService()
            .commitMessage(requestHeader);
        // If success is returned
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                // Get the message. In this case, the method handles the message conversion operation, i.e. get the actual topic and queue to be sent
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                // omit some setXxx(...) methods.// The actual delivery operation
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                // When the post is complete, the message is deleted, not actually deleted from the disk, but marked as deleted
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService()
                        .deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    // Transaction rollback operation
    } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        // Get the message
        result = this.brokerController.getTransactionalMessageService()
            .rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                // No post operation, delete directly, not really delete from the disk, just mark the message as delete
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(
                    result.getPrepareMessage());
            }
            returnres; }}// UNKNOW operations are not handled
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}
Copy the code

In this method, transaction commit and rollback operations are handled separately,

  • In the transaction commit processing, you can see that the transaction is actually posted at this point, after the post, the original transaction message will be marked as deleted;
  • In the rollback operation of the transaction, the original transaction message is directly identified as deleted

To see how the transaction message is actually delivered, go to the EndTransactionProcessor#sendFinalMessage method:

private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    // Drop operation
    final PutMessageResult putMessageResult 
        = this.brokerController.getMessageStore().putMessage(msgInner);
    if(putMessageResult ! =null) {
        switch (putMessageResult.getPutMessageStatus()) {
            // omit the result processing. }}}Copy the code

The brokerController here. GetMessageStore (.) putMessage (…). Operation to write the message to the commitLog again, but the topic and queueId are the original ones, which can then be consumed by the consumer.

5. Reverse check mechanism of transactions

We examined how the broker handles COMMIT_MESSAGE and ROLLBACK_MESSAGE states of a transaction message. In fact, a transaction message has a third state in addition to these two states: UNKNOW, from the EndTransactionProcessor#processRequest method, the broker does not handle this state!

What does rocketMq do when it’s in an unknown state? In fact, EndTransactionProcessor#processRequest does not handle the unknown state, which means that the unknown transaction message will neither commit nor rollback, but will be handled by a separate thread. This thread is the inspection thread for transaction messages.

5.1 Checking the Start of threads

In the broker startup process, BrokerController#start executes a method that:

public void start(a) throws Exception {...if(! messageStoreConfig.isEnableDLegerCommitLog()) {// Start some processors
        startProcessorByHa(messageStoreConfig.getBrokerRole());
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        this.registerBrokerAll(true.false.true); }... }/** * this is where the check thread for transaction messages is started */
private void startProcessorByHa(BrokerRole role) {
    if(BrokerRole.SLAVE ! = role) {if (this.transactionalMessageCheckService ! =null) {
            this.transactionalMessageCheckService.start(); }}}Copy the code

In this way will TransactionalMessageCheckService start () method, now let’s look at what he did this operation, and then came to the ServiceThread# start method:

public abstract class ServiceThread implements Runnable {...public void start(a) { log.info(...) ;if(! started.compareAndSet(false.true)) {
            return;
        }
        stopped = false;
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start(); }... }Copy the code

As you can see, TransactionalMessageCheckService start () method from ServiceThread, in the start () method of ServiceThread, will start a thread to handle the operation. Here we go into the TransactionalMessageCheckService# run method what did take a look at this thread:

@Override
public void run(a) {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig()
        .getTransactionCheckInterval();
    while (!this.isStopped()) {
        // Run the operation
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}
Copy the code

Follow up the ServiceThread#waitForRunning method:

protected void waitForRunning(long interval) {
    if (hasNotified.compareAndSet(true.false)) {
        // Perform the operation
        this.onWaitEnd();
        return;
    }

    //entry to wait
    waitPoint.reset();

    try {
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        hasNotified.set(false);
        this.onWaitEnd(); }}@Override
protected void onWaitEnd(a) {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // Check the operation
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, 
        this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", 
        System.currentTimeMillis() - begin);
}

Copy the code

Will perform to the TransactionalMessageServiceImpl# check method, this method is used to handle affairs news check of the operation:

public void check(long transactionTimeout, int transactionCheckMax,
    AbstractTransactionalMessageCheckListener listener) {
    try {
        // The queue name of the transaction message
        String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
        // Get the message queue to check
        Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
        if (msgQueues == null || msgQueues.size() == 0) {
            log.warn("The queue of topic is empty :" + topic);
            return;
        }
        log.debug("Check topic={}, queues={}", topic, msgQueues);
        for (MessageQueue messageQueue : msgQueues) {
            // There are a lot of omissions.// Get transaction messages from the queue
            GetResult getResult = getHalfMsg(messageQueue, i);
            
            // There are a lot of omissions.// Check transaction status
            listener.resolveHalfMsg(msgExt);

            // There are still a lot of omissions. }}catch (Throwable e) {
        log.error("Check error", e); }}Copy the code

This method leaves out a lot of code, and the key actions are just two:

  1. From the transaction messagetopicGet message on
  2. Check the transaction status of the message

5.2 brokerSend check message

Here to see check the operation of the state of affairs, directly into the Broker2Client # checkProducerTransactionState method:

 public void checkProducerTransactionState(
    final String group,
    final Channel channel,
    final CheckTransactionStateRequestHeader requestHeader,
    final MessageExt messageExt) throws Exception {
    RemotingCommand request = RemotingCommand.createRequestCommand(
        RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
    request.setBody(MessageDecoder.encode(messageExt, false));
    try {
        // Send a check message to producer with the code CHECK_TRANSACTION_STATE
        this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    } catch (Exception e) {
        log.error(...);
    }
}
Copy the code

From the code above, checking the message with CHECK_TRANSACTION_STATE and Oneway request indicates that the broker does not care about the return of the message.

What does a producer do when receiving a check message from the broker? We’ll find out in the next section.

5.3 producerProcessing check messages

From the analysis in the previous section, the check message sent by the broker has the code CHECK_TRANSACTION_STATE. Producer processes this code using ClientRemotingProcessor#processRequest:

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    switch (request.getCode()) {
        // Check transaction status
        case RequestCode.CHECK_TRANSACTION_STATE:
            return this.checkTransactionState(ctx, request);
        // omit others.default:
            break;
    }
    return null;
}
Copy the code

We follow the ClientRemotingProcessor#checkTransactionState method:

public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) 
        request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
    final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
    final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
    if(messageExt ! =null) {
        if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
            messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), 
                this.mqClientFactory.getClientConfig().getNamespace()));
        }
        String transactionId = messageExt
            .getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (null! = transactionId && !"".equals(transactionId)) {
            messageExt.setTransactionId(transactionId);
        }
        final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        if(group ! =null) {
            // Get a producer
            MQProducerInner producer = this.mqClientFactory.selectProducer(group);
            if(producer ! =null) {
                final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                // Check the status
                producer.checkTransactionState(addr, messageExt, requestHeader);
            } else {
                log.debug("checkTransactionState, pick producer by group[{}] failed", group); }}else {
            log.warn("checkTransactionState, pick producer group failed"); }}else {
        log.warn("checkTransactionState, decode message failed");
    }

    return null;
}
Copy the code

Although this method is a bit long, there are only two main operations:

  1. To obtain aproducer:this.mqClientFactory.selectProducer(group)
  2. Check transaction status:producer.checkTransactionState(...)

DefaultMQProducerImpl#checkTransactionState

public void checkTransactionState(final String addr, final MessageExt msg,
        final CheckTransactionStateRequestHeader header) {

    Runnable request = new Runnable() {
        // Omit a bunch of things. };this.checkExecutor.submit(request);
}
Copy the code

In the DefaultMQProducerImpl#checkTransactionState method, a Runnable object is created and then submitted to the checkExecutor thread pool. At the beginning of this article, As we mentioned in our analysis of the startup process of TransactionMQProducer, its assignment is in the DefaultMQProducerImpl#initTransactionEnv method, which we now see in use.

According to the thread pool flow, the main thing it runs is Runnable’s run() method, which looks like this:

Runnable request = new Runnable() {
    private final String brokerAddr = addr;
    private final MessageExt message = msg;
    private final CheckTransactionStateRequestHeader checkRequestHeader = header;
    private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

    @Override
    public void run(a) {
        / / get checkListener
        TransactionCheckListener transactionCheckListener 
            = DefaultMQProducerImpl.this.checkListener();
        1. Obtain the listener
        TransactionListener transactionListener = getCheckListener();
        if(transactionCheckListener ! =null|| transactionListener ! =null) {
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable exception = null;
            try {
                if(transactionCheckListener ! =null) {
                    localTransactionState = transactionCheckListener
                        .checkLocalTransactionState(message);
                } else if(transactionListener ! =null) {
                    log.debug("Used new check API in transaction message");
                    // 2. Check transaction status
                    localTransactionState = transactionListener.checkLocalTransaction(message);
                } else {
                    log.warn(...);
                }
            } catch(Throwable e) { log.error(...) ; exception = e; }// Handle transaction state
            this.processTransactionState(localTransactionState, group, exception);
        } else {
            log.warn(...);
        }
    }

    private void processTransactionState(
        final LocalTransactionState localTransactionState,
        final String producerGroup,
        final Throwable exception) {
        final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
        thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
        thisHeader.setProducerGroup(producerGroup);
        thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
        // Set the check flag
        thisHeader.setFromTransactionCheck(true);

        String uniqueKey = message.getProperties()
            .get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (uniqueKey == null) {
            uniqueKey = message.getMsgId();
        }
        thisHeader.setMsgId(uniqueKey);
        thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
        // 3. Handle transaction commit/rollback status
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                // Set the commit status to commit
                thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                // Set the commit status to: rollbackthisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); log.warn(...) ;break;
            caseUNKNOW: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); log.warn(...) ;break;
            default:
                break;
        }

        String remark = null;
        if(exception ! =null) {
            remark = "checkLocalTransactionState Exception: " 
                + RemotingHelper.exceptionSimpleDesc(exception);
        }

        try {
            // 4. Send a message to end the transaction
            DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl()
                .endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
        } catch (Exception e) {
            log.error("endTransactionOneway exception", e); }}};Copy the code

This part of the code is a bit long, but the key points are just four:

  1. To obtainlistenerThat’s what we started out withdemoIn the setting ofTransactionListenerImpl
  2. To check the transaction status, here is the method we specified ourselves:TransactionListenerImpl#checkLocalTransaction
  3. Handle transaction commit/rollback state, which is the basis for this stepTransactionListenerImpl#checkLocalTransactionMethod to setCommit/rollbackstate
  4. tobrokerSend a transaction termination message, same as beforeDefaultMQProducerImpl#sendMessageInTransactionThe operations in the method are consistent

6. Summary

After analyzing the flow of transaction messages, let’s summarize the flow:

Here’s a picture from the official website. The process is as follows:

  1. producerSend a “half message”,brokerWhen received, return “OK” and proceed to Step 2
  2. Execute the local transaction and obtain the execution result. If the transaction succeeds, go to Step 3. If the transaction fails, go to Step 4
  3. After the local transaction is successfully executed, send the “COMMIT” message tobrokerAt this point, the “half-message” sent in step 1 is actually delivered
  4. Local transaction execution failed. Send the ROLLBACK message tobrokerThe “half-message” sent in step 1 is cancelled and will never be sent again

In the normal case, the above four steps should satisfy the flow of the transaction message, but in practice, there may be exceptions: Step 3 or step 4 failed to send, resulting in the half-message in the broker not receiving rollback or commit notification. This mechanism is used in this case:

  1. brokerDelay receiving rollback or commit notification sends a one-way message toproducerTo informproducerReverse check the local transaction execution result
  2. producerreceivedbrokerCall back to check the status of the local transaction
  3. producerGet the status of the local transaction and send a one-way messagebrokerWhether the previous “half-message” was committed or rolled back

Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.

This article was first published in the wechat public number Java technology exploration, if you like this article, welcome to pay attention to the public number, let us explore together in the world of technology!