The architecture overview
RocketMQ is architecturally divided into four main parts, as shown in the figure above:
-
Producer: a role that publishes messages and supports distributed cluster deployment. The Producer uses MQ’s load balancing module to select the corresponding Broker cluster queue for message delivery, which supports fast failure and low latency.
-
Consumer: message consuming role, which supports distributed cluster deployment. Messages can be consumed in push and pull modes. At the same time, it also supports the consumption of cluster mode and broadcast mode. It provides real-time message subscription mechanism, which can meet the needs of most users.
-
NameServer: NameServer is a very simple Topic routing registry that acts like ZooKeeper in Dubbo and supports dynamic registration and discovery of brokers. There are two main functions: Broker management. NameServer accepts the registration information of Broker clusters and stores it as the basic data of routing information. It then provides a heartbeat detection mechanism to check whether the Broker is still alive. Routing information management, where each NameServer holds the entire routing information about the Broker cluster and the queue information for client queries. The Producer and Conumser can then use NameServer to know the routing information of the entire Broker cluster and deliver and consume messages. NameServer is also typically deployed in clusters, where instances do not communicate with each other. The Broker registers its routing information with each NameServer, so each NameServer instance has a complete routing information stored on it. When a NameServer goes offline for some reason, the Broker can still synchronize its routing information with other Nameservers. Producers and consumers can still dynamically perceive the routing information of the Broker.
-
BrokerServer: The Broker is primarily responsible for storing, Posting, and querying messages and for ensuring high availability of the service
Topic, Borker, and Queue
The relationship among the three is shown below:
In the figure, there are two brokers and two topics, each with four queues. When a producer sends a message, it sends it to a specific queue, and when a consumer gets the message, it gets it from the queue.
RocketMq topics can be created manually on the console or automatically (autoCreateTopicEnable=true is enabled). It is recommended to disable automatic creation in production environments.
Message processing flow
RocketMq message processing flows as follows:
- Message reception: Message reception refers to receiving
producer
The message processing class isSendMessageProcessor
, writes the message tocommigLog
After the file, the receiving process is completed; - Message distribution:
broker
The class that handles message distribution isReputMessageService
, it will start a thread that will continuouslycommitLong
To the correspondingconsumerQueue
, this step writes two files:consumerQueue
withindexFile
, the message distribution process is finished. - Message delivery: Message delivery refers to sending a message to
consumer
The process,consumer
Will initiate a request for the message,broker
Upon receipt of the request, callPullMessageProcessor
Class processing fromconsumerQueue
The file gets the message and returns toconsumer
After that, the delivery process is finished.
Specific analysis can refer to
- Rocketmq source code analysis of the Broker message receiving process
- Rocketmq source code analysis of the Broker message distribution process
- Rocketmq source code analysis of the Broker message delivery process
Three high guarantee
High concurrency
-
Netty high-performance transmission: Producer, Broker, and Comsumer use NetTY to communicate with each other. The custom worker thread pool is used for business processing, and the final processing operation is dropped to the worker thread pool in the NettyServerHandler.
-
Spinlocks reduce context switching: RocketMQ CommitLog uses a PutMessageLock to avoid concurrent writes. PutMessageLock has two implementations: PutMessageReentrantLock and PutMessageSpinLock. PutMessageReentrantLock is a Java-based synchronous wait-and-wake mechanism. PutMessageSpinLock PutMessageSpinLock PutMessageSpinLock PutMessageSpinLock PutMessageSpinLock PutMessageSpinLock PutMessageSpinLock PutMessageSpinLock PutMessageSpinLock RocketMQ uses PutMessageSpinLock by default to improve lock unlocking efficiency during high-concurrency writes and to reduce thread context switches.
-
Sequential file writing: The commitLog is written in sequence, which provides much higher performance than random writing. The commitLog is not written directly to the disk, but PageCache is written first, and finally the operating system asynchronously brushes the PageCache data to the disk
-
MappedFile preheating and zero-copy mechanism: The Linux system does not write data directly to the disk, but writes data to the disk’s corresponding PageCache and marks the page as a dirty page. Flush the dirty pages to the disk when the number of dirty pages reaches a certain level or a certain period of time. During this period, if the system is powered off, the dirty page data may be lost.
-
Multi-broker multi-Queue mode: The multi-broker multi-queue mode is used to improve the parallel processing capability of messages.
High availability
The high availability of RocketMq is provided by DLedger. The high availability architecture of the entire broker is as follows:
DLedger is a multi-node cluster that internally elects a leader node using raft algorithm, which fails over nodes in the broker
- more
NameServer
avoidNameServer
Single point of failure - multiple
broker
Cluster, when abroker
Cluster failure when otherbroker
The cluster also works - each
broker
The cluster has onemaster
Nodes and multipleslave
Node, whenmaster
The node is faulty.DLedger
When a fault is detected, one of them is removedslave
Node switch tomaster
Node to ensure the normal operation of the cluster
High extension
The broker is not coupled to producer/consumer. If you need to add a cluster of brokers (1 master, multiple slave), you only need to configure the address of the nameServer and add it. In theory, the broker can be expanded at will.
Once brokers are added to a cluster, the newly added cluster is registered with the nameServer, where producers/consumers can discover the cluster of brokers.
Message reliability
RocketMq’s message reliability is divided into the following phases:
- Reliability during message sending
- Message storage phase reliability
- Reliability in the message consumption phase
Below we describe how reliability is achieved in these three stages.
Reliability during message sending
The reliability of the message sending phase is handled by producer. Rocketmq mainly supports three message sending modes
- Synchronization: After a message is issued, the thread blocks until a result is returned
- Asynchronous: When sending messages, you can set the monitoring of sending results. After sending messages, the thread will not block and the sending results will be monitored after sending messages
- Unidirectional: After the message is sent, the thread will not be blocked, the result will not be returned, and the monitoring of the sending result cannot be set. That is, the thread can send the message without caring about the sending result or whether the message is sent successfully
In terms of message reliability,
Synchronous sending: When a message fails to be sent, the system tries the message again (one time plus two times of retry by default). In addition, the system can obtain the sending result after the message is sent. Therefore, the system can automatically process the failure result and send the message asynchronously: When a message fails to be sent, there is an internal retry (3 times in total). In addition, you can set the listening rule of the message when sending a message. When the message fails to be sent, you can independently process the failed message in the listening code and send it in one direction: In this mode, when a message fails to be sent, there is no retry (only a WARN log is generated), no result is returned, and no result is listened
- Rocketmq source code analysis: Producer Message sending process
Message storage phase reliability
The message storage phase is guaranteed by the broker.
In a single-master broker, messages are written to memory’s PageCache and then flushed in two ways:
SYNC_FLUSH
(Synchronous flush) : After a message is written to the PageCache of the memory, it immediately notifies the flush thread to flush the disk and waits for the flush to complete. After the flush thread completes execution, it wakes up the waiting thread and returns to the status that the message is written successfully. This method can ensure absolute data security, but the throughput is not large.ASYNC_FLUSH
(Asynchronous disk flushing (default)) : When a message is written to the PageCache in the memory, the client immediately returns a success message. When the number of messages in the PageCache reaches a certain amount, a write operation is triggered, or the PageCache message is written to the disk by a timing policy. This mode has large throughput and high performance, but data in PageCache may be lost and data security cannot be guaranteed.
Summary: Synchronous disk brushing, without losing data but affecting performance; Asynchronous flush provides high performance, but if a power failure occurs before a message is flushed, the message will be lost.
In a master multi-slave broker architecture, the master node has two roles to choose from:
SYNC_MASTER
(Synchronize host) : Synchronizes to the host immediately after receiving a messageslave
Node, whenslave
The system returns a success message only after nodes are successfully synchronized, which ensures high reliabilityASYNC_MASTER
(Asynchronous host) : When a message is received, it is not immediately synchronized to the hostslave
Node, the synchronization operation is carried out by the background thread. If the synchronization operation has not been carried out when the master/slave switchover occurs, data may be lost
Summary: The reliability of the slave host is high. Data is not lost during the master/slave switchover. However, the slave node returns only after the synchronization succeeds. The performance of an asynchronous host is high. However, if a master/slave switchover occurs before synchronization, data on the master may not be synchronized to the slave, causing message loss
Summary: To ensure message reliability, you can use SYNC_FLUSH to flush data on a single master node. In a multi-slave broker architecture, the master node can use ASYNC_FLUSH, and the master node can use SYNC_MASTER. In actual situations, you can choose a proper mode based on specific scenarios.
Reliability in the message consumption phase
The reliability of the message consumption phase is guaranteed by ComSumer. When consuming a message, two results can be returned:
CONSUME_SUCCESS
: Consumption successRECONSUME_LATER
: Fail to consume, and then consume again later
After the Consumer fails to consume the message, RocketMq provides a retry mechanism to consume the message again. Consumer Consumer message failure can be considered in the following situations:
- Due to the reasons of the message itself, such as deserialization failure, message data itself cannot be processed (such as phone charge recharge, the current message mobile phone number is cancelled, cannot be recharged). This error usually requires that this message be skipped and consumed before another message is consumed. 99% of failed messages are unsuccessful even if they are consumed immediately, so it is best to provide a timed retry mechanism that retries after 10 seconds.
- The dependent downstream application service is unavailable, for example, the DB connection is unavailable, and the external system network is unreachable. With this error, even if the current failed message is skipped, consuming other messages will also result in an error. In this case, it is recommended to apply a sleep 30s before consuming the next message to reduce the stress on the Broker to retry the message.
RocketMQ sets up a RETRY queue for each consumerGroup with the Topic name %RETRY%+consumerGroup (note that the RETRY queue for this Topic is for consumer groups, not for each Topic). Used to temporarily hold messages that the Consumer could not consume due to various exceptions. Given that exceptions take some time to recover, multiple retry levels are set for the retry queue, with each retry level having a corresponding repost delay. The more retries, the greater the post delay. RocketMQ saves RETRY messages to the Delay queue of Topic SCHEDULE_TOPIC_XXXX. Background scheduled tasks are delayed and then saved to the RETRY queue of %RETRY%+consumerGroup.
Load balancing
Producer Load balancing
When sending a message, the Producer will first find the TopicPublishInfo specified by Topic. After obtaining the TopicPublishInfo route information, The RocketMQ client sends messages by default using the selectOneMessageQueue() method, which selects a messageQueueList from TopicPublishInfo. The specific fault-tolerant policies are defined in the MQFaultStrategy class.
There is a sendLatencyFaultEnable switch variable that, if enabled, filters out brokers that are not available based on random incremental modulo. A “latencyFaultTolerance” is a fixed amount of time to back off from previous failures. For example, if the latency of the last request exceeds 550Lms, back away from 3000Lms; More than 1000L, retreat 60000L; If it is off, a queue (MessageQueue) is selected to send messages by means of random incremental modulo. LatencyFaultTolerance is the key to achieve high availability of message sending.
Consumer load balancing
In RocketMQ, the two Consumer consumption modes (Push/Pull) are based on the Pull mode for retrieving messages. The Push mode is an encapsulation of the Pull mode. Essentially, the Pull thread pulls a batch of messages from the server and submits them to the message consuming thread pool. And “non-stop” continue to try to pull messages to the server again. If the message is not pulled, the pull is delayed and continues. In both Push/Pull consumption modes, the Consumer needs to know which message queue – queue – to fetch messages from the Broker. Therefore, it is necessary to do load balancing on the Consumer side, that is, to allocate multiple MessageQueue on the Broker side to which consumers in the same ConsumerGroup consume.
- The Consumer sends heartbeat packets
After Consumer is started, it continuously sends heartbeat packets (containing information such as message consumption group name, subscription collection, message communication mode, and value of client ID) to all Broker instances in the RocketMQ cluster via a scheduled task. The Broker receives a Consumer heartbeat message and maintains it in the Local buffer variable “consumerTable” of ConsumerManager. The encapsulated client network channel information is also stored in the local buffer variable “channelInfoTable”. Provide metadata information for Consumer side load balancing.
- The Consumer side implements the core load balancing class – RebalanceImpl
Starting the MQClientInstance instance in the Consumer instance startup process completes the start of the load balancing service thread – RebalanceService (every 20 seconds). A review of the source code shows that the Run () method of the RebalanceService thread ends up calling the rebalanceByTopic() method of the RebalanceImpl class, which is the core of the Consumer side load balancing. Here, the rebalanceByTopic() method does different logic depending on whether the consumer communication type is “broadcast mode” or “cluster mode.” Here are the main processing flows in cluster mode:
(1) Obtain the message consumption queue set (mqSet) under this Topic from topicSubscribeInfoTable, the local cache variable of rebalanceImpl instance;
(2) . According to the topic and consumerGroup calls for a parameter mQClientFactory findConsumerIdList () method to the Broker send to get the Consumer group of consumers under the Id list of RPC communication request (Broker end based on the Consumer front end report Heartbeat packet data while the consumerTable builds a response back, business request code: GET_CONSUMER_LIST_BY_GROUP);
(3) First sort the message consumption queue and consumer Id under Topic, and then use the message queue allocation strategy algorithm (default: message queue average allocation algorithm) to calculate the message queue to be pulled. The average allocation algorithm here, similar to the paging algorithm, sorts all MessageQueue in order similar to the record, sorts all consumers in order similar to the page number, and calculates the average size that each page needs to contain and the range of records on each page. Finally, the entire range is traversed to calculate which records should be allocated to the current Consumer (in this case, MessageQueue).
(4) then call updateProcessQueueTableInRebalance () method, the specific approach is to first set will be assigned to the message queue (mqSet) and processQueueTable do a filtering ratio.
-
The processQueueTable annotation in red in the figure above indicates that it is not included in the allocated message queue set mqSet. Set the queue Dropped attribute is true, then look at whether the queue can remove processQueueTable cache variable, specific executive removeUnnecessaryMessageQueue here () method, Check every 1s to see if the lock on the current consumption-processing queue can be obtained. Return true if the lock is obtained. Return false if the lock on the current consumer processing queue is still not available after 1s of waiting. If true is returned, the corresponding Entry is removed from the processQueueTable cache variable;
-
The green part of processQueueTable in the figure above represents the intersection with the allocated message queue set mqSet. To judge whether the ProcessQueue has expired, no need to worry about in Pull mode, if it is a Push model, set up the Dropped attribute is true, and invoke removeUnnecessaryMessageQueue () method, like the above try to remove the Entry;
In the end, Create a ProcessQueue object for each MessageQueue in the filtered MessageQueue set (mqSet) and store it in the processQueueTable queue of the RebalanceImpl (where the computePul of the RebalanceImpl instance is called) The lFromWhere(MessageQueue MQ) method gets the next progress consumption value of the MessageQueue object, offset, and fills it into the pullRequest object properties to be created next. Add the pullRequest object to the pullRequestList and execute the dispatchPullRequest() method. The PullRequest object of the Pull message is put into the pullRequestQueue of the PullMessageService thread. After the thread is pulled out, it sends a Pull request to the Broker. The dispatchPullRequest() method of RebalancePushImpl and RebalancePullImpl is null. This answers the last thought question in the last passage.
Message consumption queue load balancing among different consumers in the same consumer group, its core design concept is that a message consumption queue can only be consumed by one consumer in the same consumer group at the same time, and a message consumer can consume multiple message queues at the same time.
Broadcast mode and cluster mode
Broadcasting mode
In broadcast mode, the same message is consumed by each consumer in the same consumerGroup
As shown in the figure, there are three MessageQueue and one consumerGroup under the same topic, and there are two consumers in the group. In broadcast mode, Consumer1 and Consumer2 both consume MessageQueue1, MessageQueue2, and MessageQueue3 messages.
Cluster pattern
In cluster mode, the same message is only consumed by one consumer in the same consumerGroup
As shown in the figure, there are three MessageQueue under the same topic, and two consumerGroups consume the messages on these three MessageQueue. Consumer1 in consumerGroup1 sends messages to MessageQueue1 and MessageQueue2, and Consumer2 in consumerGroup1 sends messages to MessageQueue3. Consumer1 in consumerGroup2 sends messages to MessageQueue1 and MessageQueue2, and Consumer2 in consumerGroup2 sends messages to MessageQueue3.
The order message
Message ordering refers to the ability to consume a class of messages in the order in which they are sent. For example, an order produces three messages: order creation, order payment, and order completion. Consumption must be in this order in order to make sense, but at the same time orders can be consumed in parallel.
RocketMQ ensures strict message ordering.
Sequential messages are divided into global sequential messages and partitioned sequential messages. Global sequential messages mean that all messages under a certain Topic must be in order. Partial sequential messages only need to ensure that each group of messages is consumed sequentially.
- Global ordering: For a given Topic, all messages are published and consumed in a strict first-in, first-out (FIFO) order. Application scenario: A scenario with low performance requirements and in which all messages are published and consumed strictly in accordance with the FIFO principle
- Partitioning order: For a given Topic, all messages are partitioned according to sharding keys. Messages within the same partition are published and consumed in a strict FIFO order. Sharding key is a key field used to distinguish different partitions in sequential messages, which is completely different from the key of ordinary messages. Application scenario: High performance requirements, sharding key is used as the partitioning field, and messages are distributed and consumed in the same block in strict accordance with the FIFO principle.
Transaction message
The figure above illustrates the general scheme of transaction message, which is divided into two processes: normal transaction message sending and submission, transaction message compensation process.
- Transaction message sending and submission:
- Send half messages.
- The server responds to the message by writing the result.
- A local transaction is executed based on the sent result (if the write fails, the half message is not visible to the business and the local logic is not executed).
- Perform Commit or Rollback based on the local transaction state (Commit generates the message index and makes the message visible to consumers)
- Compensation process:
- Initiate a “back check” from the server for transaction messages (pending status messages) that are not Commit/Rollback
- After receiving the query message, Producer checks the status of the local transaction corresponding to the query message
- Recommit or Rollback based on the local transaction status
The compensation phase is used to resolve the timeout or failure of message Commit or Rollback.
Delay message
Rocketmq implements delayed messaging with a default of 18 latency levels, which correspond to the following latency times:
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
To send messages, set the delayLevel: msg.setdelaylevel (level). Level has the following three situations:
- If level == 0, the message is non-delayed
- 1<=level<=maxLevel, the message is delayed for a specific time, for example, level==1, the delay is 1s
- If level > maxLevel, level== maxLevel, for example, level==20, the delay is 2h
The scheduled messages are temporarily stored in a topic named SCHEDULE_TOPIC_XXXX and stored in a queue according to delayTimeLevel. QueueId = delayTimeLevel -1, that is, a queue only stores messages with the same delay. Ensure that messages with the same delivery delay can be consumed sequentially. The broker consumes SCHEDULE_TOPIC_XXXX schedulingly, writing the message to the actual topic.
The message filter
Different from other MQ middleware, the RocketMQ distributed message queue filters messages when the Consumer subscribs to them. RocketMQ does this because the Producer side writes messages and the Consumer side subscribersmessages are stored separately. The Consumer side subscribersmessages need to get an index from the ConsumeQueue, the logical queue that consumes the messages. The actual message entity content is then read from the CommitLog, so there’s no way around its storage structure after all. The ConsumeQueue contains 8 bytes of Message Tag hash values. Tag-based Message filtering is formally based on this field value.
The following two filtering modes are supported
-
Tag filtering approach: the Consumer end when the subscribe message in addition to the specified Topic can also specify the Tag, if there are multiple Tag a message, can use the | | space. The Consumer constructs the subscription request as a SubscriptionData and sends a Pull message request to the Broker. Before the Broker reads data from RocketMQ’s file storage layer, Store, it builds a MessageFilter with the data and passes it to the Store. After the Store reads a record from ConsumeQueue, it will use the tag hash value recorded in the message to filter it. Since the server can only judge according to hashcode, the original tag string cannot be filtered accurately, so after the message consumer pulls the message, The original tag strings of the messages are also compared, and if they are different, the messages are discarded without message consumption.
-
The rocketMQ-filter module is responsible for the construction and execution of SQL expression. The rocketMQ-filter module is responsible for the execution of SQL expression. It is inefficient to execute SQL expressions every time you filter, so RocketMQ uses BloomFilter to avoid executing SQL expressions every time. The expression context of SQL92 is the property of the message.
Reference:
- Github.com/apache/rock… .
- Github.com/apache/rock…
Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.
This article was first published in the wechat public number Java technology exploration, if you like this article, welcome to pay attention to the public number, let us explore together in the world of technology!