preface

In the last article we mentioned that when a message is sent to a Broker, The Broker calls CommitLog#putMessage()->MappedFile#appendMessage()->MappedFile#appendMessageInner()->CommitLog#doAppend() Buffer, MappedFile. When the CommitLog is initialized, it starts two different flushDiskType services based on the flushDiskType configuration of MessageStoreConfig.

  • Synchronous flush: This means that when a message is appended to memory, it is immediately flushed to a file for storage.
  • Asynchronous flush: When a message is appended to memory, it is not immediately flushed to memory, but starts an asynchronous thread operation.

RocketMq uses asynchronous flush by default. After the CommitLog finishes appending messages to buffer MappedFile, the handleDiskFlush() method is called to flush and store the messages to a file. HandleDiskFlush is handled differently depending on the two flush modes.

Explore synchronous brush

The flush request is submitted in handleDiskFlush and the flush result is awaited synchronously. Flush failure is also marked as a message storage failure and FLUSH_DISK_TIMEOUT is returned.

2. After a request is submitted to GroupCommitService, the GroupCommitService does not process it immediately. Instead, it places the request on an internal queue and notifies the GroupCommitService of the arrival of a new request using waitPoint.

3. After the GroupCommitService is awakened, requests in the requestWrite are exchanged to the requestHead to avoid lock contention.

4. After startup, the GroupCommitService calls doCommit() in an infinite loop, and doCommit() iterates through requestRead requests:

5, can see the final call CommitLog. Enclosing mappedFileQueue. Flush (0) to brush plate.

Although synchronous flush tasks are also performed in asynchronous threads, the main flow of the message store waits for the flush results synchronously, so the operation is essentially synchronous.

Explore asynchronous swiping

The asynchronous flush service is FlushRealTimeService, but when the memory cache pool TransientStorePool is available, messages are first committed to WriteBuffer in TransientStorePool. Then it is submitted to the FileChannel of MappedFile. The asynchronous flush service is CommitRealTimeService, which inherits from FlushRealTimeService. 1. HandleDiskFlush () directly wakes up the asynchronous disk flush service

2. When FlushRealTimeService is started, it will periodically flush disks in an infinite loop.

Explore the brush of MappedFile

Either way, the following method is eventually called to flush

CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
Copy the code

1, From all the mappedFiles saved by MappedFile equeue, find the MappedFile that you want to swipe. If a corresponding MappedFile is found, flush() is called to flush the MappedFile and update the flushedWhere.

2. The final operation to flush the disk is in MappedFile#flush()

The logic in isAbleToFlush() is to flush when MappedFile is full, or flush when the minimum number of pages is exceeded to avoid unnecessary flushing

The general process of the whole message is as follows