An overview of the

How does the broker load files on disk when it restarts? An important class, DefaultMessageStore, helps broker load disk files.

DefaultMessageStore can be thought of as a file storage control class. This class aggregates important storage classes such as CommitLog, ConsumeQueue, and IndexFile.

DefaultMessageStore initialization process: initialization -> Load -> start

instantiation

DefaultMessageStore is instantiated when the broker starts. DefaultMessageStore initialization is relatively simple, simply instantiating individual properties

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
    final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
    this.messageArrivingListener = messageArrivingListener;
    this.brokerConfig = brokerConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.brokerStatsManager = brokerStatsManager;
    this.allocateMappedFileService = new AllocateMappedFileService(this);
    if (messageStoreConfig.isEnableDLegerCommitLog()) {
        this.commitLog = new DLedgerCommitLog(this);
    } else {
        // Instantiate CommitLog
        this.commitLog = new CommitLog(this);
    }
    this.consumeQueueTable = new ConcurrentHashMap<>(32);
    
    // ConsumeQueue Flush thread
    this.flushConsumeQueueService = new FlushConsumeQueueService();
    // Clear commitlog threads
    this.cleanCommitLogService = new CleanCommitLogService();
    this.cleanConsumeQueueService = new CleanConsumeQueueService();
    this.storeStatsService = new StoreStatsService();
    
    // IndexFile
    this.indexService = new IndexService(this);
    if(! messageStoreConfig.isEnableDLegerCommitLog()) {this.haService = new HAService(this);
    } else {
        this.haService = null;
    }
    // Forward CommitLog threads
    this.reputMessageService = new ReputMessageService();

    this.scheduleMessageService = new ScheduleMessageService(this);

    this.transientStorePool = new TransientStorePool(messageStoreConfig);

    if (messageStoreConfig.isTransientStorePoolEnable()) {
        this.transientStorePool.init();
    }

    this.allocateMappedFileService.start();

    this.indexService.start();
    
    ReputMessageService forwards to ConsumeQueue and IndexFile by default
    this.dispatcherList = new LinkedList<>();
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

    File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
    MappedFile.ensureDirOK(file.getParent());
    lockFile = new RandomAccessFile(file, "rw");
}
Copy the code

Load – Loads disk files

DefaultMessageStore, after being instantiated, will start loading the files on disk. The loading process is as follows:

  1. loadingCommitLog
  2. loadingConsumeQueue
  3. loadingIndexFile
  4. According to theConsumeQueueStored in thecommitlogMaximum offset to memoryCommitLogReset the write offset (I’m going to talk about why do I do this)

Start – Starts each flush thread

Start is the action that is performed after load is executed. The main thing to do is start the thread. The overall process is as follows:

  1. Start theReputMessageServiceThread,CommitLogIs not forwarded toConsumeQueueThe message is forwarded toConsumeQueue
  2. Start theConsumeQueueFlush thread,CommitLogBrush disk threads and so on.

Now in load, why do WE do step 4? In fact, this is to prepare for step 1 of the start process. Because ConsumeQueue doesn’t flush very often. After the process exits, messages are written to the CommitLog and possibly not to the ConsumeQueue. Therefore, we need to know which messages have not been forwarded to the ConsumeQueue by the maximum offset of the CommitLog in the ConsumeQueue, otherwise the consumer will not be able to consume the messages