Frontier overview
In the previous CommitLog article, we analyzed how commitlogs are loaded and stored. Messages in a CommitLog are thematic neutral, and it can be very expensive for a consumer to find the next consumption offset for that topic based on the current CommitLog offset. ConsumeQueue is therefore designed to be used as a CommitLog index file. In addition, the consumer can only consume the message if it is written to the ConsumeQueue. Because the message is facing the ConsumeQueue.
ConsumeQueue introduction
ConsumeQueue File structure
- File organization
consumeQueue
The file is organized as followstopic/queueId/file
Three layersAmong themtest%TopicTest
As the topic,2
As the queueId,00000000000000000000
Is file 2. Subentry structure consumeQueue
Each subentry is a fixed 20 bytes, which is designed to facilitate calculation of the consumption offset, as well as the index. 3. File Features Every file30w
Subentries, 20 bytes each. Then a file is approximately5.7 MB
ConsumeQueue class abstractions
The ConsumeQueue class can be understood as an abstraction from the storage structure Topic /queueId/file. And then there are multiple ConsumeQueues, defined in DefaultMessageStore.
public class ConsumeQueue {
// ...
// Under each queue, there will be many files, so this is a queue
private final MappedFileQueue mappedFileQueue;
private final String topic;
private final int queueId;
// ...
}
Copy the code
Message forwarding thread – ReputMessageService
As mentioned in the overview, the consumer can consume CommitLog messages only after they are transferred to the ConsumeQueue. RocketMQ handed over the job to ReputMessageService. Here’s how ReputMessageService works.
This thread executes the following logic every 1ms. See ReputMessageService# run
ReputMessageService
The thread records the current forwarding offsetreputFromOffset
whenreputFromOffset
Offset is less thanCommitLog
The write offset of memory (commitlog not dropped at this time) is forwarded continuously.- According to the
reputFromOffset
从CommitLog
To find the corresponding message andIn turn,
Forwarded toConsumeQueue
.IndexFile
ConsumeQueue
callwrite()
Method writes a message (unbrushed) for the consumer to pull.
ReputMessageService’s reputFromOffset value is the maximum offset to commitlog from multiple ConsumeQueues when ReputMessageService is initialized. Remember the structure of the subentries of ConsumeQueue: Commitlog offset, size, tag Hashcode. ReputFromOffset: commitlog offset + Size
See DefaultMessage# start ()
Flush thread – FlushConsumeQueueService
Consumequeue calls write() to write messages, but write() does not flush messages to disk. RocketMQ therefore provides the FlushConsumeQueueService thread to handle flushes of ConsumeQueue
FlushConsumeQueueService Flushs disks every second.
FlushConsumeQueueService flushes as follows
- When the new content is less than 2 pages, the disk will not be flushed
- when
JVM
To exit will be forced to brush disk - when
60s
In order to perform any flush, will be forced flush - Traverse all queues and flush in turn
From the perspective of the flush process, the CommitLog flush thread is very similar to the CommitLog flush thread. The difference lies in the parameter configuration
FlushConsumeQueueService#run()
You can configure disk flushing parameters
parameter | The default value | role |
---|---|---|
flushIntervalConsumeQueue | 1000 milliseconds | Forcibly flush disks at multiple intervals |
flushConsumeQueueLeastPages | 2 | At least 2 pages will be scraped |
flushConsumeQueueThoroughInterval | 60000 milliseconds | If the disk is not brushed, the disk will be forcibly brushed |
ConsumeQueue flush disk design thinking
Earlier, we mentioned that the consumer can’t pull a message until it is written to the ConsumeQueue. RocketMQ allows consumers to consume written messages as quickly as possible by writing them to memory instead of persisting them to disk. In fact, this kind of design can be seen in middleware that needs to interact with disks.