1. Message store Overview
RocketMQ storage files, in ${ROCKET_HOME}/store.
When a producer sends a message, the broker stores it in a COMMIT file and asynchronously forwards it to a consumeQueue and indexFile.
Commitlog message body and metadata storage body. The messages sent by the Producer are stored in the Commitlog.
ConsumeQueue is a message consumption queue introduced to improve message consumption performance. Since RocketMQ is a topic-based subscription model, message consumption is performed on a topic, and it is inefficient to retrieve messages by topic through commitlog files.
IndexFile An indexFile provides a way to query messages by key or time interval.
The following figure explains when writing commitlog. Commitlog, consumeQueue, indexFile 3
2 know commitLog
Commitlog is a file used by RocketMQ to store messages. Commitlog has the following characteristics
- Sequential write, random read and write.
- The message just needs to be written
commitlog
Then the message will not be lost - The message is of indefinite length
- Each file size defaults
1GB
- The file is named after
commitlog
Named by the start offset
Listed below, commitlog representation on disk
3 commitLog writing process
In this process, we only need to focus on 2 points.
- appendMessage()
- Handle brush set
AppendMessage () handles the logic of writing the message to off-heap memory, which is of little concern here. Interested readers can view the source code, the location of the source DefaultAppendMessageCallback# doAppend ()
Brush set
4.1 Synchronous Disk Flushing
From the user’s point of view of the synchronous brush disk process
From the point of view of the program synchronous brush disk process
CommitLog
Encapsulate the message intoGroupCommitRequest
And put it into the queue.- Wake up the
GroupCommitServiceThread
thread GroupCommitServiceThread
从List
To deriveRequest
And consumption- call
MappedFile
Flush the data to disk - After brushing the plate, wake up
CommitLog
The thread
4.2 Asynchronous Disk Flushing
From the user’s point of view of the asynchronous disk brush process
If disabled, transientStorePoolEnable is the flushing logic of FlushRealTimeService
Basic process:
CommitLog$FlushRealTimeService#run()
- every
500ms
Perform a flush() - At least 4 pages are needed for each swipe. If the interval between two flush operations is greater than 10 seconds, flush() is ignored.
- When you’ve finished, check to checkPoint
Configurable parameter
# false Wait with AQS; True uses sleep to wait
flushCommitLogTimed=false
Brush the disk every 500ms
flushIntervalCommitLog=500
At least 4 pages per swipe
flushCommitLogLeastPages=4
# If the disk is not flushed, the disk will be forcibly flushed
flushCommitLogThoroughInterval=1000 * 10
Copy the code
TransientStorePoolEnable Enable. RocketMQ calls the FileChannel#write() method through the CommitRealTimeService thread to write the message to the cache. It then wakes up the FlushRealTimeService thread, which executes Flush ()
The basic flow
Code position CommitLog$CommitRealTimeService#run()
- every
200ms
Perform write() once - You need at least four pages each time you execute write(). If the interval between two writes is greater than 200ms, ignore this condition and execute write() directly.
- Call FileChannel# write ()
- Wake up the FlushRealTimeService thread
- FlushRealTimeService Executes flush
Execute write every 200ms
commitIntervalCommitLog=200
At least 4 pages per write run
commitCommitLogLeastPages
# If the disk is not flushed, the disk will be forcibly flushed
commitCommitLogThoroughInterval=200
Copy the code
4.3 summarize
RocketMQ provides two flushing modes, which can be configured using flushDiskType
- flushDiskType=SYNC_FLUSH
Flush the disk synchronously and call Flush
- flushDiskType=ASYNC_FLUSH & transientStorePoolEnable=false
Flush disks asynchronously and call Flush.
- flushDiskType=ASYNC_FLUSH & transientStorePoolEnable=true
Asynchronously flush, call write, followed by flush
5 reference
- RocketMQ Design Guide
- Java NIO FileChannel FileChannel usage and principle
- ByteBuffer,