The core mechanism and main process of message push have been analyzed above. The purpose of this article is to dispel superstitions by attempting to source correct some of the misinterpretations of the Rocket MQ push mode that most articles on the market have made.

Start with a few screenshots

I’ve looked up a few similar questions before -” Problems with Rocket MQ push mode “- and let’s see what people say.

I took a few:

Dispel rumors

There are many mistakes in their understanding above, which I refute respectively:

  • In the Push mode, MQ actively pushes messages to consumers. As analyzed in the author’s RocketMQ message “Push” analysis (part 1), Push does not actively Push messages, but passively receives the pull request and informs consumers of the message
  • In the process of Push, there are three conditions of flow control in the current source code before the consumer processing capacity is not considered. The purpose is to consider the consumption capacity of consumers
  • By default, each Queue subscribed by each Consumer cannot exceed 100 MB of memory. It can be controlled and not overflow easily

Take a look at the code

In this paper, we introduced the pull on the news is determined by the PullMessageService service thread, but the final foothold is DefaultMQPushConsumerImpl# pullMessage (). The author analyzes what efforts TA has made to protect the Consumer terminal:

  • Control the accumulated quantity of consumption
  1. Count the number of messages that processQueue has provisioned for the current PullRequest object
  2. PullThresholdForQueue is a queue-level traffic control threshold. By default, each message queue will cache up to 1000 messages at the Consumer end
  3. Wait for 50ms and process again. If the value is still greater than the threshold after 50ms, repeat step 1 and step 2
public void pullMessage(PullRequest pullRequest) {
    /* Total number of messages currently held by ProcessQueue */
    long cachedMessageCount = processQueue.getMsgCount().get();
    
    if (
        cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()
    ) {
        /* Give up the pullRequest task, wait for 50ms, and then join the pullRequest again */
        executePullRequestLater(pullRequest,
            PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
        );
        return; }}Copy the code
PullThresholdForQueue this threshold can be adjusted, DefaultMQPushConsumerImpl exposes setPullThresholdForQueue method, allows users to customize according to oneself circumstance, the default is 1000Copy the code
  • Memory usage control
  1. Computes the total memory used by processQueue for the current PullRequest object
  2. PullThresholdSizeForQueue limit queue level cache message size, each message queue default cache a maximum of 100 m, to determine the size, if temporary message has apparently more than limit, does not deal with the pull request, but the request to the team
  3. Wait for 50ms and process again. If the value is still greater than the threshold after 50ms, repeat step 1 and step 2
public void pullMessage(PullRequest pullRequest) {
    /* The current ProcessQueue holds messages in memory (MB) */
    long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    
    /* The current message stack exceeds 100M, triggering flow control */
    if (
        cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()
    ) {
        /* Give up the pullRequest task, wait for 50ms, and then join the pullRequest again */
        this.executePullRequestLater(pullRequest, 
            PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
        );
        return; }}Copy the code
Same pullThresholdSizeForQueue also can adjust the threshold is, DefaultMQPushConsumerImpl exposes setPullThresholdSizeForQueue method, allows users to customize according to oneself circumstance, Default is 100Copy the code
  • Control of the difference of consumption offset

This is hard to understand, because when I first started Rocket MQ, I assumed that I would need to consume the temporary messages in the ProcessQueue before pulling again, which is not the case, as long as there is no traffic control on the Consumer side, The PullMessageService will always pull messages at its own pace.

The reporting of each message consumption progress is the minimum offset of the messages currently in ProcessQueue, which determines where the consumer needs to start pulling once it restarts. The next pull offset maintained in the PullRequest object, nextOffset, is calculated each time for the current Consumer pull behavior.

Under normal circumstances, it has been limited to 1000 temporary messages, and the pull order is front to back, how can not offset the difference is equal to 2000 ah.

But think for yourself what if something like this happened: If offset = 1 message thread deadlock in consumption, so the news would have been no ACK, and other messages are in normal consumption, will lead to normal pull news, always can restart it happened that when the consumer end, and you are spending a lot of messages, but the news offset is 1, this will bring the a lot of repeated consumption, This is unacceptable to us. This restriction ensures that even if there is a deadlock, there won’t be too many repeat purchases — 2,000 or so, despite all the flaws.

public void pullMessage(PullRequest pullRequest) {
    if (!this.consumeOrderly) {
        /* The distance between the maximum offset and the minimum offset in the current ProcessQueue must not exceed 2000 */
        if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
            /* Give up the pullRequest task, wait for 50ms, and then join the pullRequest again */
            executePullRequestLater(pullRequest, 
                PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
            );
            return; }}}Copy the code
public long getMaxSpan(a) {
    try {
        this.lockTreeMap.readLock().lockInterruptibly();
        try {
            /** * lastKey() The maximum offset of the current temporary message ** firstKey() the minimum offset of the current temporary message */
            if (!this.msgTreeMap.isEmpty()) {
                return this.msgTreeMap.lastKey() 
                    - this.msgTreeMap.firstKey(); }}finally {
            this.lockTreeMap.readLock().unlock(); }}catch (InterruptedException e) {
        log.error("getMaxSpan exception", e);
    }

    return 0;
}
Copy the code

To summarize

All in all, Rocket MQ’s push mode is safe to use without any of the awkwardness described at the beginning of this article and destabilization of the system.