The following source code is based on Rocket MQ 4.7.0
How to recover data if RocketMQ is restarted during normal or abnormal exit. The next step is to examine the process in code.
Broker Fault recovery
This code is called when the broker is first started or restarted:
/ / the BrokerControllerinitialize method
public boolean initialize(a) throws CloneNotSupportedException {
result = result && this.messageStore.load();
}
Copy the code
As you can see from the above code, when the broker is initiated, the MessageStore#load method is implemented as DefaultMessageStore by default. Now look at the load method, which is the entrance to Broker recovery:
public boolean load(a) {
boolean result = true;
try {
// Abort exits correctly by checking whether the abort file exists
boolean lastExitOK = !this.isTempFileExist();
if (null! = scheduleMessageService) { result = result &&this.scheduleMessageService.load();
}
/ / load CommitLog
result = result && this.commitLog.load();
/ / load ConsumeQueue
result = result && this.loadConsumeQueue();
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
// Restore the entry
this.recover(lastExitOK);
this.getMaxPhyOffset()); }}catch (Exception e) {
result = false;
}
if(! result) {this.allocateMappedFileService.shutdown();
}
return result;
}
Copy the code
You can see from the above code that recovery is handled by the Recover method.
private void recover(final boolean lastExitOK) {
// Get the maximum physical offset of ConsumeQueue -- this is also the physical offset of CommitLog (test print code later)
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
// Exit normally
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
// Abnormal exit processing
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
}
Copy the code
As of Version 4.7.0 of RocketMQ, the CommitLog#recoverAbnormally method shows up as expired, which will not be analyzed here. And we’ll see what happens here.
Restore CommitLog and ConsumeQueue
Here to explain by means of adding test code maxPhyOffsetOfConsumeQueue what value. You can first add the following code to Recover and then package the source code:
Then start the broker, which I started with a value of 384
The client then generates a message to the Broker
By monitoring broker logs (which I also added myself), the CommitLog size is 192 bytes.
And then restart the Broker found this maxPhyOffsetOfConsumeQueue into 576.
MaxPhyOffsetOfConsumeQueue is illustrated by the log print for CommitLog log physical offset. CommitLog#recoverNormally:
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
//CRC checks on recovery -- default true
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
// Get the CommitLog list
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
// Non-empty indicates not the first startup
if(! mappedFiles.isEmpty()) {// If there are more than three CommitLog files, start with the latest three. If there are less than three files, check as many files as possible
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
// Check each piece of data and return DispatchRequest
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data processing
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
// The file ends or is processed or needs to be changed
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
// The latest three files have been processed
if (index >= mappedFiles.size()) {
break;
} else {
// Switch files
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0; }}// Terminal file read due to error
else if(! dispatchRequest.isSuccess()) { log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
processOffset += mappedFileOffset;
// Set the refresh location
this.mappedFileQueue.setFlushedWhere(processOffset);
// Set the submission location for the next file
this.mappedFileQueue.setCommittedWhere(processOffset);
// Delete expired files
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear unnecessary data from ConsumeQueue
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); }}else {
// Delete all CommitLog files (special case is the first boot)
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics(); }}Copy the code
The following code has two variables: processOffset and mappedFileOffset. Let’s look at them by adding the log print mode. First, add the code as shown in the following image and then package the corresponding module:
Then start the Broker and look at the corresponding values as shown below:
The processOffset starts with 0, and the mappedFileOffset is the data processed by each CommitLog.
There are two kinds of normal recovery:
-
CommitLog log files exist
- Check each of the last three file data
- Set flushedWhere and committedWhere values
- Delete the CommitLog log file that has been processed.
-
No CommitLog log file exists (first startup or log file deleted)
Set flushedWhere and committedWhere to 0 and delete the ConsumeQueue file
The recovery of TopicQueue
// Topic queueId and offset
public void recoverTopicQueueTable(a) {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-"+ logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); logic.correctMinOffset(minPhyOffset); }}this.commitLog.setTopicQueueTable(table);
}
Copy the code
The relational data in this is used in the CommitLog data stored in the following code. DefaultAppendMessageCallback# doAppend method.
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
ueueOffset++;
CommitLog.this.topicQueueTable.put(key, queueOffset);
Copy the code