This blog mainly refers to:
Shallow In, Shallow Out -RocketMQ Obin
Apache-rocketmq Gitee RocketMQ official document
RocketMQ Combat with advanced GitChat
It has been a long time since I wrote my blog. Although I can find out countless reasons for not writing my blog, in the final analysis, IT is still a word “lazy”. Today I finally took a lazy cancer pill and decided to write a blog. What about RocketMQ, on second thought, since I’ve written over 30 blogs and haven’t done a good job of blogging about MQ. This blog is fairly basic and does not involve source code analysis, just literacy.
What is MQ for
The decoupling
I think in some ways, the service to promote the vigorous development of MQ, originally a system with N multiple modules, all modules are strongly coupled together, micro service now, a module is a system, must need interaction between system, three common techniques have been used for interaction, is a kind of RPC, one kind is HTTP, a is the MQ.
asynchronous
Originally a business is divided into N steps, step by step processing, in order to return the final result to the user, now with MQ, the most critical part of the first processing, and then send a message to MQ, directly back to the user OK, as for the following steps in the background slowly processing, it is really improve the user experience of magic.
Peak clipping
A sudden surge in requests for an interface will inevitably put a lot of pressure on the application server and database server. Now with MQ, there is no need to worry about how many requests are processed in the background.
RocketMQ profile
RocketMQ, written in Java, is Alibaba’s open-source messaging middleware that incorporates many of Kafka’s strengths. Kafka is also a popular messaging middleware, but Kafka is written in Scala, which makes it difficult for Java programmers to read the source code and do some custom development. RocketMQ is much simpler, and RocketMQ is supported by Ali. It has experienced N times of double 11 test. It is more suitable for domestic Internet companies, so there are many domestic companies using RocketMQ.
RocketMQ has four major components
Pictures fromGitee.com/mirrors/roc…
As you can see, RocketMQ has four main components:
NameServer
- Stateless service, registry, cluster deployable, but there is no data interaction between NameServer nodes.
- The Broker reports Topic routing information to all Nameservers at regular intervals. Producers and consumers randomly select a NameServer timed Topic to update routing information.
- Topic routing information adopts final consistency in the NameServer cluster.
- Ensure the AP.
Broker
- RocketMQ server, used to store and distribute messages.
- The Broker periodically reports all of its own Topic routing information to NameServer.
- The Broker has two roles: Master and Follower. The Master reads (consumes) and writes (produces) messages. If the Master is busy or unavailable, the Follower writes. BrokerId=0, which means Matser, BrokerId! =0, indicates Follower. Two points need to be noted:
First, so far BrokerId=1 followers can only undertake read operations; Second, only older Versions of RocketMQ support the auto-upgrade from followers to Master when the Master node fails.
Producer
The producer initiates routing information queries for the Topic to NameServer at regular intervals.
Consumer
Consumers, at regular intervals, initiate routing information queries for topics to NameServer.
Why didn’t the registry choose Zookeeper
In RocketMQ, Zookeeper was used as the registry, but it was changed to NameServer.
- RocketMQ is already a middleware and does not want to rely on other middleware.
- Zookeeper is heavy and has a lot of features that RocketMQ doesn’t use, so it’s better to write a lightweight registry.
- Zookeeper is CP, and once the leadership election is triggered, the registry becomes unavailable, whereas RocketMQ’s registry does not require strong consistency, as long as it guarantees final consistency.
RocketMQ message domain model
Message
- Transmitted messages.
- The message must have Topic.
- Messages can have multiple tags and keys, which can be considered additional attributes to the message.
Topic
- A collection of a class of messages.
- Each message must have a Topic.
- The first level type of the message.
Tag
- In addition to topics, a message can also have tags, which are used to break down different kinds of messages within the same Topic.
- Tag is not required.
- The second level type of message.
Group
It is divided into ProducerGroup and ConsumerGroup. We pay more attention to ConsumerGroup, which includes multiple consumers.
In the cluster consumption mode, a Consumer under one ConsumerGroup collectively consumes a Topic, and each Consumer is assigned to N queues. However, a queue can only be consumed by one Consumer, and different consumergroups can consume the same Topic. A message is consumed by all the ConsumerGroups that subscribe to the Topic.
Queue
- A Topic contains four queues by default.
- In the cluster consumption mode, consumers in the same ConsumerGroup can consume multiple queues of messages, but a Queue can only be consumed by one Consumer.
- The messages in the Queue are ordered.
- There are read queues and write queues, and generally the number of read queues is the same as the number of write queues, otherwise it’s easy to have problems.
Consumption patterns
There are two types of consumption: Clustering and Broadcasting.
Unlike other MQS, which specify cluster consumption or broadcast consumption when sending messages, RocketMQ sets up cluster consumption or broadcast consumption at the consumer side.
Clustering (Consumption in clusters)
By default, the cluster consumption mode is used. In this mode, all the consumers in the ConsumerGroup collectively consume a Topic message, and each Consumer is responsible for consuming N queues of messages (N can also be 1 or even 0, which is not allocated to the queue). But a queue will only be consumed by one Consumer. If a Consumer fails, other consumers under the ConsumerGroup take over from the failed Consumer.
In cluster consumption mode, the consumption progress is maintained on the Broker side and stored in${ROCKET_HOME}/store/config/ consumerOffset.json
, as shown in the figure below:usetopicName@consumerGroupName
Is Key, and the consumption progress is Value, in the form of ValuequeueId:offset
, indicating that if there are multiple ConsumerGroups, the consumption progress of each ConsumerGroup is different and needs to be stored separately.
Broadcasting (broadcast consumption)
Broadcast consumption messages are sent to all consumers in the ConsumerGroup.
In broadcast consumption mode, consumption progress is maintained on the Consumer terminal.
Consumption queue load algorithm and rebalancing mechanism
Consume queue load algorithm
We know that in the cluster consumption mode, all the consumers under the ConsumerGroup jointly consume the message of a Topic, and each Consumer is responsible for consuming the message of N queues, so how is the specific allocation? This is where the consumption queue load algorithm comes in.
RocketMQ offers many load consumption queue algorithm, one of the most common is two kinds of algorithms, namely AllocateMessageQueueAveragely, AllocateMessageQueueAveragelyByCircle. So let’s look at the difference between these two algorithms.
Suppose that a Topic now has 16 queues, denoted by q0 to q15, and three consumers, denoted by C0-C2.
Queue with AllocateMessageQueueAveragely consumption load algorithm results are as follows:
- C0: q0 Q1 Q2 Q3 Q4 q5
- C1: q6, q7, q8, q9, q10
- C2: Q11, q12, q13, q14, q15
Queue with AllocateMessageQueueAveragelyByCircle consumption load algorithm results are as follows:
- C0: q0, Q3, q6, q9, q12, q15
- C1: Q1 q4 q7 q10 q13
- C2: q2, q5, q8, q11, q14
Under the ConsumerGroup, all consumers collectively consume messages for a Topic. Each Consumer is responsible for consuming messages from N queues, but a queue cannot be consumed by N consumers at the same time. What does this mean?
As you can imagine, if a Topic has only four queues and five consumers, one of them will not be allocated to any queue, so in RocketMQ, the number of queues under a Topic directly determines the maximum number of consumers, which means, You can’t increase the rate of consumption just by adding more consumers.
Weight balance
Although it is suggested that the number of queues should be fully considered when creating a Topic, the actual situation is often unsatisfactory. Even if the number of queues does not change, the number of consumers will definitely change, for example, the number of consumers will go up and down, for example, a Consumer dies, for example, a new Consumer is added. The expansion and reduction of queues, and the expansion and reduction of consumers, will result in rebalancing, which is the redistribution of consumption queues for consumers.
In RocketMQ, consumers periodically query the number of queues for topics, the number of consumers, and if it changes, it triggers a rebalance.
Rebalancing is done in-house by RocketMQ, not a programmer’s concern.
Pull OR Push?
Generally, MQ has two ways to get messages:
- Pull: The Consumer takes the initiative to Pull messages. The advantage is that the Consumer can control the frequency and number of pulling messages and knows its consumption capacity, so it is not easy to cause message accumulation on the Consumer end. However, the real-time performance is not good and the efficiency is relatively low.
- Push: The Broker actively sends messages, which is real-time and efficient. However, the Broker cannot know the consumption capacity of consumers. If too many messages are sent to consumers, messages will accumulate on the Consumer end. If too little data is sent to the Consumer, the Consumer will be idle.
Whether it’s a Pull or a Push, a Consumer will always interact with the Broker in three ways: short, long, and polling.
It looks like RocketMQ supports both Pull and Push, but Push is actually implemented using Pull. So how does a Consumer interact with the Broker?
This is where RocketMQ is cleverly designed, neither short nor long, nor polling, but using long polling.
Long polling
A Consumer makes a request to pull a message in two cases:
- Message: After the Consumer gets the message, the connection is disconnected.
- The Broker holds the connection every 5 seconds to check if there is a message. If there is a message, notify the Consumer to disconnect the connection.
Transaction message
RocketMQ supports transaction messages. After Producer sends a transaction message to the Broker, the Broker stores the message in system Topic: RMQ_SYS_TRANS_HALF_TOPIC, so that the Consumer cannot consume the message.
The Broker has a scheduled task to consume RMQ_SYS_TRANS_HALF_TOPIC messages and initiate a check back to the Producer in three states: Commit, rollback, or unknown.
- If the status of the check is Commit and rollback, the message is submitted and rolled back.
- If it is unknown, it will wait for the next check. RocketMQ can set the interval and number of check backs for a message. After a certain number of check backs, the message will be rolled back automatically.
Delay message
A delayed message is a message that cannot be consumed by a Consumer immediately after it is sent to the Broker. RocketMQ only supports specific delay times: 1S 5s 10s 30s 1M 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Consumption form
RocketMQ supports two forms of consumption: concurrent consumption and sequential consumption. For sequential consumption, you need to ensure that the ordered messages are in the same queue. There are several overloaded methods used by RocketMQ to send messages, and one overloaded method supports queue selection.
Synchronous and asynchronous disk flushing
Producer sends messages to the Broker, which needs to persist the messages. RocketMQ supports two persistence strategies:
- Synchronous flush: The Broker persists a message before returning ACK to the Producer. The benefits are high reliability but low efficiency.
- Asynchronous flush: The Broker writes a message to the PageCache and then returns an ACK to the Producer. The benefit is that it is extremely efficient, but messages can be lost if the server hangs, and not if only the RocketMQ service hangs.
Synchronous or asynchronous replication
To ensure the reliability and availability of MQ, the Follower node is deployed in the production environment. The Follower node copies the Master data. RocketMQ supports two replication policies:
- Synchronous replication: Only after the Master and Follower write messages successfully, the Master and Follower return an ACK to the Producer. This mode has high reliability but low efficiency.
- Asynchronous replication: If the Master writes successfully, it returns an ACK to the Producer. This is efficient, but messages may be lost.
Whether “write” is written to PageCache or disk depends on the Follower Broker configuration.
Talk about the Producer
RocketMQ provides three ways to send messages:
- Oneway: Fire and forget, one-way message, after the message is sent, no value is returned from this method.
- Synchronization: After a message is sent, it waits for a response from the Broker.
- Asynchronous: A message is returned immediately after it is sent, and the correspondence method is executed after receiving the response from the Broker.
In practice, the synchronous approach is generally used. To improve RocketMQ performance, it is common to modify the parameters of the Broker, especially the flush and replication strategies.
Send message retry
If a message is sent using MessageQueueSelector, the retry mechanism for sending messages will be disabled.
There are four possible responses to send messages:
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
Copy the code
In addition to the first one, the other is always the problem, in order to ensure that the message is not lost, you need to set up Producer parameters: RetryAnotherBrokerWhenNotStoreOK to true.
Fault Avoidance mechanism
If a message fails to be sent and is sent to the same Broker during retries, there is a high probability that the message will fail again. RocketMQ is clever in that it automatically avoids the Broker during retries in favour of another Broker, but so far asynchronous sending has not been so smart. Retries are performed on only one Broker, so synchronous sending is strongly recommended.
RocketMQ provides two fault avoidance mechanisms. Control with SendLatencyFaultEnable.
- False: Default. Failure avoidance is enabled only for BrokerA retry, for example, BrokerB will be selected for retry, but BrokerA will be selected for the next time a message is sent.
- True: Turn on delayed fallback so that once a message to BrokerA fails, a pessimistic assumption is that BrokerA will not be available for a period of time and that messages will not be sent to BrokerA for a period of time.
Deferred backoff seems to work well, but generally it takes only a moment for a busy Broker to become unavailable or the network to recover. If delayed backoff is enabled, the available Broker is shunned for a period of time, leaving other brokers even busier. That could be worse.
Talk about Consumer
Note for the Consumer thread
ConsumeThreadMin, ConsumeThreadMax, ConsumeThreadMax, ConsumeThreadMin, ConsumeThreadMin, ConsumeThreadMin If messages pile up on the Consumer side, new threads are automatically started to consume until the number of consuming threads reaches ConsumeThreadMax. This is not the case. The Consumer holds a thread pool and uses an unbounded queue, meaning that the ConsumeThreadMax parameter is invalid, so in real development, ConsumeThreadMin and ConsumeThreadMax are often set the same.
ConsumeFromWhere
RocketMQ allows consumers to consume from the latest message, the earliest message, and a specified timestamp if they can’t find where to start.
Consuming message retry
RocketMQ sets up a RETRY queue for each ConsumerGroup with a Topic name of %RETRY%+ ConsumerGroup to hold messages that the ConsumerGroup needs to RETRY, but the RETRY takes a certain amount of time. RocketMQ saves RETRY messages to the Delay queue of Topic SCHEDULE_TOPIC_XXXX. Background scheduled tasks are delayed and then saved to the RETRY queue of %RETRY%+consumerGroup.
What should I do if I don’t have enough information and spending power
- Increase consumption progress, this is the best way.
- Add queue, add Consumer.
- The original Consumer, as a brick remover, “moved” messages to multiple new topics according to certain rules, and then opened several consumergroups to consume different topics.
- Create a new ConsumerGroup to consume, that is, two Consumergroups consume a Topic at the same time, but we need to pay attention to the judgment of offset. For example, if a ConsumerGroup consumes an odd number of offset, A ConsumerGroup consumes a message whose offset is even.
I thought I would be able to write a literacy essay, but I overthought it because it was a literacy essay for people who didn’t have much exposure to RocketMQ. But RocketMQ is not that easy. You can’t just use a blog to get people who didn’t have much exposure to RocketMQ to get started. I keep thinking, is this important, does it need to be described; This is something that can be ignored, can we not introduce it, etc., etc., etc., you can see that this article is basically introducing concepts, there is almost no API level, because when it comes to API, it would take two weeks to write it.