RocketMQ source code interpretation – Broker message storage


Let’s look at the broker’s message stored procedure in this article.

When the broker is started, a NettyRemotingServer is started. Rocketmq passes messages via Netty. When the NettyRemotingServer is initialized, it adds NettyServerHandler to the Pipeline. This method eventually executes the processMessageReceived method, which sorts and processes messages according to their type.

Let’s focus on requests. Look at the processRequestCommand method:

  1. So let’s get onePair (key-value like object encapsulated by RocketMQ)The first value is zeroprocessorThe second is a thread pool for processing requests.
  2. Initialize the task.
  3. Submit tasks.

There is more logic in initializing the task. Let’s look at what the task to be initialized does:

RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook ! = null) {/ / if you have the RPC hooks, perform first rpcHook. DoBeforeRequest (RemotingHelper. ParseChannelRemoteAddr (CTX) channel ()), CMD); Final RemotingCommand response = pain.getobject1 ().processRequest(CTX, CMD); if (rpcHook ! . = null) {/ / executed after rpcHook doAfterResponse (RemotingHelper. ParseChannelRemoteAddr (CTX) channel ()), CMD, response); } if (! cmd.isOnewayRPC()) { if (response ! = null) { response.setOpaque(opaque); response.markResponseType(); Try {// Write ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); }} else {// If response is null, ignore}}Copy the code

To summarize the task, it is to process the request and write the request to the remote, if it is a one-time request, do not write back the response.

Let’s see what this processor does. This processor here is SendMessageProcessor:

SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; If (requesTheader.isBatch ()) {response = this.sendbatchMessage (CTX, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } this.executeSendMessageHookAfter(response, mqtraceContext); return response;Copy the code

Hooks here again and again, and you can see various hooks in rocketmq flew over, this note on the hook and a method of different, this hook is sending a message of hooks, hooks is a RPC, and remote call, are not quite same, sending the message must be RPC calls, but in turn, pay attention to distinguish.

Let’s look at the sendMessage section:

  1. Assemble the response header. (Talking to a colleague earlierrocketmqAnd colleagues saidrocketmqOne of the defects is that there is no message tracing, but we can see that version 4.2.0 already has a switch to enable TRACE.
  2. Verify the message body is valid.
  3. selectqueueIdDecide which queue to write to.
  4. Handles retry and dead-letter queues.
  5. createMessageExtBrokerInner.
  6. To obtainMessageStoreAnd stores messages.
  7. Process the results of stored information.

Let’s take a look at step 4, if it is a RETRY message:

String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark( "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return false; } / / calculate the maximum consumption times int maxReconsumeTimes = subscriptionGroupConfig. GetRetryMaxTimes (); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } / / obtain the consumption times int reconsumeTimes = requestHeader. GetReconsumeTimes () = = null? 0 : requestHeader.getReconsumeTimes(); if (reconsumeTimes >= maxReconsumeTimes) { newTopic = MixAll.getDLQTopic(groupName); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; // Each consumer group has only one queue topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); // setTopic to the new dead-letter queue topic (%RETRY% + consumerGroup) msg.settopic (newTopic); msg.setQueueId(queueIdInt); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return false; }}Copy the code

We can see that if a message continues to fail to consume, it is added to the dead-letter queue.

The part about actually storing messages will be covered in the next article, because it is very important when it comes to the rocketMQ file format.

It is important to note that the type of message is RETRY. This part of the code will be reviewed by our consumers. Recall from the previous article: When we start the consumer, we actually start a timed service that keeps inserting pullRequests into a queue, and then we have an endless loop that keeps getting requests and pulling messages, We see ConsumeMessageConcurrentlyService submitConsumeRequest method, submit task is actually a call ConsumeRequest inside the class, the class actually processing the message, We found really use in this listener to handle the message statement: listener. ConsumeMessage (Collections. UnmodifiableList (MSGS), the context). . Does that ring a bell? We track this method returns the status, you can see there are so 1: ConsumeMessageConcurrentlyService. Enclosing processConsumeResult (status, the context, this); And processing the consumption results of the part, catch up with a look, it found that if the consumer is cluster mode:

MsgBackFailed = new ArrayList<MessageExt>(ConsumerEquest.getmsgs ().size()));  For (int I = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); Boolean result = this.sendMessageBack(MSG, context); if (! result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); }} // Return a message to the Broker that failed, directly committing delayed re-consumption if (! msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break;Copy the code

In cluster mode, if a consumption fails, the message is sent back for other machines to consume. SendMessageBack = sendMessageBack = sendMessageBack = sendMessageBack

Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.mQClientFactory.getDefaultMQProducer().send(newMsg);
Copy the code

We see that a new message is recreated, but TOPIC is modified to %RETRY% + consumerGroup. Then it goes through the send operation again.

Back to the handlePutMessageResult method, which handles the final stored result and corresponding information:

  1. Set the corresponding according to the result of cold storagecode.
  2. If the cold storage succeeds, collect related statistics. Then put theresponseWrite back to the remote, if there is a hook set send failure tocontext
  3. If the execution fails, set sending to fail if there are hookscontext.

At this point, the message store section ends.