This blog examines RocketMQ’s basic concepts from the source level and analyzes the conclusions of Producer’s underlying source code.

  • When the Broker starts, it registers itself with all nameserVers
  • The Broker sends a heartbeat to NameServer every 30 seconds after it is started

In previous articles, we learned about some of the core concepts in RocketMQ, such as Broker, NameServer, Topic, and Tag. The whole process from starting to sending messages is analyzed from the source level. How the Producer gets the data of the Broker when sending messages to the Broker, and how the Producer selects corresponding queues from multiple MessageQueue to send messages.

However, for reasons of space, the two known conclusions mentioned at the beginning of this article were not verified in the previous blog post. This time, they will be verified at the source level.

You see the source code for the Broker master-slave architecture right from the start

As mentioned in the previous blog post, brokers adopt a master-slave architecture to ensure their high availability. Even if the Master Broker fails for some unexpected reason, there is a complete piece of data on the Slave Broker and the Broker can continue to serve.

The DLeger mentioned in isEnableDLegerCommitLog can be ignored as it returns false by default. So when the Broker is first started, it executes the logic wrapped in If.

RocketMQ itself has a Master/Slave architecture, but it is not fully functional. If the Master Broker fails, the Slave Broker needs to be manually switched to Master.

It is similar to manually setting one Redis as the Slave node of another Redis. If the Master of Redis hangs at this time, the Slave node needs to be manually switched. To solve this problem, Redis developed Sentinel, which can automatically failover when a failure occurs. So RocketMQ’s Dleger after version 4.5 is pretty much the same thing, in addition to the fact that Dleger can implement multiple copies.

How can master and slave data be synchronized when Dleger is not used

To conclude, under RocketMQ master-slave architecture, master-slave synchronization adopts the mode of active pull.

If the current registered Broker role is Slave, ScheduledExecutorService is used to start a periodic scheduled task that synchronizes with the Master every 10 seconds. The synchronized data includes the configuration for Topic, the consumption Offset for Consumer, the Offset for delayed messages, and the data and configuration for subscription groups.

The functions and principles of ScheduledExecutorService are described below.

Broker registration is forced at first startup

Since this is the first startup, the forceRegister parameter is set to true directly.

The ScheduledExecutorService command is used to start scheduled tasks

After coming through the portal, the Broker initiates a scheduled task to periodically de-register. ScheduledExecutorService bottom is a newSingleThreadScheduledExecutor, only one thread thread pool, its key parameters corePoolSize value is 1, then according to the specified frequency periodically perform a task.

ScheduledExecutorService has two main functions:

  • ScheduledExecutorServiceIn a fixedfrequencyPerform a task
  • ScheduledExecutorServiceAfter that, set a time interval before proceeding to the next task

ScheduleAtFixedRate is used to implement the heartbeat mechanism

Here we use scheduleAtFixedRate, as shown below.

As for the frequency of execution, we can configure a range of no more than one minute, that is, between 10 and 60 seconds. By default, the range is once every 30 seconds, which verifies that the Broker sends a heartbeat to the NameServer every 30 seconds.

This judgment to get the frequency of execution is a bit interesting and even a bit neat, but it may take a while to understand the specific configurable time range. In real business code, I would advise against this, but readability and maintainability of business code are my top priorities.

It is important to note that the heartbeat is started here, giving a 10 second delay, because a registration has already been performed in the previous logic without using Dleger. Without delay, there would be two registrations almost at the same time, which is obviously not expected. At the same time, forceRegister was changed from true to isForceRegister.

Call registerBrokerAll to register

Once scheduled task registration is complete, each subsequent trigger executes the registerBrokerAll method for registration. You may be wondering, I am currently a Broker with the suffix All. That’s because there are multiple NameserVers, and Broker startup registers itself with all of them. Of course, there’s no proof, so let’s move on.

If the current conditions are met, the registration operation will actually take place. So what are the conditions? Determined by the variable forceRegister and a needRegister method, forceRegister defaults to true, so the first time this logic is executed it must be registered.

Determine whether the current Broker needs to be registered by comparing the data version

If you are interested, follow the article to find out what needRegister is based on to determine whether you need to register.

First, once the Broker registers with the NameServer, Producer keeps writing data and consumers keep consuming data. The Broker may also change key routing information such as MessageQueue due to failures. The data in the NameServer will be inconsistent with the actual data in the Broker. If the Producer does not update the routing data in a timely fashion, the Producer may pull the wrong route data.

Therefore, when a scheduled task is triggered, NameServer and Broker data are compared. If the data version is inconsistent, the Broker registers the data again and updates the latest data to NameServer. To put it bluntly, it is to do a regular update of data. The code in the red box below is the core code for data comparison.

When the Broker has finished comparing data with all the NameServer nodes, the result is determined. If any node has inconsistent data, it needs to be re-registered and the latest data is updated to the NameServer. The core judgment logic is also highlighted in red.

At this point, we have verified that the Broker registers with all NameserVers at startup. But since there are still shining points worth paying attention to, we continue to read the source code.

Use CountDownLatch to get the return results of all registered asynchronous tasks

It is also worth noting in needRegister that RocketMQ is implemented asynchronously with thread pools for interactions with multiple Nameservers, using CountDownLatch to wait for all requests to end and return results to the main thread.

While we’re on the subject of CountDownLatch, I’d like to mention it. Suppose we have five computations that are not dependent on each other. What if we compute the result quickly and return it? Of course, that would be five tasks running concurrently, which would require a new thread to be opened, and the results would not return together.

CountDownLatch allows the main latch to wait until all five calculations are complete before waking up to continue the logic. This is the function of CountDownLatch. If it is just a CRUD function, you may not even know what CountDownLatch is. This is why you will be asked these questions in the interview because you must be able to use CountDownLatch in the complex business background of a large company.

After specifying that registration is required, the core registration method is next, and the core logic is implemented by registerBrokerAll. The Broker also registers itself on each NameServer node and starts threads asynchronously for early execution efficiency. CountDownLatch is also used when fetching all results.

Use CopyOnWriteArrayList to store the return of the registration request

In addition, the list used to save the registration results is CopyOnWriteArrayList, which should be familiar to students who have been abused during the interview. We know that multiple threads are enabled to register different Nameservers. When writing the registration result, multiple threads are writing to the same list, which causes thread-safety issues.

And we know that ArrayList is not thread-safe, which is why CopyOnWriteArrayList is used here to hold the registration results. Why is CopyOnWriteArrayList thread-safe?

This is due to Copy On Write (COW). Read requests share the same List. When writing requests, a List is copied and an exclusive lock is added to Write data. Instead of locking all operations directly, read/write locks separate read and write requests, so that they do not affect each other and lock only write requests, reducing the consumption of locking and improving the concurrency of the overall operation.

What exactly does the concurrent registration operation above do? Let’s look at the code.

Above is all the logic for a single registration, and you can see that after building the request, there is a oneway decision.

The oneway value is false, indicating one-way communication. The Broker does not care about the return of NameServer and does not trigger any callback functions. The Broker then sends the NameServer all the data that has been written into the request body. Request data unified by a man named TopicConfigSerializeWrapper Wrapper to package.

It can be seen in two parts:

  • Data exists for all topics on the Broker node
  • Data version

With this data in hand, the Broker synchronously calls invokeSync to send the request to NameServe and, after execution, triggers a callback function that implements the specific function.

EOF

At this point, we have completed the verification of the conclusions mentioned in the beginning. We have also discovered the master-slave architecture of RocketMQ, the way the Master and Slave synchronize data, the implementation of the heartbeat mechanism, etc. We have also seen almost all the processes initiated by the Broker in the source code. It is interesting to see the source code written by the elder brother, and then take a look at the source code related to the NameServer side sometime.

That’s all for this blog post. If you found it helpful, please give it a thumbs up, a comment, a share and a comment.

Welcome to wechat search to follow [SH full stack Notes] and check out more related articles