Frontier overview

In the previous CommitLog article, we analyzed how commitlogs are loaded and stored. Messages in a CommitLog are thematic neutral, and it can be very expensive for a consumer to find the next consumption offset for that topic based on the current CommitLog offset. ConsumeQueue is therefore designed to be used as a CommitLog index file. In addition, the consumer can only consume the message if it is written to the ConsumeQueue. Because the message is facing the ConsumeQueue.

ConsumeQueue introduction

ConsumeQueue File structure

  1. File organization

consumeQueueThe file is organized as followstopic/queueId/fileThree layersAmong themtest%TopicTestAs the topic,2As the queueId,00000000000000000000Is file 2. Subentry structure consumeQueueEach subentry is a fixed 20 bytes, which is designed to facilitate calculation of the consumption offset, as well as the index. 3. File Features Every file30wSubentries, 20 bytes each. Then a file is approximately5.7 MB

ConsumeQueue class abstractions

The ConsumeQueue class can be understood as an abstraction from the storage structure Topic /queueId/file. And then there are multiple ConsumeQueues, defined in DefaultMessageStore.

public class ConsumeQueue {
    // ...
    // Under each queue, there will be many files, so this is a queue
    private final MappedFileQueue mappedFileQueue;
    private final String topic;
    private final int queueId;
    // ...
}
Copy the code

Message forwarding thread – ReputMessageService

As mentioned in the overview, the consumer can consume CommitLog messages only after they are transferred to the ConsumeQueue. RocketMQ handed over the job to ReputMessageService. Here’s how ReputMessageService works.

This thread executes the following logic every 1ms. See ReputMessageService# run

  1. ReputMessageServiceThe thread records the current forwarding offsetreputFromOffsetwhenreputFromOffsetOffset is less thanCommitLogThe write offset of memory (commitlog not dropped at this time) is forwarded continuously.
  2. According to thereputFromOffsetCommitLogTo find the corresponding message andIn turn,Forwarded toConsumeQueue.IndexFile
  3. ConsumeQueuecallwrite()Method writes a message (unbrushed) for the consumer to pull.

ReputMessageService’s reputFromOffset value is the maximum offset to commitlog from multiple ConsumeQueues when ReputMessageService is initialized. Remember the structure of the subentries of ConsumeQueue: Commitlog offset, size, tag Hashcode. ReputFromOffset: commitlog offset + Size

See DefaultMessage# start ()

Flush thread – FlushConsumeQueueService

Consumequeue calls write() to write messages, but write() does not flush messages to disk. RocketMQ therefore provides the FlushConsumeQueueService thread to handle flushes of ConsumeQueue

FlushConsumeQueueService Flushs disks every second.

FlushConsumeQueueService flushes as follows

  1. When the new content is less than 2 pages, the disk will not be flushed
  2. whenJVMTo exit will be forced to brush disk
  3. when60sIn order to perform any flush, will be forced flush
  4. Traverse all queues and flush in turn

From the perspective of the flush process, the CommitLog flush thread is very similar to the CommitLog flush thread. The difference lies in the parameter configuration

FlushConsumeQueueService#run()

You can configure disk flushing parameters

parameter The default value role
flushIntervalConsumeQueue 1000 milliseconds Forcibly flush disks at multiple intervals
flushConsumeQueueLeastPages 2 At least 2 pages will be scraped
flushConsumeQueueThoroughInterval 60000 milliseconds If the disk is not brushed, the disk will be forcibly brushed

ConsumeQueue flush disk design thinking

Earlier, we mentioned that the consumer can’t pull a message until it is written to the ConsumeQueue. RocketMQ allows consumers to consume written messages as quickly as possible by writing them to memory instead of persisting them to disk. In fact, this kind of design can be seen in middleware that needs to interact with disks.