This is the 22nd day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

A review of the premises of RocketMQ

RocketMQ is a distributed, queue-model messaging middleware with the following features:

  1. Can ensure strict message order
  2. Provides rich message pull patterns
  3. Effective subscriber level expansion capabilities
  4. Real-time message subscription mechanism
  5. Hundred million message accumulation capability

Why RocketMQ

  1. Emphasizes that the cluster has no single point, can be expanded, any point is highly available, level can be expanded
  2. Massive message stacking capability, low latency of writing after message stacking
  3. Support for tens of thousands of queues
  4. Message failure retry mechanism
  5. Message searchable
  6. Open Source community active
  7. Maturity has passed the test of Taobao double eleven

The evolution of RocketMQ

RocketMQ open source uses files as the persistence tool. Alibaba internal open source performance is higher, using oceanBase as the persistence tool. In RocketMQ1.x and 2.x, zooKeeper is used to manage clusters. 3.x is starting to use Nameserver instead of ZK, which is more lightweight. In addition, RocketMQ clients can operate in two ways: DefaultMQPushConsumer and DefaultMQPullConsumer.

Maven configuration for DefaultMQPushConsumer

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.3.0</version>
</dependency>
Copy the code

DefaultMQPushConsumer Example

  1. CONSUME_FROM_LAST_OFFSET: start consuming from the last position of the queue for the first time and then start consuming from the last time
  2. CONSUME_FROM_FIRST_OFFSET: start consuming from the initial position of the queue for the first time, then start consuming again following the progress of the last consuming
  3. CONSUME_FROM_TIMESTAMP: the first start consumes from the specified point in time and the subsequent start consumes from the last time

The first start refers to a consumer who has never made a purchase. If the consumer has made a purchase, the position of the consumer’s purchase will be recorded at the broker side. If the consumer hangs and starts again, the progress of the last purchase will automatically start

public class MQPushConsumer {
    public static void main(String[] args) throws MQClientException {
        String groupName = "rocketMqGroup1";
        // It is used to group multiple consumers together to improve concurrent processing
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        // Set the nameServer address. separated
        consumer.setNamesrvAddr("name-serverl-ip:9876; name-server2-ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);
        / / subscribe to the topic, to specify message filtering, for example: "TopicTest", "tagl | | tag2 | | tag3", said * or null topic all messages
        consumer.subscribe("order-topic"."*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List
       
         mgs, ConsumeConcurrentlyContext consumeconcurrentlycontext)
        {
                System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); }}Copy the code
  • CLUSTERING: Default mode, where each consumer in the same ConsumerGroup(groupName same) consumes only a portion of the subscribed messages, and all the consumer messages in the same ConsumerGroup add up
  • Subscribe to topic as a whole for load balancing purposes
  • BROADCASTING: Each consumer in the same ConsumerGroup consumes all messages on the subscribed topic, i.e. a consumption is distributed multiple times and consumed by multiple consumers.

ConsumeConcurrentlyStatus. RECONSUME_LATER boker messageDelayLevel launched retry will be according to the set, the default 16 times.

DefaultMQPushConsumerImpl each object in the main function is as follows:

RebalancePushImpl: is responsible for determining which queues the current consumer should consume messages from;

  • 1) PullAPIWrapper: a long connection that pulls messages from the broker and executes message consumption logic using a ConsumeMessageService callback to the user’s Listener;
  • 2) ConsumeMessageService: to achieve the so-called “push-passive” consumption mechanism; Messages pulled from the Broker are encapsulated as ConsumeRequest and submitted to ConsumeMessageSerivce. The service calls back the user’s Listener consumption messages.
  • 3) OffsetStore: Maintain the consumption record of current consumer (offset); There are two implementations, Local and Rmote. Local is stored on Local disk and is suitable for BROADCASTING broadcast consumption mode. Remote stores the consumption progress on the Broker, which is suitable for CLUSTERING consumption mode.
  • 4) MQClientFactory: responsible for managing clients (consumer and producer) and providing multiple functional interfaces for various services (Rebalance, PullMessage, etc.) to call; Most of the logic is done in this class;

Consumer. RegisterMessageListener implementation process:

/**
     * Register a callback to execute on message arrival for concurrent consuming.
     * @param messageListener message handling callback.
     */
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }
Copy the code

Through the source code can be seen that the main implementation process in DefaultMQPushConsumerImpl class consumer. After the start call DefaultMQPushConsumerImpl synchronous start method

public synchronized void start(a) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}".this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() ! =null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                  this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if(! registerOK) {this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.".this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }
Copy the code

Through mQClientFactory. Start (); We found him calling

public void start(a) throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null= =this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                  this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK".this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.".null);
                default:
                    break; }}}Copy the code

There are multiple starts in this method, we’ll look at pullMessageService. Start (); The pullMessageService is a pullMessageService. The pullMessageService is a pullMessageService.

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if(consumer ! =null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); }}Copy the code

We found that actually he was message through DefaultMQPushConsumerImpl pullMessage method of a class of logic to handle.

PullRequest Pull mode

The PullRequest is explained here. We already mentioned that The push mode of RocketMQ is encapsulated by pull mode, and the PullRequest is implemented by long polling.

The long polling mode has both the advantages of pull and the real-time advantages of push mode.

  • In push mode, after receiving the message, the server actively pushes the message to the client, which has high real-time performance. The disadvantage is that the workload of the server is heavy, affecting the performance. The processing capability of the client is different and the status of the client is not controlled by the server. If the client cannot process messages in time, messages may accumulate and affect normal services.

  • In the pull mode, the client loop pulls messages from the server and takes the initiative to pull messages from the server. The disadvantage is that the loop time is not easy to set, and the loop time is too short, which wastes CPU resources and reduces the processing capability of the client. Sometimes messages are not processed in a timely manner.

The long polling approach can combine the advantages of both
  1. Check whether the drop of the ProcessQueue object in the PullRequest object is true (the corresponding ProcessQueue object is maintained when creating a pull message request for MessageQueue under topic in the RebalanceService thread, If the Consumer does not subscribe to the topic, set the object’s drop to true. If the request is thought to have been cancelled, the method is jumped directly.
  2. Update PullRequest object ProcessQueue object timestamp (ProcessQueue. LastPullTimestamp) for the current timestamp;
  3. Check whether the Consumer in the operation, namely DefaultMQPushConsumerImpl. Whether serviceState to RUNNING; If it is not running state or a state of suspended (DefaultMQPushConsumerImpl. Pause = true), Call the PullMessageService. ExecutePullRequestLater (PullRequest PullRequest, long timeDelay) method to delay again pull news, including timeDelay = 3000; The purpose of this method is to put the PullRequest object into the PullMessageService. PullRequestQueue again after 3 seconds; And jump out of the method;
  4. Perform flow control. If msgCount ProcessQueue object is greater than the consumption of the flow control threshold (DefaultMQPushConsumer pullThresholdForQueue, the default value is 1000). Call the PullMessageService executePullRequestLater method, the PullRequest request into PullMessageService again after 50 milliseconds. PullRequestQueue queue; And jump out of the method;
  5. If not order consumption (namely DefaultMQPushConsumerImpl consumeOrderly is false), The ProcessQueue object msgTreeMap is checked: the difference between the first and last key of the TreeMap

    variable, which represents the queried queueoffset queueoffset; If the difference is greater than the threshold (by DefaultMQPushConsumer consumeConcurrentlyMaxSpan specified, the default is 2000), call the PullMessageService. ExecutePullRequestLater method, After 50 milliseconds to put the PullRequest request in PullMessageService. PullRequestQueue queue; And jump out of the method;
    ,messageext>
  6. With PullRequest. Topic messageQueue object value for the parameter from RebalanceImpl. SubscriptionInner: ConcurrentHashMap, SubscriptionData> get the SubscriptionData object. If the object is null, call executePullRequestLater for concurrency purposes and retry later. And jump out of the method;
  7. If the message model for cluster pattern (RebalanceImpl messageModel equal CLUSTERING), With MessageQueue PullRequest object variable values, type = READ_FROM_MEMORY (offset value) obtained from the memory consumption schedule call DefaultMQPushConsumerImpl as parameters. The readOffset(MessageQueue MQ, ReadOffsetType Type) method of the offsetStore object (initialized to RemoteBrokerOffsetStore object) retrieves the consumption progress offset value from local memory. If offset is greater than 0, set temporary variable commitOffsetEnable to true; otherwise, set it to false. This offset is used as the commitOffset parameter in the pullKernelImpl method. After the Broker pulls the message, the commitOffsetEnable parameter is used to determine whether to update the progress of the message. The logic of the readOffset method is: To the MessageQueue object from RemoteBrokerOffsetStore refs. OffsetTable: ConcurrentHashMap < MessageQueue, AtomicLong > variable offset for consumption schedule; If the offset is not null, the value is returned, otherwise -1 is returned.
  8. The subscription needs to be updated after each pull message (as indicated by the defaultmQPUSHConsumer.postSubscriptionWhenpull parameter, Defaults to false) and topic value parameters from RebalanceImpl. SubscriptionInner get classFilterMode SubscriptionData object is equal to false (the default is false), The third byte of the sysFlag flag is set to 1, otherwise it is set to 0;
  9. The first byte of the sysFlag flag is set to the commitOffsetEnable value; The second byte (the suspend flag) is set to 1; The fourth byte is set to the value of classFilterMode;
  10. Initialize the anonymous inner class PullCallback, which implements the onSucess/onException method. This method only calls back in the case of an asynchronous request;
  11. Call the underlying pull message API interface:

PullAPIWrapper.pullKernelImpl

PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback PullCallback) to pull messages.

The callback class PullCallback is passed into the method, and when the message is pulled asynchronously, the method of the callback class is called back when the response is received.

public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
        try {
            this.makeSureStateOK();
        } catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            return;
        }
        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}".this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }
        long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) = =0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}".this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);  }return;
        }
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) = =0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}".this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);  }return;
        }
        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) = =0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return; }}else {
            if (processQueue.isLocked()) {
                if(! pullRequest.isLockedFirst()) {final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) {
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                            pullRequest, offset);
                    }
                    pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); }}else {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.info("pull message later because not locked in broker, {}", pullRequest);
                return; }}final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
        }
        final long beginTimestamp = System.currentTimeMillis();
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if(pullResult ! =null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); }}if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                                @Override
                                public void run(a) {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e); }}},10000);
                            break;
                        default:
                            break; }}}@Override
            public void onException(Throwable e) {
                if(! pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e);
                }
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); }};boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if(sd ! =null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && ! sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); }int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true.// suspendsubExpression ! =null.// subscription
            classFilter // class filter
        );
        try {
            // How does a client pull a message
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                // The message communication mode is asynchronous
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); }}Copy the code

Send a remote request pull message

In MQClientAPIImpl. PullMessage method, according to the reference communicationMode values are divided into asynchronous pull and synchronous pull two kinds.

Whether the pull is asynchronous or synchronous, a ResponseFuture object is constructed before the pull request is sent, taking the serial number of the request message as the key value. Deposit NettyRemotingAbstract. ResponseTable: ConcurrentHashMap, ResponseFuture > variable, the variable in several ways will deal with:

  1. Delete the corresponding record in the responseTable variable directly after sending failure;
  2. Upon receipt of the response message, the ResponseFuture object is looked up from the responseTable with the serial number in the response message (returned by the server as the serial number of the request message) and the responseCommand variable for that object is set. If the synchronous awakens ResponseFuture of waiting for a response. The waitResponse method; If the asynchronous calls ResponseFuture. ExecuteInvokeCallback callback logic to handle () method is complete;
  3. In NettyRemotingClient. Start () starts, will also be initialization tasks regularly, the timing task every 1 seconds scanning responseTable list regularly, traverse ResponseFuture object in the list, check whether waiting for a response timeout, if overtime, Call responseFuture.executeInvokecallBack () and remove the object from the responseTable list;
public PullResult pullMessage(
        final String addr,
        final PullMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        switch (communicationMode) {
            case ONEWAY:
                assert false;
                return null;
            case ASYNC:
                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
                return null;
            case SYNC:
                return this.pullMessageSync(addr, request, timeoutMillis);
            default:
                assert false;
                break;
        }
        return null;
    }
Copy the code

Synchronous pull

For the synchronous way, call MQClientAPIImpl. PullMessageSync (String addr, RemotingCommand request, long timeoutMillis) method, general steps are as follows:

  1. Call RemotingClient. InvokeSync (String addr, RemotingCommand request, long timeoutMillis) methods:
    • Gets Channel information for the Broker address. According to the address of the broker from RemotingClient. ChannelTables: ConcurrentHashMap, ChannelWrapper > extract ChannelWrapper object and returns the object variable Channel variables; If there is no ChannelWrapper object, establish a new connection to the broker address and store the connection information in the channelTables variable for future use.
    • If NettyRemotingClient. RpcHook: rpcHook variable is not null (the variable in the application layer initialization DefaultMQPushConsumer or DefaultMQPullConsumer object into the value), Call rpcook. doBeforeRequest(String remoteAddr, RemotingCommand Request);
    • Call NettyRemotingAbstract. InvokeSyncImpl (Channel Channel, RemotingCommand request, long timeoutMillis) method, the method of logic is as follows:
      • A) initialize the ResponseFuture object with the request sequence number (OPAUe), timeout; And will the ResponseFuture object into NettyRemotingAbstract. ResponseTable: ConcurrentHashMap variable;
      • B) Call channel.writeAndFlush (Object MSG) to send RemotingCommand to the Broker; Then call addListener(GenericFutureListener
        > listener) method adds an internal anonymous class: This internal anonymous class implements the operationComplete method of the ChannelFutureListener interface, which calls back the listener class’s operationComplete method after sending. In this method, The channelFuture.isSuccess () method is called to check if it was successfully sent. If so, set the sendRequestOK of the ResponseFuture object to true and exit the callback method to await the response. If you don’t succeed then set ResponseFuture object sendRequestOK equal to false, then from NettyRemotingAbstract. ResponseTable deleted this request serial number (opaue) records, Set ResponseFuture object responseCommand equal to null, and awaken ResponseFuture. WaitResponse (long timeoutMillis) method to wait;
      • C) call ResponseFuture. WaitResponse (long timeoutMillis) method is waiting for a response results; When sending failed or received response message (see the 5.10.3 section) or timeout situation will wake the method returns ResponseFuture. ResponseCommand variable values;
      • D) if the step on the returned responseCommand value is null, throw an exception: if the ResponseFuture. SendRequestOK to true, the thrown RemotingTimeoutException abnormalities, Otherwise throw RemotingSendRequestException exception;
      • E) If the responseCommand value returned in the previous step is not null, return the responseCommand variable value;
    • If NettyRemotingClient. RpcHook: rpcHook variable is not null, call the rpcHook. DoAfterResponse (String remoteAddr, RemotingCommand request) method;
  • The RemotingCommand object returned from the previous step is called MQClientAPIImpl. ProcessPullResponse (RemotingCommand) for the parameter The response) method parses and encapsulates the return object into a PullResultExt object and returns it to the caller. The resulting state transition of the response message is as follows:
    • PullResultExt. PullStatus =FOUND;
    • If the RemotingCommand object Code is PULL_NOT_FOUND, PullResultExt. PullStatus = NO_NEW_MSG;
    • PullResultExt. PullStatus = NO_MATCHED_MSG;
    • If the RemotingCommand object Code is PULL_OFFSET_MOVED, PullResultExt. PullStatus = OFFSET_ILLEGAL;
@Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if(channel ! =null && channel.isActive()) {
            try {
                if (this.rpcHook ! =null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                if (this.rpcHook ! =null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throwe; }}else {
            this.closeChannel(addr, channel);
            throw newRemotingConnectException(addr); }}Copy the code

GetMQClientAPIImpl (). PullMessage is finally written to and flushed to the queue via channel. After receiving a new message request, if there is no message in the queue and it is not in a hurry to return, the server passes a cyclic state, waitForRunning for a period of time by default for 5 seconds, and then check again. If the broker has no new message, If a new message is received during the wait, the notifyMessageArriving function is called to return the request result. The core of “long polling” is that the Broker holds a request from a client for a short period of time. When a new message arrives, it immediately returns the message to the Consumer using the existing connection. Long polling is in the hands of the consumer, even if the broker has a large backlog of messages will not actively push to the consumer.