1. Message store Overview

RocketMQ storage files, in ${ROCKET_HOME}/store.

When a producer sends a message, the broker stores it in a COMMIT file and asynchronously forwards it to a consumeQueue and indexFile.

Commitlog message body and metadata storage body. The messages sent by the Producer are stored in the Commitlog.

ConsumeQueue is a message consumption queue introduced to improve message consumption performance. Since RocketMQ is a topic-based subscription model, message consumption is performed on a topic, and it is inefficient to retrieve messages by topic through commitlog files.

IndexFile An indexFile provides a way to query messages by key or time interval.

The following figure explains when writing commitlog. Commitlog, consumeQueue, indexFile 3

2 know commitLog

Commitlog is a file used by RocketMQ to store messages. Commitlog has the following characteristics

  1. Sequential write, random read and write.
  2. The message just needs to be writtencommitlogThen the message will not be lost
  3. The message is of indefinite length
  4. Each file size defaults1GB
  5. The file is named aftercommitlogNamed by the start offset

Listed below, commitlog representation on disk

3 commitLog writing process

In this process, we only need to focus on 2 points.

  1. appendMessage()
  2. Handle brush set

AppendMessage () handles the logic of writing the message to off-heap memory, which is of little concern here. Interested readers can view the source code, the location of the source DefaultAppendMessageCallback# doAppend ()

Brush set

4.1 Synchronous Disk Flushing

From the user’s point of view of the synchronous brush disk process

From the point of view of the program synchronous brush disk process

  1. CommitLogEncapsulate the message intoGroupCommitRequestAnd put it into the queue.
  2. Wake up theGroupCommitServiceThreadthread
  3. GroupCommitServiceThreadListTo deriveRequestAnd consumption
  4. callMappedFileFlush the data to disk
  5. After brushing the plate, wake upCommitLogThe thread

4.2 Asynchronous Disk Flushing

From the user’s point of view of the asynchronous disk brush process

If disabled, transientStorePoolEnable is the flushing logic of FlushRealTimeService

Basic process:

CommitLog$FlushRealTimeService#run()

  1. every500msPerform a flush()
  2. At least 4 pages are needed for each swipe. If the interval between two flush operations is greater than 10 seconds, flush() is ignored.
  3. When you’ve finished, check to checkPoint

Configurable parameter

# false Wait with AQS; True uses sleep to wait
flushCommitLogTimed=false
Brush the disk every 500ms
flushIntervalCommitLog=500
At least 4 pages per swipe
flushCommitLogLeastPages=4
# If the disk is not flushed, the disk will be forcibly flushed
flushCommitLogThoroughInterval=1000 * 10
Copy the code

TransientStorePoolEnable Enable. RocketMQ calls the FileChannel#write() method through the CommitRealTimeService thread to write the message to the cache. It then wakes up the FlushRealTimeService thread, which executes Flush ()

The basic flow

Code position CommitLog$CommitRealTimeService#run()

  1. every200msPerform write() once
  2. You need at least four pages each time you execute write(). If the interval between two writes is greater than 200ms, ignore this condition and execute write() directly.
  3. Call FileChannel# write ()
  4. Wake up the FlushRealTimeService thread
  5. FlushRealTimeService Executes flush
Execute write every 200ms
commitIntervalCommitLog=200
At least 4 pages per write run
commitCommitLogLeastPages
# If the disk is not flushed, the disk will be forcibly flushed
commitCommitLogThoroughInterval=200
Copy the code

4.3 summarize

RocketMQ provides two flushing modes, which can be configured using flushDiskType

  1. flushDiskType=SYNC_FLUSH

Flush the disk synchronously and call Flush

  1. flushDiskType=ASYNC_FLUSH & transientStorePoolEnable=false

Flush disks asynchronously and call Flush.

  1. flushDiskType=ASYNC_FLUSH & transientStorePoolEnable=true

Asynchronously flush, call write, followed by flush

5 reference

  • RocketMQ Design Guide
  • Java NIO FileChannel FileChannel usage and principle
  • ByteBuffer,