Self-praise: Dubbo Implementation Principles and Source Code Parsing — The Best Collection



Praise yourself: D Database Entity Design Collection

Abstract: the original source www.iocoder.cn/RocketMQ/me… “Taro source” welcome to reprint, keep the summary, thank you!

This article is based on RocketMQ 4.0.x official release

  • 1, an overview of the
  • 2, Consumer
  • 3. PushConsumer
  • PushConsumer subscribe
    • DefaultMQPushConsumerImpl# subscribe (…).
      • FilterAPI. BuildSubscriptionData (…).
    • DefaultMQPushConsumer# registerMessageListener (…).
  • PushConsumer message queue allocation
    • RebalanceService
    • MQClientInstance# doRebalance (…).
    • DefaultMQPushConsumerImpl# doRebalance (…).
    • RebalanceImpl# doRebalance (…).
      • RebalanceImpl# rebalanceByTopic (…).
      • RebalanceImpl# removeUnnecessaryMessageQueue (…).
        • RebalancePushImpl# removeUnnecessaryMessageQueue (…).
        • [PullConsumer] RebalancePullImpl# removeUnnecessaryMessageQueue (…).
      • RebalancePushImpl# dispatchPullRequest (…).
        • DefaultMQPushConsumerImpl# executePullRequestImmediately (…).
      • AllocateMessageQueueStrategy
        • AllocateMessageQueueAveragely
        • AllocateMessageQueueByMachineRoom
        • AllocateMessageQueueAveragelyByCircle
        • AllocateMessageQueueByConfig
  • PushConsumer Reads the consumption progress
    • RebalancePushImpl# computePullFromWhere (…).
    • [PullConsumer] RebalancePullImpl# computePullFromWhere (…).
  • PushConsumer pulls the message
    • PullMessageService
    • DefaultMQPushConsumerImpl# pullMessage (…).
      • PullAPIWrapper# pullKernelImpl (…).
        • PullAPIWrapper# recalculatePullFromWhichNode (…).
        • MQClientInstance# findBrokerAddressInSubscribe (…).
      • PullAPIWrapper# processPullResult (…).
      • ProcessQueue# putMessage (…).
    • conclusion
  • PushConsumer message
    • ConsumeMessageConcurrentlyService submitted a consumer request
      • ConsumeMessageConcurrentlyService# submitConsumeRequest (…).
      • ConsumeMessageConcurrentlyService#submitConsumeRequestLater
    • ConsumeRequest
    • ConsumeMessageConcurrentlyService# processConsumeResult (…).
      • ProcessQueue# removeMessage (…).
    • ConsumeMessageConcurrentlyService# cleanExpireMsg (…).
      • ProcessQueue# cleanExpiredMsg (…).
  • PushConsumer returns a failure message
    • DefaultMQPushConsumerImpl# sendMessageBack (…).
      • MQClientAPIImpl# consumerSendMessageBack (…).
  • 8, Consumer progress
    • OffsetStore
      • OffsetStore# load (…).
        • LocalFileOffsetStore# load (…).
          • OffsetSerializeWrapper
        • RemoteBrokerOffsetStore# load (…).
      • OffsetStore# readOffset (…).
        • LocalFileOffsetStore# readOffset (…).
        • RemoteBrokerOffsetStore# readOffset (…).
      • OffsetStore# updateOffset (…).
      • OffsetStore# persistAll (…).
        • LocalFileOffsetStore# persistAll (…).
        • RemoteBrokerOffsetStore# persistAll (…).
        • MQClientInstance# persistAllConsumerOffset (…).
  • 9, the end

🙂🙂🙂 follow wechat public number:

  1. RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
  2. RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
  3. Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
  4. New source code parsing articles are notified in real time. It’s updated about once a week.
  5. Serious source communication wechat group.

1, an overview of the

RocketMQ source code Analysis — Message pull and consumption (part 1)

The main analysis of Consumer in the consumption logic involved in the source code.

2, Consumer

MQ provides two types of consumers:

  • PushConsumer:
    • Used in most scenarios.
    • Despite the namePushAt the beginning of the actual implementation, usePullMethod is implemented. throughPull On and on and onpollingBrokerGet the message. When there are no new messages,BrokerPending requestUntil a new message is generated, the suspension is cancelled and a new message is returned. In this case, basicallyBrokerTake the initiative toPushdoClose to theOf course, there is a corresponding loss of real time. Principle of similarLong polling (Long-Polling ).
  • PullConsumer

The PullConsumer will skip the sequential consumption. The PullConsumer will skip the sequential consumption. The PullConsumer will skip the sequential consumption.

3. PushConsumer

Take a look at the components that PushConsumer contains and how they interact:

  • RebalanceService: balance message queue service, responsible for allocating currentConsumerConsumable message queues (MessageQueue). When there is a newConsumerIs added to or removed from the message queue.
  • PullMessageService: pull message service,On and on and onfromBrokerPull the message and submit the consuming task toConsumeMessageService.
  • ConsumeMessageService: Consuming messaging services,On and on and onConsume messages and process the consumption results.
  • RemoteBrokerOffsetStore:ConsumerConsumption schedule management, responsible for fromBrokerGet consumption progress, synchronous consumption progress toBroker.
  • ProcessQueue: message processing queue.
  • MQClientInstance: encapsulation forNamesrv.BrokerThe API call provided toProducer,ConsumerUse.

PushConsumer subscribe

DefaultMQPushConsumerImpl# subscribe (…).

1: public void subscribe(String topic, String subExpression) throws MQClientException {2: try {3: // Create the subscription data 4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // 5: topic, subExpression); 6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); 7: // Synchronize Consumer information to Broker via heartbeat 8: if (this.mqclientFactory! = null) { 9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 10: } 11: } catch (Exception e) { 12: throw new MQClientException("subscription exception", e); 13:14:}}Copy the code
  • Description: SubscribeTopic
  • Lines 3 through 6: Create subscription data. Parsing the see: FilterAPI. BuildSubscriptionData (…). .
  • Lines 7 to 10: Sync via heartbeatConsumerInformation to theBroker.

FilterAPI. BuildSubscriptionData (…).

1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, 2: String subString) throws Exception { 3: SubscriptionData subscriptionData = new SubscriptionData(); 4: subscriptionData.setTopic(topic); 5: subscriptionData.setSubString(subString); 6: / / handle the subscribe expression 7: if (null = = the subString | | the subString. Equals (SubscriptionData. SUB_ALL) | | the subString. The length () = = 0) {8: subscriptionData.setSubString(SubscriptionData.SUB_ALL); 9: } else { 10: String[] tags = subString.split("\\|\\|"); 11: if (tags.length > 0) { 12: for (String tag : tags) { 13: if (tag.length() > 0) { 14: String trimString = tag.trim(); 15: if (trimString.length() > 0) { 16: subscriptionData.getTagsSet().add(trimString); 17: subscriptionData.getCodeSet().add(trimString.hashCode()); 18: } 19: } 20: } 21: } else { 22: throw new Exception("subString split error"); 23: } 24: } 25: 26: return subscriptionData; 27:}Copy the code
  • Description: according toTopicAnd subscription expression to create subscription data
  • SubscriptionData. SubVersion = System. CurrentTimeMillis ().

DefaultMQPushConsumer# registerMessageListener (…).

1: public void registerMessageListener(MessageListenerConcurrently messageListener) { 2: this.messageListener = messageListener; 3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); 4:}Copy the code
  • Note: Register message listeners.

PushConsumer message queue allocation

RebalanceService

1: public class RebalanceService extends ServiceThread {2: 3: /** 4: * Wait interval 5: */ 6: private static long waitInterval = 7: Long.parseLong(System.getProperty( 8: "rocketmq.client.rebalance.waitInterval", "20000")); 9: 10: private final Logger log = ClientLogger.getLog(); 11: /** 12: * MQClient object 13: */ 14: private final MQClientInstance mqClientFactory; 15: 16: public RebalanceService(MQClientInstance mqClientFactory) { 17: this.mqClientFactory = mqClientFactory; 18: } 19: 20: @Override 21: public void run() { 22: log.info(this.getServiceName() + " service started"); 23: 24: while (! this.isStopped()) { 25: this.waitForRunning(waitInterval); 26: this.mqClientFactory.doRebalance(); 27: } 28: 29: log.info(this.getServiceName() + " service end"); 30: } 31: 32: @Override 33: public String getServiceName() { 34: return RebalanceService.class.getSimpleName(); 35:36:}}Copy the code
  • Balancing message queue service, responsible for allocating currentConsumerConsumable message queues (MessageQueue).
  • Line 26: Call MQClientInstance#doRebalance(…) Allocate message queues. There are currently three cases of triggering:

    • Such asLine 25Wait timeout is invoked every 20 seconds.
    • PushConsumerWhen started, callrebalanceService#wakeup(...)The trigger.
    • BrokernoticeConsumerWhen you add or remove,ConsumerCall in response to notificationrebalanceService#wakeup(...)The trigger.

    MQClientInstance#doRebalance(…) .

MQClientInstance# doRebalance (…).

1: public void doRebalance() { 2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { 3: MQConsumerInner impl = entry.getValue(); 4: if (impl ! = null) { 5: try { 6: impl.doRebalance(); 7: } catch (Throwable e) { 8: log.error("doRebalance exception", e); 9:} 10:} 11:} 12:}Copy the code
  • Note: Iterate over the currentClientContains theconsumerTable( ConsumerCollection) to perform message queue allocation.
  • doubt: Now the code is debugged,consumerTableContains onlyConsumeroneself 😈 have answer to this question greatly, please answer next.
  • Line 6: CallMQConsumerInner#doRebalance(...)Queue allocation.DefaultMQPushConsumerImpl,DefaultMQPullConsumerImplThe interface methods are implemented respectively.DefaultMQPushConsumerImpl#doRebalance(...)See:DefaultMQPushConsumerImpl# doRebalance (…)..

DefaultMQPushConsumerImpl# doRebalance (…).

1: public void doRebalance() { 2: if (! this.pause) { 3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); 4:5:}}Copy the code

RebalanceImpl# doRebalance (…).

5: */ 6: public void doRebalance(final Boolean isOrder) {6: public void doRebalance(final Boolean isOrder) {7: / / assign each topic message queue 8: Map < String, SubscriptionData > subTable = this. GetSubscriptionInner (); 9: if (subTable ! = null) { 10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { 11: final String topic = entry.getKey(); 12: try { 13: this.rebalanceByTopic(topic, isOrder); 14: } catch (Throwable e) { 15: if (! topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 16: log.warn("rebalanceByTopic Exception", e); 19:18:17:}}} 20:21:} / / remove not subscribe to the topic of the corresponding message queue 22: enclosing truncateMessageQueueNotMyTopic (); 23:} 24, 25, 26: / * * * remove not subscribe message queue 27: * / 28: private void truncateMessageQueueNotMyTopic () {29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); 30: for (MessageQueue mq : this.processQueueTable.keySet()) { 31: if (! subTable.containsKey(mq.getTopic())) { 32: 33: ProcessQueue pq = this.processQueueTable.remove(mq); 34: if (pq ! = null) { 35: pq.setDropped(true); 36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq); 37:} 38:} 39:} 40:}Copy the code
  • #doRebalance(...)Description: Execute the allocation message queue.
    • Lines 7 to 20: Cycle to subscribe to the topic collection (subscriptionInner), assign each oneTopicMessage queue of.
    • Line 22: Remove unsubscribedTopicMessage queue of.
  • #truncateMessageQueueNotMyTopic(...)Description: Remove unsubscribed message queues. When callingDefaultMQPushConsumer#unsubscribe(topic)Only the subscribed topic collection (subscriptionInner), corresponding message queue removal in this method.

RebalanceImpl# rebalanceByTopic (…).

  1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
  2:     switch (messageModel) {
  3:         case BROADCASTING: {
  4:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  5:             if (mqSet != null) {
  6:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
  7:                 if (changed) {
  8:                     this.messageQueueChanged(topic, mqSet, mqSet);
  9:                     log.info("messageQueueChanged {} {} {} {}", //
 10:                         consumerGroup, //
 11:                         topic, //
 12:                         mqSet, //
 13:                         mqSet);
 14:                 }
 15:             } else {
 16:                 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 17:             }
 18:             break;
 19:         }
 20:         case CLUSTERING: {
 21:             // 获取 topic 对应的 队列 和 consumer信息
 22:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 23:             List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
 24:             if (null == mqSet) {
 25:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 26:                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 27:                 }
 28:             }
 29: 
 30:             if (null == cidAll) {
 31:                 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
 32:             }
 33: 
 34:             if (mqSet != null && cidAll != null) {
 35:                 // 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。
 36:                 List<MessageQueue> mqAll = new ArrayList<>();
 37:                 mqAll.addAll(mqSet);
 38: 
 39:                 Collections.sort(mqAll);
 40:                 Collections.sort(cidAll);
 41: 
 42:                 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
 43: 
 44:                 // 根据 队列分配策略 分配消息队列
 45:                 List<MessageQueue> allocateResult;
 46:                 try {
 47:                     allocateResult = strategy.allocate(//
 48:                         this.consumerGroup, //
 49:                         this.mQClientFactory.getClientId(), //
 50:                         mqAll, //
 51:                         cidAll);
 52:                 } catch (Throwable e) {
 53:                     log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
 54:                         e);
 55:                     return;
 56:                 }
 57: 
 58:                 Set<MessageQueue> allocateResultSet = new HashSet<>();
 59:                 if (allocateResult != null) {
 60:                     allocateResultSet.addAll(allocateResult);
 61:                 }
 62: 
 63:                 // 更新消息队列
 64:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
 65:                 if (changed) {
 66:                     log.info(
 67:                         "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
 68:                         strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
 69:                         allocateResultSet.size(), allocateResultSet);
 70:                     this.messageQueueChanged(topic, mqSet, allocateResultSet);
 71:                 }
 72:             }
 73:             break;
 74:         }
 75:         default:
 76:             break;
 77:     }
 78: }
 79: 
 80: /**
 81:  * 当负载均衡时,更新 消息处理队列
 82:  * - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
 83:  * - 增加 不在processQueueTable && 存在于mqSet 里的消息队列
 84:  *
 85:  * @param topic Topic
 86:  * @param mqSet 负载均衡结果后的消息队列数组
 87:  * @param isOrder 是否顺序
 88:  * @return 是否变更
 89:  */
 90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
 91:     boolean changed = false;
 92: 
 93:     // 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
 94:     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
 95:     while (it.hasNext()) { // TODO 待读:
 96:         Entry<MessageQueue, ProcessQueue> next = it.next();
 97:         MessageQueue mq = next.getKey();
 98:         ProcessQueue pq = next.getValue();
 99: 
100:         if (mq.getTopic().equals(topic)) {
101:             if (!mqSet.contains(mq)) { // 不包含的队列
102:                 pq.setDropped(true);
103:                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104:                     it.remove();
105:                     changed = true;
106:                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107:                 }
108:             } else if (pq.isPullExpired()) { // 队列拉取超时,进行清理
109:                 switch (this.consumeType()) {
110:                     case CONSUME_ACTIVELY:
111:                         break;
112:                     case CONSUME_PASSIVELY:
113:                         pq.setDropped(true);
114:                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115:                             it.remove();
116:                             changed = true;
117:                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118:                                 consumerGroup, mq);
119:                         }
120:                         break;
121:                     default:
122:                         break;
123:                 }
124:             }
125:         }
126:     }
127: 
128:     // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。
129:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组
130:     for (MessageQueue mq : mqSet) {
131:         if (!this.processQueueTable.containsKey(mq)) {
132:             if (isOrder && !this.lock(mq)) {
133:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134:                 continue;
135:             }
136: 
137:             this.removeDirtyOffset(mq);
138:             ProcessQueue pq = new ProcessQueue();
139:             long nextOffset = this.computePullFromWhere(mq);
140:             if (nextOffset >= 0) {
141:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142:                 if (pre != null) {
143:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144:                 } else {
145:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146:                     PullRequest pullRequest = new PullRequest();
147:                     pullRequest.setConsumerGroup(consumerGroup);
148:                     pullRequest.setNextOffset(nextOffset);
149:                     pullRequest.setMessageQueue(mq);
150:                     pullRequest.setProcessQueue(pq);
151:                     pullRequestList.add(pullRequest);
152:                     changed = true;
153:                 }
154:             } else {
155:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156:             }
157:         }
158:     }
159: 
160:     // 发起消息拉取请求
161:     this.dispatchPullRequest(pullRequestList);
162: 
163:     return changed;
164: }
Copy the code
  • #rebalanceByTopic(...)Description: DistributionTopicMessage queue of.
    • Lines 3 to 19: Broadcast mode (BROADCASTING), distributionTopicThe correspondingallMessage queues.
    • Lines 20 to 74: Cluster mode (CLUSTERING), distributionTopicThe correspondingPart of theMessage queues.
      • Lines 21 to 40: getTopicCorresponding message queues and consumers, and sort them. For all theConsumerThe message queue is allocated locally and sorted to ensure that all messages are sortedConsumerThe order is consistent.
      • Lines 42 to 61: According to the queue allocation policy (AllocateMessageQueueStrategy) allocates message queues. See:AllocateMessageQueueStrategy.
      • Lines 63 to 72: UpdatesTopicCorresponding message queue.
  • #updateProcessQueueTableInRebalance(...)Note: Updates when allocating queuesTopicCorresponding message queue and returns whether there has been a change.
    • Lines 93 to 126: Remove message queues that do not exist in allocation (mqSet) message processing queue (processQueueTable).
      • Line 103: Remove unwanted message queues. Parsing the see: RebalancePushImpl# removeUnnecessaryMessageQueue (…). .
      • Lines 108 to 120: queue pull timeout, i.eCurrent time - Last pull message time > 120s(120S configurable), the decision occursBUGThe message queue is removed when the message is not pulled for a long time. After removal, below# add queue logic #You can rejoin the new message queue.
    • Line 128 to 158: Add the allocated message queue (mqSet) the new message queue.
      • Lines 132 to 135:Order consumptionRelated skip, detailed analysis see:RocketMQ source Code Analysis — Message ordering and consumption.
      • Line 137: Removes the message queue consumption progress.
      • Line 139: Get the queue consumption progress. RebalancePushImpl#computePullFromWhere(…) .
      • Line 140 to 156: Add new consumer processing queue, add consumer pull message request.
    • Line 161: Initiates the new message queue message pull request. RebalancePushImpl#dispatchPullRequest(…) .

RebalanceImpl# removeUnnecessaryMessageQueue (…).

RebalancePushImpl# removeUnnecessaryMessageQueue (…).

1: public Boolean removeUnnecessaryMessageQueue (MessageQueue mq, ProcessQueue pq) {2: / / synchronization queue consumption progress, and remove it. 3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); 4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); 5:6: / / TODO order consumption if (this. DefaultMQPushConsumerImpl. IsConsumeOrderly (7) : && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { 8: try { 9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { 10: try { 11: return this.unlockDelay(mq, pq); 12: } finally { 13: pq.getLockConsume().unlock(); 14: } 15: } else { 16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", // 17: mq, // 18: pq.getTryUnlockTimes()); 19: 20: pq.incTryUnlockTimes(); 21: } 22: } catch (Exception e) { 23: log.error("removeUnnecessaryMessageQueue Exception", e); 24: } 25: 26: return false; 27: } 28: return true; 29:}Copy the code
  • Remove unwanted message queue-related information and return whether the removal was successful.
  • Lines 2 to 4: synchronize the consumption progress of the queue and remove it.
  • Lines 5 to 27:Order consumptionRelated skip, detailed analysis see:RocketMQ source Code Analysis — Message ordering and consumption.

[PullConsumer]RebalancePullImpl# removeUnnecessaryMessageQueue (…).

1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { 2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq); 3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); 4: return true; 5:}Copy the code
  • Note: Remove unneeded message queue-related information and return removal success. andRebalancePushImpl#removeUnnecessaryMessageQueue(...)Basically the same.

RebalancePushImpl# dispatchPullRequest (…).

1: public void dispatchPullRequest(List<PullRequest> pullRequestList) { 2: for (PullRequest pullRequest : pullRequestList) { 3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); 4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); 6:5:}}Copy the code
  • Description: Initiates a message pull request. The call isPushConsumerConstantly pulling at the starting point of the message.

DefaultMQPushConsumerImpl# executePullRequestImmediately (…).

1: public void executePullRequestImmediately(final PullRequest pullRequest) { 2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); 3:}Copy the code
  • Note: Submit a pull request. After submission,PullMessageService Asynchronous execution.non-blocking. See:PullMessageService.

AllocateMessageQueueStrategy

AllocateMessageQueueAveragely

1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { 2: private final Logger log = ClientLogger.getLog(); 3: 4: @Override 5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: A List < String > cidAll) {7: / / calibration parameters correctly 8: if (currentCID = = null | | currentCID. The length () < 1) {9: throw new IllegalArgumentException("currentCID is empty"); 10: } 11: if (mqAll == null || mqAll.isEmpty()) { 12: throw new IllegalArgumentException("mqAll is null or mqAll empty"); 13: } 14: if (cidAll == null || cidAll.isEmpty()) { 15: throw new IllegalArgumentException("cidAll is null or cidAll empty"); 16: } 17: 18: List<MessageQueue> result = new ArrayList<>(); 19: if (! cidAll.contains(currentCID)) { 20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 21: consumerGroup, 22: currentCID, 23: cidAll); 24: return result; 27: int index = cidall.indexof (currentCID); // The number of consumers. 28: int mod = mqAll.size() % cidAll.size(); // The remainder, which is how many message queues are not evenly allocated. 29: int averageSize = 30: mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() 31: + 1 : mqAll.size() / cidAll.size()); 32: int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // If there is a remainder, [0, mod) splits the remainder, i.e. each consumer is allocated one more node; Int range = math.min (averageSize, mqall.size () -startIndex); 34: for (int I = 0; I < range; I ++) {35: for (int I = 0; I < range; I ++) result.add(mqAll.get((startIndex + i) % mqAll.size())); 36: } 37: return result; 38: } 39: 40: @Override 41: public String getName() { 42: return "AVG"; 43: } 44: }Copy the code
  • Note: Evenly allocate queue policies.
  • Lines 7 to 25: parameter verification.
  • Lines 26 to 36: Evenly allocate message queues.
    • 27:index: the currentConsumerIn the consumer cluster is the number. So here’s why you need a pair of incoming, okaycidAllThe reason arguments must be sorted. If I don’t sort it,ConsumerIt was calculated locallyindexInconsistency affects the calculation results.
    • Line 28:mod: Remainder, which is how many message queues are not evenly allocated.
    • Lines 29 to 31:averageSize: Code can be simplified to(mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()).
      • [ 0, mod )mqAll.size() / cidAll.size() + 1. In front of themodConsumerDivide the remainder and get 1 more message queue.
      • [ mod, cidAll.size() )mqAll.size() / cidAll.size().
    • Line 32:startIndexConsumerAllocates the start position of the message queue.
    • Line 33:range: Allocates the number of queues. The reason forMath#min(...)The reason: whenmqAll.size() <= cidAll.size()When, last fewConsumerMessage queue could not be allocated.
    • Lines 34 to 36: Generate the results of the allocation message queue.
  • Here’s an example:

Fixed message queue length is 4.

Consumer 2 It’s divisible by * Consumer 3 Not divisible * Consumer 5 Can’t all allocate *
Message queue [0] Consumer[0] Consumer[0] Consumer[0]
Message queue [1] Consumer[0] Consumer[0] Consumer[1]
Message queue [2] Consumer[1] Consumer[1] Consumer[2]
Message queue [3] Consumer[1] Consumer[2] Consumer[3]

AllocateMessageQueueByMachineRoom

1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy { 2: /** 3: * Consumer consumption brokerName Set 4: */ 5: Private Set<String> BrokerIdCs; 6: 7: @Override 8: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 9: List<String> cidAll) {10: List<MessageQueue> result = new ArrayList<MessageQueue>(); 12: int currentIndex = cidAll.indexOf(currentCID); 13: if (currentIndex < 0) { 14: return result; 17: List<MessageQueue> premqAll = new ArrayList<MessageQueue>(); 18: for (MessageQueue mq : mqAll) { 19: String[] temp = mq.getBrokerName().split("@"); 20: if (temp.length == 2 && consumeridcs.contains(temp[0])) { 21: premqAll.add(mq); 16: int mod = premqall.size ()/cidall.size (); 26: int rem = premqAll.size() % cidAll.size(); 27: int startIndex = mod * currentIndex; 28: int endIndex = startIndex + mod; 29: for (int i = startIndex; i < endIndex; i++) { 30: result.add(mqAll.get(i)); 31: } 32: if (rem > currentIndex) { 33: result.add(premqAll.get(currentIndex + mod * cidAll.size())); 34: } 35: return result; 36: } 37: 38: @Override 39: public String getName() { 40: return "MACHINE_ROOM"; 41: } 42: 43: public Set<String> getConsumeridcs() { 44: return consumeridcs; 45: } 46: 47: public void setConsumeridcs(Set<String> consumeridcs) { 48: this.consumeridcs = consumeridcs; 49:50:}}Copy the code
  • Description:On average,distributionconsumable BrokerCorresponding message queue.
  • Lines 7 to 15: parameter verification.
  • Lines 16 to 23: calculationsconsumable BrokerCorresponding message queue.
  • Lines 25 to 34: Evenly allocate message queues. theThe average distributionWay andAllocateMessageQueueAveragelyThis is slightly different, with the extra closing portion allocated to the frontremConsumer.
  • Question: When using this allocation policy,ConsumerBrokerHow to configure allocation. Study 😈 etc.master-slaveConsider the source code carefully.

AllocateMessageQueueAveragelyByCircle

1: public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy { 2: private final Logger log = ClientLogger.getLog(); 3: 4: @Override 5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: A List < String > cidAll) {7: / / calibration parameters correctly 8: if (currentCID = = null | | currentCID. The length () < 1) {9: throw new IllegalArgumentException("currentCID is empty"); 10: } 11: if (mqAll == null || mqAll.isEmpty()) { 12: throw new IllegalArgumentException("mqAll is null or mqAll empty"); 13: } 14: if (cidAll == null || cidAll.isEmpty()) { 15: throw new IllegalArgumentException("cidAll is null or cidAll empty"); 16: } 17: 18: List<MessageQueue> result = new ArrayList<MessageQueue>(); 19: if (! cidAll.contains(currentCID)) { 20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 21: consumerGroup, 22: currentCID, 23: cidAll); 24: return result; Int index = cidall.indexof (currentCID); 29: for (int i = index; i < mqAll.size(); i++) { 30: if (i % cidAll.size() == index) { 31: result.add(mqAll.get(i)); 32: } 33: } 34: return result; 35: } 36: 37: @Override 38: public String getName() { 39: return "AVG_BY_CIRCLE"; 40:41:}}Copy the code
  • Circular allocation message queue.

AllocateMessageQueueByConfig

1: public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { 2: private List<MessageQueue> messageQueueList; 3: 4: @Override 5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) { 7: return this.messageQueueList; 8: } 9: 10: @Override 11: public String getName() { 12: return "CONFIG"; 13: } 14: 15: public List<MessageQueue> getMessageQueueList() { 16: return messageQueueList; 17: } 18: 19: public void setMessageQueueList(List<MessageQueue> messageQueueList) { 20: this.messageQueueList = messageQueueList; 21, 22:}}Copy the code
  • Description: Allocates configured message queues.
  • Question:Usage scenarios of the allocation policy.

PushConsumer Reads the consumption progress

RebalancePushImpl# computePullFromWhere (…).

1: public long computePullFromWhere(MessageQueue mq) { 2: long result = -1; 3: final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); 4: final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); 5: switch (consumeFromWhere) {6: case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: // Discard 7: Case CONSUME_FROM_MIN_OFFSET: // Discarded 8: case CONSUME_FROM_MAX_OFFSET: // discarded 9: case CONSUME_FROM_LAST_OFFSET: {10: case CONSUME_FROM_LAST_OFFSET: {10: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); 11: if (lastOffset >= 0) { 12: result = lastOffset; 13: } 14: // First start,no offset 15: else if (-1 == lastOffset) { 16: if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 17: result = 0L; 18: } else { 19: try { 20: result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); 21: } catch (MQClientException e) { 22: result = -1; 23: } 24: } 25: } else { 26: result = -1; 27: } 28: break; 29: } 30: case CONSUME_FROM_FIRST_OFFSET: { 31: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); 32: if (lastOffset >= 0) { 33: result = lastOffset; 34: } else if (-1 == lastOffset) { 35: result = 0L; 36: } else { 37: result = -1; 38: } 39: break; 40: } 41: case CONSUME_FROM_TIMESTAMP: { 42: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); 43: if (lastOffset >= 0) { 44: result = lastOffset; 45: } else if (-1 == lastOffset) { 46: if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 47: try { 48: result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); 49: } catch (MQClientException e) { 50: result = -1; 51: } 52: } else { 53: try { 54: long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), 55: UtilAll.YYYY_MMDD_HHMMSS).getTime(); 56: result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); 57: } catch (MQClientException e) { 58: result = -1; 59: } 60: } 61: } else { 62: result = -1; 63: } 64: break; 65: } 66: 67: default: 68: break; 69: } 70: 71: return result; 72:}Copy the code
  • Note: Calculates where message queue consumption begins.
  • PushConsumerThere are three options for reading the consumption progress:
    • CONSUME_FROM_LAST_OFFSET: Lines 6-29: A new consumer cluster starts for the first time fromThe last position of the queueStart spending.And then you start again and you start spending where you started last time.
    • CONSUME_FROM_FIRST_OFFSET: Lines 30 to 40: A new consumer cluster is first started from the queueThe most former positionStart spending.And then you start again and you start spending where you started last time.
    • CONSUME_FROM_TIMESTAMP: Lines 41 to 65: A new consumer cluster is first started fromSpecified point in timeStart spending.And then you start again and you start spending where you started last time.

[PullConsumer]RebalancePullImpl# computePullFromWhere (…).

Skip it for now. 😈

PushConsumer pulls the message

PullMessageService

1: public class PullMessageService extends ServiceThread { 2: private final Logger log = ClientLogger.getLog(); 3: /** 4: * pull message request queue 5: */ 6: private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<>(); 7: /** 8: * MQClient object 9: */ 10: private final MQClientInstance mQClientFactory; 11: /** 12: * timer. Delay submitting pull requests 13: */ 14: Private Final ScheduledExecutorService ScheduledExecutorService = Executors 15: .newSingleThreadScheduledExecutor(new ThreadFactory() { 16: @Override 17: public Thread newThread(Runnable r) { 18: return new Thread(r, "PullMessageServiceScheduledThread"); 19:20:}}); 21: 22: public PullMessageService(MQClientInstance mQClientFactory) { 23: this.mQClientFactory = mQClientFactory; 24:} 25: 26: /** 27: * Execute delay pull message request 28: * 29: * @param pullRequest pull message request 30: * @param timeDelay Duration 31: */ 32: public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { 33: this.scheduledExecutorService.schedule(new Runnable() { 34: 35: @Override 36: public void run() { 37: PullMessageService.this.executePullRequestImmediately(pullRequest); 38: } 39: }, timeDelay, TimeUnit.MILLISECONDS); } 41:42: /** 43: * execute immediate pullRequest 44: * 45: * @param pullRequest pullRequest 46: */ 47: public void executePullRequestImmediately(final PullRequest pullRequest) { 48: try { 49: this.pullRequestQueue.put(pullRequest); 50: } catch (InterruptedException e) { 51: log.error("executePullRequestImmediately pullRequestQueue.put", e); 52:} 53:} 54:55: /** 56: * Executing a delayed task 57: * 58: * @param r Task 59: * @param timeDelay Duration 60: */ 61: public void executeTaskLater(final Runnable r, final long timeDelay) { 62: this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); 63: } 64: 65: public ScheduledExecutorService getScheduledExecutorService() { 66: return scheduledExecutorService; } 69:69: /** 70: * pullRequest 71: * 72: * @param pullRequest 73: */ 74: private void pullMessage(final PullRequest pullRequest) { 75: final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); 76: if (consumer ! = null) { 77: DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; 78: impl.pullMessage(pullRequest); 79: } else { 80: log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); 81: } 82: } 83: 84: @Override 85: public void run() { 86: log.info(this.getServiceName() + " service started"); 87: 88: while (! this.isStopped()) { 89: try { 90: PullRequest pullRequest = this.pullRequestQueue.take(); 91: if (pullRequest ! = null) { 92: this.pullMessage(pullRequest); 93: } 94: } catch (InterruptedException e) { 95: } catch (Exception e) { 96: log.error("Pull Message Service Run Method exception", e); 97: } 98: } 99: 100: log.info(this.getServiceName() + " service end"); 101: } 102: 103: @Override 104: public String getServiceName() { 105: return PullMessageService.class.getSimpleName(); 106:} 107: 108:}Copy the code
  • Pull message service, constantly from constantlyBrokerPull the message and submit the consuming task toConsumeMessageService.
  • #executePullRequestLater(...): Lines 26 to 40: SubmitdelayPull the message request.
  • #executePullRequestImmediately(...): Lines 42 to 53: SubmitimmediatelyPull the message request.
  • #executeTaskLater(...): Lines 55 to 63: SubmitDelayed tasks.
  • #pullMessage(...): Lines 69 through 82 perform pull message logic. See:DefaultMQPushConsumerImpl# pullMessage (…)..
  • #run(...): Lines 84 through 101: loop pull message request queue (pullRequestQueue) for message pull.

DefaultMQPushConsumerImpl# pullMessage (…).

  1: public void pullMessage(final PullRequest pullRequest) {
  2:     final ProcessQueue processQueue = pullRequest.getProcessQueue();
  3:     if (processQueue.isDropped()) {
  4:         log.info("the pull request[{}] is dropped.", pullRequest.toString());
  5:         return;
  6:     }
  7: 
  8:     // 设置队列最后拉取消息时间
  9:     pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
 10: 
 11:     // 判断consumer状态是否运行中。如果不是,则延迟拉取消息。
 12:     try {
 13:         this.makeSureStateOK();
 14:     } catch (MQClientException e) {
 15:         log.warn("pullMessage exception, consumer state not ok", e);
 16:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
 17:         return;
 18:     }
 19: 
 20:     // 判断是否暂停中。
 21:     if (this.isPause()) {
 22:         log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
 23:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
 24:         return;
 25:     }
 26: 
 27:     // 判断是否超过最大持有消息数量。默认最大值为1000。
 28:     long size = processQueue.getMsgCount().get();
 29:     if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
 30:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延迟消息拉取请求。50ms。
 31:         if ((flowControlTimes1++ % 1000) == 0) {
 32:             log.warn(
 33:                 "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
 34:                 processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
 35:         }
 36:         return;
 37:     }
 38: 
 39:     if (!this.consumeOrderly) { // 判断消息跨度是否过大。
 40:         if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
 41:             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延迟消息拉取请求。50ms。
 42:             if ((flowControlTimes2++ % 1000) == 0) {
 43:                 log.warn(
 44:                     "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
 45:                     processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
 46:                     pullRequest, flowControlTimes2);
 47:             }
 48:             return;
 49:         }
 50:     } else { // TODO 顺序消费
 51:         if (processQueue.isLocked()) {
 52:             if (!pullRequest.isLockedFirst()) {
 53:                 final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
 54:                 boolean brokerBusy = offset < pullRequest.getNextOffset();
 55:                 log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
 56:                     pullRequest, offset, brokerBusy);
 57:                 if (brokerBusy) {
 58:                     log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
 59:                         pullRequest, offset);
 60:                 }
 61: 
 62:                 pullRequest.setLockedFirst(true);
 63:                 pullRequest.setNextOffset(offset);
 64:             }
 65:         } else {
 66:             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
 67:             log.info("pull message later because not locked in broker, {}", pullRequest);
 68:             return;
 69:         }
 70:     }
 71: 
 72:     // 获取Topic 对应的订阅信息。若不存在,则延迟拉取消息
 73:     final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
 74:     if (null == subscriptionData) {
 75:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
 76:         log.warn("find the consumer's subscription failed, {}", pullRequest);
 77:         return;
 78:     }
 79: 
 80:     final long beginTimestamp = System.currentTimeMillis();
 81: 
 82:     PullCallback pullCallback = new PullCallback() {
 83:         @Override
 84:         public void onSuccess(PullResult pullResult) {
 85:             if (pullResult != null) {
 86:                 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
 87:                     subscriptionData);
 88: 
 89:                 switch (pullResult.getPullStatus()) {
 90:                     case FOUND:
 91:                         // 设置下次拉取消息队列位置
 92:                         long prevRequestOffset = pullRequest.getNextOffset();
 93:                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 94: 
 95:                         // 统计
 96:                         long pullRT = System.currentTimeMillis() - beginTimestamp;
 97:                         DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
 98:                             pullRequest.getMessageQueue().getTopic(), pullRT);
 99: 
100:                         long firstMsgOffset = Long.MAX_VALUE;
101:                         if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
102:                             DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
103:                         } else {
104:                             firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
105: 
106:                             // 统计
107:                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
108:                                 pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
109: 
110:                             // 提交拉取到的消息到消息处理队列
111:                             boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
112: 
113:                             // 提交消费请求
114:                             DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
115:                                 pullResult.getMsgFoundList(), //
116:                                 processQueue, //
117:                                 pullRequest.getMessageQueue(), //
118:                                 dispathToConsume);
119: 
120:                             // 提交下次拉取消息请求
121:                             if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
122:                                 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
123:                                     DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
124:                             } else {
125:                                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
126:                             }
127:                         }
128: 
129:                         // 下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出log
130:                         if (pullResult.getNextBeginOffset() < prevRequestOffset//
131:                             || firstMsgOffset < prevRequestOffset) {
132:                             log.warn(
133:                                 "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
134:                                 pullResult.getNextBeginOffset(), //
135:                                 firstMsgOffset, //
136:                                 prevRequestOffset);
137:                         }
138: 
139:                         break;
140:                     case NO_NEW_MSG:
141:                         // 设置下次拉取消息队列位置
142:                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
143: 
144:                         // 持久化消费进度
145:                         DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
146: 
147:                         // 立即提交拉取消息请求
148:                         DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
149:                         break;
150:                     case NO_MATCHED_MSG:
151:                         // 设置下次拉取消息队列位置
152:                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
153: 
154:                         // 持久化消费进度
155:                         DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
156: 
157:                         // 提交立即拉取消息请求
158:                         DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
159:                         break;
160:                     case OFFSET_ILLEGAL:
161:                         log.warn("the pull request offset illegal, {} {}", //
162:                             pullRequest.toString(), pullResult.toString());
163:                         // 设置下次拉取消息队列位置
164:                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
165: 
166:                         // 设置消息处理队列为dropped
167:                         pullRequest.getProcessQueue().setDropped(true);
168: 
169:                         // 提交延迟任务,进行消费处理队列移除。不立即移除的原因:可能有地方正在使用,避免受到影响。
170:                         DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
171: 
172:                             @Override
173:                             public void run() {
174:                                 try {
175:                                     // 更新消费进度,同步消费进度到Broker
176:                                     DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
177:                                         pullRequest.getNextOffset(), false);
178:                                     DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
179: 
180:                                     // 移除消费处理队列
181:                                     DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
182: 
183:                                     log.warn("fix the pull request offset, {}", pullRequest);
184:                                 } catch (Throwable e) {
185:                                     log.error("executeTaskLater Exception", e);
186:                                 }
187:                             }
188:                         }, 10000);
189:                         break;
190:                     default:
191:                         break;
192:                 }
193:             }
194:         }
195: 
196:         @Override
197:         public void onException(Throwable e) {
198:             if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
199:                 log.warn("execute the pull request exception", e);
200:             }
201: 
202:             // 提交延迟拉取消息请求
203:             DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
204:         }
205:     };
206: 
207:     // 集群消息模型下,计算提交的消费进度。
208:     boolean commitOffsetEnable = false;
209:     long commitOffsetValue = 0L;
210:     if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
211:         commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
212:         if (commitOffsetValue > 0) {
213:             commitOffsetEnable = true;
214:         }
215:     }
216: 
217:     // 计算请求的 订阅表达式 和 是否进行filtersrv过滤消息
218:     String subExpression = null;
219:     boolean classFilter = false;
220:     SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
221:     if (sd != null) {
222:         if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
223:             subExpression = sd.getSubString();
224:         }
225: 
226:         classFilter = sd.isClassFilterMode();
227:     }
228: 
229:     // 计算拉取消息系统标识
230:     int sysFlag = PullSysFlag.buildSysFlag(//
231:         commitOffsetEnable, // commitOffset
232:         true, // suspend
233:         subExpression != null, // subscription
234:         classFilter // class filter
235:     );
236: 
237:     // 执行拉取。如果拉取请求发生异常时,提交延迟拉取消息请求。
238:     try {
239:         this.pullAPIWrapper.pullKernelImpl(//
240:             pullRequest.getMessageQueue(), // 1
241:             subExpression, // 2
242:             subscriptionData.getSubVersion(), // 3
243:             pullRequest.getNextOffset(), // 4
244:             this.defaultMQPushConsumer.getPullBatchSize(), // 5
245:             sysFlag, // 6
246:             commitOffsetValue, // 7
247:             BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
248:             CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
249:             CommunicationMode.ASYNC, // 10
250:             pullCallback// 11
251:         );
252:     } catch (Exception e) {
253:         log.error("pullKernelImpl exception", e);
254:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
255:     }
256: }
257: 
258: private void correctTagsOffset(final PullRequest pullRequest) {
259:     if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
260:         this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
261:     }
262: }
Copy the code

PullAPIWrapper# pullKernelImpl (…).

1: /** 2: * pull message core method 3: * 4: * @param MQ message queue 5: * @param subExpression subscription expression 6: * @param subVersion subscription version 7: * @param offset Start position of the pull queue 8: * @param maxNums Number of pull messages 9: * @param sysFlag System ID of the pull request 10: * @param commitOffset Submission consumption progress 11: * @ param brokerSuspendMaxTimeMillis broker suspend request maximum time 12: * @ param timeoutMillis request broker timeout value 13: * @param communicationMode 14: * @param pullCallback 15: * @return pull message result. The result is returned only when the communication mode is synchronous. Otherwise, null is returned. 16: * @throws MQClientException When no broker is found or other client exceptions occur. 17: * @throws RemotingException When an exception occurs on the remote call 18: * @throws MQBrokerException When an exception occurs to the broker. This exception occurs only when the communication mode is synchronous. 20: */ 21: protected PullResult pullKernelImpl(22: final MessageQueue mq, 23: final String subExpression, 24: final long subVersion, 25: final long offset, 26: final int maxNums, 27: final int sysFlag, 28: final long commitOffset, 29: final long brokerSuspendMaxTimeMillis, 30: final long timeoutMillis, 31: final CommunicationMode communicationMode, 32: final PullCallback pullCallback 33: Throws MQClientException, RemotingException, MQBrokerException, InterruptedException {34: // Get Broker information 35: FindBrokerResult findBrokerResult = 36: this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 37: this.recalculatePullFromWhichNode(mq), false); 38: if (null == findBrokerResult) { 39: this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); 40: findBrokerResult = 41: this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 42: this.recalculatePullFromWhichNode(mq), false); 43:} 44:45: // Request pull message 46: If (findBrokerResult! = null) { 47: int sysFlagInner = sysFlag; 48: 49: if (findBrokerResult.isSlave()) { 50: sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); 51: } 52: 53: PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); 54: requestHeader.setConsumerGroup(this.consumerGroup); 55: requestHeader.setTopic(mq.getTopic()); 56: requestHeader.setQueueId(mq.getQueueId()); 57: requestHeader.setQueueOffset(offset); 58: requestHeader.setMaxMsgNums(maxNums); 59: requestHeader.setSysFlag(sysFlagInner); 60: requestHeader.setCommitOffset(commitOffset); 61: requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); 62: requestHeader.setSubscription(subExpression); 63: requestHeader.setSubVersion(subVersion); 64: 65: String brokerAddr = findBrokerResult.getBrokerAddr(); 66: if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { // TODO filtersrv 67: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); 68: } 69: 70: PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( 71: brokerAddr, 72: requestHeader, 73: timeoutMillis, 74: communicationMode, 75: pullCallback); 76: 77: return pullResult; } 79:80: // Broker info does not exist throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); 82:}Copy the code

PullAPIWrapper# recalculatePullFromWhichNode (…).

1: /** 2: * Mapping between message queues and pull brokers 3: * When a message is pulled, the mapping is used to obtain the Broker corresponding to the pull request 4: */ 5: private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = 6: new ConcurrentHashMap<MessageQueue, AtomicLong>(32); 7: /** 8: * Whether to use the default Broker 9: */ 10: private Volatile Boolean connectBrokerByUser = false; 11: /** 12: * Default Broker number 13: */ 14: Private Volatile Long defaultBrokerId = Mixall.master_id; 15:16: /** 17: * Calculate message queue pull message Broker number 18: * 19: * @param MQ message queue 20: * @return Broker number 21: */ 22: Public long recalculatePullFromWhichNode (final MessageQueue mq) {if 23: / / open the default Broker switch, it returns the default Broker number 24: if (this.isConnectBrokerByUser()) { 25: return this.defaultBrokerId; 26:27} : 28: / / if the message queue mapping pull the Broker, the Broker return mapping number 29: AtomicLong suggest = this. PullFromWhichNodeTable. Get (mq); 30: if (suggest ! = null) { 31: return suggest.get(); 32:} 33: 34: // Return mixall.master_id; 36:}Copy the code
  • Calculate message queue to pull the corresponding messageBrokerSerial number.

MQClientInstance# findBrokerAddressInSubscribe (…).

1: /** 2: * Broker name and address Map 3: */ 4: private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = 5: new ConcurrentHashMap<>(); 6: 7: /** 8: * Obtain Broker information 9: * 10: * @Param brokerName Broker name 11: * @Param brokerId Broker Number 12: * @param onlyThisBroker must be the broker 13: * @return Broker info 14: */ 15: public FindBrokerResult findBrokerAddressInSubscribe(// 16: final String brokerName, // 17: final long brokerId, // 18: final boolean onlyThisBroker// 19: ) { 20: String brokerAddr = null; // Broker address 21: Boolean slave = false; 22: Boolean found = false; 23: 24: // Broker info 25: HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); 26: if (map ! = null && ! map.isEmpty()) { 27: brokerAddr = map.get(brokerId); 28: slave = brokerId ! = MixAll.MASTER_ID; 29: found = brokerAddr ! = null; 30: 31: // Select a Broker 32: if (! found && ! onlyThisBroker) { 33: Entry<Long, String> entry = map.entrySet().iterator().next(); 34: brokerAddr = entry.getValue(); 35: slave = entry.getKey() ! = MixAll.MASTER_ID; 36: found = true; 37:} 38:} 39: 40: // If (found) {42: return new FindBrokerResult(brokerAddr, slave); 46: return null;} 44: 45: // Return null; 47:}Copy the code
  • Description: ObtainBrokerInformation (BrokerAddress, whether it is a slave node).

PullAPIWrapper# processPullResult (…).

1: /** 2: * Process pull result 3: * 1. Update message queue pull mapping of message Broker number 4: * 2. Parse the message and match the appropriate message according to the subscription message tagCode 5: * 6: * @param mq message queue 7: * @param pullResult pullResult 8: * @param subscriptionData subscription 9: * @return PullResult 10: */ 11: public PullResult processPullResult(final MessageQueue MQ, final PullResult PullResult, 12: final SubscriptionData subscriptionData) { 13: PullResultExt pullResultExt = (PullResultExt) pullResult; 14:15: / / update message queue pull the message Broker number mapping 16: enclosing updatePullFromWhichNode (mq, pullResultExt getSuggestWhichBrokerId ()); 17: 18: // Parse the message and match the appropriate message according to the subscription message tagCode 19: if (pullstatus.found == pullresult.getpullStatus ()) {20: // Parse the message 21: ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); 22: List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); 25: List<MessageExt> msgListFilterAgain = msgList; 26: if (! subscriptionData.getTagsSet().isEmpty() && ! subscriptionData.isClassFilterMode()) { 27: msgListFilterAgain = new ArrayList<>(msgList.size()); 28: for (MessageExt msg : msgList) { 29: if (msg.getTags() ! = null) { 30: if (subscriptionData.getTagsSet().contains(msg.getTags())) { 31: msgListFilterAgain.add(msg); 32: } 33: } 34: } 35: } 36: 37: // Hook 38: if (this.hasHook()) { 39: FilterMessageContext filterMessageContext = new FilterMessageContext(); 40: filterMessageContext.setUnitMode(unitMode); 41: filterMessageContext.setMsgList(msgListFilterAgain); 42: this.executeHook(filterMessageContext); 46: for (MessageExt MSG: msgListFilterAgain) {47: for (MessageExt MSG: msgListFilterAgain) MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, 48: Long.toString(pullResult.getMinOffset())); 49: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, 50: Long.toString(pullResult.getMaxOffset())); 51:} 52:53: / / set the message list: 54 pullResultExt. SetMsgFoundList (msgListFilterAgain); 55:} 56:57:58: / / empty message binary array pullResultExt. SetMessageBinary (null); 59: 60: return pullResult; 61:}Copy the code
  • Handling pull results.
    • Update message queue pull messagesBrokerMapping of numbers.
    • The message is parsed and the message is based on the subscription informationtagCodeMatch the appropriate message.
  • Line 16: Update message queue pull messageBrokerMapping of numbers. The next time a message is pulled, if the default pull is not setBrokerThe updated number will be usedBrokerSerial number.
  • Lines 18 to 55: Parses the message and messages based on subscription informationtagCodeMatch the appropriate message.
    • Lines 20 to 22: Parse the message. See RocketMQ source Code Analysis – Message Basics for details.
    • Lines 24 to 35: Based on subscription informationtagCodeMatch the message.
    • Lines 37 to 43:Hook.
    • Line 45 through 51: Sets the current minimum/maximum position of the message queue to the message extension field.
    • Line 54: Sets up the message queue.
  • Line 58: Empties the message binary array.

ProcessQueue# putMessage (…).

1: /** 2: * Message mapping read/write lock 3: */ 4: private Final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); 5: /** 6: * message mapping 7: * key: message queue location 8: */ 9: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>(); 10: /** 11: * Message count 12: */ 13: private Final AtomicLong msgCount = new AtomicLong(); 16: */ 17: private volatile long queueOffsetMax = 0L; 14: /** 15: * Add the maximum queue position 16: */ 17: private volatile long queueOffsetMax = 0L; 18: /** 19: * Whether consuming 20: */ 21: private volatile Boolean Consuming = false; QueueMaxOffset - Added message array [n-1]. QueueOffset 25: * Acc = Accumulation 26: * CNT = (guess) Contrast 27: */ 28: private volatile long msgAccCnt = 0; 29: 30: /** 31: * Adds the message and returns whether to submit to the consumer 32: * returns true, when a new message has been added successfully, 33: * 34: * @param MSGS message 35: * @return Whether to submit to the consumer 36: */ 37: public boolean putMessage(final List<MessageExt> msgs) { 38: boolean dispatchToConsume = false; 39: try { 40: this.lockTreeMap.writeLock().lockInterruptibly(); 41: try {42: // add message 43: int validMsgCnt = 0; 44: for (MessageExt msg : msgs) { 45: MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); 46: if (null == old) { 47: validMsgCnt++; 48: this.queueOffsetMax = msg.getQueueOffset(); 49: } 50: } 51: msgCount.addAndGet(validMsgCnt); 52:53: // Calculate if you are spending 54: if (! msgTreeMap.isEmpty() && ! this.consuming) { 55: dispatchToConsume = true; 56: this.consuming = true; // If (! msgs.isEmpty()) { 61: MessageExt messageExt = msgs.get(msgs.size() - 1); 62: String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); 63: if (property ! = null) { 64: long accTotal = Long.parseLong(property) - messageExt.getQueueOffset(); 65: if (accTotal > 0) { 66: this.msgAccCnt = accTotal; 67: } 68: } 69: } 70: } finally { 71: this.lockTreeMap.writeLock().unlock(); 72: } 73: } catch (InterruptedException e) { 74: log.error("putMessage exception", e); 75: } 76: 77: return dispatchToConsume; 78:}Copy the code

conclusion

The simplest way to describe the PullConsumer pulling a message would be this:

While (true) {if (pull message not satisfied) {thread.sleep (interval); continue; } active pull message (); }Copy the code

PushConsumer message

ConsumeMessageConcurrentlyService submitted a consumer request

ConsumeMessageConcurrentlyService# submitConsumeRequest (…).

3: */ 4: Private final BlockingQueue<Runnable> consumeRequestQueue; 5: /** 6: * consumption thread pool 7: */ 8: private final ThreadPoolExecutor consumeExecutor; 9: 10: public void submitConsumeRequest(// 11: final List<MessageExt> msgs, // 12: final ProcessQueue processQueue, // 13: final MessageQueue messageQueue, // 14: final boolean dispatchToConsume) { 15: final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); If (msgs.size() <= consumeBatchSize) {// If (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); 18: try { 19: this.consumeExecutor.submit(consumeRequest); 20: } catch (RejectedExecutionException e) { 21: this.submitConsumeRequestLater(consumeRequest); 24: for (int total = 0; total < msgs.size(); ) {25: // Calculate the message contained in the current split request 26: List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize); 27: for (int i = 0; i < consumeBatchSize; i++, total++) { 28: if (total < msgs.size()) { 29: msgThis.add(msgs.get(total)); 30: } else { 31: break; 32:} 33:} 34: 35: // Submit split consumption request ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); 37: try { 38: this.consumeExecutor.submit(consumeRequest); 39:} the catch (RejectedExecutionException e) {40: / / if rejected, then the current split message + remaining requests submitted to delay consumption. 41: for (; total < msgs.size(); total++) { 42: msgThis.add(msgs.get(total)); 43: } 44: this.submitConsumeRequestLater(consumeRequest); 45:} 46:} 47:} 48:}Copy the code
  • Note: Submit an immediate consumption request.
  • Lines 16 to 22: Submit the consumption request directly if the commit message is less than or equal to the batch consumption number.
  • Lines 23 to 47: When the commit message is larger than the batch consumption, split into multiple requests.
    • Lines 25 through 33: Calculates the messages contained in the current split request.
    • Lines 35 through 38: Submit the split consumption request.
    • Lines 39 to 44: If the commit request is rejected, the current split message + the remaining message is submitted to delay the consumption request, ending the split loop.

ConsumeMessageConcurrentlyService#submitConsumeRequestLater

1: /** 2: * Submit delayed consumption request 3: * 4: * @param MSGS message list 5: * @Param processQueue 6: * @Param messageQueue 7: */ 8: private void submitConsumeRequestLater(// 9: final List<MessageExt> msgs, // 10: final ProcessQueue processQueue, // 11: final MessageQueue messageQueue// 12: ) { 13: 14: this.scheduledExecutorService.schedule(new Runnable() { 15: 16: @Override 17: public void run() { 18: ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true); 19: } 20: }, 5000, TimeUnit.MILLISECONDS); 21:} 22: 23: /** 24: * Submit delay request 25: * @param consumeRequest 26: */ 27: private void submitConsumeRequestLater(final ConsumeRequest consumeRequest// 28: ) { 29: 30: this.scheduledExecutorService.schedule(new Runnable() { 31: 32: @Override 33: public void run() { 34: ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); // TODO BUG ? 35: } 36: }, 5000, TimeUnit.MILLISECONDS); 37:}Copy the code
  • Note: Submit a deferred consumption request.
  • Line 34: Direct callConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);. If the number of messages exceeds the bulk consumption limit, will it beBUG.

ConsumeRequest

1: class ConsumeRequest implements Runnable {2: 3: /** 4: * private final List<MessageExt> MSGS; 7: /** 8: * ProcessQueue 9: */ 10: private Final ProcessQueue ProcessQueue; 11: /** 12: * queue 13: */ 14: private final MessageQueue MessageQueue 15: 16: public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { 17: this.msgs = msgs; 18: this.processQueue = processQueue; 19: this.messageQueue = messageQueue; 20:} 21, 22, 23 @ Override: public void the run () {24: / / queue to consume 25: don't waste the if (this. ProcessQueue. IsDropped ()) {26: log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); 27: return; 28: } 29: 30: MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; / / listener: 31 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext (messageQueue); / / consumer Context: 32 ConsumeConcurrentlyStatus status = null; // Hook 35: ConsumeMessageContext ConsumeMessageContext = null; 36: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { 37: consumeMessageContext = new ConsumeMessageContext(); 38: consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); 39: consumeMessageContext.setProps(new HashMap<String, String>()); 40: consumeMessageContext.setMq(messageQueue); 41: consumeMessageContext.setMsgList(msgs); 42: consumeMessageContext.setSuccess(false); 43: ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); 44: } 45: 46: long beginTimestamp = System.currentTimeMillis(); 47: boolean hasException = false; 48: ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; / / consumer return the result type 49: try {50: / / when messages to retry, set the Topic to the original Topic: 51 ConsumeMessageConcurrentlyService. Enclosing resetRetryTopic (MSGS); 52:53: // Set the start consumption time 54: if (MSGS! = null && ! msgs.isEmpty()) { 55: for (MessageExt msg : msgs) { 56: MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); 59 57:58:}} : 60: / / to spend 61: status = listener. ConsumeMessage (Collections. UnmodifiableList (MSGS), the context). 62: } catch (Throwable e) { 63: log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", 64: RemotingHelper.exceptionSimpleDesc(e), // 65: ConsumeMessageConcurrentlyService.this.consumerGroup, 66: msgs, 67: messageQueue); 68: hasException = true; 70: long consumeRT = system.currentTimemillis () -beginTimestamp; 73: if (null == status) { 74: if (hasException) { 75: returnType = ConsumeReturnType.EXCEPTION; 76: } else { 77: returnType = ConsumeReturnType.RETURNNULL; 78: } 79: } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { 80: returnType = ConsumeReturnType.TIME_OUT; 81: } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { 82: returnType = ConsumeReturnType.FAILED; 83: } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { 84: returnType = ConsumeReturnType.SUCCESS; 85: } 86: 87: // Hook 88: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { 89: consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); 93: if (null == status) {94: log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", 95: ConsumeMessageConcurrentlyService.this.consumerGroup, 96: msgs, 97: messageQueue); 98: status = ConsumeConcurrentlyStatus.RECONSUME_LATER; 99: } 100: 101: // Hook 102: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { 103: consumeMessageContext.setStatus(status.toString()); 104: consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); 105: ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); : 107-108:106} / / statistics: 109 ConsumeMessageConcurrentlyService. Enclosing getConsumerStatsManager (110) : .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); 113: if (! processQueue.isDropped()) { 114: ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); 115: } else { 116: log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); 117:} 118:} 119: 120:}Copy the code
  • Description: Consumption request. Submit request execution consumption.
  • Lines 24 through 28: Discard processing queue without consumption.
  • Lines 34 to 44: hooks.
  • Line 51: When the message is a retry message, setTopicAs the originalTopic. For example: primitiveTopicTopicTestTry again, whenTopic%RETRY%please_rename_unique_group_name_4After this method,TopicSet backTopicTest.
  • Lines 53 through 58: Set the start consumption time.
  • Line 61: Make a purchase.
  • Lines 71 through 85: Parse the consumption return result type
  • Lines 87 to 90:Hook.
  • Lines 92 through 99: When the consumption result state is not empty, set the consumption result state to consume later.
  • Lines 101 to 106:Hook.
  • Lines 108 to 110: statistics.
  • Lines 112-117: Processing consumption results. If the consumption processing queue is removed and the message is consumed, it may result in repeated consumption of the message, so message consumption is idempotent to the maximum extent possible. Parsing the see: ConsumeMessageConcurrentlyService# processConsumeResult (…). .

ConsumeMessageConcurrentlyService# processConsumeResult (…).

1: public void processConsumeResult(// 2: final ConsumeConcurrentlyStatus status, // 3: final ConsumeConcurrentlyContext context, // 4: final ConsumeRequest consumeRequest// 5: ) { 6: int ackIndex = context.getAckIndex(); If (consumerEquest.getMsgs ().isEmpty())) 10: return; ConsumeRequest. MSGS [0] to consumeRequest. MSGS [ackIndex] 13: switch (status) {14: case CONSUME_SUCCESS: 15: if (ackIndex >= consumeRequest.getMsgs().size()) { 16: ackIndex = consumeRequest.getMsgs().size() - 1; Int ok = ackIndex + 1; 20: int failed = consumeRequest.getMsgs().size() - ok; 21: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); 22: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); 23: break; 24: case RECONSUME_LATER: 25: ackIndex = -1; 26: // Number of successes/failures 27: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), 28: consumeRequest.getMsgs().size()); 29: break; 30: default: 31: break; 32:} : 33 34: / / processing consumption failed messages 35: switch (this) defaultMQPushConsumer) getMessageModel ()) {36: case - : // Broadcast mode, regardless of whether the consumption failed, does not send a message to the Broker, just print Log 37: for (int I = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { 38: MessageExt msg = consumeRequest.getMsgs().get(i); 39: log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); 40: } 41: break; 42: Case CLUSTERING: 43: // Send a message back to the Broker. 44: List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size()); 45: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { 46: MessageExt msg = consumeRequest.getMsgs().get(i); 47: boolean result = this.sendMessageBack(msg, context); 48: if (! result) { 49: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); 50: msgBackFailed.add(msg); // Send back a message to the Broker that failed. msgBackFailed.isEmpty()) { 56: consumeRequest.getMsgs().removeAll(msgBackFailed); 57:58: this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); 59: } 60: break; 61: default: 62: break; } 64:65: // Remove the consumption success message and update the latest consumption progress 66: long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); 67: if (offset >= 0 && ! consumeRequest.getProcessQueue().isDropped()) { 68: this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); 69:} 70:}Copy the code

ProcessQueue# removeMessage (…).

1: /** 2: * Removes the message and returns the first message queue position 3: * 4: * @param MSGS message 5: * @return Message queue position 6: */ 7: public long removeMessage(final List<MessageExt> msgs) { 8: long result = -1; 9: final long now = System.currentTimeMillis(); 10: try { 11: this.lockTreeMap.writeLock().lockInterruptibly(); 12: this.lastConsumeTimestamp = now; 13: try { 14: if (! msgTreeMap.isEmpty()) { 15: result = this.queueOffsetMax + 1; If msgTreeMap is empty, the next message is queueOffsetMax+1 16:17: // Remove message 18: int removedCnt = 0; 19: for (MessageExt msg : msgs) { 20: MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); 21: if (prev ! = null) { 22: removedCnt--; 23: } 24: } 25: msgCount.addAndGet(removedCnt); 26: 27: if (! msgTreeMap.isEmpty()) { 28: result = msgTreeMap.firstKey(); 29: } 30: } 31: } finally { 32: this.lockTreeMap.writeLock().unlock(); 33: } 34: } catch (Throwable t) { 35: log.error("removeMessage exception", t); 36: } 37: 38: return result; 39:}Copy the code

ConsumeMessageConcurrentlyService# cleanExpireMsg (…).

1: public void start() { 2: this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { 3: 4: @Override 5: public void run() { 6: cleanExpireMsg(); 7:} 8:9: }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); 14: */ 15: private void cleanExpireMsg() {16: Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = 17: this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); 18: while (it.hasNext()) { 19: Map.Entry<MessageQueue, ProcessQueue> next = it.next(); 20: ProcessQueue pq = next.getValue(); 21: pq.cleanExpiredMsg(this.defaultMQPushConsumer); 23:22:}}Copy the code
  • Note: Clear expired messages periodically. The default interval is 15 minutes.

ProcessQueue# cleanExpiredMsg (…).

1: public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {2: if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { 4: return; Int loop = msgtreemap.size () < 16? msgTreeMap.size() : 16; For (int I = 0; i < loop; I++) {10: // gets the first message. If not, end the loop. 11: MessageExt MSG = null; 12: try { 13: this.lockTreeMap.readLock().lockInterruptibly(); 14: try { 15: if (! msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { 16: msg = msgTreeMap.firstEntry().getValue(); 17: } else { 18: break; 19: } 20: } finally { 21: this.lockTreeMap.readLock().unlock(); 22: } 23: } catch (InterruptedException e) { 24: log.error("getExpiredMsg exception", e); 25:26} : 27: try {28: / / back to timeout messages: 29 pushConsumer. SendMessageBack (MSG, 3); 30: log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); 31, 32: / / whether the message is still the first, if so, is to remove 33: try {34: this. LockTreeMap. WriteLock () lockInterruptibly (); 35: try { 36: if (! msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { 37: try { 38: msgTreeMap.remove(msgTreeMap.firstKey()); 39: } catch (Exception e) { 40: log.error("send expired msg exception", e); 41: } 42: } 43: } finally { 44: this.lockTreeMap.writeLock().unlock(); 45: } 46: } catch (InterruptedException e) { 47: log.error("getExpiredMsg exception", e); 48: } 49: } catch (Exception e) { 50: log.error("send expired msg exception", e); 51:} 52:} 53:}Copy the code
  • Description: Remove expired messages.
  • Lines 2 to 5: When consuming sequentially, return directly.
  • Lines 7 through 9: Loop to remove the message. Default maximum number of cycles: 16.
  • Lines 10 to 25: Get the first message. Check whether timeout occurs. If not, end the loop.
  • Line 29: Send the timeout message back toBroker.
  • Lines 32 to 48: Determine if the message is still the first one at this point, and remove it if it is.

PushConsumer returns a failure message

DefaultMQPushConsumerImpl# sendMessageBack (…).

1: public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) 2: Throws RemotingException, MQBrokerException, InterruptedException, MQClientException {3: try {4: // Consumer sends back the message 5: String brokerAddr = (null ! = brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) 6: : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); 7: this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, 8: this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } Catch (Exception e) {// Producer sends back messages using the Client's built-in Producer. log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); 12:13: Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); 14: 15: String originMsgId = MessageAccessor.getOriginMessageId(msg); 16: MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); 17: 18: newMsg.setFlag(msg.getFlag()); 19: MessageAccessor.setProperties(newMsg, msg.getProperties()); 20: MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); 21: MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); 22: MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); 23: newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); 24: 25: this.mQClientFactory.getDefaultMQProducer().send(newMsg); 26, 27:}}Copy the code

MQClientAPIImpl# consumerSendMessageBack (…).

1: /** 2: * Consumer 3: * @param addr Broker 4: * @param MSG message 5: * @param consumerGroup 6: * @param delayLevel delayLevel 7: * @param timeoutMillis timeout 8: * @param maxConsumeRetryTimes maximum retry times for consuming 9: * @throws RemotingException when an exception occurs on a remote call 10: * @throws MQBrokerException When an exception occurs on a Broker 11: * @interruptedexception When a thread interrupts 12: */ 13: public void consumerSendMessageBack(14: Final String addr, 15: final MessageExt msg, 16: final String consumerGroup, 17: final int delayLevel, 18: final long timeoutMillis, 19: final int maxConsumeRetryTimes 20: ) throws RemotingException, MQBrokerException, InterruptedException { 21: ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); 22: RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); 23: 24: requestHeader.setGroup(consumerGroup); 25: requestHeader.setOriginTopic(msg.getTopic()); 26: requestHeader.setOffset(msg.getCommitLogOffset()); 27: requestHeader.setDelayLevel(delayLevel); 28: requestHeader.setOriginMsgId(msg.getMsgId()); 29: requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); 30:31: RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), 32: request, timeoutMillis); 33: assert response ! = null; 34: switch (response.getCode()) { 35: case ResponseCode.SUCCESS: { 36: return; 37: } 38: default: 39: break; 40: } 41: 42: throw new MQBrokerException(response.getCode(), response.getRemark()); 43:}Copy the code

8, Consumer progress

OffsetStore

  • RemoteBrokerOffsetStoreConsumer Cluster patternNext, use remoteBrokerConsumption schedule.
  • LocalFileOffsetStoreConsumer Broadcasting modeUnder, use localfileConsumption schedule.

OffsetStore# load (…).

LocalFileOffsetStore# load (…).

1: @override 2: public void load() throws MQClientException {3: // Reads the consumption progress from the local disk 4: OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); 5: if (offsetSerializeWrapper ! = null && offsetSerializeWrapper.getOffsetTable() ! = null) { 6: offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); 7:8: / / print each message queue consumer schedule 9: the for (MessageQueue mq: offsetSerializeWrapper getOffsetTable (). The keySet ()) {10: AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); 11: log.info("load consumer's offset, {} {} {}", 12: this.groupName, 13: mq, 14: offset.get()); 15:} 16:} 17:}Copy the code
  • Load consumption progress from a local file into memory.
OffsetSerializeWrapper
1: public class OffsetSerializeWrapper extends RemotingSerializable { 2: private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = 3: new ConcurrentHashMap<>(); 4: 5: public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() { 6: return offsetTable; 7: } 8: 9: public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) { 10: this.offsetTable = offsetTable; 11:12:}}Copy the code
  • Description: LocalOffsetStore serialization.
Yunai-MacdeMacBook-Pro-2:config yunai$ cat / Users/yunai /. Rocketmq_offsets / 192.168.17.0 @ DEFAULT/please_rename_unique_group_name_1 / offsets. The json {" offsetTable ": {{ "brokerName":"broker-a", "queueId":3, "topic":"TopicTest" }:1470,{ "brokerName":"broker-a", "queueId":2, "topic":"TopicTest" }:1471,{ "brokerName":"broker-a", "queueId":1, "topic":"TopicTest" }:1470,{ "brokerName":"broker-a", "queueId":0, "topic":"TopicTest" }:1470 } }Copy the code

RemoteBrokerOffsetStore# load (…).

1: @Override
2: public void load() {
3: }
Copy the code
  • Do not load, actually read consumption progress fromBrokerTo obtain.

OffsetStore# readOffset (…).

Read consumption progress type:

  • READ_FROM_MEMORY: Reads data from memory.
  • READ_FROM_STORE: From storage (BrokerfileRead).
  • MEMORY_FIRST_THEN_STORE: Preferentially reads data from the memory. If the data cannot be read, the data is read from the storage.

LocalFileOffsetStore# readOffset (…).

1: @Override 2: public long readOffset(final MessageQueue mq, final ReadOffsetType type) { 3: if (mq ! = null) { 4: switch (type) { 5: case MEMORY_FIRST_THEN_STORE: 6: case READ_FROM_MEMORY: { 7: AtomicLong offset = this.offsetTable.get(mq); 8: if (offset ! = null) { 9: return offset.get(); 10: } else if (ReadOffsetType.READ_FROM_MEMORY == type) { 11: return -1; 12: } 13: } 14: case READ_FROM_STORE: { 15: OffsetSerializeWrapper offsetSerializeWrapper; 16: try { 17: offsetSerializeWrapper = this.readLocalOffset(); 18: } catch (MQClientException e) { 19: return -1; 20: } 21: if (offsetSerializeWrapper ! = null && offsetSerializeWrapper.getOffsetTable() ! = null) { 22: AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); 23: if (offset ! = null) { 24: this.updateOffset(mq, offset.get(), false); 25: return offset.get(); 26: } 27: } 28: } 29: default: 30: break; 31: } 32: } 33: 34: return -1; 35:}Copy the code
  • Line 16: fromfileRead the consumption progress.

RemoteBrokerOffsetStore# readOffset (…).

1: @Override 2: public long readOffset(final MessageQueue mq, final ReadOffsetType type) { 3: if (mq ! = null) { 4: switch (type) { 5: case MEMORY_FIRST_THEN_STORE: 6: case READ_FROM_MEMORY: { 7: AtomicLong offset = this.offsetTable.get(mq); 8: if (offset ! = null) { 9: return offset.get(); 10: } else if (ReadOffsetType.READ_FROM_MEMORY == type) { 11: return -1; 12: } 13: } 14: case READ_FROM_STORE: { 15: try { 16: long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); 17: AtomicLong offset = new AtomicLong(brokerOffset); 18: this.updateOffset(mq, offset.get(), false); 19: return brokerOffset; 20: } 21: // No offset in broker 22: catch (MQBrokerException e) { 23: return -1; 24: } 25: //Other exceptions 26: catch (Exception e) { 27: log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e); 28: return -2; 29: } 30: } 31: default: 32: break; 33: } 34: } 35: 36: return -1; 37:}Copy the code
  • Line 16: fromBrokerRead the consumption progress.

OffsetStore# updateOffset (…).

The method RemoteBrokerOffsetStore is implemented the same as LocalFileOffsetStore.

1: @Override 2: public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { 3: if (mq ! = null) { 4: AtomicLong offsetOld = this.offsetTable.get(mq); 5: if (null == offsetOld) { 6: offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); 7: } 8: 9: if (null ! = offsetOld) { 10: if (increaseOnly) { 11: MixAll.compareAndIncreaseOnly(offsetOld, offset); 12: } else { 13: offsetOld.set(offset); 14:} 15:} 16:} 17:}Copy the code

OffsetStore# persistAll (…).

LocalFileOffsetStore# persistAll (…).

1: @Override 2: public void persistAll(Set<MessageQueue> mqs) { 3: if (null == mqs || mqs.isEmpty()) 4: return; 5: 6: OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); 7: for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { 8: if (mqs.contains(entry.getKey())) { 9: AtomicLong offset = entry.getValue(); 10: offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); 11: } 12: } 13: 14: String jsonString = offsetSerializeWrapper.toJson(true); 15: if (jsonString ! = null) { 16: try { 17: MixAll.string2File(jsonString, this.storePath); 18: } catch (IOException e) { 19: log.error("persistAll consumer offset Exception, " + this.storePath, e); 20:} 21:} 22:}Copy the code
  • Persistent consumption schedule. Write the consumption progress to a file.

RemoteBrokerOffsetStore# persistAll (…).

1: @Override 2: public void persistAll(Set<MessageQueue> mqs) { 3: if (null == mqs || mqs.isEmpty()) 4: return; 5: 6: // persistent MessageQueue 7: final HashSet<MessageQueue> unusedMQ = new HashSet<>(); 8: if (! mqs.isEmpty()) { 9: for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { 10: MessageQueue mq = entry.getKey(); 11: AtomicLong offset = entry.getValue(); 12: if (offset ! = null) { 13: if (mqs.contains(mq)) { 14: try { 15: this.updateConsumeOffsetToBroker(mq, offset.get()); 16: log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", 17: this.groupName, 18: this.mQClientFactory.getClientId(), 19: mq, 20: offset.get()); 21: } catch (Exception e) { 22: log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); 23: } 24: } else { 25: unusedMQ.add(mq); 26:} 27:} 28:} 29:} 30: 31: // Remove invalid message queue 32: if (! unusedMQ.isEmpty()) { 33: for (MessageQueue mq : unusedMQ) { 34: this.offsetTable.remove(mq); 35: log.info("remove unused mq, {}, {}", mq, this.groupName); 36:} 37:} 38:}Copy the code
  • Persistency specifies the consumption progress of the message queue array toBrokerAnd remove the message queue unless specified.

MQClientInstance# persistAllConsumerOffset (…).

1: private void startScheduledTask() {2: // Periodically synchronizes the consumption progress 3: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 4: 5: @Override 6: public void run() { 7: try { 8: MQClientInstance.this.cleanOfflineBroker(); 9: MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); 10: } catch (Exception e) { 11: log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); 12: } 13: } 14: }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); 15:}Copy the code
  • Note: Periodic persistence, default period: 5000ms.
  • Important Note:
    • Consumption schedule persistence is more than just timed persistence. Operations such as pulling messages, allocating message queues, and so on, consume schedule persistence.
    • Consumption schedule persistence is more than just timed persistence. Operations such as pulling messages, allocating message queues, and so on, consume schedule persistence.
    • Consumption schedule persistence is more than just timed persistence. Operations such as pulling messages, allocating message queues, and so on, consume schedule persistence.

9, the end

😈 is probably the longest article in this series, please forgive me for any errors or inclarity. Thanks for reading, bookmarking, liking and sharing this series, and especially for turning to the end. 😜 really have diao diao long.