In every age, there is no ill-treatment of those who can learn.

Hello, I’m Yes.

Following up on the last headline: Write a Message middleware, I’ve mentioned some key points for implementing message middleware, and today I’m going to dive into the kernel implementation of RocketMQ to see how a trillion-sized, low-latency message queue can actually be designed.

In this article, I will first introduce the overall architecture design, and then go into the detailed design of each core module and the analysis of the core process.

Usage considerations and best practices will also be mentioned.

This article introduces the usage scenarios, basic concepts and common questions of message queues.

No more talking. Get in the car.

RocketMQ overall architecture design

The overall architecture design is mainly divided into four parts: Producer, Consumer, Broker and NameServer.

In order to be more realistic, I drew all cluster deployments, like brokers, and I also drew master-slave ones.

  • Producer: indicates message producers and can be deployed in clusters. It establishes a long connection with a random NameServer cluster to know which Broker Master is hosting the Topic to be sent, and then establishes a long connection with it to send messages in a variety of load-balancing modes.

  • Consumer: Message Consumer, which can also be clustered. It also establishes a long connection with a random NameServer cluster to find out which Broker Master or Slave is hosting the Topic to be sent to, and they then establish a long connection to support cluster consumption and broadcast consumption of messages.

  • Broker: Stores, queries, and consumes messages. A Master can correspond to multiple slaves. The Master supports reading and writing, while the Slave only supports reading. The Broker registers its routing information with each NameServer in the cluster.

  • NameServer: a simple Topic routing registry that supports dynamic registration and discovery of brokers and preserves the relationship between topics and Borkers. However, nameserVers do not communicate with each other. Each NameServer has complete routing information, that is, stateless.

Let me summarize their interaction in one more paragraph:

Start the NameServer cluster first. There is no data interaction between nameservers. After the Broker starts, it sends heartbeat packets to all Nameservers periodically (every 30s), including: IP, Port, TopicInfo, and NameServer periodically scan the list of brokers that are alive. If there is no heartbeat after 120s, the information about the Broker will be removed, indicating that the Broker is offline.

In this way, each NameServer knows information about all brokers in the cluster. When Producer goes online, NameServer can know which Broker it wants to send a Topic message to. Establish a long connection with the corresponding Broker (Master role) to send messages.

A Consumer can also learn from NameServer which Topic Broker it wants to receive, establish a connection with the corresponding Master and Slave, and receive messages.

The simple workflow described above gives you an idea of the overall data flow, so let’s look at each part in detail.

NameServer

It is typically lightweight and stateless. The role of Zookeeper is similar to that of Zookeeper. Its main two functions are Broker management and route information management.

So it’s pretty simple in general, but I’m going to post a few more fields, just to give you a little bit of an intuition of what it’s storing.

Producer

A Producer is nothing more than a message Producer. First, it needs to know which Broker the message is sent to, and every 30 seconds, it retrieves the mapping between Topic and Broker from a NameServer and stores it in local memory. If a new Broker is found, a long connection is established and a heartbeat is sent to the Broker every 30 seconds to maintain the connection.

In the case of synchronous sending, the message will be recast twice by default (retryTimesWhenSendFailed = 2) and the Broker that failed last time will not be selected. Will post to other brokers.

In the case of asynchronous transmission failure will try again, the default is twice (retryTimesWhenSendAsyncFailed = 2), but only on the same Broker try again.

Producer Startup process

Then we look at the initiation process of the Producer and see what they did.

The general startup flow chart is clear, but some details are not clear, such as rebalancing, TBW102, and the scheduled missions.

One might ask why the producer is pulling services and rebalancing.

Since both producers and consumers need to use MQClientInstance, and the same clientId shares an MQClientInstance, ClientId is combined with a native IP and instanceName (default), so multiple producers and consumers are actually using an MQClientInstance.

As for the scheduled tasks, see the following figure:

Producer Sends messages

Let’s take a look at the process of sending a message, which is not very complicated. It is just to find the Topic to send the message on which Broker, and then send the message.

We now know what TBW102 does. The Broker that starts a Topic automatically registers the default Topic with the NameServer, so that when the Producer sends a message for a new Topic, it knows which Broker can automatically create the Topic. Then send it to that Broker.

When the Broker receives this message, it does not find the corresponding Topic, but it accepts the creation of a new Topic, which creates the corresponding Topic routing information.

Disadvantages of creating themes automatically

Automatically creating a topic makes it possible that messages for that topic will only be sent to one Broker and will not be load-balancing.

Because the Broker creates routing information for a new Topic after the request to create it reaches the Broker, but the heartbeat is sent every 30 seconds, NameServer takes up to 30 seconds to learn about the routing information for this new Topic.

Given that the sender is still sending messages rapidly, there is no routing information for this Topic on NameServer, so there is an opportunity for other brokers that allow automatic creation to create routing information for this Topic. This allows brokers in the cluster to receive information from this Topic for load balancing purposes, but individual brokers may not receive it.

If the sender sends none for 30 seconds, the previous Broker updates the routing information to the NameServer with a heartbeat. The Producer that sends the Topic can only know from the NameServer that the Topic can only be sent to the previous Broker, which is unbalanced. If the new Topic has a lot of messages, the Broker will be overloaded.

Therefore, you are not advised to enable the autoCreateTopicEnable parameter to allow automatic theme creation online.

Sending message failure delay mechanism

One parameter is sendLatencyFaultEnable, which is disabled by default. This parameter is used to back off brokers that have timed out.

Sending messages records the time at which they were sent. If they exceed a certain amount of time, the Broker will not allow them to be sent for a period of time.

For example, if the sending time exceeds 15000ms, messages cannot be sent to the Broker within 600000 ms.

This mechanism is critical. A high probability of sending a timeout indicates that the Broker is heavily loaded, so stepping out of the way to allow it to slow down is key to achieving high availability of sending messages.

The subtotal

Producer pulls routing information to NameSrv every 30 seconds and updates the local routing table. A new Broker establishes a long connection with it and sends a heartbeat to the Broker every 30 seconds.

Do not enable autoCreateTopicEnable in production.

Producer uses retry and delay mechanisms to increase the high availability of message sending.

Broker

The Broker is more complex, but very important. It can be roughly divided into the following five modules. Let’s take a look at the chart on the official website.

  • The Remoting module handles customer requests.
  • Client Manager Manages clients and maintains subscribed topics.
  • Store Service Provides the message Store query Service.
  • HA Serivce: High availability of primary/secondary synchronization.
  • Index Serivce: Establishes an Index by specifying a key for easy query.

There are several modules that have nothing to say, so I will not analyze them. Let’s look at the stored ones first.

The Broker’s storage

RocketMQ storage uses a local file storage system, which is efficient and reliable.

There are three types of files involved: CommitLog, ConsumeQueue, and IndexFile.

CommitLog

All RocketMQ topic messages are stored in the CommitLog. Each CommitLog is named with the starting offset of 20 bits by default. If the file name is less than 00000000000000000000, 0 will be added. The second file name is 00000000001073741824, indicating that the start offset is 1073741824. You can find the corresponding file by using the offset in this way.

All messages are written sequentially, and the next file is opened if the file size is exceeded.

ConsumeQueue

ConsumeQueue Is a message consuming queue that can be thought of as an index of messages in a CommitLog that combines messages of all topics, so that messages can be found more efficiently through an index.

ConsumeQueue stores items of a fixed size and only stores 8 bytes of commitlog physical offset, 4 bytes of message length, and 8 bytes of Tag hash, fixed 20 bytes.

In real storage, a ConsumeQueue corresponds to a Queue in a Topic. Each file is about 5.72M and consists of 30W pieces of data.

The consumer gets the real physical address of the message from The ConsumeQueue and then goes to the CommitLog for the message.

IndexFile

An IndexFile is an IndexFile that provides an additional means of finding messages without affecting the main process.

The corresponding message is queried by Key or time interval. The file name is named after the creation time stamp. A single IndexFile is about 400M in size and an IndexFile stores 2000W indexes.

Let’s take a look at how the contents of these three files are generated:

When messages arrive, they are stored on Commitlog, then a ReputMessageService thread forwards them to message consumption queue files and index files in near real time, i.e., asynchronously.

Message flushing mechanism

RocketMQ offers two options: synchronous and asynchronous flush. Flush is known to be inefficient. Simple memory is the most efficient, but the reliability is not high.

  1. The Broker was shut down by violence, such as kill -9
  2. The Broker to hang
  3. The operating system is down
  4. The machine power
  5. The machine is broken. It won’t work
  6. The disk is broken

If both are 1-4, synchronous flush must be no problem, asynchronous may lose some messages, 5 and 6 have to rely on the copy mechanism, if synchronous double write must be stable, but the performance is poor, if asynchronous may lose some messages.

So it depends on the scenario to use synchronous and asynchronous flush and duplicate mechanism.

Page caching and memory mapping

Commitlogs are mixed, so all messages are written sequentially, and sequential writes to files are almost the same as memory writes.

And RocketMQ files use memory mapping, or Mmap, to map application virtual pages directly to the page cache, without having to copy the kernel state to the user state. Take a look at the diagram I drew in the previous article.

Page cache is actually the cache of files by the operating system to speed up file read and write. In other words, the writing of files is written to the page cache first, and the operating system will flush disks irregularly (the time is not controllable). The reading of files is loaded into the page cache first, and the contents of adjacent blocks are preread according to the principle of locality.

RocketMQ uses a fixed-length structure for file storage, which allows you to map an entire file into memory at once.

File preallocation and file preheating

Memory mapping is just mapping, and data is only actually loaded into memory when the page is actually read and there is a page miss interrupt, so RocketMQ has been optimized to prevent runtime performance jitter.

File preallocation

CommitLog the size of the default is 1 g, when more than size limit need to prepare a new file, and there was a a background thread and RocketMQ AllocateMappedFileService, continuous processing AllocateRequest, AllocateRequest is a pre-allocation request that prepares the next file allocation in advance to prevent file allocation and jitter during message writing.

File preheating

There is a warmMappedFile method that iterates the currently mapped file on each page, writing one zero byte, and then calls mlock and madvise(MADV_WILLNEED).

Mlock: Part or all of the address space used by a process can be locked in physical memory to prevent it from being swapped to swap space.

Madvise: Advise the operating system that this file should be accessed in the near future, so it might be a good idea to read a few pages in advance.

The subtotal

Commitlogs are stored as a hybrid, meaning that all topics are stored together, appended sequentially, and named with a starting offset.

Messages are written to the CommitLog and then distributed to the ConsumerQueue and IndexFile via background threads.

The consumer reads the ConsumerQueue to get the physical address of the real message, and then accesses the CommitLog to get the real message.

Mmap mechanism is used to reduce one copy, and file preallocation and file preheating are used to improve performance.

Provides synchronous and asynchronous disk flushing. Select an appropriate mechanism based on the scenario.

The Broker’s HA

The slave Broker establishes a long connection with the primary Broker, then takes the maximum offset of the primary Broker Commitlog, and starts pulling messages from the primary Broker. The primary Broker returns a certain number of messages, and the cycle continues to synchronize the primary and secondary data.

The consumer will first request the main Broker to consume the message. If the main Broker feels a bit stressed at the moment, it will return a suggestion to pull the message from the Broker. The consumer then goes to the server to pull the message.

Consumer

There are two modes of consumption, namely broadcast mode and cluster mode.

Broadcast mode: Each consumer in a group consumes a complete Topic message.

Cluster mode: consumers in a group divide up consumption Topic messages.

We usually use the cluster model.

And consumer consumption messages are divided into push and pull mode, detailed look at my article message queue push and pull mode, respectively from the source level analysis of RokcetMQ and Kafka message push and pull mode, as well as the advantages and disadvantages of the push and pull mode.

Load balancing on the Consumer side

Consumers periodically fetch the number of queues under a Topic and then look for all consumers in the same Consumer group that subscribed to that Topic. The default allocation strategy is similar to paging sort allocation.

The queue is sorted, and then the consumers are sorted. For example, if there are nine queues and three consumers, then consumer-1 consumes messages in queues 0, 1, 2, and consumer-2 consumes messages in queues 3, 4, 5, and so on.

So if the load is too big, then add queues, add consumers, through the load balancing mechanism can sense the rebalancing, even the load.

Consumer Retry of message consumption

It is inevitable that a message consumption failure occurs, so you need to provide a retry for a consumption failure, and the typical consumption failure is either a message structure error or some temporarily unprocessed state, so immediate retry is not appropriate.

RocketMQ assigns each consumerGroup a RETRY queue, Topic %RETRY%+consumerGroup, and sets RETRY levels to delay retries.

To take advantage of RocketMQ’s delay queue feature, retried messages are first stored in a delay queue named “SCHEDULE_TOPIC_XXXX”, and the original Topic information is stored in the extended field of the message.

After a delay, the Consumer returns to the retry queue and consumes the retry queue subject, retrieving the previous message.

If the consumption fails after a certain number of retries, it is moved to the dead-letter queue (Topic %DLQ%” + ConsumerGroup), and the dead-letter queue is considered successful.

We can then manually process these messages in the dead-letter queue.

Global and local order of messages

The global order is to eliminate all concurrency. Each Topic has one queue, and both Producer and Consuemr have the same concurrency.

Local order is just a queue order, and you can have multiple queues running in parallel.

MessageQueueSelector can specify Producer a business to send only this queue, and Comsuer receives messages through MessageListenerOrderly, which is locked consumption.

There is an mqLockTable at the Broker, and sequential messages need to be locked by the Broker while creating a pull message task before they can be consumed.

Strict ordering of messages is difficult. Assuming that everything is fine now, if a Broker goes down and there is a rebalancing, the corresponding consumer instance of the queue changes, it is possible to get out of order. If strict ordering is to be maintained, then the whole cluster will be unavailable.

Some caveats

1. Subscription messages are stored in the ConsumerGroup, so each Consumer in the ConsumerGroup needs to have the same subscription.

Because subscription messages are uploaded as a heartbeat, if the Consumer subscription information in a ConsumerGroup is different, then overwriting each other can occur.

For example, consumer A subscribes to Topic A, and consumer B subscribes to Topic B. At this point, consumer A goes to the Broker to get A message, and then B’s heartbeat packet is sent. The Broker updates, and then receives A’s request.

2. RocketMQ separates primary and secondary reads and writes

The slave can only be read but cannot be written. The slave reads only when the offset read by the current client and the maximum offset accepted by the current Broker exceed the physical memory size limits. Therefore, the slave cannot share traffic

3. Machines alone cannot increase consumption, and the number of queues needs to keep up.

4. As mentioned earlier, do not allow automatic theme creation

RocketMQ best practices

Some of these best practices can be found on the website.

The use of Tags

It is suggested to use Tages to mark different businesses by applying one Topic, because Tages Settings are more flexible, and one Topic is very clear and can be identified intuitively.

The use of Keys

If there is a unique identifier of the message service, enter it in the keys field for future locating and searching.

Improve the consumption power of consumers

1. Improve the parallelism of consumption: increase the number of queues and consumers, and increase the parallel consumption thread of a single consumer, parameter consumeThreadMax.

2, the batch consumption, set consumeMessageBatchMaxSize parameter, so that one can get multiple messages, and then like a update statement to execute, ten times before now is performed at a time.

3. Skip non-core messages. When the load is very heavy, in order to keep those core messages, set those non-core messages.

NameServer addressing

Use HTTP static server addressing (the default) so that NameServer can discover it dynamically.

JVM options

The following is copied from the official website:

If you don’t care about RocketMQ Broker startup time, “pre-touch” the Java heap to ensure that each page will be allocated during JVM initialization.

Those who don’t care about startup time can enable it: -xx :+AlwaysPreTouch Disabling bias locking may reduce JVM pauses, -xx: -usebiasedlocking For garbage collection, it is recommended to use the G1 collector with JDK 1.8.

-XX:+UseG1GC -XX:G1HeapRegionSize=16m

-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30

Also, don’t set the value of -xx :MaxGCPauseMillis too small, or the JVM will use a small young generation to achieve this goal, which will result in very frequent minor GC, so rolling GC log files are recommended:

-XX:+UseGCLogFileRotation

-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m

Linux kernel Parameters

The following is copied from the official website:

  • The EXTRA_free_kbytes tells the VM to keep extra free memory between the threshold at which background reclamation (KSWAPD) starts and the threshold at which direct reclamation (through the allocation process) is started. RocketMQ uses this parameter to avoid long latency in memory allocation. (Kernel version dependent)
  • Vm. min_free_kbytes, if set to less than 1024KB, will subtly break the system, and the system is prone to deadlocks under high loads.
  • Max_map_count, which limits the maximum number of memory-mapped regions a process can have. RocketMQ will use Mmap to load CommitLog and ConsumeQueue, so it is recommended to set a large value for this parameter. (Agressiveness –> Guider)
  • Vm. swappiness, which defines how active the kernel swap memory pages are. Higher values increase aggression and lower values decrease exchange. It is recommended to set the value to 10 to avoid switching delays.
  • File Descriptor limits, RocketMQ needs to open File descriptors for files (CommitLog and ConsumeQueue) and network connections. We recommend setting the value of the file descriptor to 655350.
  • Disk Scheduler, RocketMQ recommends using the I/O cutoff scheduler, which attempts to provide a guaranteed delay for requests.

The last

In fact, there are still a lot to be said, such as flow control, message filtering, timing message implementation, including the bottom communication 1+N+M1+M2 Reactor multithreading design and so on.

The main content is too much, and it does not affect the main process, so it is written after stripping out, roughly some of the implementation or about.

Including meta – information exchange, message sending, storage, consumption and so on.

The part about transaction messages was also analyzed in my previous article, so I won’t post it again.

As you can see, there are many, many things to consider when implementing a production-level message queue, but that’s pretty much the architecture and modules involved.

As for the specific details in-depth, or have to rely on our own research, I will play a role.

Finally, personal ability is limited, if there is a mistake, please contact me!


Wechat search a search “yes training level guide”, reply 123,20 W word algorithm brush problem guide waiting for you to receive

I’m yes, from a little bit to a billion bits. See you next time.