The following source code is based on Rocket MQ 4.7.0

1 Message storage diagram

Message storage is the most complex and important part of RocketMQ. This section describes the overall message storage architecture of RocketMQ, the PageCache and Mmap memory mapping, and the two different disk flushing methods of RocketMQ.

  • Overall message store architecture
  • Page caching and memory mapping
  • Message to brush plate

2 Overall architecture of message storage

There are three main files related to the message store:

  • CommitLog

    The message body and metadata storage body store the message body content written by the Producer end, and the message content is not fixed length. The default size of a file is 1 GB, the file name length is 20 bits, and the remaining offset is the start offset. For example, 00000000000000000000 indicates the first file. The start offset is 0, and the file size is 1 GB =1073741824. When the first file is full, the second file is 00000000001073741824, and the start offset is 1073741824, and so on. Messages are mainly written sequentially to the log file, and when the file is full, to the next file

  • ConsumeQueue

    Message consumption queues are introduced primarily to improve message consumption performance. Since RocketMQ is a topic-based subscription model, message consumption is subject specific, and it is inefficient to traverse commitlog files to retrieve messages by topic. The Consumer can then look for messages to consume based on the ConsumeQueue. ConsumeQueue serves as the index of the consuming messages, storing the CommitLog’s starting physical offset, message size, and HashCode of the message Tag. Consumequeue files can be regarded as commitlog files based on topics. Therefore, the organization mode of the ConsumeQueue folder is as follows: Topic /queue/file Three-layer organization structure, the specific storage path is: $HOME/store/consumequeue / {topic} / {queueId} / {fileName}. Similarly, the consumeQueue file adopts a fixed length design, with each entry having a total of 20 bytes, including 8-byte commitlog physical offset, 4-byte message length, and 8-byte tag hashcode. A single file consists of 30W entries, and each entry can be randomly accessed like an array. Each ConsumeQueue file is about 5.72M in size;

  • IndexFile

    IndexFile provides a way to query messages by key or time interval. The Index file is stored in:FileName is named after the timestamp when it was created. The size of a single IndexFile is about 400M. An IndexFile can hold 2000W indexes. So rocketMQ’s index file is implemented as a hash index.

As you can see in the overall Message storage architecture diagram for RocketMQ above, RocketMQ uses a hybrid storage structure, with all queues in a single instance of the Broker sharing a single log file (CommitLog). RocketMQ’s mixed storage structure (message entities for multiple topics are stored in a CommitLog) uses separate data and index parts for the Producer and Consumer respectively. The Producer sends messages to the Broker. The Broker then persists messages to CommitLog, either synchronously or asynchronously. Messages sent by the Producer are not lost as long as the messages are flushed and persisted to a CommitLog file. Because of this, consumers certainly have a chance to consume this message. If a pull request fails to pull a message, it can wait for the next pull. The server also supports long polling mode. If a pull request fails to pull a message, the Broker allows 30 seconds to wait. Here, RocketMQ uses a broker-side backend service thread, ReputMessageService, to continuously distribute requests and asynchronously build ConsumeQueue and IndexFile data. The generation of these three files is examined from a code point of view.

2.1 CommitLog

CommitLog corresponds to a CommitLog Java class in code.

public class CommitLog {
    // Message MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = -626843481;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // Empty MAGIC CODE cbd43194 at end of file
    protected final static int BLANK_MAGIC_CODE = -875286124;
    protected final MappedFileQueue mappedFileQueue;
    protected final DefaultMessageStore defaultMessageStore;
    private final FlushCommitLogService flushCommitLogService;
    private final FlushCommitLogService commitLogService;
    private final AppendMessageCallback appendMessageCallback;
    private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
    protected volatile long confirmOffset = -1L;
    private volatile long beginTimeInLock = 0;

    / /...
}
Copy the code

RocketMQ writes messages to CommitLog in sequence in the storage format shown below. In addition to recording the properties of the message itself (message length, message body, Topic length, Topic, message attribute length, and message attribute), CommitLog also records information about the consuming queue on which the message is sent (the consuming queue ID and offset). Due to the variable length of the stored entry, if the remaining space of the CommitLog cannot meet the message, the CommitLog appends a store entry with MAGIC CODE equal to BLANK_MAGIC_CODE at the end as a closing mark. And stores the message to the next CommitLog file.

BLANK_MAGIC_CODE serves as a marker that the current file is stored. The CommitLog record is full, and the next file is stored.

${user.home}/store/commitlog

We can look at the commitlog. calMsgLength method: commitlog.calmsglength

protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
        int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
        int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
        final int msgLen = 4 //TOTALSIZE
            + 4 //MAGICCODE
            + 4 //BODYCRC
            + 4 //QUEUEID
            + 4 //FLAG
            + 8 //QUEUEOFFSET
            + 8 //PHYSICALOFFSET
            + 4 //SYSFLAG
            + 8 //BORNTIMESTAMP
            + bornhostLength //BORNHOST
            + 8 //STORETIMESTAMP
            + storehostAddressLength //STOREHOSTADDRESS
            + 4 //RECONSUMETIMES
            + 8 //PREPARED TRANSACTION OFFSET
            + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
            + 1 + topicLength //TOPIC
            + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertieslength
            + 0;
        return msgLen;
    }
Copy the code

So the address field has an IPV4 and IPV6 distinction so it’s going to be an 8 or 20 field.

IPV4 or IPV6 address originally can be done with 4 bytes and 16 bytes, said there is what added four bytes first see MessageExt. SocketAddress2ByteBuffer method code:

    public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
        InetAddress address = inetSocketAddress.getAddress();
        if (address instanceof Inet4Address) {
            byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0.4);
        } else {
            byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0.16);
        }
        byteBuffer.putInt(inetSocketAddress.getPort());
        byteBuffer.flip();
        return byteBuffer;
    }
Copy the code

Because we added a port, 4 bytes to represent the port so we’re going to use 8 bytes and 20 bytes to represent the port.

2.2 ConsumeQueue

ConsumeQueue The corresponding ConsumeQueue class in the code: ConsumeQueue

public class ConsumeQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
	// Consume the size of the queue storage unit
    public static final int CQ_STORE_UNIT_SIZE = 20;
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private final DefaultMessageStore defaultMessageStore;

    private final MappedFileQueue mappedFileQueue;
    private final String topic;
    private final int queueId;
    private final ByteBuffer byteBufferIndex;

    private final String storePath;
    private final int mappedFileSize;
    private long maxPhysicOffset = -1;
    private volatile long minLogicOffset = 0;
    private ConsumeQueueExt consumeQueueExt = null;
}
Copy the code

You can see from the code that CQ_STORE_UNIT_SIZE is a fixed value of 20, as shown in the figure below. To implement fixed-length storage, ConsumeQueue stores Hash codes for message tags. Whether to consume the message is determined by comparing the HashCode of the Consumer’s subscribed Tag with the Tag HashCode in the stored entry.

Storage location:{topicName}/{fileName}

Each file stores 30W pieces of data

IndexFile

public class IndexFile {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static int hashSlotSize = 4;
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    private final int hashSlotNum; //500w
    private final int indexNum; //2000w
    private final MappedFile mappedFile;
    private final FileChannel fileChannel;
    private final MappedByteBuffer mappedByteBuffer;
    private final IndexHeader indexHeader;
    / /...
    }
Copy the code

Look at the storage structure of IndexFile: