RocketMQ – Consumer Rebalance process

@(RocketMQ source code interpretation)


The entry to the consumer’s Rebalance process is in the RebalanceService, a thread that makes Rebalance every 20 seconds by default.

Let’s catch up and look at RebalanceImpl’s doRebalance method:

//isOrder whether to order messages

public void doRebalance(final boolean isOrder) {

// Subscription data per topic

Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

if (subTable ! = null) {

for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {

final String topic = entry.getKey();

try {

this.rebalanceByTopic(topic, isOrder);

} catch (Throwable e) {

if (! topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

log.warn("rebalanceByTopic Exception", e);

}

}

}

}

this.truncateMessageQueueNotMyTopic();

}

Copy the code

Rebalance all Consumer maintained topics.

Moving on to rebalanceByTopic, this method is longer and can be divided into broadcast and cluster patterns based on message patterns. Let’s look at broadcast patterns first:

Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

if (mqSet ! = null) {

boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);

if (changed) {

this.messageQueueChanged(topic, mqSet, mqSet);

log.info("messageQueueChanged {} {} {} {}",

consumerGroup,

topic,

mqSet,

mqSet);

}

} else {

log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);

}

break;

Copy the code

Which topicSubscribeInfoTable update operations (update corresponding MessageQueue) topic information, we mentioned earlier, when sending a message (updateTopicRouteInfoFromNameServer method), We have a look at updateProcessQueueTableInRebalance:

// Remove message queues in processQueueTable && that do not exist in mqSet

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();

while (it.hasNext()) {

Entry<MessageQueue, ProcessQueue> next = it.next();

MessageQueue mq = next.getKey();

ProcessQueue pq = next.getValue();

if (mq.getTopic().equals(topic)) {

if (! Mqset.contains (MQ)) {// Does not contain the queue

pq.setDropped(true);

if (this.removeUnnecessaryMessageQueue(mq, pq)) {

it.remove();

changed = true;

log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);

}

} else if (pq.ispullexpired ()) {// pull queue timeout, also clean up

switch (this.consumeType()) {

case CONSUME_ACTIVELY:

break;

case CONSUME_PASSIVELY:

// Remove pull timeout in PUSH mode

pq.setDropped(true);

if (this.removeUnnecessaryMessageQueue(mq, pq)) {

it.remove();

changed = true;

log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",

consumerGroup, mq);

}

break;

default:

break;

}

}

}

}

Copy the code

Moving on, add the new remote queue to processQueueTable:

for (MessageQueue mq : mqSet) {

// If processQueueTable does not include this MQ

if (! this.processQueueTable.containsKey(mq)) {

if (isOrder && ! this.lock(mq)) {

log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);

continue;

}

// cancel the offset of mq before adding it

this.removeDirtyOffset(mq);

ProcessQueue pq = new ProcessQueue();

long nextOffset = this.computePullFromWhere(mq);

if (nextOffset >= 0) {

ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);

if (pre ! = null) {

log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);

} else {

log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);

PullRequest pullRequest = new PullRequest();

pullRequest.setConsumerGroup(consumerGroup);

pullRequest.setNextOffset(nextOffset);

pullRequest.setMessageQueue(mq);

pullRequest.setProcessQueue(pq);

pullRequestList.add(pullRequest);

// Returns whether there has been a change

changed = true;

}

} else {

log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);

}

}

}

Copy the code

The next message pull request will be made, which we ignore.

Cluster pattern update queue way to use the same updateProcessQueueTableInRebalance. Note the consumption progress in different modes: Broadcast mode: Consumption progress using local files. Cluster mode: Use the Broker’s consumption progress.

In broadcast mode, local consumption progress is used because consumers are independent of each other. In cluster mode, a message is not sent to other consumers if it is successfully consumed by one consumer. The consumption progress needs to be controlled by the Broker.

Rebalance we’ll look at this later.