This is the 23rd day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

The premise is introduced

In RocketMQ, there are generally two methods of retrieving messages: pull and push. The push operations were introduced in the previous chapter, and the next chapter will describe the consumption mechanism of pull.

DefaultMQPullConsumer

The biggest difference between DefaultMQPullConsumer and DefaultMQPushConsumer is that the program controls which queue messages to consume, which shift to start consuming from, and when to commit the shift. Here’s an inside look at DefaultMQPullConsumer.

Overall process execution

DefaultMQPullConsumer example

public class MQPullConsumer {
	private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
	public static void main(String[] args) throws MQClientException {
		DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
		consumer.setNamesrvAddr("name-serverl-ip:9876; name-server2-ip:9876");
		consumer.start();
		// Pull all message queues from the specified topic
		Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
		for(MessageQueue mq:mqs){
			try {
			    // Get the offset of the message, specified from store
			   long offset = consumer.fetchConsumeOffset(mq,true);
			    while(true){
				 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                                 putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
					switch(pullResult.getPullStatus()){
					case FOUND:
						List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                        for (MessageExt m : messageExtList) {
                            System.out.println(new String(m.getBody()));
                        }
						break;
					case NO_MATCHED_MSG:
						break;
					case NO_NEW_MSG:
						break;
					case OFFSET_ILLEGAL:
						break; }}}catch (Exception e) {
				e.printStackTrace();
			}
		}
		consumer.shutdown();
	}
	// Save the message subscript from the last consumption
	private static void putMessageQueueOffset(MessageQueue mq,
			long nextBeginOffset) {
	    OFFSE_TABLE.put(mq, nextBeginOffset);
	}
	// Gets the index of the message last consumed
	private static Long getMessageQueueOffset(MessageQueue mq) {
		Long offset = OFFSE_TABLE.get(mq);
		if(offset ! =null) {return offset;
		}
		return 0l; }}Copy the code
  • Consumer start: consumer.start();
  • Get all message queues under a topic: this is from nameserver based on topic
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
	// Walk through the queue
	for(MessageQueue mq:mqs){
		try {
        // Gets the current queue's consumption shift. The second parameter indicates whether the shift is fetched from local memory or from the broker. True indicates whether the shift is fetched from the broker
			long offset = consumer.fetchConsumeOffset(mq,true);
			while(true) {The second parameter indicates which tag messages can be consumed
				// The third parameter indicates the displacement from which the message is consumed
				// The fourth parameter indicates the maximum number of messages to be pulled at a time
				PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
}
Copy the code

DefaultMQPullConsumer’s overall process

Starting DefaultMQPullConsumer is done by calling the Start () method

DefaultMQPullConsumer pull source code analysis

Analyze the process by which DefaultMQPullConsumer pulls messages

consumer.fetchSubscribeMessageQueues("order-topic")
Copy the code

Pull all message queues from the specified topic

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
Copy the code

Core source code analysis

fetchSubscribeMessageQueues()
  • By calling the fetchSubscribeMessageQueues () method can obtain specified topic (GET_ROUTEINTO_BY_TOPIC) read queue information. It sends a GetRouteInfoRequest request to Nameserver for GET_ROUTEINTO_BY_TOPIC, and nameserver sends the number of read queues under the topic to the consumer. The consumer then creates a MessageQueue object with the same number of read queues using the following code.

  • Each MessageQueue object contains the topic, broker name, and read queue number. Finally fetchSubscribeMessageQueues () will MessageQueue object collection is returned to the caller.

  • Send a request to NameServer for Broker information and topic configuration information corresponding to the topic parameter, the TopicRouteData object.

 public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        try {
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            if(topicRouteData ! =null) {
                // select topicRouteData from topicRouteData
                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                if(! mqList.isEmpty()) {return mqList;
                } else {
                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty".null); }}}catch (Exception e) {
            throw new MQClientException(
                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);
        }
        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }
Copy the code

Traversal process TopicRouteData

If the TopicRouteData object is QueueData, the TopicRouteData object is QueueData. If the TopicRouteData object is QueueData, the TopicRouteData object is queuenums. Create readQueueNums MessageQueue objects and form a MessageQueue collection. Finally, it returns to the MessageQueue collection

public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
        List<QueueData> qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            if (PermName.isReadable(qd.getPerm())) {
                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                    MessageQueue mq = newMessageQueue(topic, qd.getBrokerName(), i); mqList.add(mq); }}}return mqList;
    }
Copy the code
consumer.fetchConsumeOffset

The method is used to obtain the message content starting from the offset position of the MessageQueue queue, where maxNums=32 is the maximum number of messages obtained, and offset is the starting consumption position of the MessageQueue object.

DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
Copy the code

FetchConsumeOffset () has two input arguments. The first parameter represents the queue, and the second parameter indicates whether the queue’s consumption offset is fetched from the broker, true from the broker, false from the local record, and, if not, from the broker. Here is from a local from RemoteBrokerOffsetStore. OffsetTable attribute, this property recorded the consumption of each queue displacement. OffsetTable is updated when a shift is retrieved from the broker.

PullBlockIfNotFound Pull information

Rocketmq provides multiple pull methods, using either the pullBlockIfNotFound() or pull() method. PullBlockIfNotFound will wait 30 seconds to return an empty result, and pull will wait 10 seconds to return an empty result.

But the pull method into and can adjust the timeout, whereas pullBlockIfNotFound need to modify the DefaultMQPullConsumer. ConsumerPullTimeoutMillis parameters. But the underlying logic of two method calls are the same, are called DefaultMQPullConsumerImpl. PullSyncImpl () method to get news. Let’s look at the pullSyncImpl() method.

public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true.this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
    }
Copy the code

Get the consumption progress of the MessageQueue queue to set the parameter offset. The method finally calls pullSyncImpl to get the relevant result data.

  • Parameter 1: message queue (by calling the consumer fetchSubscibeMessageQueue (topic) can get the corresponding topic need message queue).

  • Parameter 2: the expression to be filtered;

  • Parameter 3: the offset is the progress of the consumption queue;

  • Parameter 4: Take the maximum number of messages at a time.

DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
Copy the code
DefaultMQPullConsumerImpl pullSyncImpl implementation process
      private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
        long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.isRunning();
        // Check whether the entry is valid
        if (null == mq) {
            throw new MQClientException("mq is null".null);
        }

        if (offset < 0) {
            throw new MQClientException("offset < 0".null);
        }

        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0".null);
        }
        // Update the data of the rebalance service, because the rebalance service does not work, so the update data has no effect
        this.subscriptionAutomatically(mq.getTopic());

        int sysFlag = PullSysFlag.buildSysFlag(false, block, true.false);
        // Calculate the timeout. The block argument is true if the pullBlockIfNotFound method is called, and false otherwise
        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;

        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
        // Call PullAPIWrapper to pull messages from the broker,
        // Build the PullMessageRequest request object inside the pullKernelImpl method
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,/ / the queue
            subscriptionData.getSubString(),// Message filtering rules
            subscriptionData.getExpressionType(),
            isTagType ? 0L : subscriptionData.getSubVersion(),
            offset,// Pull the message displacement
            maxNums,// It is recommended that the broker return a maximum number of messages at one time. The default is 32
            sysFlag,
            0.// Set the commit shift, as you can see it is always 0, so the broker cannot record the effective shift, and the program needs to record the control commit shift itself
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,// The timeout period
            CommunicationMode.SYNC,
            null// The callback logic is null
        );
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
        //If namespace is not null , reset Topic without namespace.
        this.resetTopic(pullResult.getMsgFoundList());
        if (!this.consumeMessageHookList.isEmpty()) {
            ConsumeMessageContext consumeMessageContext = null;
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);
            this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);
            this.executeHookAfter(consumeMessageContext);
        }
        return pullResult;
    }


Copy the code

Check whether the topic of MessageQueue object in RebalanceImpl. SubscriptionInner: ConcurrentHashMap < String, SubscriptionData > variable, If not then consumerGroup, topic, subExpression as parameter called FilterAPI. BuildSubscriptionData (String consumerGroup, String topic, String subExpression) method constructs SubscriptionData save to RebalanceImpl. SubscriptionInner variable, The subExpression = “*”. This subscriptionAutomatically (mq) getTopic ()); / / build flags, logical or operation | = int sysFlag = PullSysFlag. BuildSysFlag (false, block, true, false);

SubscriptionData subscriptionData; SubExpression try {/ / to request parameter and calls FilterAPI consumerGroup, topic for parameters. BuildSubscriptionData (String consumerGroup, Stringtopic, String subExpression) method constructs a SubscriptionData object and returns SubscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); } long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; / / pull information from broker PullResult PullResult = this. PullAPIWrapper. PullKernelImpl (mq, subscriptionData getSubString (), 0 l, offset, maxNums, sysFlag, 0, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis, CommunicationMode.SYNC, null ); // Decodes the pulled message, filters it, and executes a callback, And put parsing the message list in the MsgFoundList enclosing pullAPIWrapper. ProcessPullResult (mq, pullResult subscriptionData); if (! this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = null; consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(mq); consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } return pullResult; }Copy the code

Compare Push and Pull operations

  • Push – Advantages: timeliness and convenient realization of unified server processing
  • Disadvantages: easy to cause stacking, load performance is not controllable
  • Pull – Advantages: Easy to obtain message status, controllable load balancing performance
  • Pull – Disadvantages: Poor timeliness

Using DefaultMQPullConsumer to pull the message, the commit shift sent to the broker is always zero, so the broker cannot record the effective shift, and the program must record and control the commit shift itself.

Data reference

  • Blog.csdn.net/weixin_3830…

  • Blog.csdn.net/weixin_3830…