Manna: Kei & Yun Tian Review proofread: White Japan Edit & edit: Wen Yan
Introduction: With the release of RocketMQ 5.0 Preview, the major features of 5.0 are gradually introduced. POP Consumer as a feature of 5.0, the POP consumption model represents a whole new consumption model. It has the characteristics of lightweight, stateless, no queue exclusivity, for message backlog scenarios, Streaming consumption scenarios are very friendly. Before introducing POP Consumer, let’s review the current Push Consumer.
Push Consumer
Those of you familiar with RocketMQ will be familiar with Push Consumer, which is a common and easy mode for client consumption. We simply set up and write the business logic in the callback method ConsumeMessage, and start the client application to consume the message as normal.
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("test_topic", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n"); }}Copy the code
So how does Push Consumer consume messages?
Of course, the Consumer can only receive messages if the Producer sends them to the Topic first. The Producer uses polling to send messages to each Queue. Generally, there is more than one Consumer. When the client starts, load balancing occurs in Topic and Consumer group, and queues that need to be processed are allocated to each client. During load balancing, each client retrieves all the Consumerids and all the queues and sorts them. Each client uses the same responsible balancing algorithm, such as the equal allocation algorithm, so that each client calculates which queues it needs to consume. Each increase or decrease in consumers triggers load balancing, so we can use RocketMQ load balancing to dynamically scale up and improve the client’s ability to send and receive messages.
Here’s a quick question: can you keep increasing the number of clients to increase spending power? Of course not, because the number of queues is limited, and once the number of clients reaches the number of queues, adding new nodes will not increase the consumption capacity, because some nodes will not be allocated to queues and will not be able to consume.
The client is responsible for balancing. After queues are allocated to clients, clients constantly pull messages from the Broker and consume them on the client. Isn’t the Push client? Why is the client pulling messages to the Broker? Shouldn’t the Broker be pushing messages to the client? This is an interesting point because RocketMQ, whether it’s Push Consumer, Pull Consumer, or POP Consumer, consumes messages as pulled by the client. The Push Consumer is just wrapped at the client API level to make it feel like the Broker is pushing it.
After client load balancing and pulling messages, the client can normally consume messages.
The complete Push Consumer processing logic can be seen in the figure above, we can see the complete process of Push Consumer.
Rebalance the client to determine which Consumer clients process which queues. Then pull messages by PullMessageService. Pull to the message after ConsumeMessageConcurrentlyService submit request to the message consumption thread pool, then calls the callback method ConsumeMessage, here you can get the message processing business, Finally the consumer successfully updates the local offset and reports the offset to the Broker. If a consumption fails (exception throw, timeout, etc.), the client sends sendBack to tell the Broker which message consumption failed. The Broker then sends the failed message to a delayed queue, which is then placed in a Retry Topic, where the client consumes the retry Topic to complete the redelivery. This has the advantage of not affecting the consumption of normal messages because of partial consumption failures. If you want to see more details, you can download the source code from Github to see the actual code flow.
Through the introduction of Push Consumer, we have a certain understanding of the principle of Push Consumer. We can see that the RocketMQ client does a lot of things, load balancing, pull messages, consumption point management, sendBack after a consumption failure, and so on. This is definitely not friendly to multilingual support. Those of you who have been involved in a lot of language development will know that porting so much logic to different languages is definitely not an easy task. At the same time, client upgrade o&M becomes more difficult.
So we thought, can we slim down the client side and move some of the logic from the client side to the Broker? Of course it can. When we introduced the Push Consumer client to do the balancing, we saw that the information required for load balancing, all the consumerids, was originally all the Queue information that the client got from the Broker. The Broker can also be accessed by nameServer. There is no difference between calling the balancing algorithm on the client and the Broker. So porting Rebalance to the Broker is a good idea. Broker load balancing can achieve almost the same effect as client load balancing. Client logic is reduced, multi-language implementation is easier, and subsequent upgrade operation and maintenance are more controllable. In addition, because brokers have global information relative to clients, they can do more interesting things. For example, load balancing is carried out according to the backlog of queues, and queues on some clients with high pressure are allocated to other clients for processing.
POP Consumer
Through the introduction of Push Consumer, we know some characteristics of Push Consumer.
-
Queue exclusivity: Each queue on the Broker can be assigned to only one Push Consumer machine of the same Consumer group.
-
Update the offset after consumption: Each Pull request pulls batch messages to the local queue cache. Commit the offset only after the local consumption succeeds.
The above features may cause some problems, such as client exception machine hang, which causes the allocation queue messages to accumulate and cannot be consumed.
RocketMQ’s Push Consumer is not very friendly when the machine is hung. If the client machine is hanging, the Rebalance will still queue up purchases to the hang machine. If the hang machine is slow or unable to consume, this will cause a build-up of purchases. There are also unavoidable issues such as consumption delays caused by multiple Rebalance on the client side when the server Broker is published. As shown below:
When the Push Consumer 2 machine hung, Q2 on the Broker to which it was assigned was severely built up. When we deal with this kind of problem, we usually find the machine and restart it, or go offline. Ensure that the business is not affected by abnormal machines, but if the queue is squeezed to a certain extent, the machine may not be able to quickly catch up with the consumption progress, which is also limited by the ability of Push consumers.
Let’s summarize some pain points of Push Consumer:
-
Rich client, client logic is heavy, multi-language support is not friendly;
-
Rebalance can lead to a consumption squeeze on the client or Broker.
-
A single Queue is bound to a single Consumer, and the consumption capacity of a single Queue cannot be extended horizontally.
-
The machine hang will lead to extrusion.
Based on these issues, RocketMQ 5.0 implements a new Consumer model -POP Consumer.
POP Consumer addresses both the stability and the scaling ability to de-queue placeholders.
Let’s take a quick look at how POP Consumers consume messages:
The POP Client issues a POP request message from the Broker’s queue, and the Broker returns a message. Among the system properties of messages, there is an important property called POP_CK. POP_CK is a handler for a message, through which a message can be located. When the message has been successfully consumed, the POP Client sends an ackMessage and passes a handler to confirm the success of the message consumption to the broker.
For message retries, when a message is popped, it goes into an invisible time period during which it will not be popped again. If the message consumption is not confirmed by ackMessage during this invisible period, the message will be visible again after the invisible period.
In addition, for message retries, our retry policy is a gradient of delay time, with a progressively increasing interval between retries. So, there is also a changeInvisibleTime to change the invisible time of a message.
As can be seen from the figure, the message would have been visible again at this intermediate point in time, but we extended the invisible time by using changeInvisibleTime in advance before visibility, which delayed the visible time of the message. When the user business code returns reconsumeLater or throws an exception, we can use changeInvisibleTime to change the next visible time by the number of retries. In addition, if the RT is consumed for more than 30 seconds (the default, which can be changed), the Broker will also put the message to the retry queue.
In addition, POP consumption points are saved and controlled by the Broker, and POP consumption can be consumed by multiple clients in the same queue, as shown in the following figure:
All three clients do not need to Rebalance queues. Instead, they all use POP to request messages from all brokers for consumption. Even if POP Consumer 2 were to hang, the inside information would cause POP Consumer1 and POP Consumer3 to make purchases. This solves the problem that the hang machine might cause.
As you can see from the overall flow, POP consumption can avoid consumption delays caused by Rebalance, and the client can consume all the queues of the Broker without the problem of machine hang.
Make this Rebalance to a single client. POP Consumers can Rebalance all queues in a Topic, compared to Push consumers. The number of Push Consuner clients can be at most equal to the number of queues. POP consumers can break this limit by allowing multiple POP consumers to consume the same Queue.
The Broker implementation
How is POP Consumer implemented on the Broker side?
After a POP Consumer pulls messages, it locks the Queue dimension so that only one client can pull messages to the same Queue at a time. The checkPoint information is saved at the Broker. The checkPoint information includes the message’s Topic, ConsumerGroup, QueueId, offset, POPTime, msgCout, ReviveQueueId and other information. The checkPoint information is stored in the buffer to wait for the ACK message. After receiving the ACK message from the client within a period of time, the checkPoint information is removed from the buffer and the consumption progress is updated to indicate that the message is successfully consumed.
When the checkPoint message waits for an ACK message, the checkPoint message clears the buffer and sends ck MSG to the store. Ck MSG is first sent to the delay queue SCHEDULE_Topic_XXXX. After the delay, it will enter REVIVE_LOG Topic. REVIVE_LOG Topic is the CK MSG and ACK MSG topics saved in store to be processed. POPReceiveService takes messages from REVIVE_LOG Topic and puts them into a map. If ck has a corresponding ACK, it will update the consumption site of REVIVE_LOG to indicate that message consumption is complete. If ck MSG is not confirmed due to timeout, it will query the real message corresponding to CK MSG and put the message into retry Topic to wait for client consumption. POP Consumers probabilistically consume messages in retry topics when normally consumed. We can see from this design that RocketMQ is often designed to implement business logic through internal topics, transactional messages, and timed messages.
Let’s briefly conclude the benefits of POP Consumer:
-
Stateless, offset information Broker maintenance, client and Queue are not bound.
-
Lightweight, the client only needs to send and receive messages and confirm messages.
-
No Queue placeholder, Queue is no longer bound to the client.
-
Multilingual friendly, convenient multilingual transplant.
-
The upgrade is more controllable. The logic converges to the Broker, making the upgrade more convenient and controllable.
POP&Push fusion
Since POP has so many advantages, can we use POP to solve some of the problems of Push? As mentioned earlier, when a queue has a large number of problems due to Consumer problems, limited by the consumption capacity of a single Consumer, it cannot quickly catch up with the consumption progress, resulting in a high delay. The core problem is the limitation of single queue and single Consumer, which leads to the inability of horizontal expansion of consumption power.
With POPAPI, we wanted to switch to POP mode when a queue became too large, giving multiple consumers the opportunity to consume the queue and catch up, and we implemented this in 5.0.
POP/Push mode switching mode
You can switch in two ways.
1. Command line
mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
Copy the code
2. Code switching
public static final String CONSUMER_GROUP = "CID_JODIE_1"; public static final String TOPIC = "TopicTest"; // Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8 private static void switchPop() throws Exception { DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); mqAdminExt.start(); ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet()); for (String brokerAddr : brokerAddrs) { mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000); }}Copy the code
Through the following POP Consumer Demo, we can see that POP Consumer and Push API are basically the same and easier to use. Compared with Push API, POP Consumer only has one more step to switch consumption mode.
Push & POP Retry queue differences
To use the POP consumption mode, we just need to switch modes based on the Push API, and there is some processing required for the Broker. The main area of concern is the Retry queue.
Push and POP modes handle retry queues differently
- Retry handling of Push
1) The server has a %RETRY%ConsumerGroup queue. 2) The client has a pull task pulling messages from the queue.
- Retry handling of POP
1) The server has a RETRY queue named %RETRY%ConsumerGroup_Topic for each Topic. 2) The client has no pull task specific to the RETRY queue, and each normal POP request has a probability of consuming a RETRY queue
After the mode is switched, messages in the old retry mode need to be processed; otherwise, messages are lost.
Push & POP toggle
Push to switch to POP
- The normal queue switches to POP mode
- POP requests in the normal queue are processed in the corresponding POP Retry queue
- For the Push Retry queue, we retain the pull tasks of the original Push Retry queue and work in Push mode.
POP switches to Push
- Normal queues switch to Push mode
- Push Retry queues naturally have corresponding pull tasks
- In the previous POP retry queue, we automatically created pull tasks on the client side and pulled them in Push mode. Note that this
To summarize, for retry queues, we specifically do not participate in mode switching.
conclusion
Finally, POP Consumer. POP, as a brand new consumption mode, solves some pain points of Push mode, making the client stateless and more lightweight, and the consumption logic is basically convergent to Broker, which is very friendly to support multiple languages. At the API level, it also completes the integration with Push, inheriting the simplicity of Push API and realizing the free switch between Push and POP.