Hello, I am a love of poetry Java research and development of Saiya, thank you for your attention ~ L (▔, ▔)┛. I heard “like” works better with “reading.

Today’s poetry: young reliance on danger if the ground, alone long sword Ling Qingqiu. — [Tang Gu Kuang] Three Difficult Songs on the Road

Today we are going to take a look at the RocketMQ retry mechanism, the content is more hardcore, recommend one button triple. Oh no, wrong set, suggest like + favorites.

All right, let’s get in the car

The following is a simple design for Consumer retries. Producer is simply resending (and of course fail-over).

Knowledge points involved

  • Principle of ACK retry mechanism
  • Dead letter queue (DLQ queue)

A few questions

  • What does message retry mean?
  • Consumer Messages are classified into Cluster mode and Broadcast mode. Do message retries occur in both modes?
  • What is the message retry policy?
  • Delay time rules for message retries?
  • What is a dead letter queue? What are the characteristics?
  • What are the conditions for MSGS to join the dead-letter queue?

Knowledge background

We know that the Consumer pulls the message and consumes the message separately, which are implemented by two classes:

  • Pull message: PullMessageService
  • Consumption news: ConsumeMessageConcurrentlyService

Message consumption process

Only the key code is shown below

1, suppose we pull to the message, ready to submit to ConsumeMessageConcurrentlyService in consumption, can adjust the following code:

// ConsumeMessageConcurrentlyService 
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    // Assuming no paging
    if (msgs.size() <= consumeBatchSize) {
        // The message is encapsulated inside
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
        // Drop thread pool consumption
            this.consumeExecutor.submit(consumeRequest); }}}Copy the code

2. ConsumeRequest internal code

@Override
public void run(a) {
    // 1. The callback method designed in Consumer
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
    // call the listener callback method in Consumer
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        hasException = true;
    }
    // 3. If status returns null, set it to RECONSUME_LATER
    if (null == status) {
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    // 4. Process the returned status result
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
Copy the code

What? What does the listener callback method in Consumer mean?

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_topic");

/ /... Omit some code

// set the listener callback method
consumer.setMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            System.out.println(result);

            // 2. If yes is displayed, the consumption is successful and no retry is performed
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            // RECONSUME_LATER indicates that the message needs to be retried (same with NULL)
            // RECONSUME_LATER: RECONSUME_LATER: retries
            returnConsumeConcurrentlyStatus.RECONSUME_LATER; }}});Copy the code

The callback method is the anonymous class you wrote above. I bet you know it. ‘S really humble

3. Determine whether to retry based on the returned status

public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    int ackIndex = context.getAckIndex();
    switch (status) {
            // 1. Successful consumption
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            break;
             // 2. Consumption delay
        case RECONSUME_LATER:
            ackIndex = -1;
            break;
        default:
            break;
    }

    // 3. Do different processing for different message patterns
    switch (this.defaultMQPushConsumer.getMessageModel()) {
            
    // 4, broadcast mode: if the consumption is like ackIndex -1, it will execute the loop, you can see that only print the log, no other unnecessary operation
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
            
    // Cluster mode
        case CLUSTERING:
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            
    // 6. RECONSUME_LATER if ackIndex is -1, execute the loop. CONSUME_SUCCESS does not execute the loop
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                
    RECONSUME_LATER: RECONSUME_LATER: RECONSUME_LATER: RECONSUME_LATER
                boolean result = this.sendMessageBack(msg, context);
                
    // 8. The ACK may fail and the failed ACK needs to be recorded
                if(! result) { msg.setReconsumeTimes(msg.getReconsumeTimes() +1); msgBackFailed.add(msg); }}if(! msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed);If there is an ACK failure message, the message is thrown to the thread pool to delay re-consumption for 5s
                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    // update consumption offset: note that CONSUME_SUCCESS and RECONSUME_LATER are both updated
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }}Copy the code

According to the above source code, we can draw the following conclusions:

1. From step 4, we know that in broadcast mode, even if consumers fail to consume, they will not retry, but only print warning logs.

2. Only consume failed messages (CONSUME_SUCCESS is not returned) need to send ACK retry

3. If an ACK fails, we call it a retry failure.

If the retry fails, it will continue to be delayed and re-consumed for 5s (again callback to the callback method in Consumer).

4. The offset of Consumer is updated when the message is consumed successfully or fails

4, ConsumeMessageConcurrentlyService. SendMessageBack: ready to request Broker

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
    // the default value is 0, which is always 0. This represents the latency level at which RocketMQ delays messages
    int delayLevel = context.getDelayLevelWhenNextConsume();

    try {
    // 2. Send to Broker
        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
        return true;
    } catch (Exception e) {
        log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
    }

    return false;
}
Copy the code

What? What do you mean you don’t know the latency level of RocketMQ delayed messages? T_T”

RocketMQ official website latency example

We know that RocketMQ latency levels are divided into 18 levels, with delaylevels ranging from 1 to 18, and each number corresponds to a delay time.

The delay time is as follows:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 
Copy the code

For example, if delayLevel is 1, the delay is 1s

DelayLevel =4 Ah, you still learned to rush to answer, be this meaning. You’re so smart. (o゚▽゚

Broker side processing of retries

The following code is designed into the Broker’s source code, which readers will need to download the RocketMQ source code to see.

This method is the long code that handles the Consumer’s retry request. We mainly did the following things:

  1. More news Topic is"%RETRY%"+ group, calculate queueId (retry queue, number of queues 1)
  2. If the message is retried >= 16 times (default). Continue to change the Topic of the message toDead-letter queueTopic:"%DLQ%" + group, the consumption queue is 1 (the dead-letter queue has only one consumption queue)
  3. If it does not become dead letter, the latency level of the message is calculated
  4. Copy the original Msg, generate a new Msg, drop the new Msg to BrokerController, and store it in CommitLog (what? You don’t know what CommitLog is? Next time I’ll write about the RocketMQ internal storage structure.)
    • The new Msg will have a new messageId
    • Non-dead letter: This message under a new Topic name:"%RETRY%"+ groupSave to CommitLog asDelay message
    • Dead letter:"%DLQ%" + groupCommitLog: Messages stored in the dead-letter queue are not consumed by consumers
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request){
    // new Topic name: "%RETRY%"+ group
    String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
    // The number of retry queues is 1
    int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

    // 2
    int delayLevel = requestHeader.getDelayLevel();

    // 3, the number of message retries: retry several times
    int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    
    // 4. If the maximum number of retries is exceeded (default: 16)
    if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
        || delayLevel < 0) {
        
        // change Topic to "%DLQ%" + group
        newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
        // the default number of dead letter queues is 1
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
    } else {
        // delayLevel is 0, so this equals the number of retries +3
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);
    }

    // 8. Create a message and store it in the CommitLog as a new message
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(newTopic);
    msgInner.setQueueId(queueIdInt);
    // 8-1, retry times +1. New messages are sent as they are consumed by the consumer and are compared at step 4
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
    
    // 9. Save it as a new message to the CommitLog
    PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
Copy the code

What is a dead letter queue (DLQ queue)?

Read the blog and don’t make wheels

The summary is:

  • A separate queue (DLQ) in the Broker that stores messages that the Consumer failed to consume after 16 attempts
  • This queue has only write permission, but no read permission. So it can’t be re-consumed by the Consumer, it has to be manually redelivered (rocket-MQ-Console)
  • In the DLQ queue, the TOPIC of the message is renamed:"%DLQ%" + groupName
  • The DLQ queue is the consumeQueue folder"%DLQ%" + groupNameQueue under named Topic folder)

What? What the hell is the ConsumeQueue folder? Wait for me… Write the RocketMQ message store structure in a second

Retry message delay mechanism

We say that after a retry message is sent to the Broker, it is stored in the CommitLog as a new delayed message, which is re-consumed by the Consumer when it reaches the consumption point.

The message has to be retried 16 times before it is dropped to the dead-letter queue and never consumed.

How long is each of the other 15 messages delayed?

As we can see from the source code above, the latency level of a message is affected by reconsumeTimes. The greater the number of retries, the longer the delay.

delayLevel = 3 + msgExt.getReconsumeTimes();
Copy the code

The specific retry delay time is as follows: Picture from Aliyun

conclusion

Let’s go back to our first few questions:

  • What does message retry mean?
  • Consumer Messages are classified into Cluster mode and Broadcast mode. Do message retries occur in both modes?
  • What is the message retry policy?
  • Delay time rules for message retries?
  • What is a dead letter queue? What are the characteristics?
  • What are the conditions for MSGS to join the dead-letter queue?

What does message retry mean?

To ensure high availability, RocketMQ needs to re-consume the Consumer if the Consumer consumption message fails (the callback does not return CONSUME_SUCCESS).

Consumer Messages are classified into Cluster mode and Broadcast mode. Do message retries occur in both modes?

Broadcast mode only logs consumption failures as warning logs and does not retry

The message retry mechanism is performed in clustered mode.

What is the message retry policy?

The Broker side delays messages for the Consumer to consume again.

Delay time rules for message retries?

What are the conditions for MSGS to join the dead-letter queue?

The message has not been successfully consumed after 16 retries.

The last

If you have any mistakes, please feel free to comment in the comments section. We’ll keep RocektMQ updated, so feel free to leave a comment in the comments section

The latest articles will be updated on wechat, welcome to blue ゚▽゚) Blue \

series

RocketMQ storage principles