An overview of the

The Consumer pulls the message by PullMessageService thread. PullMessageService gets the PullRequest from the pullRequestQueue to pull messages from the Broker.

After the message pulls the message, it puts the message into the local ProcessQueue. The consumer then consumes the data in ProcessQueue.

Message pull process

The process diagram

Message pull

Code location: DefaultMQPushConsumerImpl# pullMessage ()

  1. After the Rebalance is complete, a PullRequest will be added to the pullRequestQueue of the PullMessageService

  2. PullMessageService pulls 1 PullRequest from the queue and then pulls 32 messages from the Broker (default). The Consumer can be controlled by the pullBatchSize parameter.

  3. If the Consumer’s ProcessQueue has more than 1000 messages or the ProcessQueue has accumulated more than 100MB of messages, flow control is triggered, which delays pulling messages for 50ms. Otherwise, pulling a message will immediately cause another request to pull a message.

Consumer can by calling setPullThresholdForQueue (), setPullThresholdSizeForQueue (). Set the triggering flow control threshold.

Remind ProcessQueue that it is based on consumer groups.

The broker side handles message pulling

PullMessageProcessor#processRequest()

  1. The Broker first pulls the message from the ConsumerQueue based on the pull offset

  2. If the consumer filters the message by tag, the message is quickly filtered by the tagHashCode of the ConsumerQueue Item

  3. Pull messages from the CommitLog according to the Offset of the ConsumerQueue Item

  4. If consumers spend too slowly. It is recommended that the Consumer pull the message from the slave next time. Slow consumption is judged as (maximum offset of the current CommitLog – maximum offset of the pull message)/machine physical memory > 40%

  5. Returns data to the consumer

The Consumer processes pulled messages

  1. The Consumer filters the message locally again.

  2. In the case of tag filtering, the values of tag Hashcode are compared.

  3. After filtering the message, put it into the local ProcessQueue.

  4. Stores the PullRequest to the pullRequestQueue for the next PullRequest.

conclusion

  1. The default Push mode is used to pull messages. Push is implemented using Pull, or long polling.

  2. When the message is pulled, there is flow control on the Consumer terminal. The default behavior is for ProcessQueue to have more than 1000 messages or to delay pulling messages for 50ms if the message size is more than 100MB.

  3. If a message is a tag, the Broker performs coarse-grained filtering based on the tag Hashcode. Finally, the Consumer side filters according to the tag value.

  4. Once the message is pulled, it is not consumed directly, but put into the local ProcessQueue for subsequent consumer threads to consume.