1. Introduction

The Broker writes messages sent by the Producer to CommitLog. Theoretically, RocketMQ needs only a CommitLog file to run properly. The extra ConsumeQueue is built to speed up consumer consumption, but what is the purpose of building an Index file?

The presence or absence of the Index file does not affect the normal operation of RocketMQ producers and consumers; its purpose is simply to improve the efficiency of message query. If we need to query messages according to keys or time periods, the performance of retrieving messages through CommitLog will be poor. Fast query can be achieved by converting time to space, writing messages to CommitLog, and then writing Index data to Index file.

As with the ConsumeQueue build process, the Index is not written immediately after messages are written to the CommitLog. Instead, it is built when the asynchronous thread ReputMessageService replays the message.

2. Index design

The index file is stored in $HOME/store/index/{fileName}, and fileName is named after the timestamp when the file was created. An IndexFile is of a fixed length and about 400 meters in size. An IndexFile can hold 20 million indexes. The underlying storage structure of IndexFile is hash + linked list structure, which borrows from the design of HashMap.

2.1 IndexFile

Index file, the corresponding class is org. The apache. Rocketmq. Store. Index. IndexFile. Index file composition:

Length (bytes) instructions
40 Index header information
5000000 * 4 Hash slot
Other The index data

2.2 IndexHeader

Index information, the corresponding class is org. The apache. Rocketmq. Store. Index. IndexHeader. The index header contains the following information:

Length (bytes) instructions
8 Index start timestamp
8 End time stamp
8 Index start CommitLog offset
8 End CommitLog offset
4 Number of hash slots
4 The index number

2.3 Index Entries

A single index entry is a fixed length of 20 bytes. The composition is as follows:

Length (bytes) instructions
4 The hash value of Key
8 The offset of the message in the CommitLog
4 Save time difference (seconds)
4 A pointer that links to the next index

The default value of the pointer linking to the next index is 0. When a hash collision occurs, header interpolation is used. The hash slot points to the latest index data and the pointer links to the next index. Why the head plug? For RocketMQ, it’s always about the latest news.

3. Source code analysis

The author has drawn the sequence diagram of Index construction process, and the process is not complicated.

1.ReputMessageService will execute the message replay service once in 1 ms. ReputFromOffset reads the message replay from the CommitLog file, constructs the DispatchRequest object, and then distributes it. To each CommitLogDispatcher for processing. CommitLogDispatcherBuildIndex is one of them, it is used to build the Index Index data.

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
    @Override
    public void dispatch(DispatchRequest request) {
        // Is message indexing enabled?
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            // Build the index
            DefaultMessageStore.this.indexService.buildIndex(request); }}}Copy the code

Since the Index file is optional, it is worthless if there is no need to query the message, so you can choose whether to turn on the message Index or not, and only build the Index if you do.

2. If message indexing is enabled, the IndexService IndexService is called to build the index. To write the index data, the first to IndexFile positioning to the index file, methods for retryGetAndCreateIndexFile, it will try to get the latest index file, if the file with will automatically create a new index file to continue to write.

IndexFile indexFile = retryGetAndCreateIndexFile();
Copy the code

Then fetch the CommitLog final offset of the IndexFile index and compare it with the CommitLog offset. If the index data is already synchronized with the CommitLog, there is no need to follow through.

3. If CommitLog still has messages that have not been indexed, putKey is called to append the index. Build index (Topic+#+uniqKey);

private String buildKey(final String topic, final String key) {
    return topic + "#" + key;
}
Copy the code

The IndexFile is stored in a hash ➕ linked list. The hash value of the Key is calculated first, and then the hash value is modulo the number of hash slots. Then calculate the offset of the file address that the index should write based on the current number of indexes, and finally write the index data and update the header information.

/ * * * *@paramKey Indicates the unique key * of the message@paramPhyOffset Indicates the message offset *@paramStoreTimestamp Message saving time *@return* /
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        // Compute the Key hash value
        int keyHash = indexKeyHashMethod(key);
        // Compute slots
        int slotPos = keyHash % this.hashSlotNum;
        // Address offset of the slot in the index file
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
        FileLock fileLock = null;
        try {
            // The value of the current slot
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            // Save time and index file start time difference (seconds)
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            // Index data write position: index header + hash slot + index number * index size
            int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + this.indexHeader.getIndexCount() * indexSize;
            /** * Index composition: * 1.4 bytes hash value * 2.8 bytes CommitLog offset * 3.4 bytes memory time and time difference (seconds) * 4 A pointer to the next index */
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            // The Next pointer points to the current slot index
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

            // Update the value of the slot, which always stores the latest index and, for MQ, always cares about the latest data
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            // Write the index number, offset, and save end time to the first information
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if(fileLock ! =null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e); }}}}else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
    }

    return false;
}
Copy the code

When a hash collision occurs, RocketMQ uses header interpolation, and the hash slot always points to the latest message index, because RocketMQ always cares about the latest message.

The first 40 bytes of an IndexFile are header information, which stores the start and end times and offsets of the index. Using the header information, you can quickly determine whether the current file has messages within a specified time range.

4. To summarize

Index is the Index file used by RocketMQ to retrieve messages by Key and time range. Its presence or absence does not affect the normal operation of RocketMQ producers and consumers. If there is no need for message retrieval, the Index function can be turned off.

Like ConsumeQueue, Index indexes are built by replaying messages from the ReputMessageService thread. During message replay, hash value is calculated according to message Key, slot subscripts are obtained by modulo the number of hash slots, and index data is inserted by head interpolation method.

The single index entry is 20 bytes, which are 4 bytes hash code, 8 bytes message offset, 4 bytes memory time difference, and 4 bytes Next pointer.