1. Introduction

Theoretically, RocketMQ should run with only CommitLog files, so why maintain the ConsumeQueue file?

ConsumeQueue is a consumption queue, which is introduced to improve the consumption speed of consumers. After all, RocketMQ is based on a topic-topic subscription model, and consumers tend to only care about the messages they subscribe to, and if data is retrieved from a CommitLog file for each consumption, performance is definitely poor. With ConsumeQueue, consumers can quickly locate messages for consumption based on their offset in the CommitLog file.

As mentioned in the previous article, the Broker writes messages sent by clients to CommitLog files for persistent storage. But the whole process does not involve the operation of the ConsumeQueue file, so how is the ConsumeQueue file constructed?

2. ReputMessageService

ReputMessageService is “message replay service”, if I may call it that. When the Broker is started, it starts a thread to execute the doReput() method every millisecond.

Its purpose is to “replay” messages written to CommitLog files. It has a property called reputFromOffset, which records the offset of message playback and is assigned when MessageStore is started.

It works by reading messages to be replayed from CommitLog based on the playback offset reputFromOffset, constructing DispatchRequest objects, and then distributing DispatchRequest objects. To each CommitLogDispatcher for processing.

MessageStore maintains a set of CommitLogDispatcher objects and currently has only three handlers:

  1. CommitLogDispatcherBuildConsumeQueue: build ConsumeQueue index.
  2. CommitLogDispatcherBuildIndex: build the Index Index.
  3. CommitLogDispatcherCalcBitMap: build a bloom filter, accelerate the SQL92 filtration efficiency.

This article mainly analyzes CommitLogDispatcherBuildConsumeQueue, look at is how to construct ConsumeQueue RocketMQ.

3. Source code analysis

The author draws the sequence diagram of ConsumeQueue construction process, the whole construction process is not complicated.

1. The doReput() method executes once in 1 millisecond, and its body is a for loop that continues replaying messages as long as reputFromOffset does not reach the maximum offset of the CommitLog file.

private boolean isCommitLogAvailable(a) {
    return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
Copy the code

It first takes a ByteBuffer from the reputFromOffset CommitLog file, which contains the message data to be replayed.

public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
    // CommitLog Size of a single file
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    // Find the file to be built according to the index build progress. The file name is the start Offset
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
    if(mappedFile ! =null) {
        // Calculates Offset at the current file read pointer position
        int pos = (int) (offset % mappedFileSize);
        /** * The mappedfile-based MappedByteBuffer derives a ByteBuffer object * that shares the same block of memory but has its own pointer */
        SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
        return result;
    }
    return null;
}
Copy the code

The SelectMappedBufferResult class attributes are as follows:

// Start offset
private final long startOffset;
/ / the buffer
private final ByteBuffer byteBuffer;
/ / the length
private int size;
// The associated MappedFile object
private MappedFile mappedFile;
Copy the code

2. With SelectMappedBufferResult, you can read the message data. Since message replay does not need to know the message Body content, the message Body is not read, only the relevant properties are read, and a DispatchRequest object is built. Read the following properties:

// Topic to which the message belongs
private final String topic;
// ID of the queue to which the message belongs
private final int queueId;
// Message offset in CommitLog file
private final long commitLogOffset;
// Message size
private int msgSize;
// Message Tag hash code
private final long tagsCode;
// Message saving time
private final long storeTimestamp;
// Logical consumption queue point
private final long consumeQueueOffset;
private final String keys;
private final boolean success;
// Message unique key
private final String uniqKey;
// Message system flag
private final int sysFlag;
// Transaction message offset
private final long preparedTransactionOffset;
/ / property
private final Map<String, String> propertiesMap;
Copy the code

3. With a DispatchRequest object, the next step is to call the doDispatch method to dispatch the request. CommitLogDispatcherBuildConsumeQueue will be triggered at this time, it will transfer request to DefaultMessageStore execution.

DefaultMessageStore.this.putMessagePositionInfo(request);
Copy the code

4.MessageStore locates the ConsumeQueue file based on message Topic and QueueID, and appends the index to the file.

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    // Locate ConsumeQueue based on Topic and QueueID
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    // Append index to file
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}
Copy the code

Before writing the index, we make sure the message store is writable:

boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
Copy the code

Then, initialize a 20-byte ByteBuffer and write messages to it: Offset, size, tagsCode.

// Each index is 20 bytes long, and byteBufferIndex is used in a loop
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
/** * index structure :Offset+size+tagsCode * 8 bytes 4 bytes 8 bytes */
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
Copy the code

The index should be written to the file location based on the consumption queue location and the length of the single index. Since it is written sequentially, we get the latest ConsumeQueue file. If the file is full, we create a new file to continue writing.

final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
Copy the code

Before writing, verify that the expected offset and logical offset are equal. Normally they should be. If they are not, the data build is out of order and needs to be rebuilt.

if(cqOffset ! =0) {
    // Offset: current file write pointer position + file start offset (filename)
    long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

    Expecticoffset and currentLogicOffset should normally be equal
    if (expectLogicOffset < currentLogicOffset) {
        log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                 expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
        return true;
    }
    if(expectLogicOffset ! = currentLogicOffset) { LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
            expectLogicOffset,
            currentLogicOffset,
            this.topic,
            this.queueId, expectLogicOffset - currentLogicOffset ); }}Copy the code

After passing the test, you can write normally. The maximum offset of the current ConsumeQueue log message, maxPhysicOffset, is updated, and 20 bytes of index data is written to the file.

// Updates the maximum offset of messages recorded by the ConsumerQueue in the CommitLog
this.maxPhysicOffset = offset + size;
// Write 20 bytes of index data to the file
return mappedFile.appendMessage(this.byteBufferIndex.array());
Copy the code

At this point, the index synchronization of messages from CommitLog to the ConsumeQueue file is complete.

ConsumeQueue Index entry structure:

The length of the instructions
8 The offset of the message in the CommitLog file
4 The length of the message
8 Message Tag Hash code that filters messages based on the Tag

4. To summarize

ConsumeQueue is an index file used by RocketMQ to speed up consumer consumption. It is a logical consumption queue that does not hold the message itself, just a message index. The index is 20 bytes long and records the message offset, message length, and message Tag hash value in the CommitLog file. When consuming messages, consumers can quickly filter messages based on Tag hashes, quickly locate messages based on offsets, and read out a complete message based on message length.

ConsumeQueue is not written immediately after the Broker writes messages to CommitLog. Instead, an asynchronous thread ReputMessageService replays the messages. Replay process by CommitLogDispatcherBuildConsumeQueue will build to ConsumeQueue file, the frequency of the building is 1 millisecond time, almost is almost in real time, don’t worry about spending will be delay.