Abstract: the original source www.iocoder.cn/RocketMQ/me… “Taro source” welcome to reprint, keep the summary, thank you!
This article is based on RocketMQ 4.0.x official release
- 1. An overview of the
- 2. Timing message
- 2.1 Delay Level
- 2.2 Producer Sends timing messages
- 2.3 Broker stores timed messages
- 2.4 Broker sends timed messages
- 2.5 Broker persistence Sends periodic progress
- 3. Retry the message
🙂🙂🙂 follow wechat public number:
- RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
- RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
- Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
- New source code parsing articles are notified in real time. It’s updated about once a week.
- Serious source communication wechat group.
1. An overview of the
Recommended pre-reading content:
- RocketMQ source Code Analysis — Message sending and receiving
- RocketMQ source Code Analysis — Message pull and Consumption (part 2)
😈 Why are timed messages and message retries put together? You guess. 👻 you guess I guess.
2. Timing message
Timed messages are messages that cannot be consumed by consumers immediately after they are sent to the Broker, but must be consumed at a specific point in time or after a specific period of time.
The following figure shows the processing logic of timed messages:
2.1 Delay Level
RocketMQ currently only supports fixed precision timed messages. Here’s the official line:
To support arbitrary time precision, message ordering must be done at the Broker level, which inevitably incurs significant performance costs if persistence is involved.
- Delay level:
The delay level | time |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
-
The core source code is as follows:
1: // ⬇️⬇️⬇️ [messagestoreconfig. Java] 2: /** 3: * Message delay level String Configuration 4: */ 5: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 6:7: / / ⬇ ️ ⬇ ️ ⬇ ️ ScheduleMessageService. Java 8 】 : 9: / * * * analytical delay level 10:11: * * @ return whether parsing 12 success: * / 13: public boolean parseDelayLevel() { 14: HashMap<String, Long> timeUnitTable = new HashMap<>(); 15: timeUnitTable.put("s", 1000L); 16: timeUnitTable.put("m", 1000L * 60); 17: timeUnitTable.put("h", 1000L * 60 * 60); 18: timeUnitTable.put("d", 1000L * 60 * 60 * 24); 19: 20: String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); 21: try { 22: String[] levelArray = levelString.split(" "); 23: for (int i = 0; i < levelArray.length; i++) { 24: String value = levelArray[i]; 25: String ch = value.substring(value.length() - 1); 26: Long tu = timeUnitTable.get(ch); 27: 28: int level = i + 1; 29: if (level > this.maxDelayLevel) { 30: this.maxDelayLevel = level; 31: } 32: long num = Long.parseLong(value.substring(0, value.length() - 1)); 33: long delayTimeMillis = tu * num; 34: this.delayLevelTable.put(level, delayTimeMillis); 35: } 36: } catch (Exception e) { 37: log.error("parseDelayLevel exception", e); 38: log.info("levelString String = {}", levelString); 39: return false; 40: } 41: 42: return true; 43:}Copy the code
2.2 Producer Sends timing messages
- 🦅 Set the delay level for sending messages.
Message msg = new Message(...) ; msg.setDelayTimeLevel(level);Copy the code
2.3 Broker stores timed messages
- 🦅 Delayed message entry while storing messages
Topic
为SCHEDULE_TOPIC_XXXX
. - 🦅 DelayLevel is mapped to message queue number: QueueId = delaylevel-1.
The core code is as follows:
1: // ⬇️⬇️⬇️ [commitlog. Java] 2: /** 3: * Adds a message and returns a message result 4: * 5: * @param MSG message 6: * @return result 7: */ 8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) { 9: // .... Code (omitted) 10:11: / / timing message processing 12: final int tranType = MessageSysFlag. GetTransactionValue (MSG) getSysFlag ()); 13: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 14: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 15: // Delay Delivery 16: if (msg.getDelayTimeLevel() > 0) { 17: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 18: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 19:} 20:21: // When storing messages, delay messages enter 'Topic' as' SCHEDULE_TOPIC_XXXX '. 22: topic = ScheduleMessageService.SCHEDULE_TOPIC; 23:24: / / delay level with a message queue number Do 25: fixed mapping queueId = ScheduleMessageService. DelayLevel2QueueId (MSG) getDelayTimeLevel ()); 26: 27: // Backup real topic, queueId 28: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 30: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 31: 32: msg.setTopic(topic); 33: msg.setQueueId(queueId); 34:} 35:} 36: 37: //.... Code (omitted) : 38} 39:40: / / ⬇ ️ ⬇ ️ ⬇ ️ ScheduleMessageService. Java 41 】 : / * * 42: * calculated according to the message queue delay level number 43: * QueueId = delaylevel-1 44: * 45: * @param DelayLevel DelayLevel 46: * @return message queue number 47: */ 48: public static int delayLevel2QueueId(final int delayLevel) { 49: return delayLevel - 1; 50:}Copy the code
- 🦅 generated
ConsumeQueue
When each messagetagsCode
Use message plan consumption time. In this way,ScheduleMessageService
In the pollConsumeQueue
, can be usedtagsCode
Filter.
The core code is as follows:
1: // ⬇️⬇️⬇️【 commitlog. Java 】 2: /** 3: * check the message and returns the message size 4: * 5: * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure 6: */ 7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { 8: try { 9: // // .... (omitted code) 10: 11: // 17 properties 12: short propertiesLength = byteBuffer.getShort(); 13: if (propertiesLength > 0) { 14: // .... 15: String tags = propertiesMap.get(messageconst.property_tags); 16: if (tags ! = null && tags.length() > 0) { 17: tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags); 18: } 19: 20: // Timing message processing 21: { 22: String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); 23: if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t ! = null) { 24: int delayLevel = Integer.parseInt(t); 25: 26: if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 27: delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); 28: } 29: 30: if (delayLevel > 0) { 31: tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, 32: storeTimestamp); 33:} 34:} 35:} 36:} 37: 38: //.... 39:40: return DispatchRequest(// 41: topic, // 1 42: queueId, // 2 43: physicOffset, // 3 44: totalSize, // 4 45: tagsCode, // 5 46: storeTimestamp, // 6 47: queueOffset, // 7 48: keys, // 8 49: uniqKey, //9 50: sysFlag, // 9 51: preparedTransactionOffset// 10 52: ); 53: } catch (Exception e) { 54: } 55: 56: return new DispatchRequest(-1, false /* success */); 57:} 58:59: / / ⬇ ️ ⬇ ️ ⬇ ️ 60 ScheduleMessageService. Java 】 【 : / * * 61: * delivery time plan spending time 】 【 62: * 63: * @param delayLevel delayLevel 64: * @param storeTimestamp storage time 65: * @return delivery time [planned consumption time] 66: */ 67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { 68: Long time = this.delayLevelTable.get(delayLevel); 69: if (time ! = null) { 70: return time + storeTimestamp; 71: } 72: 73: return storeTimestamp + 1000; 74:}Copy the code
2.4 Broker sends timed messages
- 🦅 to
SCHEDULE_TOPIC_XXXX
Each consumption queue corresponds toA singleScheduled task polling and sendingArrival delivery time [Planned consumption time]The news.
The following figure shows the processing logic for sending timed messages:
The implementation code is as follows:
1: /**
2: * ⬇️⬇️⬇️ 发送(投递)延迟消息定时任务
3: */
4: class DeliverDelayedMessageTimerTask extends TimerTask {
5: /**
6: * 延迟级别
7: */
8: private final int delayLevel;
9: /**
10: * 位置
11: */
12: private final long offset;
13:
14: public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
15: this.delayLevel = delayLevel;
16: this.offset = offset;
17: }
18:
19: @Override
20: public void run() {
21: try {
22: this.executeOnTimeup();
23: } catch (Exception e) {
24: // XXX: warn and notify me
25: log.error("ScheduleMessageService, executeOnTimeup exception", e);
26: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
27: this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
28: }
29: }
30:
31: /**
32: * 纠正可投递时间。
33: * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。
34: *
35: * @param now 当前时间
36: * @param deliverTimestamp 投递时间
37: * @return 纠正结果
38: */
39: private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
40: long result = deliverTimestamp;
41:
42: long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
43: if (deliverTimestamp > maxTimestamp) {
44: result = now;
45: }
46:
47: return result;
48: }
49:
50: public void executeOnTimeup() {
51: ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
52:
53: long failScheduleOffset = offset;
54:
55: if (cq != null) {
56: SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
57: if (bufferCQ != null) {
58: try {
59: long nextOffset = offset;
60: int i = 0;
61: for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
62: long offsetPy = bufferCQ.getByteBuffer().getLong();
63: int sizePy = bufferCQ.getByteBuffer().getInt();
64: long tagsCode = bufferCQ.getByteBuffer().getLong();
65:
66: long now = System.currentTimeMillis();
67: long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
68:
69: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
70:
71: long countdown = deliverTimestamp - now;
72:
73: if (countdown <= 0) { // 消息到达可发送时间
74: MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
75: if (msgExt != null) {
76: try {
77: // 发送消息
78: MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
79: PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
80: if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功
81: continue;
82: } else { // 发送失败
83: // XXX: warn and notify me
84: log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());
85:
86: // 安排下一次任务
87: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
88:
89: // 更新进度
90: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
91: return;
92: }
93: } catch (Exception e) {
94: // XXX: warn and notify me
95: log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
96: + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
97: }
98: }
99: } else {
100: // 安排下一次任务
101: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
102:
103: // 更新进度
104: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
105: return;
106: }
107: } // end of for
108:
109: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
110:
111: // 安排下一次任务
112: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
113:
114: // 更新进度
115: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
116: return;
117: } finally {
118: bufferCQ.release();
119: }
120: } // end of if (bufferCQ != null)
121: else { // 消费队列已经被删除部分,跳转到最小的消费进度
122: long cqMinOffset = cq.getMinOffsetInQueue();
123: if (offset < cqMinOffset) {
124: failScheduleOffset = cqMinOffset;
125: log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
126: + cqMinOffset + ", queueId=" + cq.getQueueId());
127: }
128: }
129: } // end of if (cq != null)
130:
131: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
132: }
133:
134: /**
135: * 设置消息内容
136: *
137: * @param msgExt 消息
138: * @return 消息
139: */
140: private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
141: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
142: msgInner.setBody(msgExt.getBody());
143: msgInner.setFlag(msgExt.getFlag());
144: MessageAccessor.setProperties(msgInner, msgExt.getProperties());
145:
146: TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
147: long tagsCodeValue =
148: MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
149: msgInner.setTagsCode(tagsCodeValue);
150: msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
151:
152: msgInner.setSysFlag(msgExt.getSysFlag());
153: msgInner.setBornTimestamp(msgExt.getBornTimestamp());
154: msgInner.setBornHost(msgExt.getBornHost());
155: msgInner.setStoreHost(msgExt.getStoreHost());
156: msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
157:
158: msgInner.setWaitStoreMsgOK(false);
159: MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
160:
161: msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
162:
163: String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
164: int queueId = Integer.parseInt(queueIdStr);
165: msgInner.setQueueId(queueId);
166:
167: return msgInner;
168: }
169: }Copy the code
2.5 Broker persistence Sends periodic progress
- 🦅 The scheduled message sending progress is stored in a file (
../config/delayOffset.json
) - 🦅 Sends the persistent progress every 10 seconds.
The core code is as follows:
1: / / ⬇ ️ ⬇ ️ ⬇ ️ ScheduleMessageService. Java 2 】 : / * * 3: public void the start () {4: / / regularly send messages 5: for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 6: Integer level = entry.getKey(); 7: Long timeDelay = entry.getValue(); 8: Long offset = this.offsetTable.get(level); 9: if (null == offset) { 10: offset = 0L; 11: } 12: 13: if (timeDelay ! = null) { 14: this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); 16:15:}} : 17 18: / / timing persistence 19: send schedule this. The timer. The scheduleAtFixedRate (new TimerTask () {20:21: @ Override 22: public void run() { 23: try { 24: ScheduleMessageService.this.persist(); 25: } catch (Exception e) { 26: log.error("scheduleAtFixedRate flush exception", e); 27: } 28: } 29: }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); 30:}Copy the code
3. Retry the message
When a Consumer fails to consume a message, a retry mechanism is provided to make the message consume again.
- 🦅
Consumer
Send back the message of a failed consumptionBroker
And into theDelayed message queue. That is, the news of consumption failure does not immediately consume.
The core code is as follows:
1: / / ⬇ ️ ⬇ ️ ⬇ ️ SendMessageProcessor. Java 2 】 : / 3: * * * consumer to send back message 4: * 5: * @ param CTX CTX 6: * @ param request request 7: * @return response 8: * @throws RemotingCommandException When a remote call is abnormal 9: */ 10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) 11: throws RemotingCommandException { 12: // .... 13: // Handle delayLevel (exclusive). 14: int delayLevel = requestHeader.getDelayLevel(); 15: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); 16: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { 17: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); 18: } 19: if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// 20: // .... Else {22: if (0 == delayLevel) {23: delayLevel = 3 + msgext.getreConsumeTimes (); 24: } 25: msgExt.setDelayTimeLevel(delayLevel); 27:} 27: 28: //.... 29: Return response; 30:}Copy the code