An overview of the
In JDK, ScheduledThreadPoolExecutor offer delay message function, but because of internal queue is used DelayedWorkQueue queue, and DelayedWorkQueue is using a heap, The insertion time complexity of the heap is all O(logn), which is very inefficient, and most MQS would not tolerate such efficiency.
As a result, MQ reimplements latency.
RocketMQ is even more crafty in the implementation of delayed messages. Instead of using the time round algorithm that most MQ uses, RocketMQ simply uses Timer.
Delay message dump
When a producer sends a delayed message, the broker must surely wait until the message expires before the consumer can consume it.
That is, messages must do extra processing on the broker side to avoid them.
The topic and queueId are dumped
The broker resets the message topic to SCHEDULE_TOPIC_XXXX and queueId to delayLevel-1 before the message is written to the commitlog. Then write the original topic and queueId to the properties of the message so that when the message expires, the original topic and queueId can be restored.
CommitLog#putMessage(final MessageExtBrokerInner MSG)
public class CommitLog {
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//TODO will send delayed messages to another SCHEDULE_TOPIC_XXXX topic
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// TODO uses latency level -1 as our queueId
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
// TODO backs up our original topic, as well as our original queueId
// TODO REAL_TOPIC, REAL_QIDMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); }}}Copy the code
After this processing, the Consumer cannot immediately consume the delayed message that was just sent.
ConsumeQueue subentry tagsCode is reset
The Broker sets the tagsCode to the expiry time of the message when the CommitLog is dumped to the ConsumeQueue if it finds a delayed message.
Code location: CommitLog# checkMessageAndReturnSize
public class CommitLog {
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) {
// If TODO is a delayed message, it will store the expiration time of the message as tagsCode
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp); }}}Copy the code
Message expiration processing
How the Broker senses that a message is due for consumption by the Consumer is the focus. Next, take a look at the trickery of RocketMQ.
As we know, the Broker can change the default latency level of RocketMQ by configuring messageDelayLevel.
Here is the default configuration for RocketMQ
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Copy the code
That said, RocketMQ doesn’t really have the same flexibility as other MQS to provide delayed messages and must be configured through a configuration file.
Do not provide such a function, is more simple and funny to implement the delayed message function.
The implementation of delayed messages are all in this class ScheduleMessageService
Parse the messageDelayLevel configuration
When the Broker starts, the messageDelayLevel configuration is parsed
ScheduleMessageService#parseDelayLevel()
The main logic is as follows:
Parse messageDelayLevel and place the values in the delayLevelTable map.
public class ScheduleMessageService extends ConfigManager {
public boolean parseDelayLevel(a) {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s".1000L);
timeUnitTable.put("m".1000L * 60);
timeUnitTable.put("h".1000L * 60 * 60);
timeUnitTable.put("d".1000L * 60 * 60 * 24);
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levlString.split("");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis); }}catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true; }}Copy the code
The Timer starts
ScheduleMessageService#start()
public void start(a) {
if (started.compareAndSet(false.true)) {
this.timer = new Timer("ScheduleMessageTimerThread".true);
// Get the configured latency level
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
// The consumption offset for each delay level
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
// Initialize the deferred scheduling task
if(timeDelay ! =null) {
this.timer.schedule(newDeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); }}}}Copy the code
OffsetTable is the Broker startup, from ${storePath} / store/config/delayOffset json parsing. The content stored in this file is a JSON file that contains the consumption offset for each latency level. Roughly as follows
{
"offsetTable": {1:10}}Copy the code
DeliverDelayedMessageTimerTask
The task is DeliverDelayedMessageTimerTask Timer, see the run of the next () method implementation logic.
The logic of run() is as follows
-
According to the consumption offset of the delay queue, the message is obtained from the corresponding queue
-
Gets the timestamp when the message was stored according to the tagsCode in the ConsumeQueue subentry
-
The tagsCode is compared with the current time. If the tagsCode is less than or equal to the current time, the delayed message is restored to the original message for consumption by the Consumer
-
Continue scheduling the next delayed message
Consumption offset persistence
The Broker persists delayed message consumption offsets every 10 seconds.
ScheduleMessageService#start()
public class ScheduleMessageService extends ConfigManager {
public void start(a) {
if (started.compareAndSet(false.true)) {
// By default, persistence is performed every 10 seconds
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run(a) {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e); }}},10000.this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }}}Copy the code