In this paper, the content

This section describes the design principles of RocketMQ key mechanisms, including message storage, message communication, message filtering, load balancing, and transaction messages.

This article refers to the official Rocket MQ documentation

Architecture design

RocketMQ is architecturally divided into four main parts, as shown in the figure above:

  • Producer: a role that publishes messages and supports distributed cluster deployment. The Producer uses MQ’s load balancing module to select the corresponding Broker cluster queue for message delivery, which supports fast failure and low latency.
  • Consumer: message consuming role, which supports distributed cluster deployment. Messages can be consumed in push and pull modes. At the same time, it also supports the consumption of cluster mode and broadcast mode. It provides real-time message subscription mechanism, which can meet the needs of most users.
  • NameServer: NameServer is a very simple Topic routing registry that acts like ZooKeeper in Dubbo and supports dynamic registration and discovery of brokers. There are two main functions: Broker management. NameServer accepts the registration information of Broker clusters and stores it as the basic data of routing information. It then provides a heartbeat detection mechanism to check whether the Broker is still alive. Routing information management, where each NameServer holds the entire routing information about the Broker cluster and the queue information for client queries. The Producer and Conumser can then use NameServer to know the routing information of the entire Broker cluster and deliver and consume messages. NameServer is also typically deployed in clusters, where instances do not communicate with each other. The Broker registers its routing information with each NameServer, so each NameServer instance has a complete routing information stored on it. When a NameServer goes offline for some reason, the Broker can still synchronize its routing information with other Nameservers. Producers and consumers can still dynamically perceive the routing information of the Broker.
  • BrokerServer: The Broker is responsible for storing, Posting, and querying messages and for ensuring high availability of services. To enable these functions, the Broker contains several important sub-modules.
  1. Remoting Module: The entity of the entire Broker that handles requests from clients.
  2. Client Manager: Manages clients (Producer/Consumer) and maintains Topic subscription information of consumers
  3. Store Service: Provides convenient and simple APIS for storing messages to physical disks and querying messages.
  4. HA Service: A high availability Service that provides data synchronization between Master and Slave brokers.
  5. Index Service: Indexes messages delivered to the Broker based on a specific Message key to provide quick lookup of messages.

1. Message storage

Message storage is the most complex and important part of RocketMQ. This section describes the overall message storage architecture of RocketMQ, the PageCache and Mmap memory mapping, and the two different disk flushing methods of RocketMQ.

1.1 CommitLog

The file that actually stores the message.

The message body and metadata storage body store the message body content written by the Producer end, and the message content is not fixed length. The default size of a file is 1 GB, the file name length is 20 bits, and the remaining offset is the start offset. For example, 00000000000000000000 indicates the first file. The start offset is 0, and the file size is 1 GB =1073741824. When the first file is full, the second file is 00000000001073741824, and the start offset is 1073741824, and so on. Messages are written primarily sequentially to the log file, and when the file is full, to the next file;

1.2 ConsumeQueue

The index file of messages for each queue in topic is equivalent to the CommitLog index file. The index is the queue in topic

Message consumption queues are introduced primarily to improve message consumption performance. Since RocketMQ is a topic-based subscription model, message consumption is subject specific, and it is inefficient to traverse commitlog files to retrieve messages by topic. The Consumer can then look for messages to consume based on the ConsumeQueue. ConsumeQueue serves as the index of the consuming messages, storing the CommitLog’s starting physical offset, message size, and HashCode of the message Tag. Consumequeue files can be regarded as commitlog files based on topics. Therefore, the organization mode of the ConsumeQueue folder is as follows: Topic /queue/file Three-layer organization structure, the specific storage path is: $HOME/store/consumequeue / {topic} / {queueId} / {fileName}. Similarly, the consumeQueue file adopts a fixed length design, with each entry having a total of 20 bytes, including 8-byte commitlog physical offset, 4-byte message length, and 8-byte tag hashcode. A single file consists of 30W entries, and each entry can be randomly accessed like an array. Each ConsumeQueue file is about 5.72M in size;

1.3 IndexFile

Also a CommitLog index file, the index is key or time

IndexFile provides a way to query messages by key or time interval. The Index file is stored in: HOME\store\indexHOME \ Store \indexHOME\ Store \index{fileName} fileName is named after the timestamp when it was created. The size of a single IndexFile is fixed at about 400M. An IndexFile can hold 2000W indexes. The underlying storage of IndexFile is designed to implement HashMap structure in the file system, so the underlying implementation of rocketmq IndexFile is hash index.

1.4 Messages are flushed

Rocket provides both synchronous and asynchronous swipes

Synchronous flush: Persisting a message to disk returns ACK

Asynchronous flush: Writing a message to pageCache returns an ACK

1.5 page caching

The PageCache is the OS’s cache of files to speed up reading and writing to files. Generally speaking, the sequential read and write speed of the program is almost close to the read and write speed of the memory. The main reason is that the OS uses the PageCache mechanism to optimize the performance of the read and write access operation, and uses part of the memory for PageCache. The OS first writes data to the Cache and then asynchronously flusits the data from the Cache to physical disks by the PDFlush kernel thread. If PageCache is not matched during a file reading, the OS prereads data files of other adjacent blocks in sequence when the file is accessed from the physical disk.

1.6 Memory Mapping

RocketMQ uses MappedByteBuffer to read and write files. The FileChannel model in NIO is used to map physical files on disk directly to user-mode memory addresses (this Mmap approach reduces the performance overhead of traditional IO copying disk file data back and forth between the buffer of the operating system kernel address space and the buffer of the user application address space). Converting operations on files to direct operations on memory addresses greatly improves the efficiency of reading and writing files. (Because of the memory-mapping mechanism, RocketMQ uses a fixed-length structure for file storage, which allows the entire file to be mapped to memory at once.)

2. Message communication

The RocketMQ message queue cluster mainly includes NameServer, Broker(Master/Slave), Producer, and Consumer. the basic communication flow is as follows:

(1) Once the Broker is started, it needs to register itself with NameServer. The Topic routing information is reported to the NameServer every 30 seconds.

(2) When sending messages as a client, Producer needs to obtain routing information from the TopicPublishInfoTable cached locally according to the Topic of the messages. If no, the updated routing information is pulled from NameServer again, and Producer pulls routing information from NameServer every 30 seconds by default.

(3) The message Producer selects a MessageQueue to send messages according to the routing information obtained in 2. The Broker receives and stores messages as the receiver of messages.

(4) Message Consumer Selects one or several message queues to pull messages and consume them according to the routing information obtained in 2) and completes the load balancing of the client.

As you can see from 1) to 3) above, communication occurs between message producers, brokers and NameServer (only part of MQ communication is described here), so designing a good network communication module in MQ is crucial to determining the overall message transmission capability and ultimate performance of the RocketMQ cluster.

The RocketMQ-Remoting module is the module responsible for network communication in the RocketMQ message queue. It is relied upon and referenced by almost every other module that requires network communication, such as RocketMq-Client, RocketMq-Broker, and RocketMq-Namesrv. To enable efficient data requests and receipt between clients and servers, the RocketMQ message queue customizes the communication protocol and extends the communication module on top of Netty.

3. Message filtering

Different from other MQ middleware, the RocketMQ distributed message queue filters messages when the Consumer subscribs to them. RocketMQ does this because the Producer side writes messages and the Consumer side subscribersmessages are stored separately. The Consumer side subscribersmessages need to get an index from the ConsumeQueue, the logical queue that consumes the messages. The actual message entity content is then read from the CommitLog, so there’s no way around its storage structure after all. The ConsumeQueue contains 8 bytes of Message Tag hash values. Tag-based Message filtering is formally based on this field value.

Main approach to support the following two kinds of filter (1) the Tag filtering approach: the Consumer end when the subscribe message in addition to the specified Topic can also specify the Tag, if there are multiple Tag a message, can use the | | space. The Consumer constructs the subscription request as a SubscriptionData and sends a Pull message request to the Broker. Before the Broker reads data from RocketMQ’s file storage layer, Store, it builds a MessageFilter with the data and passes it to the Store. After the Store reads a record from ConsumeQueue, it will use the tag hash value recorded in the message to filter it. Since the server can only judge according to hashcode, the original tag string cannot be filtered accurately, so after the message consumer pulls the message, The original tag strings of the messages are also compared, and if they are different, the messages are discarded without message consumption.

The rocketMQ-filter module is responsible for the construction and execution of the real SQL expression. The rocketMQ-filter module is responsible for the implementation of the real SQL expression. It is inefficient to execute SQL expressions every time you filter, so RocketMQ uses BloomFilter to avoid executing SQL expressions every time. The expression context of SQL92 is the property of the message.

4. Load balancing

The load balancing in RocketMQ is done on the Client side. Specifically, it can be divided into the load balancing for sending messages from the Producer side and the load balancing for subscribing messages from the Consumer side.

4.1 Load balancing of Producer

When sending a message, the Producer will first find the TopicPublishInfo specified by Topic. After obtaining the TopicPublishInfo route information, The RocketMQ client sends messages by default using the selectOneMessageQueue() method, which selects a messageQueueList from TopicPublishInfo. The specific fault-tolerant policies are defined in the MQFaultStrategy class. There is a sendLatencyFaultEnable switch variable that, if enabled, filters out brokers that are not available based on random incremental modulo. A “latencyFaultTolerance” is a fixed amount of time to back off from previous failures. For example, if the latency of the last request exceeds 550Lms, back away from 3000Lms; More than 1000L, retreat 60000L; If it is off, a queue (MessageQueue) is selected to send messages by means of random incremental modulo. LatencyFaultTolerance is the key to achieve high availability of message sending.

4.2 Consumer load balancing

In RocketMQ, the two Consumer consumption modes (Push/Pull) are based on the Pull mode for retrieving messages. The Push mode is an encapsulation of the Pull mode. Essentially, the Pull thread pulls a batch of messages from the server and submits them to the message consuming thread pool. And “non-stop” continue to try to pull messages to the server again. If the message is not pulled, the pull is delayed and continues. In both Push/Pull consumption modes, the Consumer needs to know which message queue – queue – to fetch messages from the Broker. Therefore, it is necessary to do load balancing on the Consumer side, that is, to allocate multiple MessageQueue on the Broker side to which consumers in the same ConsumerGroup consume.

1. The Consumer sends heartbeat packets

After Consumer is started, it continuously sends heartbeat packets (containing information such as message consumption group name, subscription collection, message communication mode, and value of client ID) to all Broker instances in the RocketMQ cluster via a scheduled task. The Broker receives a Consumer heartbeat message and maintains it in the Local buffer variable “consumerTable” of ConsumerManager. The encapsulated client network channel information is also stored in the local buffer variable “channelInfoTable”. Provide metadata information for Consumer side load balancing.

2. RebalanceImpl, the core class for load balancing on the Consumer side

Starting the MQClientInstance instance in the Consumer instance startup process completes the start of the load balancing service thread – RebalanceService (every 20 seconds). A review of the source code shows that the Run () method of the RebalanceService thread ends up calling the rebalanceByTopic() method of the RebalanceImpl class, which is the core of the Consumer side load balancing. Here, the rebalanceByTopic() method does different logic depending on whether the consumer communication type is “broadcast mode” or “cluster mode.” Here are the main processing flows in cluster mode:

(1) Obtain the message consumption queue set (mqSet) under this Topic from topicSubscribeInfoTable, the local cache variable of rebalanceImpl instance;

(2) . According to the topic and consumerGroup calls for a parameter mQClientFactory findConsumerIdList () method to the Broker send to get the Consumer group of consumers under the Id list of RPC communication request (Broker end based on the Consumer front end report Heartbeat packet data while the consumerTable builds a response back, business request code: GET_CONSUMER_LIST_BY_GROUP);

(3) First sort the message consumption queue and consumer Id under Topic, and then use the message queue allocation strategy algorithm (default: message queue average allocation algorithm) to calculate the message queue to be pulled. The average allocation algorithm here, similar to the paging algorithm, sorts all MessageQueue in order similar to the record, sorts all consumers in order similar to the page number, and calculates the average size that each page needs to contain and the range of records on each page. Finally, the entire range is traversed to calculate which records should be allocated to the current Consumer (in this case, MessageQueue).

(4) then call updateProcessQueueTableInRebalance () method, the specific approach is to first set will be assigned to the message queue (mqSet) and processQueueTable do a filtering ratio.

  • The processQueueTable annotation in red in the figure above indicates that it is not included in the allocated message queue set mqSet. Set the queue Dropped attribute is true, then look at whether the queue can remove processQueueTable cache variable, specific executive removeUnnecessaryMessageQueue here () method, Check every 1s to see if the lock on the current consumption-processing queue can be obtained. Return true if the lock is obtained. Return false if the lock on the current consumer processing queue is still not available after 1s of waiting. If true is returned, the corresponding Entry is removed from the processQueueTable cache variable;
  • The green part of processQueueTable in the figure above represents the intersection with the allocated message queue set mqSet. To judge whether the ProcessQueue has expired, no need to worry about in Pull mode, if it is a Push model, set up the Dropped attribute is true, and invoke removeUnnecessaryMessageQueue () method, like the above try to remove the Entry;

In the end, Create a ProcessQueue object for each MessageQueue in the filtered MessageQueue set (mqSet) and store it in the processQueueTable queue of the RebalanceImpl (where the computePul of the RebalanceImpl instance is called) The lFromWhere(MessageQueue MQ) method gets the next progress consumption value of the MessageQueue object, offset, and fills it into the pullRequest object properties to be created next. Add the pullRequest object to the pullRequestList and execute the dispatchPullRequest() method. The PullRequest object of the Pull message is put into the pullRequestQueue of the PullMessageService thread. After the thread is pulled out, it sends a Pull request to the Broker. The dispatchPullRequest() method of RebalancePushImpl and RebalancePullImpl is null. This answers the last thought question in the last passage.

Message consumption queue load balancing among different consumers in the same consumer group, its core design concept is that a message consumption queue can only be consumed by one consumer in the same consumer group at the same time, and a message consumer can consume multiple message queues at the same time.

5. Transaction messages

Apache RocketMQ already supports distributed transaction messages in version 4.3.0. RocketMQ uses the 2PC approach to commit transaction messages and adds compensation logic to handle two-phase timeout or failure messages, as shown in the figure below.

5.1 RocketMQ Transaction Message Flow Overview

The figure above illustrates the general scheme of transaction message, which is divided into two processes: normal transaction message sending and submission, transaction message compensation process.

1. Transaction message sending and submission:

(1) Send half messages.

(2) The server responds to the message writing result.

(3) Execute a local transaction based on the sent result (if the write fails, the half message is not visible to the business and the local logic is not executed).

(4) Perform Commit or Rollback according to the local transaction status (Commit generates the message index, and the message is visible to consumers)

2. Compensation process:

Rollback Rollback Rollback Rollback Rollback Rollback Rollback Rollback Rollback Rollback Rollback Rollback Rollback

(2) After receiving the backcheck message, Producer checks the status of local transactions corresponding to the backcheck message

(3) Recommit or Rollback based on the local transaction status

The compensation phase is used to resolve the timeout or failure of message Commit or Rollback.

5.2 RocketMQ transaction message design

1. Transaction messages are not visible to users during a phase

In the main flow of RocketMQ transaction messages, one phase of the message is not visible to the user. Among them, the biggest characteristic of transaction message compared with ordinary message is that the message sent at one stage is invisible to users. So, how do you write messages but not be visible to the user? RocketMQ transaction messages do this: if the message is half, back up the topic of the original message and the message consumption queue, and then change the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group is not subscribed to the Topic, the consumer cannot consume messages of type half. RocketMQ then starts a scheduled task to pull messages from Topic RMQ_SYS_TRANS_HALF_TOPIC for consumption, obtain a service provider based on the producer group and send back a transaction status request. The message is committed or rolled back based on the transaction state.

In RocketMQ, messages are stored on the server as follows. Each message has a corresponding index. The Consumer reads the content of the message entity through the secondary index ConsumeQueue as follows:

The specific implementation strategy for RocketMQ is: If a transaction message is written, the attributes of the message such as Topic and Queue are replaced, and the information of the original Topic and Queue is stored in the attributes of the message. Because the message Topic is replaced, the message will not be forwarded to the message consumption Queue of the original Topic, and consumers cannot perceive the existence of the message and will not consume it. Changing the message theme is a common RocketMQ “trick,” but recall the implementation mechanism for delayed messages.

2.Com MIT and Rollback operations and the introduction of Op messages

After the first phase writes a message that is not visible to the user, the second phase makes the message visible to the user if it is a Commit operation. In the case of Rollback, one phase of the message needs to be undone. Let’s start with Rollback. In the case of Rollback, the messages themselves are invisible to the user, and there is no need to actually undo the messages (in fact, RocketMQ cannot actually delete a message because the files are written sequentially). But unlike this message, which is in a Pending state, an operation is required to identify the message’s final state. The RocketMQ transaction messaging scheme introduces the concept of Op messages, which identify the status (Commit or Rollback) of transaction messages. If a transaction message does not have a corresponding Op message, the status of the transaction is not determined (perhaps phase 2 failed). After the Op message is introduced, the transaction message either Commit or Rollback records an Op operation. Commit relative to Rollback simply creates an index of the Half message before writing the Op message.

3. Storage and correspondence of Op messages

RocketMQ Op messages written to the global through the source code of the method in a specific Topic – TransactionalMessageUtil. BuildOpTopic (); This Topic is an internal Topic (like the Half message Topic) and will not be consumed by users. The content of the Op message is Offset from the store of the corresponding Half message, so that the Op message can be indexed to the Half message for subsequent lookup operations.

4. Index building for Half messages

When performing a two-phase Commit operation, you need to build an index of the Half message. Since Half messages in phase one are written to a particular Topic, phase two builds the index by reading Half messages, replacing topics and queues with the actual target topics and queues, and then generating a message visible to the user through an ordinary message write operation. So The RocketMQ transaction message phase 2 actually takes the contents of the message stored in phase 1, restores a complete normal message at Phase 2, and then goes through the message write process.

5. How do I handle the message indicating phase 2 failure?

If a RocketMQ transaction message fails during the two-phase process, for example during a Commit operation, a network problem causes the message to fail, then a strategy is required to make the message finally Commit. RocketMQ employs a compensation mechanism called “backcheck.” The Broker checks the status of the undetermined message and sends the message to the respective Producer (the Producer of the same Group). The Producer checks the status of the local transaction based on the message, and then performs Commit or Rollback. The Broker checks back transaction messages by comparing Half messages with Op messages and pushes CheckPoint (which records that the status of those transaction messages is determined).

It is important to note that RocketMQ does not continually check the transaction status of the message. The default check is 15 times. If the transaction status is not known after 15 times of check, RocketMQ rolls back the message by default.

6 Message Query

RocketMQ supports Message query by Message Id and Message Key.

6.1 Querying Messages Based on MessageId

The MessageId in RocketMQ is 16 bytes long and contains the message storage host address (IP address and port), the message Commit Log offset. “Query messages by MessageId” in RocketMQ: The Client resolves the Broker’s address (IP address and port) and the offset address of the Commit Log from the MessageId, encapsulates it into an RPC request, and sends it through the Remoting communication layer (business request code: VIEW_MESSAGE_BY_ID). The Broker sends QueryMessageProcessor. The message is read with the commitLog offset and size commitLog to find the real record and return it as a complete message.

6.2 Querying Messages by Message Key

“Query messages by Message Key” is implemented based on RocketMQ IndexFile. RocketMQ’s index file logical structure is similar to the implementation of HashMap in the JDK. The specific structure of the index file is as follows:

IndexFile provides Message index query by Message Key. An IndexFile is stored in the following locations: HOME\store\indexHOME\ Store \indexHOME\ Store \index{fileName}, fileName is named after the timestamp when it was created, the file size is fixed, 40+500W4+2000W20= 420000040 bytes. If the UNIQ_KEY property is set in the message properties, write to the value of topic + “#” + UNIQ_KEY as the key. If the message is set to KEYS (multiple KEYS separated by Spaces), topic + “#” + KEY will also be indexed.

The index data contains Key Hash/CommitLog Offset/Timestamp NextIndex Offset these four fields, a total of 20 Byte. NextIndex offset is the slotValue read above. If there is a hash conflict, this field can be used to join all conflicting indexes in a linked list. Timestamp records the difference between storeTimestamp messages and is not an absolute time. The structure of the entire Index File is shown in the figure below. The 40-byte Header is used to store some general statistics. The 4500W Slot Table does not store the actual Index data, but the head of the one-way linked list corresponding to each Slot. 202000W is the true Index data, i.e. an Index File can hold 2000W indexes.

RocketMQ uses the QueryMessageProcessor business processor on the Broker side to query messages by Message Key. The process of reading a message is to find a record in the IndexFile with a topic and key, and read the physical content of the message from the commitLog offset file.

This article refers to the official Rocket MQ documentation