At first glance, our consumer perception of RocketMQ might be as simple as a client that consumes messages. All you need to do is specify the corresponding Topic and ConsumerGroup, and the rest is just:

  • Receives the message
  • Process the message

And we’re done.

Of course, this may be true in real business scenarios. But if we don’t know exactly what Consumer will do when it launches, the details of the underlying implementation can be as confusing as looking for a needle in a haystack in a complex business scenario.

On the other hand, if you understand the details, you will have more context when troubleshooting the problem, and you will be more likely to come up with solutions.

Some of the basic concepts of RocketMQ and some of the underlying implementation of RocketMQ have been written in the RocketMQ Basic Concepts & source code analysis article.

A simple example

The overall logical

Let’s start with a simple example of the basic use of the RocketMQ Consumer. Start by using it and learn the details.

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest"."*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n"); }}Copy the code

The code must be a bit difficult to look at, but the following flow chart is logically equivalent to the above code and can be viewed together.

Point of consumption strategy

In addition to general topics like Topic and register message listeners, setConsumeFromWhere deserves more attention. It determines where the consumer will start spending, and there are three possible values:

There are actually three more values in the ConsumeFromWhere enumeration class source code, but they have been deprecated. However, this configuration is only valid for new ConsumerGroups. Existing ConsumerGroups will continue to consume the Offset from the last consumption.

And it makes a lot of sense, let’s say you have 1,000 messages, and your service is consuming 500, and then you launch something that restarts the service, and then you start consuming again, right? Isn’t that ridiculous?

Caches information about subscribed topics

Subscribe (“TopicTest”, “*”), but there’s a lot going on behind the scenes, so let me just draw the simple process.

The first argument to the SUBSCRIBE function is the Topic we need to consume, needless to say. The second argument is called the filter expression string, and the simpler argument is the Tag of the message you want to subscribe to.

Each message has its own Tag and if you don’t know, you might want to check out the above article

Here we pass *, which stands for subscribing to all categories of messages. Of course, we can also be introduced to tagA | | tagB | | tagC this, on behalf of our consumption only played these three Tag information.

RocketMQ constructs SubscriptionData from the two parameters we pass in, and maintains it in an in-memory ConcurrentHashMap. In short, it caches the subscribed Topic.

A critical operation occurs after the cache is complete, which is to start sending heartbeats to all brokers. The Consumer client will:

  • Name of consumer
  • The consumption type represents the pattern of consuming messages through Push or Pull
  • Consumption model refers to CLUSTERING or BROADCASTING consumption
  • Point of consumption strategyThat’s sort of likeCONSUME_FROM_LAST_OFFSETThis kind of
  • A consumer’s subscription data set can listen to multiple topics
  • Collection of producers The collection of production registered on the current instance

That’s right, after the Consumer instance starts, it also runs the Producer’s code. Furthermore, if a client is not configured with either producers or consumers, the heartbeat logic will not be executed because it makes no sense.

Start consumer instance

All of the core logic mentioned above is actually here and discussed in detail below, so this is the end of the simple example.

Enter the startup core logic

In the core entry class of startup, a total of 4 states are processed respectively, which are:

  • CREATE_JUST
  • RUNNING
  • START_FAILED
  • SHUTDOWN_ALREADY

But since we’ve just created it, we’ll go to the logic of CREATE_JUST, so we’ll focus on what the Consumer does when it starts.

Check the configuration

Basic operation, as we usually write business code is no different, check the configuration of various parameters are legitimate.

There are too many configuration items to mention, just know that RocketMQ starts up and verifies the parameters in the configuration.

Anyway, let’s make a list:

  • Is the name of the consumer group empty
  • The name of the consumer group cannot be a name reserved for use by RocketMQ, namely —DEFAULT_CONSUMER
  • Whether the consumption model (CLUSTERING, BROADCASTING) is configured
  • The consumption point policy (for example, CONSUME_FROM_LAST_OFFSET) is configured
  • To judge whether the consumption method is legal, it can only be sequential consumption or concurrent consumption
  • Whether the minimum consumption thread and the maximum consumption thread quantity of the consumer group are in the specified range, which refers to (1, 1000), open left and open right. And then there’s the judgment that the minimum can’t be greater than the maximum
  • . Etc., etc.

So you see, even a great open source framework can have this cumbersome, common business code.

Changing the instance name

InstanceName is obtained from rocketmq.client.name. If it is not configured, the instanceName is set to DEFAULT. If, for example, the consumption model is CLUSTERING (which is the DEFAULT), DEFAULT is changed to a string of ${PID}#${system.nanotime ()}, as shown here.

instanceName = "90762 # 75029316672643"
Copy the code

Why do I mention this in isolation? It was quite a unique identifier for each instance, the unique identification is very important, if a consumer group of the same instanceName, then might cause repeated consumption, or news accumulation problems, caused the this point accumulation is more interesting news, follow-up should be alone I have time to write an article to discuss.

But as sharp-eyed students might have noticed, instanceName is not composed of PID and System.nanotime. PID may be obtained from the PID of the Docker container host machine, which may be the same and understandable. The System. NanoTime? Can that be repeated?

In fact, according to RocketMQ’s Github commit record, this issue is likely to persist until at least March 16, 2021.

RocketMQ has fixed this issue in a submission dated March 16. Here’s what’s changed:

In the original version, instanceName consisted only of Pids, making it entirely possible for different consumer instances to have the same instanceName.

For those of you familiar with RocketMQ, isn’t the unique identifier for a Consumer on the Broker side a clientID? Yes, but clientID is made up of clientIP and instanceName.

As mentioned above, clientIP may be the same because of Docker, which will eventually lead to the same clientID.

OK, so that’s it for changing the name of an instance, I really didn’t expect that much.

Instantiate consumer

The key variable is named mQClientFactory

The consumer instance is then instantiated, and the clientID initialization mentioned above in changing the instance name is done in this step. Here I will not give you the source code, you need to know that this place will instantiate out of a consumer OK, do not too much entanglements in detail.

You then set properties to the implementation of Rebalance, such as the consumer group name, the message model, the policy that Rebalance takes, and the consumer instance that you just instantiated.

The default policy for this Rebalance is:

AllocateMessageQueueAveragely is a Messsage Queue allocation strategy to consumers on average, more details or see me on the above article.

In addition, PullAPIWrapper, the core implementation of the pull message, is initialized.

Initialize offsetStore

Different offsetStore implementations are instantiated based on different message models (i.e., BROADCASTING or CLUSTERING).

  • BROADCASTING is implemented as LocalFileOffsetStore
  • The implementation for CLUSTERING is RemoteBrokerOffsetStore

The difference is that LocalFileOffsetStore manages the Offset locally, while RemoteBrokerOffsetStore hands the Offset to the Broker

Start the ConsumeMessageService

Cache Consumer Group

The consumer group is then cached in the current client instance, in an in-memory concurrentHashMap called consumerTable.

RegisterConsumer:

But I think it makes more sense to “translate” to caching because it just caches the built consumer instance into the map, and that’s it. If it exists, it returns false, indicating that the registration was not actually successful.

So why return false? Would you be ok if you did not perform cache logic? Even throw an MQClientException from the outside based on this false?

Why? Assume that your colleague A has used the name consumer_group_name_for_A, and the line is running normal consumption messages. Well, you’ve added A feature that requires listening to MQ, which also uses consumer_group_name_for_A. Imagine if RocketMQ didn’t do validation, you’d be registered, but your coworker would be like, “What the hell? How did you start to repeat consumption?”

Start the mQClientFactory

This mQClientFactory is the consumer instance created in the instantiation consumer step, which ends with a call to mqClientFactory.start ().

That’s the core logic at the end.

Example Initialize the NameServer address

Initialize the Netty client used for communication

Start a bunch of scheduled tasks

This is not an exaggeration, it is a lot, for example:

  • If NameServer doesn’t get it, it will start a scheduled task to pull it every once in a while
  • For example, it also starts a scheduled task to pull routing data for a given Topic from NameServer once in a while. This routing data refers specifically to MessageQueue data, such as how many write queues, how many read queues, and brokernames, clusters, and IP addresses of the brokers distributed to the Topic, which is roughly called routing data
  • Another example is to start a scheduled task that sends a heartbeat and not start the heartbeat
  • For example, the Broker might fail, right? Does the client need to kill the offline Broker in time? So RocketMQ has a cleanOfflineBroker method that does just that
  • The next key is persistent offset, which, due to the CLUSTERING consumption, regularly reports current consumer consumption to the Broker

EOF

EOF

If you think this article is helpful to you, please give a like, close a note, share a share and leave a comment.