preface
The half message belongs to the first phase of the RocketMQ transaction and can consist of two parts:
- Producer -> Broker sends half messages
- The broker processes half messages
Correspond to steps 1 and 2 in the figure.
Look at the source code with a problem, first ask a few questions:
- How does producer send half messages?
- How does a broker distinguish between ordinary and transactional messages
- How is it possible that half messages are not consumed by consumers?
You’ll find the answer after you read it.
Producer Half Sends messages
First, the producer code is as follows. To use the transaction message function, the producer object is declared using TransactionMQProducer.
public static void main(String[] args) throws MQClientException {
// Create the TransactionMQProducer instance and set the producer group name
TransactionMQProducer producer = new TransactionMQProducer("transactionGroup");
// Set the NameServer address
producer.setNamesrvAddr("127.0.0.1:9876");
// Add a transaction listener
producer.setTransactionListener(new TransactionListener() {
/** * The method to execute the local transaction */
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// Perform a local transaction
doXXX();
// Returns the execution result
return LocalTransactionState.xxx;
}
/** * message callback executes the method */
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// check the transaction status
findxxx();
// Commit a transaction while the message is checked
returnLocalTransactionState.xxx; }});/ / start the producer
producer.start();
}
// Send a message
SendResult result = producer.sendMessageInTransaction(msg, null);
Copy the code
Unlike a normal message, sendMessageIntransaction() is used to send a transaction method in line 37, so all the logic sent must be under this method:
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
// The local transaction logic must be defined
if (null= =this.transactionListener) {
throw new MQClientException("TransactionListener is null".null);
}
// Wrap the topic with delay and retry tags
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
Copy the code
Going into the sendMessageInTransaction() method, we’ll focus on the first half of the method:
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null".null);
}
// ignore DelayTimeLevel parameter
if(msg.getDelayTimeLevel() ! =0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult;
// Mark the message as a transaction message with the TRAN_MSG flag
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// The producer group is flagged so that the broker can perform a backcheck
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// Send half message
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
/ / a little
}
Copy the code
You can see that the code adds two attributes to the message:
- PROPERTY_TRANSACTION_PREPARED: Marks it as a transaction message. The broker can determine if it is a transaction message based on whether the MSG has this field.
- PROPERTY_PRODUCER_GROUP: producer group. This is required when the broker performs a transaction result lookup.
After the message is specially wrapped into a transaction message, the send() method called is the generic message sending method through which all messages are sent.
The Broker processes Half messages
The broker/SRC/main/Java/org/apache/rocketmq/broker/processor/SendMessageProcessor. Java classes, asyncSendMessage () method has a piece of code:
.// Get the transaction property field
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// Check whether it is null && is true
if(transFlag ! =null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// Store the prepare message
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// Store ordinary messages
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); }...Copy the code
See how to process the prepare message.
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner); //1 Calls asyncPutHalfMessage()
}
Copy the code
AsyncPrepareMessage () calls the asyncPutHalfMessage() method
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
Copy the code
Store.asyncputmessage () is the generic method used to store normal messages, so the special handling of half messages is in parseHalfMessageInner()
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// The original subject and the original queue ID of the backup message
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// The topic and queueID of transaction messages are written dead
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
Copy the code
There are two parts of logic
- Back up the original topic and queueId and put the actual topic of the message on
REAL_TOPIC
Property, where queueId is placedREAL_QID
Properties of the - Override old values with transaction-specific topic and queueId, topic =
RMQ_SYS_TRANS_HALF_TOPIC
QueueId = 0. This means that all half messages will be in the same topic queue.
Question answer
Now we can answer the question
- How does producer send half messages?
A: RocketMQ uses a specialized Producer object, TransactionMQProducer, for transaction messages. The Producer’s method of sending messages wraps the messages into transaction messages.
- How does a broker distinguish between ordinary and transactional messages
A: The broker distinguishes normal messages from transaction messages by adding the PROPERTY_TRANSACTION_PREPARED flag to the message’s property when the message is sent.
- How is it possible that half messages are not consumed by consumers?
A: Messages are processed so that they are assigned to specific topic queues and are isolated. So consumers don’t have access to these messages.