One, foreword

“This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”

In the previous intensive reading of the RocketMQ source series (1) — NameServer we had a legacy issue:

How does RocketMQ handle the possibility that a producer may send a message to a broker that is down because there is a 120-second interval between when the broker is down and when the NameServer route is removed?

This article will give an explanation of this problem. At the same time, this paper will focus on analyzing two problems from the perspective of source code:

  1. What is the start-up process of Producer?
  2. How does the Producer send messages to the broker?

It should be emphasized that this article will not elaborate on some concepts related to Producer and news. Students who are not familiar with this topic can find the official Chinese documents in the RocketMQ source code series (0) — Opening words for further understanding.

Two, start the process

The core producer class in RocketMQ is DefaultMQProducer, and the source entry for starting the process is DefaultMQProducer#start(). The flowchart is as follows:

You can read the source code against the flow chart. Here is a simple summary. The steps to start the process are as follows:

  1. Check whether the producer group is valid
  2. Get MQClientInstance
  3. Register the current producer with an MQClientInstance (register can be understood as setting the producer to an MQClientInstance)
  4. Start the MQClientInstance client

Let’s look at some of the little details

2.1 MQClientManager

Literally, MQ client manager.

There is only one MQClientManager instance in the entire JVM, why?

public class MQClientManager {
    private final static InternalLogger log = ClientLogger.getLog();
    // the MQClientManager instance is exposed
    private static MQClientManager instance = new MQClientManager();
    private AtomicInteger factoryIndexGenerator = new AtomicInteger();
    // MQClientInstance cache table
    private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
        new ConcurrentHashMap<String, MQClientInstance>();

    private MQClientManager(a) {}public static MQClientManager getInstance(a) {
        returninstance; }... }Copy the code

The exposed instance is a static variable that is initialized only when the class is first loaded. The information is stored in the JVM and is unique.

It maintains a cached TABLE of MQClientInstance: ConcurrentMap

factoryTable. Only one MQClientInstance is created for the same clientId. So in summary, only one MQClientManager will exist within a SINGLE JVM instance, but if multiple applications (clients) are running, there will be multiple MQClientInstances.

We can see how clientId is generated:

    public String buildMQClientId(a) {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());

        sb.append("@");
        sb.append(this.getInstanceName());
        if(! UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }
Copy the code

ClientId is a combination of IP address, instance name, and unitName (optional). That’s a problem, because the IP address and the instance name are the same in the same instance.

The instance name has been changed, as you can see here: the instance name has been changed to the process ID

   if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
      }
Copy the code

2.2 MQClientInstance

MQClientInstance encapsulates the RocketMQ network processing API and is the network channel for consumers and producers to communicate with NameServer and Broker.

  if (consumerEmpty) {
      if(id ! = MixAll.MASTER_ID)continue;
  }
Copy the code

The above code fragment, located in the sendHeartbeatToAllBroker() method of MQClientInstance, indicates that the producer only sends heartbeats to the Master’s broker

Create MQClientInstance source code:

    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if(prev ! =null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId); }}return instance;
    }
Copy the code

Use ConcurrentMap to ensure that the concurrency is error free, and double check to ensure that the multi-threaded scenario, the return instance is the same.

2.3 Heartbeat Mechanism

After MQClientInstance is started, there is another line of code that is important:

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
Copy the code

Note that the AllBrokers here are of course not all of the brokers in the cluster, but the brokers associated with the current client.

Send a message

Once the startup is complete, the Producer can start sending messages. It can be seen that there are many ways to send messages in DefaultMQProducer, which can be classified as follows:

Depending on the message type:

  • Ordinary messages: nothing special, just ordinary messages
  • Delayed message: When a delayed message is delivered, a specified delay level needs to be set, that is, the message will be consumed by the consumer after a specific time interval. In the ScheduleMessageService of the MQ server, a timer is set for each delay level and the consumption queue of the corresponding delay level is pulled every second. Currently, RocketMQ does not support delay messages at any time interval. It only supports specific levels of delay messages, namely, “1s 5s 10s 30S 1m 2m 3m 4m 5m 6m 7m 8m 9M 10M 20m 30m 1H 2H “.
  • Sequential messages: For a given Topic, the Producer guarantees that messages are sent to a queue in order, and only one thread should consume the queue data.
  • Transaction messages: Messages are guaranteed to be sent to the broker through a two-phase commit with periodic state checks. See the following figure for the specific process

According to the sending method:

  • Reliable Synchronous sending: Synchronous sending refers to the communication mode in which the sender sends the data and then sends the next packet only after receiving the response from the receiver.
  • Reliable asynchronous transmission: Asynchronous transmission refers to the communication mode in which the sender sends the data without waiting for the receiver to send back the response before sending the next packet. The asynchronous sending of MQ requires the user to implement the SendCallback interface. After sending one message, the message sender can return without waiting for the server response and send the second message. The sender receives the server response through the callback interface and processes the response result.
  • Oneway sending: The sender is only responsible for sending messages without waiting for the response from the server and no callback function is triggered. That is, the sender only sends requests without waiting for the reply. Sending messages in this way takes a very short time, usually at the microsecond level.

Here, we choose to analyze sending synchronous messages. The flow chart of sending messages is as follows:

This is summarized in the following main steps:

  1. Validation message: Validation of topic and message body
  2. Find topic routing information: Note that the information found here is stored in the MessageQueue dimension
  3. Select the MessageQueue: step 2 returns the MessageQueue of all brokers on the subject of the message to be sent. This step is to select one of these MessageQueue for sending
  4. Performs the action of sending a specific message

3.1 Select MessageQueue: This is the default solution

MQFaultStrategy#selectOneMessageQueue

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            ...... // Business logic
            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
Copy the code

Based on the sendLatencyFaultEnable parameter, we have two options, one is called the default option, and the other is the option after fault delay is enabled. You can see that the schema with fault delay enabled actually calls the default schema. What does the default schema do first?

TopicPublishInfo#selectOneMessageQueue

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
      LastBrokerName indicates which broker was sent last time
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            for (int i = 0; i < this.messageQueueList.size(); i++) {
              Sendqueue records an index that can be incremented using ThreadLocal
                int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if(! mq.getBrokerName().equals(lastBrokerName)) {returnmq; }}returnselectOneMessageQueue(); }}Copy the code

As you can see, choosing MessageQueue is actually quite simple:

  • Maintain an incrementable value sendqueue modulo the total number of MessageQueue each time to obtain a new subscript of the MessageQueue;

  • Reselect when the selected new MessageQueue belongs to the last Broker.

    This allows for load balancing so that messages sent two times together are not sent to the same broker. It also minimizes the number of messages sent to the down broker when one of the brokers is down.

3.2 Selecting MessageQueue — Fault delay scheme

When sendLatencyFaultEnable is enabled, we execute the following logical branch:

    if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if(notBestBroker ! =null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else{ latencyFaultTolerance.remove(notBestBroker); }}catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }
Copy the code

The overall logic is summarized as follows:

  1. Select a MessageQueue in the same way as the default scheme
  2. Check whether the MessageQueue is available. If yes, return the MessageQueue directly
  3. If not, an attempt is made to select an available Broker from among the evaded brokers. If the selected Broker has a write queue, a return is returned
  4. If there is no written queue, then the default scheme is used to select a queue to return

The core of the failure delay mechanism is the use of

ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);    class FaultItem implements Comparable<FaultItem> {        private final String name;  // broker name private volatile long currentLatency; Private volatile long startTimestamp; // The current time does not exceed this timestamp, indicating that the broker needs to be avoided... }
Copy the code

Each time a queue is selected, you need to check whether the current Broker is in the queue by using the faultItemTable in memory. If the Broker is not available, you can return it directly. If it is, the proof may not be available, so I need to check again

   public boolean isAvailable() {            
       return (System.currentTimeMillis() - startTimestamp) >= 0;   
       }
Copy the code

This table is updated every time a message is sent.

Then you invoke the core scenario that sends the messagesendKernelImplTo assemble and send messages. Interested students can read the source code against the flow chart

Four, several questions mentioned in the preface

The startup process and message sending are described in sections 2 and 3, respectively. Now let’s look at how producers respond to broker downtime, which is a legacy of the previous article.

4.1 How do producers cope with broker downtime

Let’s look at the method sendDefaultImpl that sends the message and you can see that there’s a for loop

  for (; times < timesTotal; times++) {    ...SendResult = this.sendKernelImpl(MSG, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() ! = sendstatus.send_ok) {// The message retry switch is enabled, For message retry the if (this. DefaultMQProducer. IsRetryAnotherBrokerWhenNotStoreOK ()) {continue; } return sendResult;} return sendResult; default: break; }}
Copy the code

TimesTotal is the number of retries +1, that is, if the message retry switch is enabled, the producer will retry the message.

In combination with the MessageQueue option described above, either the default or the delay option will avoid the last broker when re-selected. The broker that caused the message to fail is therefore not selected when the message is retried.

In summary, RocketMQ makes message sending highly available with message retry + Broker avoidance

4.2 Why Cannot A Topic be Automatically Created in a Production Environment?

Many times we are told not to set autoCreateTopicEnable to true in a production environment, as this causes autoCreateTopicEnable to only exist on one Broker and all subsequent requests for that Topic to be limited to a single Broker, creating a single point of stress.

But why is that? So let’s analyze that

1. When the broker starts, the locally created topic is loaded

 public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) {
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        this.consumerOffsetManager = new ConsumerOffsetManager(this);
   			// Load the topic configuration
        this.topicConfigManager = new TopicConfigManager(this);
        this.pullMessageProcessor = new PullMessageProcessor(this); . }Copy the code

In the TopicConfigManager constructor, autoCreateTopicEnable is determined and the default theme is loaded:

    if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
Copy the code

You can see that a topic named AUTO_CREATE_TOPIC_KEY_TOPIC has been created with both read and write queues of 8.

The information is then synchronized to the NameServer.

Note that every broker with autoCreateTopicEnable enabled loads the default subject information at startup and reports it to NameServer. There will then be multiple broker information stored at NameServer for the default topic

2. The producer sends messages to query topic information

The producer sends a message, you can use tryToFindTopicPublishInfo to query subject information:

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null== topicPublishInfo || ! topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            // The newly created theme takes this branch
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true.this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            returntopicPublishInfo; }}Copy the code

Now, of course, the theme of the message only exist producers in the end, so must find, finally can only go on the bottom of this branch, came to updateTopicRouteInfoFromNameServer and perform the following logic:

TopicRouteData topicRouteData;
if(isDefault && defaultMQProducer ! =null) {
   // Get the default topic information created by the broker
		topicRouteData = 				  this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
		if(topicRouteData ! =null) {
      // Change the number of read/write queues for the default topic information to 4
    		for (QueueData data : topicRouteData.getQueueDatas()) {
        		intqueueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); }}}... TopicRouteData old =this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
Copy the code

The producer then selects a MessageQueue and encapsulates the message for sending. Note that the message sent here is not the default topic, but the topic of the message itself:

SendKernelImpl requestheader.settopic (MSG. GetTopic ()); sendKernelImpl requestheader.settopic (MSG.
Copy the code

After the broker receives the message will first to check the message: AbstractSendMessageProcessor# msgCheck

// Query whether the topic of the header exists
        TopicConfig topicConfig =
            this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (null == topicConfig) {
            int topicSysFlag = 0;
            if (requestHeader.isUnitMode()) {
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    topicSysFlag = TopicSysFlag.buildSysFlag(false.true);
                } else {
                    topicSysFlag = TopicSysFlag.buildSysFlag(true.false);
                }
            }

            log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
            // If no, the topic is created based on the relevant information in the message
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                requestHeader.getTopic(),
                requestHeader.getDefaultTopic(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
Copy the code

At this point, the information for the new topic is available on the broker.

3. Next steps

Here’s what happened next:

  • The broker uses the heartbeat mechanism to report topic messages, including newly created topics
  • The NameServer receives the topic information from the broker and updates the routing information
  • When the producer sends a message to the newly created topic, it finds that the new topic has a route on the NameServer. Then the producer receives the route information and sends the message according to the route information

Do you see any problems? So far, although the routing information for the new topic already exists on the NameServer, there is only one broker and there are no updates.

The above.

Is there a way to solve this problem? There are!

Method one:

AutoCreateTopicEnable is set to false, so the production environment needs to manually create topics using the command line tool. This can be done in cluster mode (so that each broker in the cluster has the same number of queues). It can also be created using a single broker pattern (so that the number of queues per broker can be inconsistent).

Method 2:

Send more than nine messages in rapid succession (the default write queue for a single broker is 8).

The key point is that after the first message is sent, the receiving Broker will create a topic locally and then synchronize it to the NameServer through the heartbeat mechanism within 30 seconds at most. If we send more than nine messages at the fastest time, the messages will be received by multiple brokers, and eventually the routing information on the NameServer will be multiple brokers.

But this method is too uncontrollable, so we still use method one in production.

References:

  • RocketMQ 4.8.0 source
  • Github.com/apache/rock…
  • Github.com/DillonDong/…
  • Inside The RocketMQ Technology

The last

  • If you feel there is a harvest, three consecutive support;
  • If there are mistakes in the article, please comment and point out, also welcome to reprint, reprint please indicate the source;
  • Github: github.com/CleverZiv/r… (Annotated in Chinese)
  • Personal VX: Listener27, exchange technology, interview, study materials, help the first-line Internet manufacturers in the promotion, etc