Abstract: the original source www.iocoder.cn/RocketMQ/me… “Taro source” welcome to reprint, keep the summary, thank you!

This article is based on RocketMQ 4.0.x official release

  • 1. An overview of the
  • 2. Timing message
    • 2.1 Delay Level
    • 2.2 Producer Sends timing messages
    • 2.3 Broker stores timed messages
    • 2.4 Broker sends timed messages
    • 2.5 Broker persistence Sends periodic progress
  • 3. Retry the message

🙂🙂🙂 follow wechat public number:

  1. RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
  2. RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
  3. Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
  4. New source code parsing articles are notified in real time. It’s updated about once a week.
  5. Serious source communication wechat group.

1. An overview of the

Recommended pre-reading content:

😈 Why are timed messages and message retries put together? You guess. 👻 you guess I guess.

2. Timing message

Timed messages are messages that cannot be consumed by consumers immediately after they are sent to the Broker, but must be consumed at a specific point in time or after a specific period of time.

The following figure shows the processing logic of timed messages:

2.1 Delay Level

RocketMQ currently only supports fixed precision timed messages. Here’s the official line:

To support arbitrary time precision, message ordering must be done at the Broker level, which inevitably incurs significant performance costs if persistence is involved.

  • Delay level:
The delay level time
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h
  • The core source code is as follows:

    1: // ⬇️⬇️⬇️ [messagestoreconfig. Java] 2: /** 3: * Message delay level String Configuration 4: */ 5: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 6:7: / / ⬇ ️ ⬇ ️ ⬇ ️ ScheduleMessageService. Java 8 】 : 9: / * * * analytical delay level 10:11: * * @ return whether parsing 12 success: * / 13: public boolean parseDelayLevel() { 14: HashMap<String, Long> timeUnitTable = new HashMap<>(); 15: timeUnitTable.put("s", 1000L); 16: timeUnitTable.put("m", 1000L * 60); 17: timeUnitTable.put("h", 1000L * 60 * 60); 18: timeUnitTable.put("d", 1000L * 60 * 60 * 24); 19: 20: String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); 21: try { 22: String[] levelArray = levelString.split(" "); 23: for (int i = 0; i < levelArray.length; i++) { 24: String value = levelArray[i]; 25: String ch = value.substring(value.length() - 1); 26: Long tu = timeUnitTable.get(ch); 27: 28: int level = i + 1; 29: if (level > this.maxDelayLevel) { 30: this.maxDelayLevel = level; 31: } 32: long num = Long.parseLong(value.substring(0, value.length() - 1)); 33: long delayTimeMillis = tu * num; 34: this.delayLevelTable.put(level, delayTimeMillis); 35: } 36: } catch (Exception e) { 37: log.error("parseDelayLevel exception", e); 38: log.info("levelString String = {}", levelString); 39: return false; 40: } 41: 42: return true; 43:}Copy the code

2.2 Producer Sends timing messages

  • 🦅 Set the delay level for sending messages.
Message msg = new Message(...) ; msg.setDelayTimeLevel(level);Copy the code

2.3 Broker stores timed messages

  • 🦅 Delayed message entry while storing messagesTopicSCHEDULE_TOPIC_XXXX.
  • 🦅 DelayLevel is mapped to message queue number: QueueId = delaylevel-1.

The core code is as follows:

1: // ⬇️⬇️⬇️ [commitlog. Java] 2: /** 3: * Adds a message and returns a message result 4: * 5: * @param MSG message 6: * @return result 7: */ 8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) { 9: // .... Code (omitted) 10:11: / / timing message processing 12: final int tranType = MessageSysFlag. GetTransactionValue (MSG) getSysFlag ()); 13: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 14: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 15: // Delay Delivery 16: if (msg.getDelayTimeLevel() > 0) { 17: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 18: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 19:} 20:21: // When storing messages, delay messages enter 'Topic' as' SCHEDULE_TOPIC_XXXX '. 22: topic = ScheduleMessageService.SCHEDULE_TOPIC; 23:24: / / delay level with a message queue number Do 25: fixed mapping queueId = ScheduleMessageService. DelayLevel2QueueId (MSG) getDelayTimeLevel ()); 26: 27: // Backup real topic, queueId 28: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 30: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 31: 32: msg.setTopic(topic); 33: msg.setQueueId(queueId); 34:} 35:} 36: 37: //.... Code (omitted) : 38} 39:40: / / ⬇ ️ ⬇ ️ ⬇ ️ ScheduleMessageService. Java 41 】 : / * * 42: * calculated according to the message queue delay level number 43: * QueueId = delaylevel-1 44: * 45: * @param DelayLevel DelayLevel 46: * @return message queue number 47: */ 48: public static int delayLevel2QueueId(final int delayLevel) { 49: return delayLevel - 1; 50:}Copy the code

  • 🦅 generatedConsumeQueueWhen each messagetagsCodeUse message plan consumption time. In this way,ScheduleMessageServiceIn the pollConsumeQueue, can be usedtagsCodeFilter.

The core code is as follows:

1: // ⬇️⬇️⬇️【 commitlog. Java 】 2: /** 3: * check the message and returns the message size 4: * 5: * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure 6: */ 7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { 8: try { 9: // // .... (omitted code) 10: 11: // 17 properties 12: short propertiesLength = byteBuffer.getShort(); 13: if (propertiesLength > 0) { 14: // .... 15: String tags = propertiesMap.get(messageconst.property_tags); 16: if (tags ! = null && tags.length() > 0) { 17: tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags); 18: } 19: 20: // Timing message processing 21: { 22: String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); 23: if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t ! = null) { 24: int delayLevel = Integer.parseInt(t); 25: 26: if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 27: delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); 28: } 29: 30: if (delayLevel > 0) { 31: tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, 32: storeTimestamp); 33:} 34:} 35:} 36:} 37: 38: //.... 39:40: return DispatchRequest(// 41: topic, // 1 42: queueId, // 2 43: physicOffset, // 3 44: totalSize, // 4 45: tagsCode, // 5 46: storeTimestamp, // 6 47: queueOffset, // 7 48: keys, // 8 49: uniqKey, //9 50: sysFlag, // 9 51: preparedTransactionOffset// 10 52: ); 53: } catch (Exception e) { 54: } 55: 56: return new DispatchRequest(-1, false /* success */); 57:} 58:59: / / ⬇ ️ ⬇ ️ ⬇ ️ 60 ScheduleMessageService. Java 】 【 : / * * 61: * delivery time plan spending time 】 【 62: * 63: * @param delayLevel delayLevel 64: * @param storeTimestamp storage time 65: * @return delivery time [planned consumption time] 66: */ 67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { 68: Long time = this.delayLevelTable.get(delayLevel); 69: if (time ! = null) { 70: return time + storeTimestamp; 71: } 72: 73: return storeTimestamp + 1000; 74:}Copy the code

2.4 Broker sends timed messages

  • 🦅 toSCHEDULE_TOPIC_XXXXEach consumption queue corresponds toA singleScheduled task polling and sendingArrival delivery time [Planned consumption time]The news.

The following figure shows the processing logic for sending timed messages:

The implementation code is as follows:

  1: /**
  2:  * ⬇️⬇️⬇️ 发送(投递)延迟消息定时任务
  3:  */
  4: class DeliverDelayedMessageTimerTask extends TimerTask {
  5:     /**
  6:      * 延迟级别
  7:      */
  8:     private final int delayLevel;
  9:     /**
 10:      * 位置
 11:      */
 12:     private final long offset;
 13: 
 14:     public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
 15:         this.delayLevel = delayLevel;
 16:         this.offset = offset;
 17:     }
 18: 
 19:     @Override
 20:     public void run() {
 21:         try {
 22:             this.executeOnTimeup();
 23:         } catch (Exception e) {
 24:             // XXX: warn and notify me
 25:             log.error("ScheduleMessageService, executeOnTimeup exception", e);
 26:             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
 27:                 this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
 28:         }
 29:     }
 30: 
 31:     /**
 32:      * 纠正可投递时间。
 33:      * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。
 34:      *
 35:      * @param now 当前时间
 36:      * @param deliverTimestamp 投递时间
 37:      * @return 纠正结果
 38:      */
 39:     private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
 40:         long result = deliverTimestamp;
 41: 
 42:         long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
 43:         if (deliverTimestamp > maxTimestamp) {
 44:             result = now;
 45:         }
 46: 
 47:         return result;
 48:     }
 49: 
 50:     public void executeOnTimeup() {
 51:         ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,  delayLevel2QueueId(delayLevel));
 52: 
 53:         long failScheduleOffset = offset;
 54: 
 55:         if (cq != null) {
 56:             SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
 57:             if (bufferCQ != null) {
 58:                 try {
 59:                     long nextOffset = offset;
 60:                     int i = 0;
 61:                     for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
 62:                         long offsetPy = bufferCQ.getByteBuffer().getLong();
 63:                         int sizePy = bufferCQ.getByteBuffer().getInt();
 64:                         long tagsCode = bufferCQ.getByteBuffer().getLong();
 65: 
 66:                         long now = System.currentTimeMillis();
 67:                         long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 68: 
 69:                         nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 70: 
 71:                         long countdown = deliverTimestamp - now;
 72: 
 73:                         if (countdown <= 0) { // 消息到达可发送时间
 74:                             MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
 75:                             if (msgExt != null) {
 76:                                 try {
 77:                                     // 发送消息
 78:                                     MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
 79:                                     PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
 80:                                     if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功
 81:                                         continue;
 82:                                     } else { // 发送失败
 83:                                         // XXX: warn and notify me
 84:                                         log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());
 85: 
 86:                                         // 安排下一次任务
 87:                                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
 88: 
 89:                                         // 更新进度
 90:                                         ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
 91:                                         return;
 92:                                     }
 93:                                 } catch (Exception e) {
 94:                                     // XXX: warn and notify me
 95:                                     log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
 96:                                             + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
 97:                                 }
 98:                             }
 99:                         } else {
100:                             // 安排下一次任务
101:                             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
102: 
103:                             // 更新进度
104:                             ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
105:                             return;
106:                         }
107:                     } // end of for
108: 
109:                     nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
110: 
111:                     // 安排下一次任务
112:                     ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
113: 
114:                     // 更新进度
115:                     ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
116:                     return;
117:                 } finally {
118:                     bufferCQ.release();
119:                 }
120:             } // end of if (bufferCQ != null)
121:             else { // 消费队列已经被删除部分,跳转到最小的消费进度
122:                 long cqMinOffset = cq.getMinOffsetInQueue();
123:                 if (offset < cqMinOffset) {
124:                     failScheduleOffset = cqMinOffset;
125:                     log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
126:                         + cqMinOffset + ", queueId=" + cq.getQueueId());
127:                 }
128:             }
129:         } // end of if (cq != null)
130: 
131:         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
132:     }
133: 
134:     /**
135:      * 设置消息内容
136:      *
137:      * @param msgExt 消息
138:      * @return 消息
139:      */
140:     private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
141:         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
142:         msgInner.setBody(msgExt.getBody());
143:         msgInner.setFlag(msgExt.getFlag());
144:         MessageAccessor.setProperties(msgInner, msgExt.getProperties());
145: 
146:         TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
147:         long tagsCodeValue =
148:             MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
149:         msgInner.setTagsCode(tagsCodeValue);
150:         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
151: 
152:         msgInner.setSysFlag(msgExt.getSysFlag());
153:         msgInner.setBornTimestamp(msgExt.getBornTimestamp());
154:         msgInner.setBornHost(msgExt.getBornHost());
155:         msgInner.setStoreHost(msgExt.getStoreHost());
156:         msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
157: 
158:         msgInner.setWaitStoreMsgOK(false);
159:         MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
160: 
161:         msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
162: 
163:         String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
164:         int queueId = Integer.parseInt(queueIdStr);
165:         msgInner.setQueueId(queueId);
166: 
167:         return msgInner;
168:     }
169: }Copy the code

2.5 Broker persistence Sends periodic progress

  • 🦅 The scheduled message sending progress is stored in a file (../config/delayOffset.json)
  • 🦅 Sends the persistent progress every 10 seconds.

The core code is as follows:

1: / / ⬇ ️ ⬇ ️ ⬇ ️ ScheduleMessageService. Java 2 】 : / * * 3: public void the start () {4: / / regularly send messages 5: for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 6: Integer level = entry.getKey(); 7: Long timeDelay = entry.getValue(); 8: Long offset = this.offsetTable.get(level); 9: if (null == offset) { 10: offset = 0L; 11: } 12: 13: if (timeDelay ! = null) { 14: this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); 16:15:}} : 17 18: / / timing persistence 19: send schedule this. The timer. The scheduleAtFixedRate (new TimerTask () {20:21: @ Override 22: public void run() { 23: try { 24: ScheduleMessageService.this.persist(); 25: } catch (Exception e) { 26: log.error("scheduleAtFixedRate flush exception", e); 27: } 28: } 29: }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); 30:}Copy the code

3. Retry the message

When a Consumer fails to consume a message, a retry mechanism is provided to make the message consume again.

  • 🦅 ConsumerSend back the message of a failed consumptionBrokerAnd into theDelayed message queue. That is, the news of consumption failure does not immediately consume.

The core code is as follows:

1: / / ⬇ ️ ⬇ ️ ⬇ ️ SendMessageProcessor. Java 2 】 : / 3: * * * consumer to send back message 4: * 5: * @ param CTX CTX 6: * @ param request request 7: * @return response 8: * @throws RemotingCommandException When a remote call is abnormal 9: */ 10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) 11: throws RemotingCommandException { 12: // .... 13: // Handle delayLevel (exclusive). 14: int delayLevel = requestHeader.getDelayLevel(); 15: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); 16: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { 17: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); 18: } 19: if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// 20: // .... Else {22: if (0 == delayLevel) {23: delayLevel = 3 + msgext.getreConsumeTimes (); 24: } 25: msgExt.setDelayTimeLevel(delayLevel); 27:} 27: 28: //.... 29: Return response; 30:}Copy the code

666. The eggs