RocketMQ – Consumer Rebalance process
@(RocketMQ source code interpretation)
The entry to the consumer’s Rebalance process is in the RebalanceService, a thread that makes Rebalance every 20 seconds by default.
Let’s catch up and look at RebalanceImpl’s doRebalance method:
//isOrder whether to order messages
public void doRebalance(final boolean isOrder) {
// Subscription data per topic
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable ! = null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (! topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
Copy the code
Rebalance all Consumer maintained topics.
Moving on to rebalanceByTopic, this method is longer and can be divided into broadcast and cluster patterns based on message patterns. Let’s look at broadcast patterns first:
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet ! = null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
Copy the code
Which topicSubscribeInfoTable update operations (update corresponding MessageQueue) topic information, we mentioned earlier, when sending a message (updateTopicRouteInfoFromNameServer method), We have a look at updateProcessQueueTableInRebalance:
// Remove message queues in processQueueTable && that do not exist in mqSet
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (! Mqset.contains (MQ)) {// Does not contain the queue
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.ispullexpired ()) {// pull queue timeout, also clean up
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
// Remove pull timeout in PUSH mode
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
Copy the code
Moving on, add the new remote queue to processQueueTable:
for (MessageQueue mq : mqSet) {
// If processQueueTable does not include this MQ
if (! this.processQueueTable.containsKey(mq)) {
if (isOrder && ! this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
// cancel the offset of mq before adding it
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre ! = null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
// Returns whether there has been a change
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
Copy the code
The next message pull request will be made, which we ignore.
Cluster pattern update queue way to use the same updateProcessQueueTableInRebalance. Note the consumption progress in different modes: Broadcast mode: Consumption progress using local files. Cluster mode: Use the Broker’s consumption progress.
In broadcast mode, local consumption progress is used because consumers are independent of each other. In cluster mode, a message is not sent to other consumers if it is successfully consumed by one consumer. The consumption progress needs to be controlled by the Broker.
Rebalance we’ll look at this later.