Transaction message

What is a transaction message? I will not say what is a transaction message here. I will mainly describe the implementation of transaction messages in RocketMQ through source code, the design of transaction messages in RocketMQ and the implementation of state and source code.

How to use

The RocketMQ source code provides an example of how to use TransactionMQProducer and TransactionListener.

public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("TestTranscationProducerName"); . Omit part of the code producer. SetTransactionListener (transactionListener); producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }}... }Copy the code

TransactionListener contains two methods

LocalTransactionState executeLocalTransaction(final Message MSG, final Object ARG); // This method is executed when a transaction message does not return a response, such as an UN_KNOW state, which is a query from the Broker to the message producer about the transaction state LocalTransactionState checkLocalTransaction(final MessageExt msg);Copy the code
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();



    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }


    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {

        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
Copy the code

The above code is an Example from RocketMQ to illustrate one use of transactional messages

The key point is that TransactionMQProducer sends messages and TransactionListener. For a transaction message, there are COMMIT_MESSAGE,ROLLBACK_MESSAGE, and UNKNOW states.

Next we will focus on how the relationship between TransactionMQProducer and TransactionListener and state works.

Three states

COMMIT_MESSAGE COMMIT messages ROLLBACK_MESSAGE rollback messages UNKNOW UNKNOW is a check status for events The broker needs to ask Producer whether the message is complete

Here are the three states in which the entire transaction message exists and now let’s look at the specific workflow.

Source code analysis workflow

TransactionMQProducer

One specific workflow for analyzing transaction messages through source code starts with the TransactionMQProducer#sendMessageInTransaction that sends the message.

Because the source code method is too long, leaving the main branch of the author’s code will cut out other non-important judgment and inspection code

public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) .... / / add message properties PROPERTY_TRANSACTION_PREPARED said is true for the transaction message MessageAccessor. PutProperty (MSG, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); / / set the current production MessageAccessor. PutProperty (MSG, MessageConst PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); . SEND_OK: // String transactionId = msg.getProperty(messageconst.property_uniq_client_message_id_keyidx); if (null ! = transactionId && !" ".equals(transactionId)) { msg.setTransactionId(transactionId); } // omit localTransactionState obsolete compatible logic is already obsolete 5.0 will be removed later not mentioned here. . if (transactionListener ! = null) {/ / perform local executeLocalTransaction method localTransactionState = transactionListener. ExecuteLocalTransaction (MSG, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } this.endTransaction(MSG, sendResult, localTransactionState, localException);Copy the code

As you can see, sendMessageInTransaction mainly implements message sending, ExecuteLocalTransaction returns the corresponding localTransactionState to endTransaction for execution.

public void endTransaction( final Message msg, final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; / /... Omit the id and the connection of processing / / create EndTransactionRequestHeader EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); / / set the transaction ID requestHeader. SetTransactionId (transactionId); / / set the transaction message corresponding CommitLogOffset requestHeader. SetCommitLogOffset (id) getOffset ()); Switch (localTransactionState) {case COMMIT_MESSAGE: // Set the status of transaction messages to the state we return from executeLocalTransaction. requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } / / some hooks operating doExecuteEndTransactionHook (MSG, sendResult getMsgId (), brokerAddr, localTransactionState, false); / / set the current group requestHeader. Send the message producer setProducerGroup (this) defaultMQProducer) getProducerGroup ()); / / set the corresponding offset of queue requestHeader. SetTranStateTableOffset (sendResult. GetQueueOffset ()); SetMsgId (sendresult.getmsgid ()); // Set the message ID requesTheader.setmsgid (sendresult.getmsgid ()); // executeLocalTransaction Is abnormal String remark = localException! = null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; . / / state of sending a message to the Broker this. MQClientFactory. GetMQClientAPIImpl () endTransactionOneway (brokerAddr requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }Copy the code

The sending process of the main flow is as follows

1. The message sent by TransactionMQProducer affairs Set up corresponding transaction monitor TransactionListener 2. SendMessageInTransaction Identify the message as a transaction message, and execute TransactionListener#executeLocalTransaction to return the status 3 of the corresponding transaction message. Encapsulation EndTransactionRequestHeader message type is sent to the broker current state of the transaction

From the above we have a rough idea of how TransactionListener#executeLocalTransaction executes, but we don’t see the relationship between the corresponding message state and the overall logic, so we need to go to the broker to find the corresponding transaction message processing

Broker

SendMessageProcessor

We also start parsing from the Broker receiving the transaction message sendMessageProcess #asyncSendMessage. The following shows the processing of transaction messages extracted from asyncSendMessage

if (transFlag ! ParseBoolean (transFlag)) {// Determine whether the current Broker receives transaction messages if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return CompletableFuture.completedFuture(response); } / / by TransactionalMessageService alone putMessageResult = handle affairs news this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); }else{// Other messages to DefaultMessageStore processing putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);Copy the code

By the above code see transaction message processing mechanism is handled by TransactionalMessageService separately Then through the analysis of the processing of TransactionalMessageService view the message.

@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
    return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}

Copy the code

You can see in TransactionalMessageService again using TransactionalMessageBridge# asyncPutHalfMessage method to process the message Note the name of this method HalfMessage half message here, “HalfMessage” is what we say here.

Keep reading

public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
    return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
Copy the code
Private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {// Save the Topic of the original message MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); / / save the original message Queue MessageAccessor. PutProperty (msgInner, MessageConst PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); / / set the state of affairs to TRANSACTION_NOT_TYPE msgInner. SetSysFlag (MessageSysFlag. ResetTransactionValue (msgInner. GetSysFlag (), MessageSysFlag.TRANSACTION_NOT_TYPE)); / / set the Topic to RMQ_SYS_TRANS_HALF_TOPIC msgInner. SetTopic (TransactionalMessageUtil. BuildHalfTopic ()); // setQueueId to 0 msgnon.setqueueid (0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }Copy the code

We still use DefaultMessageStore, but add a layer of message wrapping and our approach to delayed messages is more or less to convert topics and queues into topics and queues of particular messages.

Then the first step in the message processing process is known

Message producers produce transaction messages and send them to the broker. 2. The broker receives the messages and stores them as HalfMessage messages (replacing Topic and Queue) to CommitLog 3. 4. The message producer processes the local transaction TransactionListener#executeLocalTransaction 5. The corresponding message processing results EndTransactionRequestHeader sent to the Broker

Analysis to this basic step 5 can be determined, the next step is mainly analyzes the Broker received EndTransactionRequestHeader is how to deal with different states.

EndTransactionProcessor

The processRequest method is used to handle EndTransactionRequestHeader message from Producer

. // Omit judgment and log message OperationResult result = new OperationResult(); / / when status to TRANSACTION_COMMIT_TYPE if (MessageSysFlag. TRANSACTION_COMMIT_TYPE = = requestHeader. GetCommitOrRollback ()) { // Get the HalfMessage from the converted Topic result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); If (result.getresponsecode () == responsecode.success) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); If (res.getCode() == responsecode.success) {// Convert the previous HalfMessage to MessageExtBrokerInner MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); / / remove the transaction status identification msgInner. SetSysFlag (MessageSysFlag. ResetTransactionValue (msgInner. GetSysFlag (), requestHeader.getCommitOrRollback())); / / set TranStateTable offset msgInner. SetQueueOffset (requestHeader. GetTranStateTableOffset ()); / / set the offset Half message msgInner. SetPreparedTransactionOffset (requestHeader. GetCommitLogOffset ()); / / set the storage time consistent with Half news msgInner setStoreTimestamp (result. GetPrepareMessage () getStoreTimestamp ()); / / remove transaction message attribute data MessageAccessor. ClearProperty (msgInner, MessageConst. PROPERTY_TRANSACTION_PREPARED); RemotingCommand sendResult = sendFinalMessage(msgInner); If (sendresult.getCode () == responsecode.success) {if (sendresult.getCode () == responsecode.success) {if (sendresult.getCode () == responsecode.success) {if (sendresult.getCode () == responsecode.success) {if (sendresult.getCode () == responsecode.success) this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; }} else if (MessageSysFlag. TRANSACTION_ROLLBACK_TYPE = = requestHeader. GetCommitOrRollback ()) {/ / by offset Retrieve the previously stored HalfMessage, which is the converted Topic message result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); If (result.getresponsecode () == responsecode.success) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); If (res.getCode() == responsecode.success) {// Deleting Half is the same as adding a message to indicate that Half has been deleted this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; }}...Copy the code

As you can see above, two status values are processed: TRANSACTION_COMMIT_TYPE and TRANSACTION_ROLLBACK_TYPE

1. Use offset to find the previous Half message, that is, the converted Topic and Queue message 2. If it is a COMMIT Message, restore the Half Message to the previous Message and store it in the CommitLog 3. Add a message to indicate that the Half message has been deleted

One more thing to say here is deletePrepareMessage

Will call to addRemoveTagInTransactionOp stored in the body of this method can see the OP is Half Offset of the message queue.

Main OP message Body for messageExt. GetQueueOffset ()

private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
  
   Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
       String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
   writeOp(message, messageQueue);
   return true;
}
Copy the code

The next two topics need to be distinguished and this one is very important

RMQ_SYS_TRANS_OP_HALF_TOPIC
public static String buildOpTopic() {
    return TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC;
}

RMQ_SYS_TRANS_HALF_TOPIC
public static String buildHalfTopic() {
    return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
}
Copy the code

RMQ_SYS_TRANS_HALF_TOPIC This is the Topic that converts the message to the transaction message store, that is, when the transaction message is sent the conversion Topic is the buildHalfTopic() method. RMQ_SYS_TRANS_OP_HALF_TOPIC This message is the id of the delete Half message added in the EndTransactionProcessor when the COMMIT or ROLLBACK message was returned This can be interpreted as a message that records the processing status of the half message, as described below in Check

This can improve some state handling of transaction messages, but we have never seen a state called unknown. How does the Broker handle an unknown state?

This is from TransactionalMessageService to check how to deal with the state of the UNKNOW

TransactionalMessageService

The author said above TransactionalMessageService# prepareMessage method is used to transfer messages packaged into halves.

If the COMMIT and ROLLBACK have not been received, we use the check method to check. The check state we see on the Internet refers to the UNKNOW state or prepare state. The main implementation logic is in the check method

First we determine when the check method will run. In TransactionalMessageCheckService performs to check method and TransactionalMessageCheckService is inherited from ServiceThread code is as follows

@Override public void run() { log.info("Start transaction check service thread!" ); long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); while (! this.isStopped()) { this.waitForRunning(checkInterval); } log.info("End transaction check service thread!" ); } @override protected void onWaitEnd() {// Long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); . / / maximum number of int checkMax = brokerController getBrokerConfig () getTransactionCheckMax (); Long begin = system.currentTimemillis (); log.info("Begin to check prepare message, begin time:{}", begin); / / execution TransactionalMessageService check method of enclosing brokerController. GetTransactionalMessageService (). The check (a timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }}Copy the code

TransactionCheckInterval = transactionCheckInterval = transactionCheckInterval That is equivalent to a timer Perform in a while to do this once a while, call the service when passing the biggest inquiry number and TransactionalMessageCheckListener timeout

After analyzing the enclosing brokerController. GetTransactionalMessageCheckListener (what is)? Corresponding to the implementation class DefaultTransactionalMessageCheckListener contains several of the more important method I first listed here

The resolveHalfMsg sends a check message to the Producer

public void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void Run () {try {// Send a check message to Producer sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!" , e); }}}); }Copy the code

ResolveDiscardMsg Discarded message If the transaction message times out or exceeds the maximum number of check times, the policy of discarding the message is adopted

@override public void resolveDiscardMsg(MessageExt msgExt) {try {TRANS_CHECK_MAX_TIME_TOPIC Commitlog MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt); PutMessageResult putMessageResult = this.getBrokerController().getMessageStore().putMessage(brokerInner); if (putMessageResult ! = null && putMessageResult. GetPutMessageStatus () = = PutMessageStatus. PUT_OK) {... } catch (Exception e) { log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e); }}Copy the code

Next, we analyze the implementation details in the check method. The check method is relatively long to implement, so I will put forward the internal source key points for analysis one by one.

  1. Get the MessageQueue of the corresponding transaction through Topic
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
Copy the code
  1. Traversal retrieves the offset of MessageQueue consumption for two different topicsRMQ_SYS_TRANS_OP_HALF_TOPICandRMQ_SYS_TRANS_HALF_TOPIC
for (MessageQueue messageQueue : msgQueues) { long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); . }Copy the code
  1. Logging those messages that belong to removeMap via OP messages is implemented as follows
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
Copy the code
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, MessageQueue opQueue, long pullOffsetOfOp, Long miniOffset, List< long > doneOpOffset) {PullResult PullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32); . Delete some judgment List < MessageExt > opMsg = pullResult. GetMsgFoundList (); For (MessageExt opMessageExt: Long queueOffset = getLong(new String(opMessageext.getBody (), TransactionalMessageUtil.charset)); / /... If (TransactionalMessageUtil. REMOVETAG. Equals (opMessageExt getTags ())) {/ / when identify this Half message has been deleted the if (queueOffset < MiniOffset) {/ / record to have treated List doneOpOffset. Add (opMessageExt. GetQueueOffset ()); } else {/ / added to the need to delete the Map removeMap put (queueOffset, opMessageExt getQueueOffset ()); } } } return pullResult; }Copy the code
  1. The following code is a loop, processing every Half message, asking the Producer for the message and the logic of dropping the message.
while (true) { ... if (removeMap.containsKey(i)) { log.debug("Half offset {} has been committed/rolled back", i); Long removedOpOffset = removeMap.remove(i); Doneopoffset.add (removedOpOffset); doneopoffset.add (removedOpOffset); } else {offset = GetResult GetResult = getHalfMsg(messageQueue, I); MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; If (needDiscard(msgExt, TransactionCheckMax) | | needSkip (msgExt)) {/ / discard the message It has described the method of realization of the listener. ResolveDiscardMsg (msgExt); newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; }... List<MessageExt> opMsg = pullResult.getMsgFoundList(); Boolean isNeedCheck = (opMsg == null &&); // Run time - The time for message production exceeds the time set for each check ValueOfCurrentMinusBorn > checkImmunityTime) / / the above code execution time for more than a single run timeout | | (opMsg! = null && (opMsg. Get (opMsg. The size () - 1). The getBornTimestamp () - startTime > transactionTimeout)) / / the current time is less than the production time | | (valueOfCurrentMinusBorn <= -1); If (isNeedCheck) {// Write Half message to commitlog again if (! putBackHalfMsgQueue(msgExt, i)) { continue; } // Send a check transaction message to Producer Listener.resolvehalfmsg (msgExt); } else {removeMap pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; }Copy the code

Saving and calculating the consumption progress is not the focus of this article. We will sort out the entire Rocket Offset later. Here is a brief look at the configuration file consumerOffset.

{
   "offsetTable":{
       ....
      "RMQ_SYS_TRANS_HALF_TOPIC@CID_RMQ_SYS_TRANS":{0:132
      },
      "RMQ_SYS_TRANS_OP_HALF_TOPIC@CID_RMQ_SYS_TRANS":{0:50
      }
   }
}
Copy the code

conclusion

Above we can basically send the producer transaction message to the Broker to process the message as well as the relationship between several states of the transaction and the source mechanism of the work have been analyzed once. Below we will sort out the process of the whole transaction

The whole process looks something like the following