1. An overview of the

In business, we often think about how to send reliable messages. How do you evaluate the sending capability of a messaging system? Based on these two questions, we read the Producer module of RocketMQ. The content is divided into two parts: 1. How the producer starts and closes; 2. How the producer sends various messages: normal messages, transaction messages and delayed messages. Common messages: used to decouple services. Transaction messages add the reliability of message delivery to the capability of ordinary messages. Delayed message, mainly to solve some delayed things, such as: order automatically closed problems. Rocketmq is used as an example here to explore the design of MQ and, if given the opportunity, apply the learning of other MQ components.

2. Producer startup and shutdown

RocketMQ producers is by reading the source code reading MQProducer (org. Apache. RocketMQ. Client. Producer. MQProducer) how interfaces are implemented. Here we mainly use start(),shutdown() and send() methods. The send() of MQProducer mainly extends three dimensions: send mode (synchronous, asynchronous, oneway), send transaction message sendMessageInTransaction(), and whether to batch. At the same time, we through the message itself three types (org.apache.rocketmq.com mon. Message. MessageType) : normal message, transaction (half and commit) and delayed (delay), to further understand RocketMQ ability.

Objectives: 1. MQ itself is used for the purpose of decoupling and peak-cutting between systems. Transaction messages are used to solve the problem of reliability of sending messages. 3. Delayed messages are used to improve performance and meet more complex business scenarios, such as scheduled payment cancellation, order timeout closing, etc. 4. Ordered message (TBD)Copy the code

2.1 Producer initiation process

Subsequent repair; Creation of several core instances:

  • MQClientManager
  • MQClientInstance
  • MQClientApiImpl
  • Custom serviceThread
  • The processing of RequestFutureTable
  • MQ的tracer

2.2 Producer shutdown process

The closing process is relatively simple, so just paste the code. In theory, shutting down an application does several things:

  • Tell someone to shut it down
  • Individual business thread pools or threads are closed (easy for Java to kill normally)
  • Core object assigned to NULL (easy to GC)

And from the code below, that’s exactly what it does

public void shutdown(final boolean shutdownFactory) { switch (this.serviceState) { case CREATE_JUST: break; case RUNNING: this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup()); this.defaultAsyncSenderExecutor.shutdown(); if (shutdownFactory) { this.mQClientFactory.shutdown(); } this.timer.cancel(); log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); this.serviceState = ServiceState.SHUTDOWN_ALREADY; break; case SHUTDOWN_ALREADY: break; default: break; }}Copy the code

3. Send producer messages

RocketMQ consumption send us from the method org. Apache. RocketMQ. Client. Impl. Producer. DefaultMQProducerImpl# sendDefaultImpl began. Let’s see what it does in the send. Before doing so, it’s a good idea to understand the structure of Message and TopicPublishInfo. All message sending logic starts from these two places. The core solution is:

  1. What is the current environment for topic queues
  2. Which queue is currently selected, which broker can be selected, and what is the policy
  1. Send failed, what should the policy be

3.1 Common Message Sending Logic

3.1.1 Business process

3.1.1.1 selected queue

SelectOneMessageQueue is one of TopicPublishInfo’s core methods, and here is one of its default methods. From an understanding of the code, its default strategy for selecting queues is polling. And more sophisticated strategy is through the org. Apache. Rocketmq. Client. The latency. MQFaultStrategy class to implement.

 public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
Copy the code

Here are some logical queue selection scenarios:

  • Failure re-election
  • Delay (sendLatencyFaultEnable)

3.1.1.2 Send kernel implementation

The sending kernel method mainly solves several things:

  • Broker Address retrieval
  • Encapsulation SendMessageRequestHeader
  • mQClientFactory .getMQClientAPIImpl().sendMessage

Here’s why I’m talking about these three things. This is because you will probably be able to see the source code for most RocketMQ.

Broker Address

For a single communication, RocketMQ ensures point-to-point communication. But is there any optimization here? We can understand it more deeply. Such as connection sharing, such as Broker Address synchronization. I don’t want to go into details, I can look at the code.

Encapsulation SendMessageRequestHeader

This is the point of this section, and perhaps the least technical. Because we point into the package path: it is in org.apache.rocketmq.com mon. The protocol. It defines the entire RocketMQ communication protocol, and if you can understand this, what are the difficulties with other interactive instructions?

mQClientFactory .getMQClientAPIImpl().sendMessage

This is a layer of API that RocketMQ encapsulates on top of Remote. Remote defines the implementation of RemoteCommand and RemoteService. This layer is how definitions in Protocol are converted to RemoteCommand. This layer solves the problem of Protocol.

The following code is a validation of the above logic. Finally, the MQClientAPIImpl is transformed into a RemotingCommand.

  if (sendSmartMsg || msg instanceof MessageBatch) {
      SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
      request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
    } else {
       request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
   }
Copy the code

The key message to focus on here is requestcode.send_message. It helps us track how brokers process messages sent by producers.


3.1.1.3 SendResult processing

According to RocketMQ delivery modes: Asyn, OneWay and Sync. Only Sync has special treatment for SendResult.

  • Returns the result
  • If the returned result fails, you can configure it to retry
switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() ! = SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; }Copy the code

SendResult is also processed by MQClientAPIImpl. Specific can read the following code org. Apache. Rocketmq. Client. Impl. MQClientAPIImpl# processSendResponse

 SendMessageResponseHeader responseHeader =
                (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
Copy the code

Here, the normal message sending process is finished, more details, you can go to the code to appreciate.

3.2 Transaction Messages

We talked about the process of reading and sending normal messages. Let’s take a look at how Transactional messages are implemented by RocketMQ. There are the following interfaces in the interface MQProducer. If you look at his implementation, there is a problem: DefaultMQProducerImpl does not implement this functionality, only TransactionMQProducer does. I’ve always felt that transactional messages are not a good concept, easily confused or metaphorically with database ACID properties, and it seems more reasonable to define them as “reliable messages”. Its core purpose is to ensure the consistency of message delivery and business execution. (This scenario requires careful analysis and is ignored for now)

/**
     * This method is used to send transactional messages.
     *
     * @param msg Transactional message to send.
     * @param arg Argument used along with local transaction executor.
     * @return Transaction result.
     * @throws MQClientException
     */
    @Override
    public TransactionSendResult sendMessageInTransaction(Message msg,
        Object arg) throws MQClientException {
        throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
    }
Copy the code

TransactionMQProducer

Before delving into sendMessageInTransaction, take a look at the structure of TransactionMQProducer

public class TransactionMQProducer extends DefaultMQProducer {
    private int checkThreadPoolMinSize = 1;
    private int checkThreadPoolMaxSize = 1;
    private int checkRequestHoldMax = 2000;
    private ExecutorService executorService;
    private TransactionListener transactionListener;
}
Copy the code

Removing discarded fields adds two key pieces of information: thread executor and TransactionListener. TransactionListener is used to call back and forth local transactions.

\

sendMessageInTransaction

The difference between transaction sending core logic and normal message sending logic is:

  • Before sending, implementation-specific attributes (PROPERTY_TRANSACTION_PREPARED)
  • When sent, the transaction SendResult is encapsulated
  • Call the TransactionListener
  • Call the endTransaction method

Here I have two questions for myself: 1. What is the difference between RemotingCommand and how does a Broker handle transaction messages

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
request.setRemark(remark);
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
Copy the code

This code shows requestcode.end_transaction, a directive to remember and see how it is handled by the broker.

Classic question: How do two-paragraph submissions solve the consistency problem?Copy the code

3.3 Delayed Messages

There is no specific way to send delayed messages, and the delay mechanism is mainly implemented in the Broker in RocketMQ. On the sending side, the Message method setDelayTimeLevel() is the only option. A total of 18 levels are customizable. Specifically view the referenced article [1]

4 summarizes

This article mainly explains the implementation principle of MQProducer separately by grasping the interface of MQProducer and analyzing its core capabilities of sending messages and Message objects. Transactional messages are guaranteed consistency through the classic two-paragraph commit solution, with the difference between a normal message and a delayed message being just an individual attribute. The bottom layer relies on the Remoting module, where MQClientAPIImpl is built. Reasonable layering and encapsulation simplifies business complexity and can be copied and learned. There is no asynchronous message handling.

Reference:

  1. RocketMQ delay queue