preface

RocketMQ Transaction Message Part 1: Introduction to transaction messages

Use of transaction messages in the RocketMQ Transaction Message Part ii

This article follows the previous two articles on transaction message source analysis.

Transaction message processing basic flow

When introducing transaction messages, I drew a simple flow chart to illustrate the overall processing flow of transaction messages:

Serial numbers below P.S. (1, 2, 3…) Represents the order in which (1, 2, 3…) Has nothing to do.

  1. The transaction producer invokes the transaction message sending interface to send the message
  2. At the start of the pre-commit phase, the client sends a pre-message and marks it as a transaction message in the request header. The body of the message is the content of the message we actually want to send
  3. The broker receives the message, realizes that it is a transaction message, and backs up the current message. The “backup” means that all the data of the current message is written to an internal transaction topic instead of the actual topic to be sent. The transaction topic is not visible to the consumer because the consumer is not subscribed, and then responds to the client’s sending request
  4. The client confirms that the transaction was successfully sent, then executes the local transaction and marks the transaction execution status. If the send fails, the local transaction does not need to be executed, and the transaction is simply marked as failed and needs to be rolled back.
  5. Send a request to end the transaction to the broker that sent the message based on the execution state of the transaction (request header contains commit, rollback, or unknown state).
  6. The broker receives a request for the end of a transaction and logs it if the status is unknown. If it is a commit transaction, the backup transaction message is recovered and written to the original topic, which is now visible to the consumer, and a message is written to the OP queue (another internal topic) in the body of the queue offset of the current transaction message. If the transaction is rolled back, only a message is written to the OP queue, and the transaction message is not restored, so it is not visible to the consumer. The op queue will be explained in detail in the source code section below.
  7. Talk about transaction backcheck. A transaction backcheck is when the broker scans for messages that have not been committed or rolled back, finds the client, and sends a request to the client to submit the end of the transaction status again.

Source analysis

The overall process involves more code, the next part of the source code for analysis.

Client processing, transaction execution process

The client process is as follows:

The main entry to the source code implementation is in this method:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException;

The code is as follows, and I have added relevant comments:

    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        // Get the transaction message listener we created (local transaction execution and transaction callback)
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null".null);
        }
 
        Transaction messages do not support delayed messages
        // ignore DelayTimeLevel parameter
        if(msg.getDelayTimeLevel() ! =0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }
 
        Validators.checkMessage(msg, this.defaultMQProducer);
 
        SendResult sendResult = null;
        // Set the half-message property (TRAN_MSG) to indicate that this is a transaction half-message
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        // Set the production group attribute (PGROUP)
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            // Message send, which is the default process of sending ordinary messages synchronously, in the client side, there is no other extra processing for transaction messages, mainly will judge if it is half a message, mark a transaction message
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
 
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    // transactionId
                    if(sendResult.getTransactionId() ! =null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    // UNIQ_KEY, which is a unique ID generated when the client sends it, is the msgId of sendResult
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null! = transactionId && !"".equals(transactionId)) {
                        // In this case, the transactionId is actually the msgId client of Message
                        msg.setTransactionId(transactionId);
                    }
                    // In general, I do not use the localTransactionExecuter method to call the transaction message interface, so this is usually empty
                    if (null! = localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); }else if(transactionListener ! =null) {
                        log.debug("Used new transaction API");
                        // Execute our local transaction
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
 
                    if(localTransactionState ! = LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); }}catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; }}break;
            // Failed to send message, rollback message
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
 
        try {
            // The end of the transaction, whether to commit, roll back or do something else
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
 
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        // Return the result of sending the transaction message, where the local transaction execution status is returned
        return transactionSendResult;
    }
Copy the code

For the above method of calling the end of transaction request, the code and comments are as follows:

    public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        // getOffsetMsgId. This is the server msgId, which contains a lot of meta information about the message
        if(sendResult.getOffsetMsgId() ! =null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        // The half message is sent to the broker and the commit is sent to the same broker
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        // Set transaction message commit offset (commit to internal transaction topic)
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }
 
        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        // There is a local transaction exceptionString remark = localException ! =null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        // Phase 2 execution message
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }
Copy the code

By looking at the code of the above two methods, the basic transaction message sending part of the customer is already more clear (transaction callback processing part is later).

Broker side process, receive transaction half message (pre-commit)

The basic process of receiving a transaction message is as follows:

In simple terms, the transaction message also like normal messages sent to the broker, the broker like receiving ordinary receive, after receive will determine whether a transaction, so, just to write the all information of the message to an internal things in the topic, to ensure that the temporary end on consumption is not visible, key source (in an asynchronous write for example) as follows:

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) {
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        finalSendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); .// This is a transaction message
        // If this property is present, it is a transaction message sent
        if(transFlag ! =null && Boolean.parseBoolean(transFlag)) {
            // Broker checks whether transaction messages are enabled
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            // The transaction message is a separate process, unlike other messages
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }
Copy the code

When this is found to be a transaction message, the code for backing up the transaction message is as follows:

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        // The original topic and queue information of the message will be stored in the property, since the message will be written to the queue of the transaction topic, which is the backup of the original message
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        // Remove the transaction flag
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        // Set the current stored message topic to RMQ_SYS_TRANS_HALF_TOPIC, transaction semi-message topic
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        The other topic used is RMQ_SYS_TRANS_OP_HALF_TOPIC, which also has only one queue per broker
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }
Copy the code

The transaction semi-message is then written to the internal transaction topic as if it were a normal message

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        // As the name implies, transaction semi-message processing
        // Transaction half-message storage is complete, basic half-message sending (phase 1) is complete, and there is almost no additional processing for this transaction topic at commitlog time, just like normal messages
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }
Copy the code

The broker end processes the transaction

As mentioned earlier, the client will send an end-of-transaction request to the broker based on the transaction execution status, telling the broker whether to commit or rollback. The basic process is as follows:

Transaction processing or do a lot of action, take a look at its key source code implementation:

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        // Transaction message end (two-phase) processing
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.debug("Transaction request:{}", requestHeader);
        // Slave nodes are not allowed to process transaction messages
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }
 
        // Check whether it is a transaction check
        if (requestHeader.getFromTransactionCheck()) {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }
 
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
 
                    break;
                }
 
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null; }}else {
            // Just to keep a log
            switch (requestHeader.getCommitOrRollback()) {
                // The execution status of the local transaction is not known
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }
 
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    break;
                }
 
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        }
        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            // Start committing transaction messages
            // This is the message that was submitted according to the half-message offset of the internal transaction topic
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // result.getPrepareMessage() is the half-message submitted to the internal transaction topic
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // All is ready to commit the transaction, here is the original message information, intact recovery
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    // Clears transaction message properties
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    // The original message is written to the corresponding topic, which is then visible to the consumer side and can be consumed normally
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        The deletion action is to write the message to the OP queue (RMQ_SYS_TRANS_OP_HALF_TOPIC) with the tag d and the message body as the message offset in the transaction topic
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                returnres; }}else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            // Transaction rollback
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                returnres; }}/** ** rollback: * commit: write the original message to the original topic and delete the half-message. Unknown: the original message is not written, and the op queue is not */
        // The client is called one way. // The client is called one way
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    }
Copy the code

Transaction to check

The previous process is normal transaction processing, but if the client sends the transaction request due to downtime, restart, network reasons, etc., and the end of the transaction request is not correctly sent to the broker, then the transaction backcheck mechanism is needed.

When the broker is started, it starts a timed task (1 minute by default) to pull messages from the queue of the previously mentioned transaction topic and check whether the pulled messages have been processed (such as committed or rolled back). If not, depending on whether a transaction rollback is required, Ask the client to recheck the execution status of the local transaction and tell the broker or discard it.

In fact, there are a few key issues to understand:

  1. Transaction semi-messages written to transaction Topic are deleted after the transaction ends, but RocketMQ is appending, so deletion here is not actually deleting a message from the message queue.
  2. How do you know whether a transaction half message broker already commit or rollback, as said before, here introduce a op queue, namely inside another topic, if a message has been committed or rolled back, is to write a message in the op queue the message body is in a transaction the topic the offset value in the queue, if the op is not in the queue, This indicates that the status of the transaction message has not been committed and is unknown and may need to be checked back by the transaction.
  3. We know half written to the topic of affairs news also like normal messages, read in order is written order, if have write 1, 2, 3, 4, 5, 6, a total of 6 the transaction message, 1, 2, 5 state of affairs has commit or rollback, 3, 4, still unknown, however, the total consumption can’t go back again. No, if the broker finds that the message is unknown, it writes the message back to the transaction topic queue at the time of processing and asks the client to check back. Continue with the next message, wait until it reaches the transaction that was appended, then check from the OP queue to see if the transaction has been processed, if not, and did not reach the maximum number of transaction callback, then write back, and continue. If the maximum number of times is reached, it is discarded (actually written to another internal topic, i.e. the transaction message uses 3 internal topics to store data).

About the transaction back to check here mainly using text description, no longer draw a flow chart, the key source is as follows:

The broker checks every minute by default, pulls messages from the internal transaction Topic and OP queues, and compares them to see if the current transaction half-message has been processed and needs to be checked back:

    // Check transaction messages periodically (once a minute)
    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                // One pre-message queue corresponds to one op queue.
                MessageQueue opQueue = getOpQueue(messageQueue);
                // Get the consumption offset for transaction topic and op topic
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
 
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                while (true) {
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    // What has already been handled, there is no need to handle it again
                    if (removeMap.containsKey(i)) {
                        log.debug("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        // the op queue is offset
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        // Gets the half message currently being processed
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue; }}// The message is discarded more than 15 times, or the message is out of date (beyond the set file retention time)
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            // The default implementation is to move to TRANS_CHECK_MAXTIME_TOPIC
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
 
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        long checkImmunityTime = transactionTimeout;
                        // No place to write this attribute (except test)
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null! = checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                If this check time is exceeded, write back to the half-message queue
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    newOffset = i + 1;
                                    i++;
                                    continue; }}}else {
                            // The transaction may not be completed, and processing is meaningless
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        // checkImmunityTime The default is 6 seconds, the first time that can be checked
                        // Normally, every commit/rollback is a message that has already been processed. There is a message in the OP queue. If there is no (first callback), or if there is already one, it will have to be checked back
                        boolean isNeedCheck = (opMsg == null&& valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg ! =null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                            || (valueOfCurrentMinusBorn <= -1);
 
                        if (isNeedCheck) {
                            // Write the message back to the half queue
                            if(! putBackHalfMsgQueue(msgExt, i)) {continue;
                            }
                            // Check the status of the transaction, the next time to process the above written back half message
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                if(newOffset ! = halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); }If 2,3,4,6,7, the offset is at most 4.
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if(newOpOffset ! = opOffset) { transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); }}}catch (Throwable e) {
            log.error("Check error", e);
        }
 
        // If you are in the op queue, you are already committed or rollback. If you are in the OP queue, you need to check whether you need to rollback. If you are in the OP queue, you need to write back to the half queue
Copy the code

How to confirm that the transaction half message has been processed from the OP queue is mainly based on the message body of the op queue (which holds the offset value of the transaction half message) to determine whether the current offset transaction message has been processed:

    /**
     * Read op message, parse op message, and fill removeMap
     *
     * @param removeMap Half message to be remove, key:halfOffset, value: opOffset.
     * @param opQueue Op message queue.
     * @param pullOffsetOfOp The begin offset of op message queue.
     * @param miniOffset The current minimum offset of half message queue.
     * @param doneOpOffset Stored op messages that have been processed.
     * @return Op message result.
     */
    private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
        MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
        // Use CID_RMQ_SYS_TRANS to pull messages from the OP queue
        PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
        if (null == pullResult) {
            return null;
        }
        if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
            || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
            log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
            transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
            return pullResult;
        } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) {
            log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
            return pullResult;
        }
        List<MessageExt> opMsg = pullResult.getMsgFoundList();
        if (opMsg == null) {
            log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
            return pullResult;
        }
        // Filter the pull messages to see if the op messages correspond to half messages
        for (MessageExt opMessageExt : opMsg) {
            // Record the offset value of the op message in the transaction queue
            Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
            log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
                opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
            if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
                // All half-messages that need to be deleted are found
                // miniOffset is halfOffset, the minimum offset to be consumed. This is the op message that has been processed to be deleted
                if (queueOffset < miniOffset) {
                    doneOpOffset.add(opMessageExt.getQueueOffset());
                } else {
                    // the op message holds an offset of half. This value is actually greater than the offset of the current half message. This is already processed and no longer needs to be processedremoveMap.put(queueOffset, opMessageExt.getQueueOffset()); }}else {
                log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
            }
        }
        log.debug("Remove map: {}", removeMap);
        log.debug("Done op list: {}", doneOpOffset);
        return pullResult;
    }
Copy the code

The broker finds a producer client under the production group and sends a callback request:

    public void sendCheckMessage(MessageExt msgExt) throws Exception {
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = newCheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)) ; checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        // Find the corresponding producer instance based on the production group and send a backcheck request
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if(channel ! =null) {
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); }}Copy the code

After receiving the request, the client performs the transaction rollback logic and sends the transaction state back to the broker:

    // check the transaction
    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        final CheckTransactionStateRequestHeader requestHeader =
            (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
        if(messageExt ! =null) {
            if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
                messageExt.setTopic(NamespaceUtil
                    .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
            }
            String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null! = transactionId && !"".equals(transactionId)) {
                // Is the MSG Id of the client, if the user does not customize this value
                messageExt.setTransactionId(transactionId);
            }
            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            if(group ! =null) {
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                if(producer ! =null) {
                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    producer.checkTransactionState(addr, messageExt, requestHeader);
                } else {
                    log.debug("checkTransactionState, pick producer by group[{}] failed", group); }}else {
                log.warn("checkTransactionState, pick producer group failed"); }}else {
            log.warn("checkTransactionState, decode message failed");
        }
 
        return null;
    }
Copy the code

conclusion

This concludes the RocketMQ transaction message.