Self-praise: Dubbo Implementation Principles and Source Code Parsing — The Best Collection
Praise yourself: D Database Entity Design Collection
Abstract: the original source www.iocoder.cn/RocketMQ/me… “Taro source” welcome to reprint, keep the summary, thank you!
This article is based on RocketMQ 4.0.x official release
- 1, an overview of the
- 2, Consumer
- 3. PushConsumer
- PushConsumer subscribe
- DefaultMQPushConsumerImpl# subscribe (…).
- FilterAPI. BuildSubscriptionData (…).
- DefaultMQPushConsumer# registerMessageListener (…).
- DefaultMQPushConsumerImpl# subscribe (…).
- PushConsumer message queue allocation
- RebalanceService
- MQClientInstance# doRebalance (…).
- DefaultMQPushConsumerImpl# doRebalance (…).
- RebalanceImpl# doRebalance (…).
- RebalanceImpl# rebalanceByTopic (…).
- RebalanceImpl# removeUnnecessaryMessageQueue (…).
- RebalancePushImpl# removeUnnecessaryMessageQueue (…).
- [PullConsumer] RebalancePullImpl# removeUnnecessaryMessageQueue (…).
- RebalancePushImpl# dispatchPullRequest (…).
- DefaultMQPushConsumerImpl# executePullRequestImmediately (…).
- AllocateMessageQueueStrategy
- AllocateMessageQueueAveragely
- AllocateMessageQueueByMachineRoom
- AllocateMessageQueueAveragelyByCircle
- AllocateMessageQueueByConfig
- PushConsumer Reads the consumption progress
- RebalancePushImpl# computePullFromWhere (…).
- [PullConsumer] RebalancePullImpl# computePullFromWhere (…).
- PushConsumer pulls the message
- PullMessageService
- DefaultMQPushConsumerImpl# pullMessage (…).
- PullAPIWrapper# pullKernelImpl (…).
- PullAPIWrapper# recalculatePullFromWhichNode (…).
- MQClientInstance# findBrokerAddressInSubscribe (…).
- PullAPIWrapper# processPullResult (…).
- ProcessQueue# putMessage (…).
- PullAPIWrapper# pullKernelImpl (…).
- conclusion
- PushConsumer message
- ConsumeMessageConcurrentlyService submitted a consumer request
- ConsumeMessageConcurrentlyService# submitConsumeRequest (…).
- ConsumeMessageConcurrentlyService#submitConsumeRequestLater
- ConsumeRequest
- ConsumeMessageConcurrentlyService# processConsumeResult (…).
- ProcessQueue# removeMessage (…).
- ConsumeMessageConcurrentlyService# cleanExpireMsg (…).
- ProcessQueue# cleanExpiredMsg (…).
- ConsumeMessageConcurrentlyService submitted a consumer request
- PushConsumer returns a failure message
- DefaultMQPushConsumerImpl# sendMessageBack (…).
- MQClientAPIImpl# consumerSendMessageBack (…).
- DefaultMQPushConsumerImpl# sendMessageBack (…).
- 8, Consumer progress
- OffsetStore
- OffsetStore# load (…).
- LocalFileOffsetStore# load (…).
- OffsetSerializeWrapper
- RemoteBrokerOffsetStore# load (…).
- LocalFileOffsetStore# load (…).
- OffsetStore# readOffset (…).
- LocalFileOffsetStore# readOffset (…).
- RemoteBrokerOffsetStore# readOffset (…).
- OffsetStore# updateOffset (…).
- OffsetStore# persistAll (…).
- LocalFileOffsetStore# persistAll (…).
- RemoteBrokerOffsetStore# persistAll (…).
- MQClientInstance# persistAllConsumerOffset (…).
- OffsetStore# load (…).
- OffsetStore
- 9, the end
🙂🙂🙂 follow wechat public number:
- RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
- RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
- Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
- New source code parsing articles are notified in real time. It’s updated about once a week.
- Serious source communication wechat group.
1, an overview of the
RocketMQ source code Analysis — Message pull and consumption (part 1)
The main analysis of Consumer in the consumption logic involved in the source code.
2, Consumer
MQ provides two types of consumers:
- PushConsumer:
- Used in most scenarios.
- Despite the name
Push
At the beginning of the actual implementation, usePull
Method is implemented. throughPull
On and on and onpollingBroker
Get the message. When there are no new messages,Broker
会Pending requestUntil a new message is generated, the suspension is cancelled and a new message is returned. In this case, basicallyBroker
Take the initiative toPush
doClose to theOf course, there is a corresponding loss of real time. Principle of similarLong polling (Long-Polling
).
- PullConsumer
The PullConsumer will skip the sequential consumption. The PullConsumer will skip the sequential consumption. The PullConsumer will skip the sequential consumption.
3. PushConsumer
Take a look at the components that PushConsumer contains and how they interact:
RebalanceService
: balance message queue service, responsible for allocating currentConsumer
Consumable message queues (MessageQueue
). When there is a newConsumer
Is added to or removed from the message queue.PullMessageService
: pull message service,On and on and onfromBroker
Pull the message and submit the consuming task toConsumeMessageService
.ConsumeMessageService
: Consuming messaging services,On and on and onConsume messages and process the consumption results.RemoteBrokerOffsetStore
:Consumer
Consumption schedule management, responsible for fromBroker
Get consumption progress, synchronous consumption progress toBroker
.ProcessQueue
: message processing queue.MQClientInstance
: encapsulation forNamesrv
.Broker
The API call provided toProducer
,Consumer
Use.
PushConsumer subscribe
DefaultMQPushConsumerImpl# subscribe (…).
1: public void subscribe(String topic, String subExpression) throws MQClientException {2: try {3: // Create the subscription data 4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // 5: topic, subExpression); 6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); 7: // Synchronize Consumer information to Broker via heartbeat 8: if (this.mqclientFactory! = null) { 9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 10: } 11: } catch (Exception e) { 12: throw new MQClientException("subscription exception", e); 13:14:}}Copy the code
- Description: Subscribe
Topic
。 - Lines 3 through 6: Create subscription data. Parsing the see: FilterAPI. BuildSubscriptionData (…). .
- Lines 7 to 10: Sync via heartbeat
Consumer
Information to theBroker
.
FilterAPI. BuildSubscriptionData (…).
1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, 2: String subString) throws Exception { 3: SubscriptionData subscriptionData = new SubscriptionData(); 4: subscriptionData.setTopic(topic); 5: subscriptionData.setSubString(subString); 6: / / handle the subscribe expression 7: if (null = = the subString | | the subString. Equals (SubscriptionData. SUB_ALL) | | the subString. The length () = = 0) {8: subscriptionData.setSubString(SubscriptionData.SUB_ALL); 9: } else { 10: String[] tags = subString.split("\\|\\|"); 11: if (tags.length > 0) { 12: for (String tag : tags) { 13: if (tag.length() > 0) { 14: String trimString = tag.trim(); 15: if (trimString.length() > 0) { 16: subscriptionData.getTagsSet().add(trimString); 17: subscriptionData.getCodeSet().add(trimString.hashCode()); 18: } 19: } 20: } 21: } else { 22: throw new Exception("subString split error"); 23: } 24: } 25: 26: return subscriptionData; 27:}Copy the code
- Description: according to
Topic
And subscription expression to create subscription data - SubscriptionData. SubVersion = System. CurrentTimeMillis ().
DefaultMQPushConsumer# registerMessageListener (…).
1: public void registerMessageListener(MessageListenerConcurrently messageListener) { 2: this.messageListener = messageListener; 3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); 4:}Copy the code
- Note: Register message listeners.
PushConsumer message queue allocation
RebalanceService
1: public class RebalanceService extends ServiceThread {2: 3: /** 4: * Wait interval 5: */ 6: private static long waitInterval = 7: Long.parseLong(System.getProperty( 8: "rocketmq.client.rebalance.waitInterval", "20000")); 9: 10: private final Logger log = ClientLogger.getLog(); 11: /** 12: * MQClient object 13: */ 14: private final MQClientInstance mqClientFactory; 15: 16: public RebalanceService(MQClientInstance mqClientFactory) { 17: this.mqClientFactory = mqClientFactory; 18: } 19: 20: @Override 21: public void run() { 22: log.info(this.getServiceName() + " service started"); 23: 24: while (! this.isStopped()) { 25: this.waitForRunning(waitInterval); 26: this.mqClientFactory.doRebalance(); 27: } 28: 29: log.info(this.getServiceName() + " service end"); 30: } 31: 32: @Override 33: public String getServiceName() { 34: return RebalanceService.class.getSimpleName(); 35:36:}}Copy the code
- Balancing message queue service, responsible for allocating current
Consumer
Consumable message queues (MessageQueue
). -
Line 26: Call MQClientInstance#doRebalance(…) Allocate message queues. There are currently three cases of triggering:
- Such as
Line 25
Wait timeout is invoked every 20 seconds. PushConsumer
When started, callrebalanceService#wakeup(...)
The trigger.Broker
noticeConsumer
When you add or remove,Consumer
Call in response to notificationrebalanceService#wakeup(...)
The trigger.
MQClientInstance#doRebalance(…) .
- Such as
MQClientInstance# doRebalance (…).
1: public void doRebalance() { 2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { 3: MQConsumerInner impl = entry.getValue(); 4: if (impl ! = null) { 5: try { 6: impl.doRebalance(); 7: } catch (Throwable e) { 8: log.error("doRebalance exception", e); 9:} 10:} 11:} 12:}Copy the code
- Note: Iterate over the current
Client
Contains theconsumerTable
(Consumer
Collection) to perform message queue allocation. - doubt: Now the code is debugged,
consumerTable
Contains onlyConsumer
oneself 😈 have answer to this question greatly, please answer next. - Line 6: Call
MQConsumerInner#doRebalance(...)
Queue allocation.DefaultMQPushConsumerImpl
,DefaultMQPullConsumerImpl
The interface methods are implemented respectively.DefaultMQPushConsumerImpl#doRebalance(...)
See:DefaultMQPushConsumerImpl# doRebalance (…)..
DefaultMQPushConsumerImpl# doRebalance (…).
1: public void doRebalance() { 2: if (! this.pause) { 3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); 4:5:}}Copy the code
- Description: Performs message queue allocation.
- Line 3: Call
RebalanceImpl#doRebalance(...)
Queue allocation. See:RebalancePushImpl# doRebalance (…)..
RebalanceImpl# doRebalance (…).
5: */ 6: public void doRebalance(final Boolean isOrder) {6: public void doRebalance(final Boolean isOrder) {7: / / assign each topic message queue 8: Map < String, SubscriptionData > subTable = this. GetSubscriptionInner (); 9: if (subTable ! = null) { 10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { 11: final String topic = entry.getKey(); 12: try { 13: this.rebalanceByTopic(topic, isOrder); 14: } catch (Throwable e) { 15: if (! topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 16: log.warn("rebalanceByTopic Exception", e); 19:18:17:}}} 20:21:} / / remove not subscribe to the topic of the corresponding message queue 22: enclosing truncateMessageQueueNotMyTopic (); 23:} 24, 25, 26: / * * * remove not subscribe message queue 27: * / 28: private void truncateMessageQueueNotMyTopic () {29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); 30: for (MessageQueue mq : this.processQueueTable.keySet()) { 31: if (! subTable.containsKey(mq.getTopic())) { 32: 33: ProcessQueue pq = this.processQueueTable.remove(mq); 34: if (pq ! = null) { 35: pq.setDropped(true); 36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq); 37:} 38:} 39:} 40:}Copy the code
#doRebalance(...)
Description: Execute the allocation message queue.- Lines 7 to 20: Cycle to subscribe to the topic collection (
subscriptionInner
), assign each oneTopic
Message queue of. - Line 22: Remove unsubscribed
Topic
Message queue of.
- Lines 7 to 20: Cycle to subscribe to the topic collection (
#truncateMessageQueueNotMyTopic(...)
Description: Remove unsubscribed message queues. When callingDefaultMQPushConsumer#unsubscribe(topic)
Only the subscribed topic collection (subscriptionInner
), corresponding message queue removal in this method.
RebalanceImpl# rebalanceByTopic (…).
1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
2: switch (messageModel) {
3: case BROADCASTING: {
4: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
5: if (mqSet != null) {
6: boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
7: if (changed) {
8: this.messageQueueChanged(topic, mqSet, mqSet);
9: log.info("messageQueueChanged {} {} {} {}", //
10: consumerGroup, //
11: topic, //
12: mqSet, //
13: mqSet);
14: }
15: } else {
16: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
17: }
18: break;
19: }
20: case CLUSTERING: {
21: // 获取 topic 对应的 队列 和 consumer信息
22: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
23: List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
24: if (null == mqSet) {
25: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
26: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
27: }
28: }
29:
30: if (null == cidAll) {
31: log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
32: }
33:
34: if (mqSet != null && cidAll != null) {
35: // 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。
36: List<MessageQueue> mqAll = new ArrayList<>();
37: mqAll.addAll(mqSet);
38:
39: Collections.sort(mqAll);
40: Collections.sort(cidAll);
41:
42: AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
43:
44: // 根据 队列分配策略 分配消息队列
45: List<MessageQueue> allocateResult;
46: try {
47: allocateResult = strategy.allocate(//
48: this.consumerGroup, //
49: this.mQClientFactory.getClientId(), //
50: mqAll, //
51: cidAll);
52: } catch (Throwable e) {
53: log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
54: e);
55: return;
56: }
57:
58: Set<MessageQueue> allocateResultSet = new HashSet<>();
59: if (allocateResult != null) {
60: allocateResultSet.addAll(allocateResult);
61: }
62:
63: // 更新消息队列
64: boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
65: if (changed) {
66: log.info(
67: "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
68: strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
69: allocateResultSet.size(), allocateResultSet);
70: this.messageQueueChanged(topic, mqSet, allocateResultSet);
71: }
72: }
73: break;
74: }
75: default:
76: break;
77: }
78: }
79:
80: /**
81: * 当负载均衡时,更新 消息处理队列
82: * - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
83: * - 增加 不在processQueueTable && 存在于mqSet 里的消息队列
84: *
85: * @param topic Topic
86: * @param mqSet 负载均衡结果后的消息队列数组
87: * @param isOrder 是否顺序
88: * @return 是否变更
89: */
90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
91: boolean changed = false;
92:
93: // 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
94: Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
95: while (it.hasNext()) { // TODO 待读:
96: Entry<MessageQueue, ProcessQueue> next = it.next();
97: MessageQueue mq = next.getKey();
98: ProcessQueue pq = next.getValue();
99:
100: if (mq.getTopic().equals(topic)) {
101: if (!mqSet.contains(mq)) { // 不包含的队列
102: pq.setDropped(true);
103: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104: it.remove();
105: changed = true;
106: log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107: }
108: } else if (pq.isPullExpired()) { // 队列拉取超时,进行清理
109: switch (this.consumeType()) {
110: case CONSUME_ACTIVELY:
111: break;
112: case CONSUME_PASSIVELY:
113: pq.setDropped(true);
114: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115: it.remove();
116: changed = true;
117: log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118: consumerGroup, mq);
119: }
120: break;
121: default:
122: break;
123: }
124: }
125: }
126: }
127:
128: // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。
129: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组
130: for (MessageQueue mq : mqSet) {
131: if (!this.processQueueTable.containsKey(mq)) {
132: if (isOrder && !this.lock(mq)) {
133: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134: continue;
135: }
136:
137: this.removeDirtyOffset(mq);
138: ProcessQueue pq = new ProcessQueue();
139: long nextOffset = this.computePullFromWhere(mq);
140: if (nextOffset >= 0) {
141: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142: if (pre != null) {
143: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144: } else {
145: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146: PullRequest pullRequest = new PullRequest();
147: pullRequest.setConsumerGroup(consumerGroup);
148: pullRequest.setNextOffset(nextOffset);
149: pullRequest.setMessageQueue(mq);
150: pullRequest.setProcessQueue(pq);
151: pullRequestList.add(pullRequest);
152: changed = true;
153: }
154: } else {
155: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156: }
157: }
158: }
159:
160: // 发起消息拉取请求
161: this.dispatchPullRequest(pullRequestList);
162:
163: return changed;
164: }
Copy the code
#rebalanceByTopic(...)
Description: DistributionTopic
Message queue of.- Lines 3 to 19: Broadcast mode (
BROADCASTING
), distributionTopic
The correspondingallMessage queues. - Lines 20 to 74: Cluster mode (
CLUSTERING
), distributionTopic
The correspondingPart of theMessage queues.- Lines 21 to 40: get
Topic
Corresponding message queues and consumers, and sort them. For all theConsumer
The message queue is allocated locally and sorted to ensure that all messages are sortedConsumer
The order is consistent. - Lines 42 to 61: According to the queue allocation policy (
AllocateMessageQueueStrategy
) allocates message queues. See:AllocateMessageQueueStrategy. - Lines 63 to 72: Updates
Topic
Corresponding message queue.
- Lines 21 to 40: get
- Lines 3 to 19: Broadcast mode (
#updateProcessQueueTableInRebalance(...)
Note: Updates when allocating queuesTopic
Corresponding message queue and returns whether there has been a change.- Lines 93 to 126: Remove message queues that do not exist in allocation (
mqSet
) message processing queue (processQueueTable
).- Line 103: Remove unwanted message queues. Parsing the see: RebalancePushImpl# removeUnnecessaryMessageQueue (…). .
- Lines 108 to 120: queue pull timeout, i.e
Current time - Last pull message time > 120s
(120S configurable), the decision occursBUGThe message queue is removed when the message is not pulled for a long time. After removal, below# add queue logic #You can rejoin the new message queue.
- Line 128 to 158: Add the allocated message queue (
mqSet
) the new message queue.- Lines 132 to 135:
Order consumption
Related skip, detailed analysis see:RocketMQ source Code Analysis — Message ordering and consumption. - Line 137: Removes the message queue consumption progress.
- Line 139: Get the queue consumption progress. RebalancePushImpl#computePullFromWhere(…) .
- Line 140 to 156: Add new consumer processing queue, add consumer pull message request.
- Lines 132 to 135:
- Line 161: Initiates the new message queue message pull request. RebalancePushImpl#dispatchPullRequest(…) .
- Lines 93 to 126: Remove message queues that do not exist in allocation (
RebalanceImpl# removeUnnecessaryMessageQueue (…).
RebalancePushImpl# removeUnnecessaryMessageQueue (…).
1: public Boolean removeUnnecessaryMessageQueue (MessageQueue mq, ProcessQueue pq) {2: / / synchronization queue consumption progress, and remove it. 3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); 4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); 5:6: / / TODO order consumption if (this. DefaultMQPushConsumerImpl. IsConsumeOrderly (7) : && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { 8: try { 9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { 10: try { 11: return this.unlockDelay(mq, pq); 12: } finally { 13: pq.getLockConsume().unlock(); 14: } 15: } else { 16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", // 17: mq, // 18: pq.getTryUnlockTimes()); 19: 20: pq.incTryUnlockTimes(); 21: } 22: } catch (Exception e) { 23: log.error("removeUnnecessaryMessageQueue Exception", e); 24: } 25: 26: return false; 27: } 28: return true; 29:}Copy the code
- Remove unwanted message queue-related information and return whether the removal was successful.
- Lines 2 to 4: synchronize the consumption progress of the queue and remove it.
- Lines 5 to 27:
Order consumption
Related skip, detailed analysis see:RocketMQ source Code Analysis — Message ordering and consumption.
[PullConsumer]
RebalancePullImpl# removeUnnecessaryMessageQueue (…).
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { 2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq); 3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); 4: return true; 5:}Copy the code
- Note: Remove unneeded message queue-related information and return removal success. and
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
Basically the same.
RebalancePushImpl# dispatchPullRequest (…).
1: public void dispatchPullRequest(List<PullRequest> pullRequestList) { 2: for (PullRequest pullRequest : pullRequestList) { 3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); 4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); 6:5:}}Copy the code
- Description: Initiates a message pull request. The call is
PushConsumer
Constantly pulling at the starting point of the message.
DefaultMQPushConsumerImpl# executePullRequestImmediately (…).
1: public void executePullRequestImmediately(final PullRequest pullRequest) { 2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); 3:}Copy the code
- Note: Submit a pull request. After submission,
PullMessageService
Asynchronous execution.non-blocking. See:PullMessageService.
AllocateMessageQueueStrategy
AllocateMessageQueueAveragely
1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { 2: private final Logger log = ClientLogger.getLog(); 3: 4: @Override 5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: A List < String > cidAll) {7: / / calibration parameters correctly 8: if (currentCID = = null | | currentCID. The length () < 1) {9: throw new IllegalArgumentException("currentCID is empty"); 10: } 11: if (mqAll == null || mqAll.isEmpty()) { 12: throw new IllegalArgumentException("mqAll is null or mqAll empty"); 13: } 14: if (cidAll == null || cidAll.isEmpty()) { 15: throw new IllegalArgumentException("cidAll is null or cidAll empty"); 16: } 17: 18: List<MessageQueue> result = new ArrayList<>(); 19: if (! cidAll.contains(currentCID)) { 20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 21: consumerGroup, 22: currentCID, 23: cidAll); 24: return result; 27: int index = cidall.indexof (currentCID); // The number of consumers. 28: int mod = mqAll.size() % cidAll.size(); // The remainder, which is how many message queues are not evenly allocated. 29: int averageSize = 30: mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() 31: + 1 : mqAll.size() / cidAll.size()); 32: int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // If there is a remainder, [0, mod) splits the remainder, i.e. each consumer is allocated one more node; Int range = math.min (averageSize, mqall.size () -startIndex); 34: for (int I = 0; I < range; I ++) {35: for (int I = 0; I < range; I ++) result.add(mqAll.get((startIndex + i) % mqAll.size())); 36: } 37: return result; 38: } 39: 40: @Override 41: public String getName() { 42: return "AVG"; 43: } 44: }Copy the code
- Note: Evenly allocate queue policies.
- Lines 7 to 25: parameter verification.
- Lines 26 to 36: Evenly allocate message queues.
- 27:
index
: the currentConsumer
In the consumer cluster is the number. So here’s why you need a pair of incoming, okaycidAll
The reason arguments must be sorted. If I don’t sort it,Consumer
It was calculated locallyindex
Inconsistency affects the calculation results. - Line 28:
mod
: Remainder, which is how many message queues are not evenly allocated. - Lines 29 to 31:
averageSize
: Code can be simplified to(mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
.[ 0, mod )
:mqAll.size() / cidAll.size() + 1
. In front of themod
个Consumer
Divide the remainder and get 1 more message queue.[ mod, cidAll.size() )
:mqAll.size() / cidAll.size()
.
- Line 32:
startIndex
:Consumer
Allocates the start position of the message queue. - Line 33:
range
: Allocates the number of queues. The reason forMath#min(...)
The reason: whenmqAll.size() <= cidAll.size()
When, last fewConsumer
Message queue could not be allocated. - Lines 34 to 36: Generate the results of the allocation message queue.
- 27:
- Here’s an example:
Fixed message queue length is 4.
Consumer 2 It’s divisible by * | Consumer 3 Not divisible * | Consumer 5 Can’t all allocate * | |
---|---|---|---|
Message queue [0] | Consumer[0] | Consumer[0] | Consumer[0] |
Message queue [1] | Consumer[0] | Consumer[0] | Consumer[1] |
Message queue [2] | Consumer[1] | Consumer[1] | Consumer[2] |
Message queue [3] | Consumer[1] | Consumer[2] | Consumer[3] |
AllocateMessageQueueByMachineRoom
1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy { 2: /** 3: * Consumer consumption brokerName Set 4: */ 5: Private Set<String> BrokerIdCs; 6: 7: @Override 8: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 9: List<String> cidAll) {10: List<MessageQueue> result = new ArrayList<MessageQueue>(); 12: int currentIndex = cidAll.indexOf(currentCID); 13: if (currentIndex < 0) { 14: return result; 17: List<MessageQueue> premqAll = new ArrayList<MessageQueue>(); 18: for (MessageQueue mq : mqAll) { 19: String[] temp = mq.getBrokerName().split("@"); 20: if (temp.length == 2 && consumeridcs.contains(temp[0])) { 21: premqAll.add(mq); 16: int mod = premqall.size ()/cidall.size (); 26: int rem = premqAll.size() % cidAll.size(); 27: int startIndex = mod * currentIndex; 28: int endIndex = startIndex + mod; 29: for (int i = startIndex; i < endIndex; i++) { 30: result.add(mqAll.get(i)); 31: } 32: if (rem > currentIndex) { 33: result.add(premqAll.get(currentIndex + mod * cidAll.size())); 34: } 35: return result; 36: } 37: 38: @Override 39: public String getName() { 40: return "MACHINE_ROOM"; 41: } 42: 43: public Set<String> getConsumeridcs() { 44: return consumeridcs; 45: } 46: 47: public void setConsumeridcs(Set<String> consumeridcs) { 48: this.consumeridcs = consumeridcs; 49:50:}}Copy the code
- Description:On average,distributionconsumable
Broker
Corresponding message queue. - Lines 7 to 15: parameter verification.
- Lines 16 to 23: calculationsconsumable
Broker
Corresponding message queue. - Lines 25 to 34: Evenly allocate message queues. theThe average distributionWay and
AllocateMessageQueueAveragely
This is slightly different, with the extra closing portion allocated to the frontrem
个Consumer
. - Question: When using this allocation policy,
Consumer
和Broker
How to configure allocation. Study 😈 etc.master-slaveConsider the source code carefully.
AllocateMessageQueueAveragelyByCircle
1: public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy { 2: private final Logger log = ClientLogger.getLog(); 3: 4: @Override 5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: A List < String > cidAll) {7: / / calibration parameters correctly 8: if (currentCID = = null | | currentCID. The length () < 1) {9: throw new IllegalArgumentException("currentCID is empty"); 10: } 11: if (mqAll == null || mqAll.isEmpty()) { 12: throw new IllegalArgumentException("mqAll is null or mqAll empty"); 13: } 14: if (cidAll == null || cidAll.isEmpty()) { 15: throw new IllegalArgumentException("cidAll is null or cidAll empty"); 16: } 17: 18: List<MessageQueue> result = new ArrayList<MessageQueue>(); 19: if (! cidAll.contains(currentCID)) { 20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 21: consumerGroup, 22: currentCID, 23: cidAll); 24: return result; Int index = cidall.indexof (currentCID); 29: for (int i = index; i < mqAll.size(); i++) { 30: if (i % cidAll.size() == index) { 31: result.add(mqAll.get(i)); 32: } 33: } 34: return result; 35: } 36: 37: @Override 38: public String getName() { 39: return "AVG_BY_CIRCLE"; 40:41:}}Copy the code
- Circular allocation message queue.
AllocateMessageQueueByConfig
1: public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { 2: private List<MessageQueue> messageQueueList; 3: 4: @Override 5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) { 7: return this.messageQueueList; 8: } 9: 10: @Override 11: public String getName() { 12: return "CONFIG"; 13: } 14: 15: public List<MessageQueue> getMessageQueueList() { 16: return messageQueueList; 17: } 18: 19: public void setMessageQueueList(List<MessageQueue> messageQueueList) { 20: this.messageQueueList = messageQueueList; 21, 22:}}Copy the code
- Description: Allocates configured message queues.
- Question:Usage scenarios of the allocation policy.
PushConsumer Reads the consumption progress
RebalancePushImpl# computePullFromWhere (…).
1: public long computePullFromWhere(MessageQueue mq) { 2: long result = -1; 3: final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); 4: final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); 5: switch (consumeFromWhere) {6: case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: // Discard 7: Case CONSUME_FROM_MIN_OFFSET: // Discarded 8: case CONSUME_FROM_MAX_OFFSET: // discarded 9: case CONSUME_FROM_LAST_OFFSET: {10: case CONSUME_FROM_LAST_OFFSET: {10: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); 11: if (lastOffset >= 0) { 12: result = lastOffset; 13: } 14: // First start,no offset 15: else if (-1 == lastOffset) { 16: if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 17: result = 0L; 18: } else { 19: try { 20: result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); 21: } catch (MQClientException e) { 22: result = -1; 23: } 24: } 25: } else { 26: result = -1; 27: } 28: break; 29: } 30: case CONSUME_FROM_FIRST_OFFSET: { 31: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); 32: if (lastOffset >= 0) { 33: result = lastOffset; 34: } else if (-1 == lastOffset) { 35: result = 0L; 36: } else { 37: result = -1; 38: } 39: break; 40: } 41: case CONSUME_FROM_TIMESTAMP: { 42: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); 43: if (lastOffset >= 0) { 44: result = lastOffset; 45: } else if (-1 == lastOffset) { 46: if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 47: try { 48: result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); 49: } catch (MQClientException e) { 50: result = -1; 51: } 52: } else { 53: try { 54: long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), 55: UtilAll.YYYY_MMDD_HHMMSS).getTime(); 56: result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); 57: } catch (MQClientException e) { 58: result = -1; 59: } 60: } 61: } else { 62: result = -1; 63: } 64: break; 65: } 66: 67: default: 68: break; 69: } 70: 71: return result; 72:}Copy the code
- Note: Calculates where message queue consumption begins.
PushConsumer
There are three options for reading the consumption progress:CONSUME_FROM_LAST_OFFSET
: Lines 6-29: A new consumer cluster starts for the first time fromThe last position of the queueStart spending.And then you start again and you start spending where you started last time.CONSUME_FROM_FIRST_OFFSET
: Lines 30 to 40: A new consumer cluster is first started from the queueThe most former positionStart spending.And then you start again and you start spending where you started last time.CONSUME_FROM_TIMESTAMP
: Lines 41 to 65: A new consumer cluster is first started fromSpecified point in timeStart spending.And then you start again and you start spending where you started last time.
[PullConsumer]
RebalancePullImpl# computePullFromWhere (…).
Skip it for now. 😈
PushConsumer pulls the message
PullMessageService
1: public class PullMessageService extends ServiceThread { 2: private final Logger log = ClientLogger.getLog(); 3: /** 4: * pull message request queue 5: */ 6: private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<>(); 7: /** 8: * MQClient object 9: */ 10: private final MQClientInstance mQClientFactory; 11: /** 12: * timer. Delay submitting pull requests 13: */ 14: Private Final ScheduledExecutorService ScheduledExecutorService = Executors 15: .newSingleThreadScheduledExecutor(new ThreadFactory() { 16: @Override 17: public Thread newThread(Runnable r) { 18: return new Thread(r, "PullMessageServiceScheduledThread"); 19:20:}}); 21: 22: public PullMessageService(MQClientInstance mQClientFactory) { 23: this.mQClientFactory = mQClientFactory; 24:} 25: 26: /** 27: * Execute delay pull message request 28: * 29: * @param pullRequest pull message request 30: * @param timeDelay Duration 31: */ 32: public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { 33: this.scheduledExecutorService.schedule(new Runnable() { 34: 35: @Override 36: public void run() { 37: PullMessageService.this.executePullRequestImmediately(pullRequest); 38: } 39: }, timeDelay, TimeUnit.MILLISECONDS); } 41:42: /** 43: * execute immediate pullRequest 44: * 45: * @param pullRequest pullRequest 46: */ 47: public void executePullRequestImmediately(final PullRequest pullRequest) { 48: try { 49: this.pullRequestQueue.put(pullRequest); 50: } catch (InterruptedException e) { 51: log.error("executePullRequestImmediately pullRequestQueue.put", e); 52:} 53:} 54:55: /** 56: * Executing a delayed task 57: * 58: * @param r Task 59: * @param timeDelay Duration 60: */ 61: public void executeTaskLater(final Runnable r, final long timeDelay) { 62: this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); 63: } 64: 65: public ScheduledExecutorService getScheduledExecutorService() { 66: return scheduledExecutorService; } 69:69: /** 70: * pullRequest 71: * 72: * @param pullRequest 73: */ 74: private void pullMessage(final PullRequest pullRequest) { 75: final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); 76: if (consumer ! = null) { 77: DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; 78: impl.pullMessage(pullRequest); 79: } else { 80: log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); 81: } 82: } 83: 84: @Override 85: public void run() { 86: log.info(this.getServiceName() + " service started"); 87: 88: while (! this.isStopped()) { 89: try { 90: PullRequest pullRequest = this.pullRequestQueue.take(); 91: if (pullRequest ! = null) { 92: this.pullMessage(pullRequest); 93: } 94: } catch (InterruptedException e) { 95: } catch (Exception e) { 96: log.error("Pull Message Service Run Method exception", e); 97: } 98: } 99: 100: log.info(this.getServiceName() + " service end"); 101: } 102: 103: @Override 104: public String getServiceName() { 105: return PullMessageService.class.getSimpleName(); 106:} 107: 108:}Copy the code
- Pull message service, constantly from constantly
Broker
Pull the message and submit the consuming task toConsumeMessageService
. #executePullRequestLater(...)
: Lines 26 to 40: SubmitdelayPull the message request.#executePullRequestImmediately(...)
: Lines 42 to 53: SubmitimmediatelyPull the message request.#executeTaskLater(...)
: Lines 55 to 63: SubmitDelayed tasks.#pullMessage(...)
: Lines 69 through 82 perform pull message logic. See:DefaultMQPushConsumerImpl# pullMessage (…)..#run(...)
: Lines 84 through 101: loop pull message request queue (pullRequestQueue
) for message pull.
DefaultMQPushConsumerImpl# pullMessage (…).
1: public void pullMessage(final PullRequest pullRequest) {
2: final ProcessQueue processQueue = pullRequest.getProcessQueue();
3: if (processQueue.isDropped()) {
4: log.info("the pull request[{}] is dropped.", pullRequest.toString());
5: return;
6: }
7:
8: // 设置队列最后拉取消息时间
9: pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
10:
11: // 判断consumer状态是否运行中。如果不是,则延迟拉取消息。
12: try {
13: this.makeSureStateOK();
14: } catch (MQClientException e) {
15: log.warn("pullMessage exception, consumer state not ok", e);
16: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
17: return;
18: }
19:
20: // 判断是否暂停中。
21: if (this.isPause()) {
22: log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
23: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
24: return;
25: }
26:
27: // 判断是否超过最大持有消息数量。默认最大值为1000。
28: long size = processQueue.getMsgCount().get();
29: if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
30: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延迟消息拉取请求。50ms。
31: if ((flowControlTimes1++ % 1000) == 0) {
32: log.warn(
33: "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
34: processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
35: }
36: return;
37: }
38:
39: if (!this.consumeOrderly) { // 判断消息跨度是否过大。
40: if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
41: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延迟消息拉取请求。50ms。
42: if ((flowControlTimes2++ % 1000) == 0) {
43: log.warn(
44: "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
45: processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
46: pullRequest, flowControlTimes2);
47: }
48: return;
49: }
50: } else { // TODO 顺序消费
51: if (processQueue.isLocked()) {
52: if (!pullRequest.isLockedFirst()) {
53: final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
54: boolean brokerBusy = offset < pullRequest.getNextOffset();
55: log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
56: pullRequest, offset, brokerBusy);
57: if (brokerBusy) {
58: log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
59: pullRequest, offset);
60: }
61:
62: pullRequest.setLockedFirst(true);
63: pullRequest.setNextOffset(offset);
64: }
65: } else {
66: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
67: log.info("pull message later because not locked in broker, {}", pullRequest);
68: return;
69: }
70: }
71:
72: // 获取Topic 对应的订阅信息。若不存在,则延迟拉取消息
73: final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
74: if (null == subscriptionData) {
75: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
76: log.warn("find the consumer's subscription failed, {}", pullRequest);
77: return;
78: }
79:
80: final long beginTimestamp = System.currentTimeMillis();
81:
82: PullCallback pullCallback = new PullCallback() {
83: @Override
84: public void onSuccess(PullResult pullResult) {
85: if (pullResult != null) {
86: pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
87: subscriptionData);
88:
89: switch (pullResult.getPullStatus()) {
90: case FOUND:
91: // 设置下次拉取消息队列位置
92: long prevRequestOffset = pullRequest.getNextOffset();
93: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
94:
95: // 统计
96: long pullRT = System.currentTimeMillis() - beginTimestamp;
97: DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
98: pullRequest.getMessageQueue().getTopic(), pullRT);
99:
100: long firstMsgOffset = Long.MAX_VALUE;
101: if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
102: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
103: } else {
104: firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
105:
106: // 统计
107: DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
108: pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
109:
110: // 提交拉取到的消息到消息处理队列
111: boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
112:
113: // 提交消费请求
114: DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
115: pullResult.getMsgFoundList(), //
116: processQueue, //
117: pullRequest.getMessageQueue(), //
118: dispathToConsume);
119:
120: // 提交下次拉取消息请求
121: if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
122: DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
123: DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
124: } else {
125: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
126: }
127: }
128:
129: // 下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出log
130: if (pullResult.getNextBeginOffset() < prevRequestOffset//
131: || firstMsgOffset < prevRequestOffset) {
132: log.warn(
133: "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
134: pullResult.getNextBeginOffset(), //
135: firstMsgOffset, //
136: prevRequestOffset);
137: }
138:
139: break;
140: case NO_NEW_MSG:
141: // 设置下次拉取消息队列位置
142: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
143:
144: // 持久化消费进度
145: DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
146:
147: // 立即提交拉取消息请求
148: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
149: break;
150: case NO_MATCHED_MSG:
151: // 设置下次拉取消息队列位置
152: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
153:
154: // 持久化消费进度
155: DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
156:
157: // 提交立即拉取消息请求
158: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
159: break;
160: case OFFSET_ILLEGAL:
161: log.warn("the pull request offset illegal, {} {}", //
162: pullRequest.toString(), pullResult.toString());
163: // 设置下次拉取消息队列位置
164: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
165:
166: // 设置消息处理队列为dropped
167: pullRequest.getProcessQueue().setDropped(true);
168:
169: // 提交延迟任务,进行消费处理队列移除。不立即移除的原因:可能有地方正在使用,避免受到影响。
170: DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
171:
172: @Override
173: public void run() {
174: try {
175: // 更新消费进度,同步消费进度到Broker
176: DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
177: pullRequest.getNextOffset(), false);
178: DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
179:
180: // 移除消费处理队列
181: DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
182:
183: log.warn("fix the pull request offset, {}", pullRequest);
184: } catch (Throwable e) {
185: log.error("executeTaskLater Exception", e);
186: }
187: }
188: }, 10000);
189: break;
190: default:
191: break;
192: }
193: }
194: }
195:
196: @Override
197: public void onException(Throwable e) {
198: if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
199: log.warn("execute the pull request exception", e);
200: }
201:
202: // 提交延迟拉取消息请求
203: DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
204: }
205: };
206:
207: // 集群消息模型下,计算提交的消费进度。
208: boolean commitOffsetEnable = false;
209: long commitOffsetValue = 0L;
210: if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
211: commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
212: if (commitOffsetValue > 0) {
213: commitOffsetEnable = true;
214: }
215: }
216:
217: // 计算请求的 订阅表达式 和 是否进行filtersrv过滤消息
218: String subExpression = null;
219: boolean classFilter = false;
220: SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
221: if (sd != null) {
222: if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
223: subExpression = sd.getSubString();
224: }
225:
226: classFilter = sd.isClassFilterMode();
227: }
228:
229: // 计算拉取消息系统标识
230: int sysFlag = PullSysFlag.buildSysFlag(//
231: commitOffsetEnable, // commitOffset
232: true, // suspend
233: subExpression != null, // subscription
234: classFilter // class filter
235: );
236:
237: // 执行拉取。如果拉取请求发生异常时,提交延迟拉取消息请求。
238: try {
239: this.pullAPIWrapper.pullKernelImpl(//
240: pullRequest.getMessageQueue(), // 1
241: subExpression, // 2
242: subscriptionData.getSubVersion(), // 3
243: pullRequest.getNextOffset(), // 4
244: this.defaultMQPushConsumer.getPullBatchSize(), // 5
245: sysFlag, // 6
246: commitOffsetValue, // 7
247: BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
248: CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
249: CommunicationMode.ASYNC, // 10
250: pullCallback// 11
251: );
252: } catch (Exception e) {
253: log.error("pullKernelImpl exception", e);
254: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
255: }
256: }
257:
258: private void correctTagsOffset(final PullRequest pullRequest) {
259: if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
260: this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
261: }
262: }
Copy the code
#pullMessage(...)
Description: Pull message.- Lines 3 to 6: The message processing queue has terminated and no message pull is performed.
- Line 9: Sets the time when the message processing queue finally pulls the message.
- Lines 11 to 18:
Consumer
Not in the running state, no message pull, commitdelayPull the message request. - Lines 20 to 25:
Consumer
Pause, no message pull, commitdelayPull the message request. - Lines 27 to 37: The message processing queue holds more messages than the maximum allowed value (default: 1000), does not pull messages, and submits delayed pull message requests.
- Lines 39 to 49:
Consumer
为Concurrent consumptionAnd the message queue holds the message span is too large (message span = the difference between the position of the last message and the first message, default: 2000), does not carry out the message pull, submitdelayPull the message request. - Lines 50 to 70:
Order consumption
Related skip, detailed analysis see:RocketMQ source Code Analysis — Message ordering and consumption. - Lines 72 to 78:
Topic
The corresponding subscription information does not exist, no message pull, commitdelayPull the message request. - Lines 222 through 224: Determine whether the request is used
Consumer
localSubscription information (SubscriptionData
) instead of usingBroker
Subscription information in. See:PullMessageProcessor# the processRequest (…). Lines 64 through 110. - Line 226: Whether to enable filtering class filtering mode. See RocketMQ source Code Analysis – Filtersrv for details.
- Lines 229 through 235: Calculate the pull message request system identity. Parsing the see: PullMessageRequestHeader sysFlag.
- Lines 237 to 255:
- Perform the message pull asynchronous request. PullAPIWrapper#pullKernelImpl(…) .
- Commit when the originating request raises an exceptiondelayPull the message request. The corresponding
Broker
See the logic for handling pull messages:PullMessageProcessor# the processRequest (…)..
PullCallback
: Pull message callback:- Line 86: Process the pull result. PullAPIWrapper#processPullResult(…) .
- Lines 89 to 192: Processing pull state results:
- Lines 90 to 139: Pull message (
FOUND
) :- Line 91 through 93: Sets the next pull message queue location.
- Lines 95 to 97: statistics.
- Lines 101 through 102: The list of messages pulled to messages is empty, submit an immediate pull message request. Why is there a pull message, but the message result is not empty? See: PullAPIWrapper#processPullResult(…) .
- Lines 106 to 108: statistics.
- Line 111: Commits the pulled message to the message processing queue. ProcessQueue#putMessage(…) .
- Lines 113 through 118: Submit the consumption request to
ConsumeMessageService
. See:ConsumeMessageConcurrentlyService. - Lines 120 to 126: Depending on the pull frequency (
pullInterval
Submitted),Immediate or delayedPull the message request. Default pull frequency is 0ms, commitimmediatelyPull the message request. - Lines 129 to 137: the position of the next pull message queue is smaller than that of the last pull message queue or the position of the first message queue is smaller than that of the last pull message queueBUGTo output warning logs.
- Line 140 to 149: No new message (
NO_NEW_MSG
) :
- Line 140 to 149: No new message (
- Line 142: Sets the next pull message queue location.
- Line 145: Correct consumption schedule. See:
#correctTagsOffset(...)
. - Line 148: SubmitimmediatelyPull the message request.
- Lines 150-159: New message but no match (
NO_MATCHED_MSG
). Logic withNO_NEW_MSG
。 - Line 160 to 189: Message queue position of pull request is invalid (
OFFSET_ILLEGAL
).
- Lines 150-159: New message but no match (
- Line 164: Sets the position of the next pull message queue.
- Line 167: Sets the message processing queue to
dropped
. - Lines 169 to 188: Submit the deferred task for queue removal.
- Line 175 to 178: Update the consumption progress to synchronize the consumption progress to
Broker
. - Line 181: Removes the consumer processing queue.
- Question: Why not immediately remove??
- Lines 196 to 204: An exception has occurred, submitting a delayed pull message request.
- Question: Why not immediately remove??
- Line 175 to 178: Update the consumption progress to synchronize the consumption progress to
- Lines 90 to 139: Pull message (
#correctTagsOffset(...)
: Correct consumption progress.- Lines 258-261: When the number of messages held in the consumption processing queue is 0, the update consumption progress is the pull message queue position of the pull request.
PullAPIWrapper# pullKernelImpl (…).
1: /** 2: * pull message core method 3: * 4: * @param MQ message queue 5: * @param subExpression subscription expression 6: * @param subVersion subscription version 7: * @param offset Start position of the pull queue 8: * @param maxNums Number of pull messages 9: * @param sysFlag System ID of the pull request 10: * @param commitOffset Submission consumption progress 11: * @ param brokerSuspendMaxTimeMillis broker suspend request maximum time 12: * @ param timeoutMillis request broker timeout value 13: * @param communicationMode 14: * @param pullCallback 15: * @return pull message result. The result is returned only when the communication mode is synchronous. Otherwise, null is returned. 16: * @throws MQClientException When no broker is found or other client exceptions occur. 17: * @throws RemotingException When an exception occurs on the remote call 18: * @throws MQBrokerException When an exception occurs to the broker. This exception occurs only when the communication mode is synchronous. 20: */ 21: protected PullResult pullKernelImpl(22: final MessageQueue mq, 23: final String subExpression, 24: final long subVersion, 25: final long offset, 26: final int maxNums, 27: final int sysFlag, 28: final long commitOffset, 29: final long brokerSuspendMaxTimeMillis, 30: final long timeoutMillis, 31: final CommunicationMode communicationMode, 32: final PullCallback pullCallback 33: Throws MQClientException, RemotingException, MQBrokerException, InterruptedException {34: // Get Broker information 35: FindBrokerResult findBrokerResult = 36: this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 37: this.recalculatePullFromWhichNode(mq), false); 38: if (null == findBrokerResult) { 39: this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); 40: findBrokerResult = 41: this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 42: this.recalculatePullFromWhichNode(mq), false); 43:} 44:45: // Request pull message 46: If (findBrokerResult! = null) { 47: int sysFlagInner = sysFlag; 48: 49: if (findBrokerResult.isSlave()) { 50: sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); 51: } 52: 53: PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); 54: requestHeader.setConsumerGroup(this.consumerGroup); 55: requestHeader.setTopic(mq.getTopic()); 56: requestHeader.setQueueId(mq.getQueueId()); 57: requestHeader.setQueueOffset(offset); 58: requestHeader.setMaxMsgNums(maxNums); 59: requestHeader.setSysFlag(sysFlagInner); 60: requestHeader.setCommitOffset(commitOffset); 61: requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); 62: requestHeader.setSubscription(subExpression); 63: requestHeader.setSubVersion(subVersion); 64: 65: String brokerAddr = findBrokerResult.getBrokerAddr(); 66: if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { // TODO filtersrv 67: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); 68: } 69: 70: PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( 71: brokerAddr, 72: requestHeader, 73: timeoutMillis, 74: communicationMode, 75: pullCallback); 76: 77: return pullResult; } 79:80: // Broker info does not exist throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); 82:}Copy the code
- Pull message core method. This method has a lot of parameters, see the code comment for each parameter description 😈.
- Lines 34 to 43: get
Broker
Information (Broker
Address, whether it is a slave node).- # recalculatePullFromWhichNode (…).
- # MQClientInstance# findBrokerAddressInSubscribe (…).
- Lines 45 to 78: Request a pull message.
- Line 81: when
Broker
If the information does not exist, an exception is thrown.
PullAPIWrapper# recalculatePullFromWhichNode (…).
1: /** 2: * Mapping between message queues and pull brokers 3: * When a message is pulled, the mapping is used to obtain the Broker corresponding to the pull request 4: */ 5: private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = 6: new ConcurrentHashMap<MessageQueue, AtomicLong>(32); 7: /** 8: * Whether to use the default Broker 9: */ 10: private Volatile Boolean connectBrokerByUser = false; 11: /** 12: * Default Broker number 13: */ 14: Private Volatile Long defaultBrokerId = Mixall.master_id; 15:16: /** 17: * Calculate message queue pull message Broker number 18: * 19: * @param MQ message queue 20: * @return Broker number 21: */ 22: Public long recalculatePullFromWhichNode (final MessageQueue mq) {if 23: / / open the default Broker switch, it returns the default Broker number 24: if (this.isConnectBrokerByUser()) { 25: return this.defaultBrokerId; 26:27} : 28: / / if the message queue mapping pull the Broker, the Broker return mapping number 29: AtomicLong suggest = this. PullFromWhichNodeTable. Get (mq); 30: if (suggest ! = null) { 31: return suggest.get(); 32:} 33: 34: // Return mixall.master_id; 36:}Copy the code
- Calculate message queue to pull the corresponding message
Broker
Serial number.
MQClientInstance# findBrokerAddressInSubscribe (…).
1: /** 2: * Broker name and address Map 3: */ 4: private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = 5: new ConcurrentHashMap<>(); 6: 7: /** 8: * Obtain Broker information 9: * 10: * @Param brokerName Broker name 11: * @Param brokerId Broker Number 12: * @param onlyThisBroker must be the broker 13: * @return Broker info 14: */ 15: public FindBrokerResult findBrokerAddressInSubscribe(// 16: final String brokerName, // 17: final long brokerId, // 18: final boolean onlyThisBroker// 19: ) { 20: String brokerAddr = null; // Broker address 21: Boolean slave = false; 22: Boolean found = false; 23: 24: // Broker info 25: HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); 26: if (map ! = null && ! map.isEmpty()) { 27: brokerAddr = map.get(brokerId); 28: slave = brokerId ! = MixAll.MASTER_ID; 29: found = brokerAddr ! = null; 30: 31: // Select a Broker 32: if (! found && ! onlyThisBroker) { 33: Entry<Long, String> entry = map.entrySet().iterator().next(); 34: brokerAddr = entry.getValue(); 35: slave = entry.getKey() ! = MixAll.MASTER_ID; 36: found = true; 37:} 38:} 39: 40: // If (found) {42: return new FindBrokerResult(brokerAddr, slave); 46: return null;} 44: 45: // Return null; 47:}Copy the code
- Description: Obtain
Broker
Information (Broker
Address, whether it is a slave node).
PullAPIWrapper# processPullResult (…).
1: /** 2: * Process pull result 3: * 1. Update message queue pull mapping of message Broker number 4: * 2. Parse the message and match the appropriate message according to the subscription message tagCode 5: * 6: * @param mq message queue 7: * @param pullResult pullResult 8: * @param subscriptionData subscription 9: * @return PullResult 10: */ 11: public PullResult processPullResult(final MessageQueue MQ, final PullResult PullResult, 12: final SubscriptionData subscriptionData) { 13: PullResultExt pullResultExt = (PullResultExt) pullResult; 14:15: / / update message queue pull the message Broker number mapping 16: enclosing updatePullFromWhichNode (mq, pullResultExt getSuggestWhichBrokerId ()); 17: 18: // Parse the message and match the appropriate message according to the subscription message tagCode 19: if (pullstatus.found == pullresult.getpullStatus ()) {20: // Parse the message 21: ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); 22: List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); 25: List<MessageExt> msgListFilterAgain = msgList; 26: if (! subscriptionData.getTagsSet().isEmpty() && ! subscriptionData.isClassFilterMode()) { 27: msgListFilterAgain = new ArrayList<>(msgList.size()); 28: for (MessageExt msg : msgList) { 29: if (msg.getTags() ! = null) { 30: if (subscriptionData.getTagsSet().contains(msg.getTags())) { 31: msgListFilterAgain.add(msg); 32: } 33: } 34: } 35: } 36: 37: // Hook 38: if (this.hasHook()) { 39: FilterMessageContext filterMessageContext = new FilterMessageContext(); 40: filterMessageContext.setUnitMode(unitMode); 41: filterMessageContext.setMsgList(msgListFilterAgain); 42: this.executeHook(filterMessageContext); 46: for (MessageExt MSG: msgListFilterAgain) {47: for (MessageExt MSG: msgListFilterAgain) MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, 48: Long.toString(pullResult.getMinOffset())); 49: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, 50: Long.toString(pullResult.getMaxOffset())); 51:} 52:53: / / set the message list: 54 pullResultExt. SetMsgFoundList (msgListFilterAgain); 55:} 56:57:58: / / empty message binary array pullResultExt. SetMessageBinary (null); 59: 60: return pullResult; 61:}Copy the code
- Handling pull results.
- Update message queue pull messages
Broker
Mapping of numbers. - The message is parsed and the message is based on the subscription information
tagCode
Match the appropriate message.
- Update message queue pull messages
- Line 16: Update message queue pull message
Broker
Mapping of numbers. The next time a message is pulled, if the default pull is not setBroker
The updated number will be usedBroker
Serial number. - Lines 18 to 55: Parses the message and messages based on subscription information
tagCode
Match the appropriate message.- Lines 20 to 22: Parse the message. See RocketMQ source Code Analysis – Message Basics for details.
- Lines 24 to 35: Based on subscription information
tagCode
Match the message. - Lines 37 to 43:
Hook
. - Line 45 through 51: Sets the current minimum/maximum position of the message queue to the message extension field.
- Line 54: Sets up the message queue.
- Line 58: Empties the message binary array.
ProcessQueue# putMessage (…).
1: /** 2: * Message mapping read/write lock 3: */ 4: private Final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); 5: /** 6: * message mapping 7: * key: message queue location 8: */ 9: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>(); 10: /** 11: * Message count 12: */ 13: private Final AtomicLong msgCount = new AtomicLong(); 16: */ 17: private volatile long queueOffsetMax = 0L; 14: /** 15: * Add the maximum queue position 16: */ 17: private volatile long queueOffsetMax = 0L; 18: /** 19: * Whether consuming 20: */ 21: private volatile Boolean Consuming = false; QueueMaxOffset - Added message array [n-1]. QueueOffset 25: * Acc = Accumulation 26: * CNT = (guess) Contrast 27: */ 28: private volatile long msgAccCnt = 0; 29: 30: /** 31: * Adds the message and returns whether to submit to the consumer 32: * returns true, when a new message has been added successfully, 33: * 34: * @param MSGS message 35: * @return Whether to submit to the consumer 36: */ 37: public boolean putMessage(final List<MessageExt> msgs) { 38: boolean dispatchToConsume = false; 39: try { 40: this.lockTreeMap.writeLock().lockInterruptibly(); 41: try {42: // add message 43: int validMsgCnt = 0; 44: for (MessageExt msg : msgs) { 45: MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); 46: if (null == old) { 47: validMsgCnt++; 48: this.queueOffsetMax = msg.getQueueOffset(); 49: } 50: } 51: msgCount.addAndGet(validMsgCnt); 52:53: // Calculate if you are spending 54: if (! msgTreeMap.isEmpty() && ! this.consuming) { 55: dispatchToConsume = true; 56: this.consuming = true; // If (! msgs.isEmpty()) { 61: MessageExt messageExt = msgs.get(msgs.size() - 1); 62: String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); 63: if (property ! = null) { 64: long accTotal = Long.parseLong(property) - messageExt.getQueueOffset(); 65: if (accTotal > 0) { 66: this.msgAccCnt = accTotal; 67: } 68: } 69: } 70: } finally { 71: this.lockTreeMap.writeLock().unlock(); 72: } 73: } catch (InterruptedException e) { 74: log.error("putMessage exception", e); 75: } 76: 77: return dispatchToConsume; 78:}Copy the code
conclusion
The simplest way to describe the PullConsumer pulling a message would be this:
While (true) {if (pull message not satisfied) {thread.sleep (interval); continue; } active pull message (); }Copy the code
PushConsumer message
ConsumeMessageConcurrentlyService submitted a consumer request
ConsumeMessageConcurrentlyService# submitConsumeRequest (…).
3: */ 4: Private final BlockingQueue<Runnable> consumeRequestQueue; 5: /** 6: * consumption thread pool 7: */ 8: private final ThreadPoolExecutor consumeExecutor; 9: 10: public void submitConsumeRequest(// 11: final List<MessageExt> msgs, // 12: final ProcessQueue processQueue, // 13: final MessageQueue messageQueue, // 14: final boolean dispatchToConsume) { 15: final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); If (msgs.size() <= consumeBatchSize) {// If (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); 18: try { 19: this.consumeExecutor.submit(consumeRequest); 20: } catch (RejectedExecutionException e) { 21: this.submitConsumeRequestLater(consumeRequest); 24: for (int total = 0; total < msgs.size(); ) {25: // Calculate the message contained in the current split request 26: List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize); 27: for (int i = 0; i < consumeBatchSize; i++, total++) { 28: if (total < msgs.size()) { 29: msgThis.add(msgs.get(total)); 30: } else { 31: break; 32:} 33:} 34: 35: // Submit split consumption request ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); 37: try { 38: this.consumeExecutor.submit(consumeRequest); 39:} the catch (RejectedExecutionException e) {40: / / if rejected, then the current split message + remaining requests submitted to delay consumption. 41: for (; total < msgs.size(); total++) { 42: msgThis.add(msgs.get(total)); 43: } 44: this.submitConsumeRequestLater(consumeRequest); 45:} 46:} 47:} 48:}Copy the code
- Note: Submit an immediate consumption request.
- Lines 16 to 22: Submit the consumption request directly if the commit message is less than or equal to the batch consumption number.
- Lines 23 to 47: When the commit message is larger than the batch consumption, split into multiple requests.
- Lines 25 through 33: Calculates the messages contained in the current split request.
- Lines 35 through 38: Submit the split consumption request.
- Lines 39 to 44: If the commit request is rejected, the current split message + the remaining message is submitted to delay the consumption request, ending the split loop.
ConsumeMessageConcurrentlyService#submitConsumeRequestLater
1: /** 2: * Submit delayed consumption request 3: * 4: * @param MSGS message list 5: * @Param processQueue 6: * @Param messageQueue 7: */ 8: private void submitConsumeRequestLater(// 9: final List<MessageExt> msgs, // 10: final ProcessQueue processQueue, // 11: final MessageQueue messageQueue// 12: ) { 13: 14: this.scheduledExecutorService.schedule(new Runnable() { 15: 16: @Override 17: public void run() { 18: ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true); 19: } 20: }, 5000, TimeUnit.MILLISECONDS); 21:} 22: 23: /** 24: * Submit delay request 25: * @param consumeRequest 26: */ 27: private void submitConsumeRequestLater(final ConsumeRequest consumeRequest// 28: ) { 29: 30: this.scheduledExecutorService.schedule(new Runnable() { 31: 32: @Override 33: public void run() { 34: ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); // TODO BUG ? 35: } 36: }, 5000, TimeUnit.MILLISECONDS); 37:}Copy the code
- Note: Submit a deferred consumption request.
- Line 34: Direct call
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
. If the number of messages exceeds the bulk consumption limit, will it beBUG.
ConsumeRequest
1: class ConsumeRequest implements Runnable {2: 3: /** 4: * private final List<MessageExt> MSGS; 7: /** 8: * ProcessQueue 9: */ 10: private Final ProcessQueue ProcessQueue; 11: /** 12: * queue 13: */ 14: private final MessageQueue MessageQueue 15: 16: public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { 17: this.msgs = msgs; 18: this.processQueue = processQueue; 19: this.messageQueue = messageQueue; 20:} 21, 22, 23 @ Override: public void the run () {24: / / queue to consume 25: don't waste the if (this. ProcessQueue. IsDropped ()) {26: log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); 27: return; 28: } 29: 30: MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; / / listener: 31 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext (messageQueue); / / consumer Context: 32 ConsumeConcurrentlyStatus status = null; // Hook 35: ConsumeMessageContext ConsumeMessageContext = null; 36: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { 37: consumeMessageContext = new ConsumeMessageContext(); 38: consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); 39: consumeMessageContext.setProps(new HashMap<String, String>()); 40: consumeMessageContext.setMq(messageQueue); 41: consumeMessageContext.setMsgList(msgs); 42: consumeMessageContext.setSuccess(false); 43: ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); 44: } 45: 46: long beginTimestamp = System.currentTimeMillis(); 47: boolean hasException = false; 48: ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; / / consumer return the result type 49: try {50: / / when messages to retry, set the Topic to the original Topic: 51 ConsumeMessageConcurrentlyService. Enclosing resetRetryTopic (MSGS); 52:53: // Set the start consumption time 54: if (MSGS! = null && ! msgs.isEmpty()) { 55: for (MessageExt msg : msgs) { 56: MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); 59 57:58:}} : 60: / / to spend 61: status = listener. ConsumeMessage (Collections. UnmodifiableList (MSGS), the context). 62: } catch (Throwable e) { 63: log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", 64: RemotingHelper.exceptionSimpleDesc(e), // 65: ConsumeMessageConcurrentlyService.this.consumerGroup, 66: msgs, 67: messageQueue); 68: hasException = true; 70: long consumeRT = system.currentTimemillis () -beginTimestamp; 73: if (null == status) { 74: if (hasException) { 75: returnType = ConsumeReturnType.EXCEPTION; 76: } else { 77: returnType = ConsumeReturnType.RETURNNULL; 78: } 79: } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { 80: returnType = ConsumeReturnType.TIME_OUT; 81: } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { 82: returnType = ConsumeReturnType.FAILED; 83: } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { 84: returnType = ConsumeReturnType.SUCCESS; 85: } 86: 87: // Hook 88: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { 89: consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); 93: if (null == status) {94: log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", 95: ConsumeMessageConcurrentlyService.this.consumerGroup, 96: msgs, 97: messageQueue); 98: status = ConsumeConcurrentlyStatus.RECONSUME_LATER; 99: } 100: 101: // Hook 102: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { 103: consumeMessageContext.setStatus(status.toString()); 104: consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); 105: ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); : 107-108:106} / / statistics: 109 ConsumeMessageConcurrentlyService. Enclosing getConsumerStatsManager (110) : .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); 113: if (! processQueue.isDropped()) { 114: ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); 115: } else { 116: log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); 117:} 118:} 119: 120:}Copy the code
- Description: Consumption request. Submit request execution consumption.
- Lines 24 through 28: Discard processing queue without consumption.
- Lines 34 to 44: hooks.
- Line 51: When the message is a retry message, set
Topic
As the originalTopic
. For example: primitiveTopic
为TopicTest
Try again, whenTopic
为%RETRY%please_rename_unique_group_name_4
After this method,Topic
Set backTopicTest
. - Lines 53 through 58: Set the start consumption time.
- Line 61: Make a purchase.
- Lines 71 through 85: Parse the consumption return result type
- Lines 87 to 90:
Hook
. - Lines 92 through 99: When the consumption result state is not empty, set the consumption result state to consume later.
- Lines 101 to 106:
Hook
. - Lines 108 to 110: statistics.
- Lines 112-117: Processing consumption results. If the consumption processing queue is removed and the message is consumed, it may result in repeated consumption of the message, so message consumption is idempotent to the maximum extent possible. Parsing the see: ConsumeMessageConcurrentlyService# processConsumeResult (…). .
ConsumeMessageConcurrentlyService# processConsumeResult (…).
1: public void processConsumeResult(// 2: final ConsumeConcurrentlyStatus status, // 3: final ConsumeConcurrentlyContext context, // 4: final ConsumeRequest consumeRequest// 5: ) { 6: int ackIndex = context.getAckIndex(); If (consumerEquest.getMsgs ().isEmpty())) 10: return; ConsumeRequest. MSGS [0] to consumeRequest. MSGS [ackIndex] 13: switch (status) {14: case CONSUME_SUCCESS: 15: if (ackIndex >= consumeRequest.getMsgs().size()) { 16: ackIndex = consumeRequest.getMsgs().size() - 1; Int ok = ackIndex + 1; 20: int failed = consumeRequest.getMsgs().size() - ok; 21: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); 22: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); 23: break; 24: case RECONSUME_LATER: 25: ackIndex = -1; 26: // Number of successes/failures 27: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), 28: consumeRequest.getMsgs().size()); 29: break; 30: default: 31: break; 32:} : 33 34: / / processing consumption failed messages 35: switch (this) defaultMQPushConsumer) getMessageModel ()) {36: case - : // Broadcast mode, regardless of whether the consumption failed, does not send a message to the Broker, just print Log 37: for (int I = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { 38: MessageExt msg = consumeRequest.getMsgs().get(i); 39: log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); 40: } 41: break; 42: Case CLUSTERING: 43: // Send a message back to the Broker. 44: List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size()); 45: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { 46: MessageExt msg = consumeRequest.getMsgs().get(i); 47: boolean result = this.sendMessageBack(msg, context); 48: if (! result) { 49: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); 50: msgBackFailed.add(msg); // Send back a message to the Broker that failed. msgBackFailed.isEmpty()) { 56: consumeRequest.getMsgs().removeAll(msgBackFailed); 57:58: this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); 59: } 60: break; 61: default: 62: break; } 64:65: // Remove the consumption success message and update the latest consumption progress 66: long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); 67: if (offset >= 0 && ! consumeRequest.getProcessQueue().isDropped()) { 68: this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); 69:} 70:}Copy the code
- Description: Processing consumption results.
- Lines 8 to 10: Return directly if the consumption request message is not empty.
- Lines 12 to 32: calculations
ackIndex
Value.consumeRequest.msgs[0 - ackIndex]
For consumption to succeed, it needs to be doneack
Confirmation.- Lines 14 to 23:
CONSUME_SUCCESS
:ackIndex = context.getAckIndex()
. - Lines 24 to 29:
RECONSUME_LATER
:ackIndex = -1
.
- Lines 14 to 23:
- Lines 34 to 63: Process the message of a consumption failure.
- Lines 36 to 41:
BROADCASTING
: broadcast mode, regardless of consumption failure, does not send a message toBroker
, only logs are displayed. - Lines 42 to 60:
CLUSTERING
: In cluster mode, consumption failure message is sent backBroker
.- Lines 43 to 52: Send back a message that the consumption failed
Broker
. See:DefaultMQPushConsumerImpl# sendMessageBack (…).. - Lines 54 to 59: send back
Broker
Failed message submitted directly to delayed re-consumption. - If send back
Broker
Success, the result is caused by, for example, a network exceptionConsumer
Message consumption should be idempotent to the maximum extent possible, as repeated consumption of messages will result if a message fails to be sent back.
- Lines 43 to 52: Send back a message that the consumption failed
- Lines 36 to 41:
- Lines 65 to 69: Remove[Consumption success]And consumption failed but sent back
Broker
Success 】 and update the latest consumption progress.- Why is there a message that the consumption failed but the Broker was successful? Go to line 56.
- ProcessQueue# removeMessage (…).
ProcessQueue# removeMessage (…).
1: /** 2: * Removes the message and returns the first message queue position 3: * 4: * @param MSGS message 5: * @return Message queue position 6: */ 7: public long removeMessage(final List<MessageExt> msgs) { 8: long result = -1; 9: final long now = System.currentTimeMillis(); 10: try { 11: this.lockTreeMap.writeLock().lockInterruptibly(); 12: this.lastConsumeTimestamp = now; 13: try { 14: if (! msgTreeMap.isEmpty()) { 15: result = this.queueOffsetMax + 1; If msgTreeMap is empty, the next message is queueOffsetMax+1 16:17: // Remove message 18: int removedCnt = 0; 19: for (MessageExt msg : msgs) { 20: MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); 21: if (prev ! = null) { 22: removedCnt--; 23: } 24: } 25: msgCount.addAndGet(removedCnt); 26: 27: if (! msgTreeMap.isEmpty()) { 28: result = msgTreeMap.firstKey(); 29: } 30: } 31: } finally { 32: this.lockTreeMap.writeLock().unlock(); 33: } 34: } catch (Throwable t) { 35: log.error("removeMessage exception", t); 36: } 37: 38: return result; 39:}Copy the code
ConsumeMessageConcurrentlyService# cleanExpireMsg (…).
1: public void start() { 2: this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { 3: 4: @Override 5: public void run() { 6: cleanExpireMsg(); 7:} 8:9: }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); 14: */ 15: private void cleanExpireMsg() {16: Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = 17: this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); 18: while (it.hasNext()) { 19: Map.Entry<MessageQueue, ProcessQueue> next = it.next(); 20: ProcessQueue pq = next.getValue(); 21: pq.cleanExpiredMsg(this.defaultMQPushConsumer); 23:22:}}Copy the code
- Note: Clear expired messages periodically. The default interval is 15 minutes.
ProcessQueue# cleanExpiredMsg (…).
1: public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {2: if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { 4: return; Int loop = msgtreemap.size () < 16? msgTreeMap.size() : 16; For (int I = 0; i < loop; I++) {10: // gets the first message. If not, end the loop. 11: MessageExt MSG = null; 12: try { 13: this.lockTreeMap.readLock().lockInterruptibly(); 14: try { 15: if (! msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { 16: msg = msgTreeMap.firstEntry().getValue(); 17: } else { 18: break; 19: } 20: } finally { 21: this.lockTreeMap.readLock().unlock(); 22: } 23: } catch (InterruptedException e) { 24: log.error("getExpiredMsg exception", e); 25:26} : 27: try {28: / / back to timeout messages: 29 pushConsumer. SendMessageBack (MSG, 3); 30: log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); 31, 32: / / whether the message is still the first, if so, is to remove 33: try {34: this. LockTreeMap. WriteLock () lockInterruptibly (); 35: try { 36: if (! msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { 37: try { 38: msgTreeMap.remove(msgTreeMap.firstKey()); 39: } catch (Exception e) { 40: log.error("send expired msg exception", e); 41: } 42: } 43: } finally { 44: this.lockTreeMap.writeLock().unlock(); 45: } 46: } catch (InterruptedException e) { 47: log.error("getExpiredMsg exception", e); 48: } 49: } catch (Exception e) { 50: log.error("send expired msg exception", e); 51:} 52:} 53:}Copy the code
- Description: Remove expired messages.
- Lines 2 to 5: When consuming sequentially, return directly.
- Lines 7 through 9: Loop to remove the message. Default maximum number of cycles: 16.
- Lines 10 to 25: Get the first message. Check whether timeout occurs. If not, end the loop.
- Line 29: Send the timeout message back to
Broker
. - Lines 32 to 48: Determine if the message is still the first one at this point, and remove it if it is.
PushConsumer returns a failure message
DefaultMQPushConsumerImpl# sendMessageBack (…).
1: public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) 2: Throws RemotingException, MQBrokerException, InterruptedException, MQClientException {3: try {4: // Consumer sends back the message 5: String brokerAddr = (null ! = brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) 6: : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); 7: this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, 8: this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } Catch (Exception e) {// Producer sends back messages using the Client's built-in Producer. log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); 12:13: Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); 14: 15: String originMsgId = MessageAccessor.getOriginMessageId(msg); 16: MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); 17: 18: newMsg.setFlag(msg.getFlag()); 19: MessageAccessor.setProperties(newMsg, msg.getProperties()); 20: MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); 21: MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); 22: MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); 23: newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); 24: 25: this.mQClientFactory.getDefaultMQProducer().send(newMsg); 26, 27:}}Copy the code
- Description: Send back a message.
- Lines 4 to 8:
Consumer
Send back the message. See:MQClientAPIImpl# consumerSendMessageBack (…).. - Lines 10 to 25: When an exception occurs,
Consumer
Built-in defaultProducer
Send a message.- 😈 Question: what kind of circumstances will occur exceptions?
MQClientAPIImpl# consumerSendMessageBack (…).
1: /** 2: * Consumer 3: * @param addr Broker 4: * @param MSG message 5: * @param consumerGroup 6: * @param delayLevel delayLevel 7: * @param timeoutMillis timeout 8: * @param maxConsumeRetryTimes maximum retry times for consuming 9: * @throws RemotingException when an exception occurs on a remote call 10: * @throws MQBrokerException When an exception occurs on a Broker 11: * @interruptedexception When a thread interrupts 12: */ 13: public void consumerSendMessageBack(14: Final String addr, 15: final MessageExt msg, 16: final String consumerGroup, 17: final int delayLevel, 18: final long timeoutMillis, 19: final int maxConsumeRetryTimes 20: ) throws RemotingException, MQBrokerException, InterruptedException { 21: ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); 22: RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); 23: 24: requestHeader.setGroup(consumerGroup); 25: requestHeader.setOriginTopic(msg.getTopic()); 26: requestHeader.setOffset(msg.getCommitLogOffset()); 27: requestHeader.setDelayLevel(delayLevel); 28: requestHeader.setOriginMsgId(msg.getMsgId()); 29: requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); 30:31: RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), 32: request, timeoutMillis); 33: assert response ! = null; 34: switch (response.getCode()) { 35: case ResponseCode.SUCCESS: { 36: return; 37: } 38: default: 39: break; 40: } 41: 42: throw new MQBrokerException(response.getCode(), response.getRemark()); 43:}Copy the code
8, Consumer progress
OffsetStore
RemoteBrokerOffsetStore
:Consumer
Cluster patternNext, use remoteBroker
Consumption schedule.LocalFileOffsetStore
:Consumer
Broadcasting modeUnder, use localfile
Consumption schedule.
OffsetStore# load (…).
LocalFileOffsetStore# load (…).
1: @override 2: public void load() throws MQClientException {3: // Reads the consumption progress from the local disk 4: OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); 5: if (offsetSerializeWrapper ! = null && offsetSerializeWrapper.getOffsetTable() ! = null) { 6: offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); 7:8: / / print each message queue consumer schedule 9: the for (MessageQueue mq: offsetSerializeWrapper getOffsetTable (). The keySet ()) {10: AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); 11: log.info("load consumer's offset, {} {} {}", 12: this.groupName, 13: mq, 14: offset.get()); 15:} 16:} 17:}Copy the code
- Load consumption progress from a local file into memory.
OffsetSerializeWrapper
1: public class OffsetSerializeWrapper extends RemotingSerializable { 2: private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = 3: new ConcurrentHashMap<>(); 4: 5: public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() { 6: return offsetTable; 7: } 8: 9: public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) { 10: this.offsetTable = offsetTable; 11:12:}}Copy the code
- Description: Local
Offset
Store serialization.
Yunai-MacdeMacBook-Pro-2:config yunai$ cat / Users/yunai /. Rocketmq_offsets / 192.168.17.0 @ DEFAULT/please_rename_unique_group_name_1 / offsets. The json {" offsetTable ": {{ "brokerName":"broker-a", "queueId":3, "topic":"TopicTest" }:1470,{ "brokerName":"broker-a", "queueId":2, "topic":"TopicTest" }:1471,{ "brokerName":"broker-a", "queueId":1, "topic":"TopicTest" }:1470,{ "brokerName":"broker-a", "queueId":0, "topic":"TopicTest" }:1470 } }Copy the code
RemoteBrokerOffsetStore# load (…).
1: @Override
2: public void load() {
3: }
Copy the code
- Do not load, actually read consumption progress from
Broker
To obtain.
OffsetStore# readOffset (…).
Read consumption progress type:
READ_FROM_MEMORY
: Reads data from memory.READ_FROM_STORE
: From storage (Broker
或file
Read).MEMORY_FIRST_THEN_STORE
: Preferentially reads data from the memory. If the data cannot be read, the data is read from the storage.
LocalFileOffsetStore# readOffset (…).
1: @Override 2: public long readOffset(final MessageQueue mq, final ReadOffsetType type) { 3: if (mq ! = null) { 4: switch (type) { 5: case MEMORY_FIRST_THEN_STORE: 6: case READ_FROM_MEMORY: { 7: AtomicLong offset = this.offsetTable.get(mq); 8: if (offset ! = null) { 9: return offset.get(); 10: } else if (ReadOffsetType.READ_FROM_MEMORY == type) { 11: return -1; 12: } 13: } 14: case READ_FROM_STORE: { 15: OffsetSerializeWrapper offsetSerializeWrapper; 16: try { 17: offsetSerializeWrapper = this.readLocalOffset(); 18: } catch (MQClientException e) { 19: return -1; 20: } 21: if (offsetSerializeWrapper ! = null && offsetSerializeWrapper.getOffsetTable() ! = null) { 22: AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); 23: if (offset ! = null) { 24: this.updateOffset(mq, offset.get(), false); 25: return offset.get(); 26: } 27: } 28: } 29: default: 30: break; 31: } 32: } 33: 34: return -1; 35:}Copy the code
- Line 16: from
file
Read the consumption progress.
RemoteBrokerOffsetStore# readOffset (…).
1: @Override 2: public long readOffset(final MessageQueue mq, final ReadOffsetType type) { 3: if (mq ! = null) { 4: switch (type) { 5: case MEMORY_FIRST_THEN_STORE: 6: case READ_FROM_MEMORY: { 7: AtomicLong offset = this.offsetTable.get(mq); 8: if (offset ! = null) { 9: return offset.get(); 10: } else if (ReadOffsetType.READ_FROM_MEMORY == type) { 11: return -1; 12: } 13: } 14: case READ_FROM_STORE: { 15: try { 16: long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); 17: AtomicLong offset = new AtomicLong(brokerOffset); 18: this.updateOffset(mq, offset.get(), false); 19: return brokerOffset; 20: } 21: // No offset in broker 22: catch (MQBrokerException e) { 23: return -1; 24: } 25: //Other exceptions 26: catch (Exception e) { 27: log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e); 28: return -2; 29: } 30: } 31: default: 32: break; 33: } 34: } 35: 36: return -1; 37:}Copy the code
- Line 16: from
Broker
Read the consumption progress.
OffsetStore# updateOffset (…).
The method RemoteBrokerOffsetStore is implemented the same as LocalFileOffsetStore.
1: @Override 2: public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { 3: if (mq ! = null) { 4: AtomicLong offsetOld = this.offsetTable.get(mq); 5: if (null == offsetOld) { 6: offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); 7: } 8: 9: if (null ! = offsetOld) { 10: if (increaseOnly) { 11: MixAll.compareAndIncreaseOnly(offsetOld, offset); 12: } else { 13: offsetOld.set(offset); 14:} 15:} 16:} 17:}Copy the code
OffsetStore# persistAll (…).
LocalFileOffsetStore# persistAll (…).
1: @Override 2: public void persistAll(Set<MessageQueue> mqs) { 3: if (null == mqs || mqs.isEmpty()) 4: return; 5: 6: OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); 7: for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { 8: if (mqs.contains(entry.getKey())) { 9: AtomicLong offset = entry.getValue(); 10: offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); 11: } 12: } 13: 14: String jsonString = offsetSerializeWrapper.toJson(true); 15: if (jsonString ! = null) { 16: try { 17: MixAll.string2File(jsonString, this.storePath); 18: } catch (IOException e) { 19: log.error("persistAll consumer offset Exception, " + this.storePath, e); 20:} 21:} 22:}Copy the code
- Persistent consumption schedule. Write the consumption progress to a file.
RemoteBrokerOffsetStore# persistAll (…).
1: @Override 2: public void persistAll(Set<MessageQueue> mqs) { 3: if (null == mqs || mqs.isEmpty()) 4: return; 5: 6: // persistent MessageQueue 7: final HashSet<MessageQueue> unusedMQ = new HashSet<>(); 8: if (! mqs.isEmpty()) { 9: for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { 10: MessageQueue mq = entry.getKey(); 11: AtomicLong offset = entry.getValue(); 12: if (offset ! = null) { 13: if (mqs.contains(mq)) { 14: try { 15: this.updateConsumeOffsetToBroker(mq, offset.get()); 16: log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", 17: this.groupName, 18: this.mQClientFactory.getClientId(), 19: mq, 20: offset.get()); 21: } catch (Exception e) { 22: log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); 23: } 24: } else { 25: unusedMQ.add(mq); 26:} 27:} 28:} 29:} 30: 31: // Remove invalid message queue 32: if (! unusedMQ.isEmpty()) { 33: for (MessageQueue mq : unusedMQ) { 34: this.offsetTable.remove(mq); 35: log.info("remove unused mq, {}, {}", mq, this.groupName); 36:} 37:} 38:}Copy the code
- Persistency specifies the consumption progress of the message queue array to
Broker
And remove the message queue unless specified.
MQClientInstance# persistAllConsumerOffset (…).
1: private void startScheduledTask() {2: // Periodically synchronizes the consumption progress 3: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 4: 5: @Override 6: public void run() { 7: try { 8: MQClientInstance.this.cleanOfflineBroker(); 9: MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); 10: } catch (Exception e) { 11: log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); 12: } 13: } 14: }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); 15:}Copy the code
- Note: Periodic persistence, default period: 5000ms.
- Important Note:
- Consumption schedule persistence is more than just timed persistence. Operations such as pulling messages, allocating message queues, and so on, consume schedule persistence.
- Consumption schedule persistence is more than just timed persistence. Operations such as pulling messages, allocating message queues, and so on, consume schedule persistence.
- Consumption schedule persistence is more than just timed persistence. Operations such as pulling messages, allocating message queues, and so on, consume schedule persistence.
9, the end
😈 is probably the longest article in this series, please forgive me for any errors or inclarity. Thanks for reading, bookmarking, liking and sharing this series, and especially for turning to the end. 😜 really have diao diao long.