About the author: Hi, everyone. I am the author of RocketMQ Technology Insider, the chief evangelist of RocketMQ Open Source community, and the maintainer of the public account middleware Interest Circle, which focuses on analyzing mainstream Java middleware. It has published 15 columns on Kafka, RocketMQ, Dubbo, Sentinel, Canal, ElasticJob and other middleware.

RocketMQ provides a tag-based message filtering mechanism, but many of my friends had some questions during the use of RocketMQ. I inadvertently tagged the official RocketMQ group, and I remember a number of friends asking the following question:

Today I’m going to share with RocketMQ Tag a few issues that are worth your attention. If you find them helpful, I’m looking forward to your likes.

  • Why does message loss occur when consumer group subscriptions are inconsistent?
  • If a tag has a small number of messages, does it show high latency?

1. Messages are lost due to inconsistent subscription relationships among consumer groups

From a message consumption perspective, a consumer group is a basic physical isolation unit, with each consumer group having its own consumption point, consumption thread pool, and so on.

A common mistake for RocketMQ beginners is that different consumers in the consumer group subscribe to different tags on the same topic, which can cause messages to be lost (some messages are not consumed).

To recap the key points:

  1. For example, a Topic has four queues.

  2. The message sender sends four tagA messages consecutively and then four TAGB messages consecutively. By default, the message sender adopts the round-robin load balancing mechanism. In this way, tagA and tabB messages exist in each queue of a topic.

  3. The consumer group DW_TAG_test, whose IP is 192.168.3.10, subscribes to tagA, and the other consumer, whose IP is 192.168.3.11, subscribes to tagB.

  4. Before consuming messages, consumers in the consumption group will first load the queue, which is evenly allocated by default. The allocation result is as follows:

    • 192.168.3.10 Allocated to Q0 and Q1.

    • 192.168.3.11 Allocated to Q2 and Q3.

      • The consumer then initiates a message pull request to the Broker. 192.168.3.10 The consumer subscribs only to tagA, so that messages with tagBs in Q0 and Q1 will be filtered. However, the filtered messages will not be delivered to another consumer who subscribs to tagB. Messages are lost.
      • Similarly, as the consumer of 192.168.3.11 only subscribes to tagB, messages with tagAS in Q2 and Q3 will be filtered, but the filtered tagas will not be delivered to another consumer who subscribes to tagA, so this part of messages will not be delivered, resulting in message loss.

2. If the number of messages in a tag is small, does it show high latency?

There is such a worry in the group at the beginning, and the scenario is as follows:

After the consumer consumes the tag1 message with offset=100, 1000W non-TAG1 messages appear consecutively. Will the backlog of this consumer group continue to increase to 1000W?

To understand this problem, we should at least focus on the following features of the source:

  • Message pull process
  • Site submission mechanism

This article is not intended to analyze the entire process of the source code, if you are interested in this code, you can check out the author’s RocketMQ Technology Inside book.

This article will be problem-oriented, through their own thinking, and find the key source code to verify, and finally a simple example code for verification.

Before encountering problems, we can try to think about how we would think if this feature were to be implemented.

To determine if the consumer group consumes the message offset=100 and the next 1000W messages will be filtered, what should we do if we want the site to be submitted? I think there should be at least the following key points:

  • Message What does the server do if no suitable message can be found for 1000 CONSECUTIVE messages when a message is pulled
  • How to submit a site when the client pulls a message or when the client does not pull a message

2.1 Key design in message pull process

The client to the service side pull messages, does not meet the conditions of 1000 w messages in a row, a filter to find so many messages, must be very time consuming, the client can’t wait for so long, measures must be taken to the service end, must stop trigger a search condition and returned to the client NO_MESSAGE, client in news search will wait how long?

Key point 1: When the client initiates a message pull request to the server, the timeout is set as follows:

The meanings of the two variables related to timeout time are as follows:

  • Long brokerSuspendMaxTimeMillis not conform to the news in the current allowed to hang on the Broker end time, defaults to 15 s, temporarily does not support the custom.
  • Long timeoutMillis Timeout period for pulling messages. The default value is 30s.

That is, the maximum timeout period for pulling a message is 30 seconds.

Key point 2: The Broker sets a complete exit condition when handling a message pull, specified by the getMessage method of DefaultMessageStore, as described below:

Key points:

  • First of all, the client will pass in a number of expected pull messages, corresponding to the above code maxMsgNums, if the specified number of pull messages (readers can refer to isTheBatchFull method), the normal exit.
  • Another key filter condition is the maximum number of index bytes scanned by the server during a pull, i.e. the number of bytes scanned by the ConsumeQueue at a pull. Take 16000 and multiply the expected number of pulls by 20, because a ConsumeQueue entry is 20 bytes.
  • The server also contains a long round robin mechanism. If a specified number of bytes are scanned but no messages are found, the broker is suspended for a period of time. If a new message arrives and meets the filtering criteria, it is woken up and returns the message to the client.

Going back to the problem, if the server has 1000W consecutive non-TAG1 messages, the pull request will not be filtered at once, but will return so that the client will not time out.

This eliminates the first concern: the server is not going to wait and not return a message if it doesn’t find it. The next key to seeing if there is a backlog is how to commit the points.

2.2 Site submission mechanism

2.2.1 The client pulls out an appropriate message site submission mechanism

Pull the thread from the service side Pull to structure after news will be submitted to the consumer group of thread pool, the main definition in DefaultMQPushConsumerImpl PullTask class, specific code is as follows:

As is known to all, RocketMQ is in the consumer sites submitted after the success, the code in ConsumeMessageConcurrentlyService, as shown below:

The key point here:

  • After consuming the successful message, the consumer will adopt the minimum site submission mechanism to ensure that the consumption is not lost.

  • The minimum site submission mechanism is actually to put the pulled message into a TreeMap, and then remove the message from the TreeMap after the consuming thread successfully consumes a message, and then calculate the site:

    • Returns the first message (minimum point) in TreeMap if there are still messages being processed in TreeMap
    • If there is no message processing in the current TreeMap, the return point is this.queueOffsetMax. QueueOffsetMax represents the maximum consumption point pulled in the current consumption queue, because all the messages pulled are consumed.
  • Finally, call the updateoffset method to update the local loci cache (with timed persistence)

2.2.2 The client does not pull an appropriate message site submission mechanism

Client if there is no pull to the right message, for example, are all the tag to filter, in DefaultMqPushConsumerImpl PullTask defines the processing mode, specific as follows:

The key codes are in correctTasOffset, please see:

If the message in the processing queue is 0, the next pull offset will be used as a point, and this value will be driven forward when the server is looking up the message, as shown in getMessage of DefaultMessageStore:

So you can see from this that even if all the messages are filtered out, the loci will still be driven forward without creating a huge backlog.

2.2.3 The message is pulled with a site submission

RocketMQ site submission is stored in the local cache by the client, and then the site information is periodically submitted to the Broker. There is another implicit site submission mechanism:

When a message is pulled, if the locale information is present in the local cache, a system flag is set :FLAG_COMMIT_OFFSET. This flag triggers a locale commit on the server side as follows:

2.2.4 Summary and verification

In summary, using tags does not cause a large backlog due to the small number of tags.

To test this idea, I also did A simple test by starting A message sender to send tag B messages to A given topic, and consumers only subscribe to Tag A, but there is no backlog for consumers. The test code is shown in the following figure:

Check the consumer group backlog as shown below:

First article :www.codingw.net/Article?id=…

One last word (attention, don’t fuck me for nothing)

If this article is helpful or inspiring, please give it a thumbs up.

To master one or two Java mainstream middleware is a necessary skill to knock on BAT and other big factories. It gives you a learning route of Java middleware and helps you realize the transformation of the workplace.

Java advanced ladder, growth path and learning materials to help break through the middleware field