Hello, I am a love of poetry Java research and development of Saiya, thank you for your attention ~ L (▔, ▔)┛. I heard “like” works better with “reading.
Today’s poetry: young reliance on danger if the ground, alone long sword Ling Qingqiu. — [Tang Gu Kuang] Three Difficult Songs on the Road
Today we are going to take a look at the RocketMQ retry mechanism, the content is more hardcore, recommend one button triple. Oh no, wrong set, suggest like + favorites.
All right, let’s get in the car
The following is a simple design for Consumer retries. Producer is simply resending (and of course fail-over).
Knowledge points involved
- Principle of ACK retry mechanism
- Dead letter queue (DLQ queue)
A few questions
- What does message retry mean?
- Consumer Messages are classified into Cluster mode and Broadcast mode. Do message retries occur in both modes?
- What is the message retry policy?
- Delay time rules for message retries?
- What is a dead letter queue? What are the characteristics?
- What are the conditions for MSGS to join the dead-letter queue?
Knowledge background
We know that the Consumer pulls the message and consumes the message separately, which are implemented by two classes:
- Pull message: PullMessageService
- Consumption news: ConsumeMessageConcurrentlyService
Message consumption process
Only the key code is shown below
1, suppose we pull to the message, ready to submit to ConsumeMessageConcurrentlyService in consumption, can adjust the following code:
// ConsumeMessageConcurrentlyService
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// Assuming no paging
if (msgs.size() <= consumeBatchSize) {
// The message is encapsulated inside
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
// Drop thread pool consumption
this.consumeExecutor.submit(consumeRequest); }}}Copy the code
2. ConsumeRequest internal code
@Override
public void run(a) {
// 1. The callback method designed in Consumer
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
// call the listener callback method in Consumer
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
hasException = true;
}
// 3. If status returns null, set it to RECONSUME_LATER
if (null == status) {
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 4. Process the returned status result
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
Copy the code
What? What does the listener callback method in Consumer mean?
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_topic");
/ /... Omit some code
// set the listener callback method
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println(result);
// 2. If yes is displayed, the consumption is successful and no retry is performed
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// RECONSUME_LATER indicates that the message needs to be retried (same with NULL)
// RECONSUME_LATER: RECONSUME_LATER: retries
returnConsumeConcurrentlyStatus.RECONSUME_LATER; }}});Copy the code
The callback method is the anonymous class you wrote above. I bet you know it. ‘S really humble
3. Determine whether to retry based on the returned status
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
switch (status) {
// 1. Successful consumption
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
break;
// 2. Consumption delay
case RECONSUME_LATER:
ackIndex = -1;
break;
default:
break;
}
// 3. Do different processing for different message patterns
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 4, broadcast mode: if the consumption is like ackIndex -1, it will execute the loop, you can see that only print the log, no other unnecessary operation
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
// Cluster mode
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// 6. RECONSUME_LATER if ackIndex is -1, execute the loop. CONSUME_SUCCESS does not execute the loop
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
RECONSUME_LATER: RECONSUME_LATER: RECONSUME_LATER: RECONSUME_LATER
boolean result = this.sendMessageBack(msg, context);
// 8. The ACK may fail and the failed ACK needs to be recorded
if(! result) { msg.setReconsumeTimes(msg.getReconsumeTimes() +1); msgBackFailed.add(msg); }}if(! msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed);If there is an ACK failure message, the message is thrown to the thread pool to delay re-consumption for 5s
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// update consumption offset: note that CONSUME_SUCCESS and RECONSUME_LATER are both updated
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }}Copy the code
According to the above source code, we can draw the following conclusions:
1. From step 4, we know that in broadcast mode, even if consumers fail to consume, they will not retry, but only print warning logs.
2. Only consume failed messages (CONSUME_SUCCESS is not returned) need to send ACK retry
3. If an ACK fails, we call it a retry failure.
If the retry fails, it will continue to be delayed and re-consumed for 5s (again callback to the callback method in Consumer).
4. The offset of Consumer is updated when the message is consumed successfully or fails
4, ConsumeMessageConcurrentlyService. SendMessageBack: ready to request Broker
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
// the default value is 0, which is always 0. This represents the latency level at which RocketMQ delays messages
int delayLevel = context.getDelayLevelWhenNextConsume();
try {
// 2. Send to Broker
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
Copy the code
What? What do you mean you don’t know the latency level of RocketMQ delayed messages? T_T”
RocketMQ official website latency example
We know that RocketMQ latency levels are divided into 18 levels, with delaylevels ranging from 1 to 18, and each number corresponds to a delay time.
The delay time is as follows:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Copy the code
For example, if delayLevel is 1, the delay is 1s
DelayLevel =4 Ah, you still learned to rush to answer, be this meaning. You’re so smart. (o゚▽゚
Broker side processing of retries
The following code is designed into the Broker’s source code, which readers will need to download the RocketMQ source code to see.
This method is the long code that handles the Consumer’s retry request. We mainly did the following things:
- More news Topic is
"%RETRY%"+ group
, calculate queueId (retry queue, number of queues 1) - If the message is retried >= 16 times (default). Continue to change the Topic of the message toDead-letter queueTopic:
"%DLQ%" + group
, the consumption queue is 1 (the dead-letter queue has only one consumption queue) - If it does not become dead letter, the latency level of the message is calculated
- Copy the original Msg, generate a new Msg, drop the new Msg to BrokerController, and store it in CommitLog (what? You don’t know what CommitLog is? Next time I’ll write about the RocketMQ internal storage structure.)
- The new Msg will have a new messageId
- Non-dead letter: This message under a new Topic name:
"%RETRY%"+ group
Save to CommitLog asDelay message - Dead letter:
"%DLQ%" + group
CommitLog: Messages stored in the dead-letter queue are not consumed by consumers
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request){
// new Topic name: "%RETRY%"+ group
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
// The number of retry queues is 1
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
// 2
int delayLevel = requestHeader.getDelayLevel();
// 3, the number of message retries: retry several times
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
// 4. If the maximum number of retries is exceeded (default: 16)
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// change Topic to "%DLQ%" + group
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
// the default number of dead letter queues is 1
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
} else {
// delayLevel is 0, so this equals the number of retries +3
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
// 8. Create a message and store it in the CommitLog as a new message
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setQueueId(queueIdInt);
// 8-1, retry times +1. New messages are sent as they are consumed by the consumer and are compared at step 4
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
// 9. Save it as a new message to the CommitLog
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
Copy the code
What is a dead letter queue (DLQ queue)?
Read the blog and don’t make wheels
The summary is:
- A separate queue (DLQ) in the Broker that stores messages that the Consumer failed to consume after 16 attempts
- This queue has only write permission, but no read permission. So it can’t be re-consumed by the Consumer, it has to be manually redelivered (rocket-MQ-Console)
- In the DLQ queue, the TOPIC of the message is renamed:
"%DLQ%"
+groupName
- The DLQ queue is the consumeQueue folder
"%DLQ%"
+groupName
Queue under named Topic folder)
What? What the hell is the ConsumeQueue folder? Wait for me… Write the RocketMQ message store structure in a second
Retry message delay mechanism
We say that after a retry message is sent to the Broker, it is stored in the CommitLog as a new delayed message, which is re-consumed by the Consumer when it reaches the consumption point.
The message has to be retried 16 times before it is dropped to the dead-letter queue and never consumed.
How long is each of the other 15 messages delayed?
As we can see from the source code above, the latency level of a message is affected by reconsumeTimes. The greater the number of retries, the longer the delay.
delayLevel = 3 + msgExt.getReconsumeTimes();
Copy the code
The specific retry delay time is as follows: Picture from Aliyun
conclusion
Let’s go back to our first few questions:
- What does message retry mean?
- Consumer Messages are classified into Cluster mode and Broadcast mode. Do message retries occur in both modes?
- What is the message retry policy?
- Delay time rules for message retries?
- What is a dead letter queue? What are the characteristics?
- What are the conditions for MSGS to join the dead-letter queue?
What does message retry mean?
To ensure high availability, RocketMQ needs to re-consume the Consumer if the Consumer consumption message fails (the callback does not return CONSUME_SUCCESS).
Consumer Messages are classified into Cluster mode and Broadcast mode. Do message retries occur in both modes?
Broadcast mode only logs consumption failures as warning logs and does not retry
The message retry mechanism is performed in clustered mode.
What is the message retry policy?
The Broker side delays messages for the Consumer to consume again.
Delay time rules for message retries?
What are the conditions for MSGS to join the dead-letter queue?
The message has not been successfully consumed after 16 retries.
The last
If you have any mistakes, please feel free to comment in the comments section. We’ll keep RocektMQ updated, so feel free to leave a comment in the comments section
The latest articles will be updated on wechat, welcome to blue ゚▽゚) Blue \
series
RocketMQ storage principles