Note: This series of source code analysis is based on RocketMq 4.8.0, gitee Repository link: gitee.com/funcy/rocke… .

Previously, the process of sending messages from producer was analyzed. In this paper, we will analyze the process of consumer consumption messages.

Consumer consumption news demo for org. Apache. Rocketmq. Example. Simple. PushConsumer, code is as follows:

public class PushConsumer {

    public static void main(String[] args) 
            throws InterruptedException, MQClientException {
        String nameServer = "localhost:9876";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.setNamesrvAddr(nameServer);
        consumer.subscribe("TopicTest"."*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        // Register a listener to listen for messages
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
                // There is a message here
                System.out.printf("%s Receive New Messages: %s %n", 
                    Thread.currentThread().getName(), msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});/ / start
        consumer.start();
        System.out.printf("Consumer Started.%n"); }}Copy the code

Consumer is fairly simple to use. You create a DefaultMQPushConsumer object, configure some properties, and, crucially, register a message listener (in which to fetch messages), then call the start() method to start consumer.

Now let’s analyze the consumption process of this area.

1. Construction method:DefaultMQPushConsumer

DefaultMQPushConsumer = DefaultMQPushConsumer

public DefaultMQPushConsumer(final String consumerGroup) {
    // The queue allocation policy is specified
    this(null, consumerGroup, null.new AllocateMessageQueueAveragely());
}

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, 
        RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
    this.consumerGroup = consumerGroup;
    this.namespace = namespace;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
Copy the code

In the constructor, we just do some assignment of member variables. The key is the strategy for allocating message queues: AllocateMessageQueueStrategy, if specified, the default is to use AllocateMessageQueueAveragely, namely average get messages from the queue.

2. Startconsumer:DefaultMQPushConsumer#start

DefaultMQPushConsumer#start:

public void start(a) throws MQClientException {
    setConsumerGroup(
        NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    / / start
    this.defaultMQPushConsumerImpl.start();
    // We do not care about the contents of the message trajectory
    if (null != traceDispatcher) {
        ...
    }
}
Copy the code

Continue to enter DefaultMQPushConsumerImpl# start:

public synchronized void start(a) throws MQClientException {
    switch (this.serviceState) {
        caseCREATE_JUST: log.info(...) ;this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            this.copySubscription();

            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
            / / the client
            this.mQClientFactory = MQClientManager.getInstance()
                .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
            // Set load balancing properties
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(
                this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            if (this.defaultMQPushConsumer.getOffsetStore() ! =null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                // Message mode: Broadcast mode is local, cluster mode is remote
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, 
                            this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, 
                            this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            // Load the offset of the consumption information
            this.offsetStore.load();
            // Instantiate different consumeMessageService according to the client: sequential message and concurrent message
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService = new ConsumeMessageOrderlyService(this, 
                    (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, 
                    (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            this.consumeMessageService.start();
            // Register a consumer group
            boolean registerOK = mQClientFactory.registerConsumer(
                this.defaultMQPushConsumer.getConsumerGroup(), this);
            if(! registerOK) {this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown(
                    defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                throw newMQClientException(...) ; }/ / startmQClientFactory.start(); log.info(...) ;this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw newMQClientException(...) ;default:
            break;
    }

    // Update topic information to get data from nameServer
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    // Send heartbeat to all brokers
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // Load balancing
    this.mQClientFactory.rebalanceImmediately();
}
Copy the code

This method is quite long, the whole process of consumer startup is here, let’s pick out the key points, to summarize what this method does.

  1. Obtaining a ClientmQClientFactoryThat type oforg.apache.rocketmq.client.impl.factory.MQClientInstance, if theproducerIf we remember anything, we’ll see,producerIn themQClientFactoryIs also of type
  2. Distinguish between broadcast mode and cluster modeoffsetStoreThe so-calledoffsetStore, is a memory used to store the offset of current consumer consumption information. In broadcast mode, the offset is stored in a local file, and in cluster mode, it is stored remotelybroker, broadcast mode and cluster mode, we will analyze in detail later
  3. Depending on the client instantiationconsumeMessageServiceThis is used to distinguish sequential messages from concurrent messages
  4. Start themQClientFactoryThat is, start the client
  5. updatetopicMessage, send heartbeat message tobrokerAnd handle the load balancing function

Above is DefaultMQPushConsumerImpl# start method of the main work. In fact, points 1, 2, and 3 above are configurations that are started in the mqClientFactory.start () method.

3. StartmQClientFactory:MQClientInstance#start

Let’s take a look at the startup process for mQClientFactory. Enter MQClientInstance#start:

public void start(a) throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // Get the address of nameServer
                if (null= =this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start the remote service of the client. This method configures the Netty client
                this.mQClientAPIImpl.start();
                // Start a scheduled task
                this.startScheduledTask();
                // Pull service, only for consumers
                this.pullMessageService.start();
                // Start the load balancing service for consumers only
                this.rebalanceService.start();
                // Enable internal producer
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK".this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw newMQClientException(...) ;default:
                break; }}}Copy the code

This method is also called during the initiation of the producer. We have already analyzed this method in the previous wave, but this time we will analyze this method from the perspective of the consumer.

The method does the following:

  1. To obtainnameServerThe address of the
  2. Start the remote service on the client. This method is configurednettyThe client
  3. Starting a Scheduled Task
  4. Start the pull message service
  5. Start the load balancing service

Starting a scheduled task MQClientInstance#startScheduledTask

private void startScheduledTask(a) {...// Persist the consumer's consumption offset every 5 seconds
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run(a) {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e); }}},1000 * 10.this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // Omit other scheduled tasks. }Copy the code

There are other scheduled tasks that are started in this method. Here we focus on the scheduled task that executes the MQClientInstance#persistAllConsumerOffset() method, which persists the offset of the current consumer consumption message. We’ll give an idea of the scheduled task in this section. I’ll look at the persistence process in more detail in the section analyzing offset persistence.

Back to the MQClientInstance#start process, steps 4 and 5 mainly start two services: pullMessageService and rebalanceService. The information for this class is as follows:

/** * PullMessageService */
public class PullMessageService extends ServiceThread {... }/** * RebalanceService */
public class RebalanceService extends ServiceThread {... }Copy the code

Both classes are subclasses of ServiceThread, and both classes’ start() methods are derived from ServiceThread:

public abstract class ServiceThread implements Runnable {

    // omit other code./** * start() method */
    public void start(a) { log.info(...) ;if(! started.compareAndSet(false.true)) {
            return;
        }
        stopped = false;
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start(); }}Copy the code

In code, ServiceThread implements the Runnable interface. In its start() method, it starts a thread whose execution logic comes from its subclass’s run() method. So to see the start() method of pullMessageService and rebalanceService execute logic, just look at the run() method of the corresponding class.

At this point, the consumer startup is complete, the services are started, and the consumer pull messages are handled by the cooperation of these services. Let’s analyze what these services do.

Limited to space, this article will stop here first, continue next.


Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.

This article was first published in the wechat public number Java technology exploration, if you like this article, welcome to pay attention to the public number, let us explore together in the world of technology!