Wechat official account “Backend Advanced” focuses on back-end technology sharing: Java, Golang, WEB framework, distributed middleware, service governance and so on.
Some time ago, a friend of mine asked me a question. He said that during the process of building RocketMQ cluster, he encountered a problem about consuming subscriptions. The specific problem is as follows:
Then he sent me the error log:
the consumer's subscription not exist
Copy the code
I first found the error in the source code:
Org. Apache. Rocketmq. Broker. Processor. PullMessageProcessor# the processRequest:
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
Copy the code
The source code here is to find out the subscription information of this Topic, but it was not found here, so the error of consumption subscription does not exist was reported.
My friend also told me that in his consumer cluster, each consumer subscribed to his own Topic, and there were c1 and C2 consumers in his consumer group. C1 subscribed to topicA, while C2 subscribed to topicB.
Now THAT I know why, LET me say that the consumer subscription information is grouped into groups within the broker. The data structure is as follows:
Org. Apache. Rocketmq. Broker. Client. ConsumerManager:
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
Copy the code
This means that the cluster every consumer when registered subscription information with the broker to overwrite each other’s subscription information, which is why the same consumer groups should have exactly the same subscription relationship of reason, and friends in the same every consumers subscribe to the relationship between consumer groups, subscription information in the overlap problem.
He does not understand why RocketMQ does not allow this, so adhering to the professional quality of the old driver, I will analyze RocketMQ consumption subscription registration, message pull from the perspective of the source code. Message queue load and redistribution mechanism to get the RocketMQ consumption subscription mechanism straight.
Consumer subscription information registration
Each consumer has an MQClientInstance. This class is launched when the consumer starts. A list of scheduled tasks is launched in the startup method.
Org. Apache. Rocketmq. Client. Impl. Factory. MQClientInstance# startScheduledTask:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); }}},1000.this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
Copy the code
Each broker in the cluster is sent its own HeartbeatData. HeartbeatData is the HeartbeatData of each client, which contains the following data:
// Client ID
private String clientID;
// Producer information
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
// Consumer information
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
Copy the code
The consumer information contains the subject information that the client subscribes to.
The request type for sending HeartbeatData is HEART_BEAT. The request type for sending HeartbeatData is HEART_BEAT.
Org. Apache. Rocketmq. Broker. Processor. ClientManageProcessor# heartBeat:
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
// Decode to get HeartbeatData
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
// Cycle to register consumer subscription information
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
// Get subscription configuration information by consumer group
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null! = subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false.true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
// Register consumer subscription information
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
// ...
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
Copy the code
The broker receives the HEART_BEAT request and unextracts the request data to obtain the HeartbeatData. The broker then registers the request data according to the consumption subscription information in the HeartbeatData.
Org. Apache. Rocketmq. Broker. Client. ConsumerManager# registerConsumer:
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// Get the consumer information in the consumer group
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
// If the consumer information for the consumer group is empty, create a new one
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev ! =null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// Update the subscription information. The subscription information is stored by consumer group, so this step will cause the subscription information of each consumer client in the same consumer group to be overwritten
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); }}this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
Copy the code
This step is the core method by which the broker updates a consumer’s subscription information. If the consumer Group information of the consumer group is empty, create a new one. This step results in subscriptions being overwritten by each consumer client within the same consumer group.
Message pull
When MQClientInstance is started, a thread is started to handle the message pull task:
Org. Apache. Rocketmq. Client. Impl. Factory. MQClientInstance# start:
// Start pull service
this.pullMessageService.start();
Copy the code
PullMessageService inherits ServiceThread, which implements the Runnable interface. Its run method is implemented as follows:
Org. Apache. Rocketmq. Client. Impl. Consumer. PullMessageService# run:
@Override
public void run(a) {
while (!this.isStopped()) {
try {
// Get the pull message request object from the pullRequestQueue
PullRequest pullRequest = this.pullRequestQueue.take();
// Perform message pull
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e); }}}Copy the code
PullRequestQueue is a blocking queue. If the PullRequest data is empty, executing the Take () method will block until a new PullRequest task comes in. This is a key step, you might be thinking, when is the pullRequest created and put into the pullRequestQueue? PullRequest is created in the RebalanceImpl and is an implementation of the RocketMQ message queue load and redistribution mechanism.
Message queue load and redistribution
PullMessageService will block due to the lack of a pullRequest object in the pullRequestQueue, while MQClientInstance will block due to the lack of a pullRequest object. A thread is also started to handle message queue load and redistribution tasks:
Org. Apache. Rocketmq. Client. Impl. Factory. MQClientInstance# start:
// Start rebalance service
this.rebalanceService.start();
Copy the code
RebalanceService also inherits ServiceThread, and its run method is as follows:
@Override
public void run(a) {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance(); }}Copy the code
Keep following:
Org. Apache. Rocketmq. Client. Impl. Consumer. RebalanceImpl# doRebalance:
public void doRebalance(final boolean isOrder) {
// Get all subscription information from the consumer
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if(subTable ! =null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// Message queue load and redistribution
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if(! topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); }}}}this.truncateMessageQueueNotMyTopic();
}
Copy the code
SubTable stores the subscription information of the consumer, which will be filled in when the consumer subscribs to the message. Let’s go on:
Org. Apache. Rocketmq. Client. Impl. Consumer. RebalanceImpl# rebalanceByTopic:
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
Copy the code
The rebalanceByTopic method is the core to achieve load balancing on the Consumer side. Here, we load and redistribute message queues in cluster mode. Firstly, queue information of subscribing topics is obtained from topicSubscribeInfoTable. The list of subscription client ids for a topic in the consumption group is then randomly retrieved from a single broker in the cluster. It is important to note that the subscription client information can be retrieved from any broker in the cluster. As mentioned in the previous analysis, the consumer client starts a thread that sends heartbeat packets to all brokers.
Org. Apache. Rocketmq. Client. Impl. Consumer. RebalanceImpl# rebalanceByTopic:
If the topic subscription information mqSet and the topic subscription client are not empty, message queue load and redistribution is performed
if(mqSet ! =null&& cidAll ! =null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// Sort to ensure that only one consumer is allocated per message queue
Collections.sort(mqAll);
Collections.sort(cidAll);
// Message queue allocation algorithm
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
// Execute the algorithm and get the result allocateResult after queue reallocation
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
// ...
}
Copy the code
Above, is the core of the information load balancing logic RocketMQ itself provides five kinds of algorithm of load default AllocateMessageQueueAveragely average allocation algorithm and its allocation algorithm features below:
Suppose you have consumer group G1, you have consumers C1 and C2, C1 subscribes to topicA, C2 subscribes to topicB, broker1 and Broker2 are in the cluster, and suppose topicA has eight message queues, Broker_a (Q0 / Q1 / Q2 / Q3) and Broker_B (Q0 / Q1 / Q2 / Q3). We know earlier that the findConsumerIdList method gets all consumer client ids in the consumer group. The consumption of topicA after allocation by the average allocation algorithm is as follows:
C1: Broker_A (Q0 / Q1 / Q2 / Q3)
C2: Broker_B (Q0 / Q1 / Q2 / Q3)
The problem is that C2 does not subscribe to topicA at all, but according to the allocation algorithm, c2 is added to the allocation, which results in this situation where half of the messages are allocated to C2 for consumption, and the message queue allocated to C2 is delayed for consumption by ten seconds or more, the same as topicB.
Rebalance topicA and topicB:
Consumer’s Subscription Not Exist
Org. Apache. Rocketmq. Client. Impl. Consumer. RebalanceImpl# rebalanceByTopic:
if(mqSet ! =null&& cidAll ! =null) {
// ...
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if(allocateResult ! =null) {
allocateResultSet.addAll(allocateResult);
}
// allocateResult updates the message queue cache table processQueueTable for the current consumer load and generates the pullRequestList into the pullRequestQueue blocking queue
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet); }}Copy the code
The above code logic mainly takes mqSet and cidAll to load and redistribute message queues. The result is allocateResult, which is a list of MessageQueue, Then update the consumer load’s message queue cache table processQueueTable with allocateResult to generate the pullRequestList and place it in the pullRequestQueue blocking queue:
Org. Apache. Rocketmq. Client. Impl. Consumer. RebalanceImpl# updateProcessQueueTableInRebalance:
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// Loop to wrap mqSet subscription data into a PullRequest object and add it to the pullRequestList
for (MessageQueue mq : mqSet) {
// If the subscription information does not exist in the cache list, it indicates that the message queue is newly added after the message queue reallocation
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if(pre ! =null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true; }}else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); }}}// Add the pullRequestList to the pullRequestQueue blocking queue in the PullMessageService to wake up the PullMessageService thread to pull messages
this.dispatchPullRequest(pullRequestList);
Copy the code
Earlier we talked about pulling a pullRequest from a pullRequestQueue blocking queue. This is where the pullRequest is created.
Subscription Not exist error: Consumer’s Subscription Not exist
Suppose there is consumer group G1, c1 and C2 belong to G1, C1 subscribs topicA, C2 subscribs topicB, c2 starts first, updates G1 subscription information to topicB, C1 starts later, overwrite G1 subscription information to topicA. The Rebalance load on C1 adds topicA’s pullRequest to the pullRequestQueue, while c2 updates G1’s subscription to topicB. The c1 PullMessageService thread will pull topicA’s pullRequest from the pullRequestQueue. However, the broker could not find the topicA subscription information for consumer group G1 (because it was overwritten by c2 heartbeat packets), and reported the consumer subscription information did not exist.