RocketMQ source code interpretation — message order
preface
RocketMQ provides two order levels:
- Plain sequential messages: Producer sends associated messages to the same message queue.
- Strict ordering messages: Based on normal ordering messages, consumers consume strict ordering.
Strictly sequential messages are known to be used only in database binlog synchronization.
Producer order
Plain sequential messages are implemented using MessageQueueSelector:
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
Copy the code
The above example, from the official RocketMQ documentation, uses the orderId as a condition for selecting the queue, allowing the same orderId to enter the same queue.
Here the source code will not look, is to use our provided MessageQueueSelector to select a queue we want, and then send a message.
Consumer order
When a consumer makes a strict sequential consumption, three locks are used to ensure strict sequential consumption:
Broker message queue lock
Distributed lock: In clustered mode, a Consumer can pull and consume messages only after it obtains this lock from the Broker. In broadcast mode, consumers do not need this lock (for consumers in broadcast mode, all queues are their own, so no lock is required).Consumer message queue lock
(Local lock) : The Consumer obtains this lock to operate on the message queue.Consumer message processing queue consumption lock
(Local lock) : The Consumer obtains this lock to consume the message queue
RocketMQ’s message pull, as mentioned earlier, is done by a queue that continually pulls requests out of it. Initialize a queue pull was conducted in the process of rebalance, we look at the updateProcessQueueTableInRebalance:
List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (! This. ProcessQueueTable. Either containsKey (mq)) {/ / if strict order, try to lock the remote queue, if there is no lock, next time when rebalance will also try to get the lock, Continue if (isOrder &&! this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre ! = null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest PullRequest = new PullRequest(); // Initialize PullRequest. pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); }} // pull this.dispatchPullRequest(pullRequestList);Copy the code
Continue to look at how queues are locked:
public boolean lock(final MessageQueue mq) { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (findBrokerResult ! = null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.getMqSet().add(mq); Try {// request the Broker to obtain the distributed lock Set<MessageQueue> lockedMq = for the specified MessageQueue this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); ProcessQueue (MessageQueue MMQQ: lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); if (processQueue ! = null) { processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); }} // Return success if the remote locked queue contains the queue to be locked. Boolean lockOK = lockedMq. Contains (mq); return lockOK; } catch (Exception e) { } } return false; }Copy the code
Here’s the problem: we see that the ultimate measure of success is whether the remote locked queue contains the queue we want to lock. Is there anything wrong with that judgment? What if another consumer, instead of the current consumer, remotely locks the MQ?
Here we look at the lock on the queue for conditions there are three: consumerGroup, mQClientFactory getClientId () and MessageQueue. Here is the remote lock queue based on its clientId, that is, the lock queue of the current consumer, so the above problem will not occur.
Locks have expiration time:
public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); }}Copy the code
The client initiates a timed task to refresh the lock time, and if a consumer that has locked the queue fails, the lock can be released to ensure that the queue does not remain locked. Let’s take a look at lockAll:
Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue ! = null) { if (! processQueue.isLocked()) { } processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } for (MessageQueue mq : mqs) { if (! LockOKMQSet. The contains (mq)) {/ / release the remote release the lock, but local still holding the lock queue ProcessQueue ProcessQueue = this. ProcessQueueTable. Get (mq); if (processQueue ! = null) { processQueue.setLocked(false); }}}Copy the code
Obtain the address of the broker, and then batch lock queues and unlock remote queues that have released locks.
Removing message queues
In clustered mode, when a Consumer removes its message queue, it tells the Broker to unlock it:
/ / synchronization queue consumption schedule and remove this. DefaultMQPushConsumerImpl. GetOffsetStore (). The persist (mq); this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); // In clustered mode and in strict order, Unlock the queue lock the if (this. DefaultMQPushConsumerImpl. IsConsumeOrderly && () MessageModel. CLUSTERING. Equals (this) defaultMQPushConsumerImpl) MessageModel ())) {try {/ / try to lock the local queue if (pq.getlockconsume ().trylock (1000, timeunit.milliseconds)) {try {return this.unlockdelay (mq, pq); } finally {// unlock the local processQueue pq.getlockconsume ().unlock(); }} else {// This process queue is being consumed pq.inctryunLockTimes (); } } catch (Exception e) { } return false; } return true;Copy the code
The message queue consumption lock is acquired. Avoid collisions with message queue consumption. If the lock fails to be obtained, the message queue will fail to be removed until the next consumption queue is Rebalance.
If you remove without a lock, the queue may be removed while messages are being consumed. There is no guarantee of strict order if consumption goes wrong.
Take a look at the unlockDelay (delay unlocking the Broker message queue and unlocking it if there is no message in the message processing queue) :
// Unlock the Broker message queue lock. If there are messages remaining in the message processing queue, the Broker message queue lock is delayed. if (pq.hasTempMessage()) { this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() { @Override public void run() { RebalancePushImpl.this.unlock(mq, true); } }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS); } else { this.unlock(mq, true); } return true;Copy the code
News consumption
Mainly see ConsumeRequest (in ConsumeMessageOrderlyService), the code is a little bit more, sectional view, the first is to get MessageQueue lock:
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
Copy the code
Still a local lock, MessageQueue is an abstraction of the remote queue, where the lock ensures that the queue is consumed only by the current thread.
Next, check the lock status:
// If the queue is not locked, the distributed lock is committed. Next time if consumption (MessageModel. CLUSTERING. Equals (ConsumeMessageOrderlyService. This. DefaultMQPushConsumerImpl. MessageModel ()) &&! this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } // If the lock on this queue expires, commit the distributed lock as well, Next time if consumption (MessageModel. CLUSTERING. Equals (ConsumeMessageOrderlyService. This. DefaultMQPushConsumerImpl. MessageModel ()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; }Copy the code
Then there is the part of consumption delay. Here, considering that the time of a consumption should not be too long, if it takes a long time to commit the lock, it will submit a task with a low priority of consumption message:
Long interval = System.currentTimemillis () - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; }Copy the code
Then get the message to consume:
// It is different from concurrent consumption getting messages. Concurrent consumption requests Which messages are consumed are set up when the request is created. final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);Copy the code
Initialize the context, etc., and finally execute the consumption logic. Let’s look at the code:
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
Copy the code
Here is the consumption after acquiring a lock, first (this. ProcessQueue. GetLockConsume (). The lock ()), where we remove queue said before, why are removed before acquiring a lock, it is for this reason: If the lock fails to be acquired during removal, the queue is being consumed and will need to be removed later.
Processing consumption results
Mainly ConsumeMessageOrderlyService. This. ProcessConsumeResult, ConsumeOrderlyStatus have four kinds of state:
SUCCESS
: Consumable successfully but not submitted.ROLLBACK
: Failed consumption, consumption rollback.COMMIT
: The consumption was successfully committed and committed.SUSPEND_CURRENT_QUEUE_A_MOMENT
: failed to consume, suspend the consumption queue for a while, continue to consume later.
In a concurrent consumption scenario, if a consumption fails, the Consumer sends the failure message back to the Broker retry queue, skips the current message and waits for the next pull to consume the message.
But when it comes to strictly sequential consumption, this is obviously not the case. Therefore, a message that fails to consume will suspend the queue for a while and continue consuming later.
But the news of consumption failure has always failed, and consumption cannot always be. When the consumption retry limit is exceeded, the Consumer sends messages that fail to consume beyond the limit back to the Broker dead-letter queue.
Take a look at the source:
if (context.isAutoCommit()) { switch (status) { case COMMIT: case ROLLBACK: log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue()); Case SUCCESS: / / submit consumption. CommitOffset = consumeRequest getProcessQueue (), commit (); / / statistical enclosing getConsumerStatsManager (.) incConsumeOKTPS (consumerGroup, consumeRequest getMessageQueue () getTopic (), msgs.size()); break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: / / statistical enclosing getConsumerStatsManager (.) incConsumeFailedTPS (consumerGroup, consumeRequest getMessageQueue () getTopic (), msgs.size()); // Messages consumed more than the maximum number of times are sent to the dead-letter queue in the broker. If (checkReconsumeTimes (MSGS)) {/ / set to consumption (put back into the map) consumeRequest. GetProcessQueue () makeMessageToCosumeAgain (MSGS); // Submit the delayed consumption request, Later consumption (and hang up for a while) enclosing submitConsumeRequestLater (consumeRequest. GetProcessQueue (), consumeRequest. GetMessageQueue (), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else {/ / if the news consumption is greater than the maximum times and be successful in a dead-letter queue, delivery schedules, do not need to hang commitOffset = consumeRequest. GetProcessQueue (). The commit (); } break; default: break; } } else { switch (status) { case SUCCESS: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case COMMIT: commitOffset = consumeRequest.getProcessQueue().commit(); break; Case ROLLBACK: / / set message to consumer consumeRequest getProcessQueue (). The ROLLBACK (); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } break; default: break; }}Copy the code
Here, there is little difference between auto-commit and non-auto-commit. Let’s look at the checkReconsumeTimes method:
boolean suspend = false; if (msgs ! = null && ! msgs.isEmpty()) { for (MessageExt msg : msgs) { if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) { MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes())); // If the return fails, an interrupt is returned and the next processing is needed. sendMessageBack(msg)) { suspend = true; msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); } } else { suspend = true; msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); } } } return suspend;Copy the code
In this method, it verifies that the message can be reconsumed, if not, attempts to send back to the broker’s dead-letter queue, and returns suspend = true if it can be reconsumed.
Let’s look at the sendMessageBack method:
try {
// max reconsume times exceeded then send to dead letter queue.
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
Copy the code
We see here, here is not sent directly to the direct messages queue, but sent to RETRY queue, this part of the code logic in the Broker, I need to confirm, should be a Broker to identify the current to a message have consumption number > = maximum number of consumption, will put the message directly to the dead-letter queue rather than to RETRY queue.
ProccessQueue parsing related methods
First look at attributes:
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
Copy the code
MsgTreeMap is all messages, consumingMsgOrderlyTreeMap is a copy of the message being consumption.
The rollback:
this.lockTreeMap.writeLock().lockInterruptibly(); Try {/ / rollback consumption of messages, to put the message back to the main map, and then clean the backup map enclosing msgTreeMap. PutAll (enclosing consumingMsgOrderlyTreeMap); this.consumingMsgOrderlyTreeMap.clear(); } finally { this.lockTreeMap.writeLock().unlock(); }Copy the code
Commit:
/ / return offset, clearing consumingMsgOrderlyTreeMap Long offset = this. ConsumingMsgOrderlyTreeMap. LastKey (); msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { msgSize.addAndGet(0 - msg.getBody().length); } this.consumingMsgOrderlyTreeMap.clear(); if (offset ! = null) { return offset + 1; }Copy the code
MakeMessageToCosumeAgain:
For (MessageExt MSG: MSGS) {/ / back in the main map, a group of news and removed from the backing map enclosing consumingMsgOrderlyTreeMap. Remove (MSG) getQueueOffset ()); this.msgTreeMap.put(msg.getQueueOffset(), msg); }Copy the code