preface

RocketMQ is an excellent distributed messaging middleware that provides high performance, low latency, stable and reliable messaging services to business parties. Its core advantages are reliable consumption storage, high performance and low latency of message sending, strong message stacking capability and message processing capability.

From the perspective of storage mode, there are mainly several aspects:

  • The file system
  • Distributed KV storage
  • Relational database

In terms of efficiency, the file system is higher than KV storage, which in turn is higher than the relational database. The main message queue middleware, such as RocketMQ, RabbitMQ, and Kafka, use file systems to store messages because it is the fastest to operate directly on a file system.

Today, we’ll explore the mechanics of the RocketMQ message store, starting with its store files.

A, CommitLog

Messages for all topics are stored in the CommitLog file.

Our business system sends a message to RocketMQ, and no matter how complex the process is in the middle, the message is persisted to a CommitLog file.

As we know, a Broker server only has a CommitLog file (group). RocketMQ stores messages for all topics in the same file, which contains messages that are written in sequence.

Sometimes, you might want to take a look at what the contents of your CommitLog file look like.

1. Message sending

Of course, we need to write something to the CommitLog file first, so let’s look at an example of sending a message.

public static void main(String[] args) throws Exception {
    MQProducer producer = getProducer();
    for(int i = 0; i<10; i++){ Message message = new Message(); message.setTopic("topic"+i);
        message.setBody(("A blog in a quiet place.").getBytes());
        SendResult sendResult = producer.send(message);
    }
    producer.shutdown();
}
Copy the code

We send messages to 10 different topics, and if there is only one Broker machine, they are saved to the same CommitLog file. At this point, the location of this file in C: / Users/shiqizhen/store/commitlog / 00000000000000000000.

2. Read the contents of the file

This file cannot be opened directly because it is a binary file, so we need to programmatically read its byte array.

public static ByteBuffer read(String path)throws Exception{
    File file = new File(path);
    FileInputStream fin = new FileInputStream(file);
    byte[] bytes = new byte[(int)file.length()];
    fin.read(bytes);
    ByteBuffer buffer = ByteBuffer.wrap(bytes);
    return buffer;
}
Copy the code

In the code above, you can read all the contents of a file from the path passed in. To facilitate the next step, we convert the byte array we read into a java.nio.byteBuffer object.

3, parsing,

Before we parse it, we need to understand two things:

  • The format of the message, that is, what fields a message contains;
  • The size in bytes of each field.

In the figure above, we have seen the format of the message, which contains 19 fields. In terms of the size of bytes, some are 4 bytes, some are 8 bytes, we won’t go over them all, just look at the code.

/** * Commitlog file parsing * @param byteBuffer * @return
 * @throws Exception
 */
public static MessageExt decodeCommitLog(ByteBuffer byteBuffer)throws Exception {

	MessageExt msgExt = new MessageExt();

	// 1 TOTALSIZE
	int storeSize = byteBuffer.getInt();
	msgExt.setStoreSize(storeSize);

	if (storeSize<=0){
	    returnnull; } // 2 MAGICCODE byteBuffer.getInt(); // 3 BODYCRC int bodyCRC = byteBuffer.getInt(); msgExt.setBodyCRC(bodyCRC); // 4 QUEUEID int queueId = byteBuffer.getInt(); msgExt.setQueueId(queueId); // 5 FLAG int flag = byteBuffer.getInt(); msgExt.setFlag(flag); // 6 QUEUEOFFSET long queueOffset = byteBuffer.getLong(); msgExt.setQueueOffset(queueOffset); // 7 PHYSICALOFFSET long physicOffset = byteBuffer.getLong(); msgExt.setCommitLogOffset(physicOffset); // 8 SYSFLAG int sysFlag = byteBuffer.getInt(); msgExt.setSysFlag(sysFlag); // 9 BORNTIMESTAMP long bornTimeStamp = byteBuffer.getLong(); msgExt.setBornTimestamp(bornTimeStamp); // 10 BORNHOST int bornhostIPLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4:16; byte[] bornHost = new byte[bornhostIPLength]; byteBuffer.get(bornHost, 0, bornhostIPLength); int port = byteBuffer.getInt(); msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port)); // 11 STORETIMESTAMP long storeTimestamp = byteBuffer.getLong(); msgExt.setStoreTimestamp(storeTimestamp); // 12 STOREHOST int storehostIPLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4:16; byte[] storeHost = new byte[storehostIPLength]; byteBuffer.get(storeHost, 0, storehostIPLength); port = byteBuffer.getInt(); msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port)); // 13 RECONSUMETIMES int reconsumeTimes = byteBuffer.getInt(); msgExt.setReconsumeTimes(reconsumeTimes); // 14 Prepared Transaction Offset long preparedTransactionOffset = byteBuffer.getLong(); msgExt.setPreparedTransactionOffset(preparedTransactionOffset); // 15 BODY int bodyLen = byteBuffer.getInt();if (bodyLen > 0) {
	    byte[] body = new byte[bodyLen];
	    byteBuffer.get(body);
	    msgExt.setBody(body);
	}

	// 16 TOPIC
	byte topicLen = byteBuffer.get();
	byte[] topic = new byte[(int) topicLen];
	byteBuffer.get(topic);
	msgExt.setTopic(new String(topic, CHARSET_UTF8));

	// 17 properties
	short propertiesLength = byteBuffer.getShort();
	if (propertiesLength > 0) {
	    byte[] properties = new byte[propertiesLength];
	    byteBuffer.get(properties);
	    String propertiesString = new String(properties, CHARSET_UTF8);
	    Map<String, String> map = string2messageProperties(propertiesString);
	}
	int msgIDLength = storehostIPLength + 4 + 8;
	ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);
	String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
	msgExt.setMsgId(msgId);

	return msgExt;
}
Copy the code

4. Output message content

public static void main(String[] args) throws Exception {
    String filePath = "C:\\Users\\shiqizhen\\store\\commitlog\\00000000000000000000";
    ByteBuffer buffer = read(filePath);
    List<MessageExt> messageList = new ArrayList<>();
    while (true){
        MessageExt message = decodeCommitLog(buffer);
        if (message==null){
            break;
        }
        messageList.add(message);
    }
    for (MessageExt ms:messageList) {
        System.out.println("Theme."+ms.getTopic()+"Message."+
            new String(ms.getBody())+"The queue ID."+ms.getQueueId()+"Storage address :"+ms.getStoreHost()); }}Copy the code

By running this code, we can directly see the contents of the CommitLog file:

Topic: Topic1 Message: Topic1 message: Topic1 blog queue ID:0 Storage address :/192.168.44.1:10911 Topic: Topic2 Message: Blog queue in Quiet Land ID:1 Storage address :/192.168.44.1:10911 Topic: Topic3 Message: Blog queue in quiet Land ID:0 storage address :/192.168.44.1:10911 Topic: Topic4 message: blog in quiet Land Queue ID:3 Storage address :/192.168.44.1:10911 Topic: Topic5 message: Blog queue ID:1 Storage address :/192.168.44.1:10911 Topic: Topic6 message: blog queue ID:2 Storage address :/192.168.44.1:10911 Topic: Topic7 message: The blog queue ID of the quiet place :3 Storage address :/192.168.44.1:10911 Topic: Topic8 message: the blog queue ID of the quiet Place :2 Storage address :/192.168.44.1:10911 Topic: Topic9 Message: Quiet place blog queue ID:0 Storage address :/192.168.44.1:10911Copy the code

With the above code, you should have a better understanding of CommitLog files without much text description.

Now, let’s consider another question:

CommitLogFiles hold messages for all topics, but when we consume, it’s more about subscribing to a topic to consume.RocketMQHow do you retrieve messages efficiently?

Second, the ConsumeQueue

To solve that problem, RocketMQ introduced the ConsumeQueue consumption queue file.

Before we move on to ConsumeQueue, we must first understand another concept, namely MessageQueue.

1, the MessageQueue

We know that when sending a message, we specify a Topic. So, when creating a Topic, there is an important parameter, MessageQueue. In simple terms, this is how many queues your Topic corresponds to, i.e. how many MessageQueue (default is 4). So what does it do?

It is a data sharding mechanism. For example, if we have 100 items of data in our Topic, the default Topic is 4 queues, then there are about 25 items of data in each queue. These MessageQueue are then bound to the Broker, meaning that each MessageQueue may be on a different Broker machine, depending on your number of queues and your Broker cluster.

Let’s look at the picture above. The Topic named order has 4 MessageQueue, each with 25 pieces of data. Because there is only one Broker in my local environment, their Brokernames refer to the same machine.

Since MessageQueue is multiple, when a message is sent, a queue must be selected in some way. By default, polling is used to fetch a message queue.

public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}
Copy the code

Of course, RocketMQ also has a fail-delay mechanism, which is a little more complicated when selecting message queues, but we won’t discuss it today.

2, ConsumeQueue

With MessageQueue out of the way, let’s move on to the ConsumerQueue. We said above that it is for efficient retrieval of topic messages.

ConsumerQueue is also a set file, its position in the C: / Users/shiqizhen/store/consumequeue. Below this directory are folders named Topic, then one level below that is a folder named MessageQueue queue ID, and finally one or more files.

After this layering, RocketMQ gets at least the following information:

  • You can locate the specific folder based on the theme name.
  • Then find the specific file based on the message queue ID;
  • Finally, find the specific message according to the content of the file.

So, what does this file store?

Parse the file

To speed up ConsumerQueue’s retrieval and save disk space, the full message of the message is not stored in the file. It is stored in the following format:

Again, let’s write a piece of code that prints the contents of the ConsumerQueue file in this format.

public static void main(String[] args)throws Exception {
    String path = "C:\\Users\\shiqizhen\\store\\consumequeue\\order\\0\\00000000000000000000";
    ByteBuffer buffer = read(path);
    while (true){
        long offset = buffer.getLong();
        long size = buffer.getInt();
        long code = buffer.getLong();
        if (size==0){
            break;
        }
        System.out.println("Message length :"+size+"Message offset :" +offset);
    }
    System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
}
Copy the code

Previously, we wrote 100 entries to the order topic, so here it has 25 entries in order#messagequeue#0.

Message length :173 Message Offset :2003 Message length :173 Message Offset :2695 Message length :173 Message Offset :3387 Message length :173 Message offset :4079 Message length :173 Message Offset :4771 message length :173 Message offset :5463 Message length :173 Message Offset :6155 Message length :173 Message Offset :6847 Message length :173 Message offset :7539 Message length :173 Message offset :8231 Message length :173 Message offset :8923 Message length :173 Message offset :9615 Message length :173 Message Offset :6847 message length :173 Message offset :7539 Message length :173 Message offset :8923 message length :173 Message offset :9615 Message length :173 Message Offset :10307 Message length :173 Message Offset :10999 Message length :173 Message Offset :11691 Message length :173 Message offset :12383 Message length :173 Message offset :13075 Message length :173 Offset :13767 Offset :173 Offset :14459 Message length :173 Offset :15151 Message length :173 Offset :15843 Message length :173 Offset :16535 Message length :173 Offset :17227 Message length: 173 news offset: 17919 message length: 173 news offset: 18611 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -Copy the code

Careful friends must have found it. In the output above, the difference of the message offset is equal to = message length * queue length.

4. Query messages

Now that we know the length and offset of the message through ConsumerQueue, it’s easier to find the message.

public static MessageExt getMessageByOffset(ByteBuffer commitLog,long offset,int size) throws Exception {
    ByteBuffer slice = commitLog.slice();
    slice.position((int)offset);
    slice.limit((int) (offset+size));
    MessageExt message = CommitLogTest.decodeCommitLog(slice);
    return message;
}
Copy the code

We can then rely on this method to get the details of the message from the ConsumerQueue.

Public static void main(String[] args) throws Exception {// ConsumerQueue Root directory String consumerPath ="C:\\Users\\shiqizhen\\store\\consumequeue"; // Commitlog directory String commitLogPath ="C:\\Users\\shiqizhen\\store\\commitlog\\00000000000000000000"; ByteBuffer commitLogBuffer = commitlogtest. read(commitLogPath); File File = new File(consumerPath); File[] files = file.listFiles();for (File f:files) {
		if (f.isDirectory()){
			File[] listFiles = f.listFiles();
			for (File queuePath:listFiles) {
				String path = queuePath+"/ 00000000000000000000"; ByteBuffer buffer = commitlogtest. read(path);while (true){// Read the message offset and the message length long offset = (int) buffer.getLong(); int size = buffer.getInt(); long code = buffer.getLong();if (size==0){
						break; } / / according to the offset and length of the message, to read the message content MessageExt commitloh file message = getMessageByOffset (commitLogBuffer, offset, size);if(message! =null){ System.out.println("Message subject :"+message.getTopic()+" MessageQueue:"+
							message.getQueueId()+"Message body :"+new String(message.getBody()));
					}
				}
			}
		}
	}
}
Copy the code

Running this code gives you all the messages for the 10 topics in the previous test sample.

Message topic: topic0 MessageQueue: 1 message body: peaceful place blog message topic: topic1 MessageQueue: 0 message body: peaceful place blog message topic: topic2 MessageQueue: 1 message body: peaceful land of blog Message topic: topic3 MessageQueue: 0 message body: peaceful place blog message topic: topic4 MessageQueue: 3 message body: peaceful place blog message topic: topic5 MessageQueue: 1 message body: peaceful land of blog Message topic: topic6 MessageQueue: 2 message body: peaceful place blog message topic: topic7 MessageQueue: 3 message body: peaceful place blog message topic: topic8 MessageQueue: 2 message body: peaceful land of blog Message topic: Topic9 MessageQueue:0 Message body: Quiet place blogCopy the code

5. Consumer news

When a message is consumed, the process of finding the message is similar. It’s worth noting, however, that there can be multiple ConsumerQueue files and CommitLog files, so there is a process to locate the file. Take a look at the source.

First, the corresponding ConsumerQueue is searched based on the consumption progress to obtain its file contents.

Public SelectMappedBufferResult getIndexBuffer(Final Long startIndex) {//ConsumerQueue file size int mappedFileSize = this.mappedFileSize; // Find the offset in the ConsumerQueue file long offset = startIndex * CQ_STORE_UNIT_SIZE;if (offset >= this.getMinLogicOffset ()) {/ / return ConsumerQueue mapping file MappedFile MappedFile = this. MappedFileQueue. FindMappedFileByOffset (offset);if(mappedFile ! = null) {/ / returns the file in a particular piece of content SelectMappedBufferResult result. = mappedFile selectMappedBuffer ((int) (offset % mappedFileSize));returnresult; }}return null;
}
Copy the code

The message is then retrieved with the offset and length of the message in the CommitLog file.

public SelectMappedBufferResult getMessage(final long offset, Final int size) {//commitlog file size int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); / / based on message offsets, positioning to specific commitlog file MappedFile MappedFile = this. MappedFileQueue. FindMappedFileByOffset (offset, the offset = = 0);if(mappedFile ! Int pos = (int) (offset % mappedFileSize);return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}
Copy the code

6. Query by Message Id

We saw above how to look up messages by message offsets, but RocketMQ provides several other ways to look up messages.

  • Query by Message Key;
  • Query by Unique Key;
  • Query by Message Id.

In this case, both the Message Key and the Unique Key are generated by the client before the Message is sent. The Message Id is generated when the Broker stores messages.

The Message Id consists of 16 bytes and contains the address of the Message storage host and the offset in the CommitLog file. Source code as proof:

/** * create message ID * @param input * @param ADDR Broker server address * @param offset Message being stored, offset in Commitlog * @return*/ public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) { input.flip(); int msgIDLength = addr.limit() == 8 ? 16:28; input.limit(msgIDLength); input.put(addr); input.putLong(offset);return UtilAll.bytes2string(input.array());
}
Copy the code

When we query the Broker for messages based on MessageId, we first parse out the Broker address and Message offset using a decodeMessageId method.

public static MessageId decodeMessageId(final String msgId) throws Exception { SocketAddress address; long offset; int ipLength = msgId.length() == 32 ? 4 * 2:16 * 2; byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength)); byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8)); ByteBuffer bb = ByteBuffer.wrap(port); int portInt = bb.getInt(0); Broker address = new InetSocketAddress(inetaddress.getByAddress (IP), portInt); Byte [] data = utilall.string2bytes (msgId. Substring (ipLength + 8, ipLength + 8 + 16)); // Offset byte[] data = utilall.string2bytes (msgId. Substring (ipLength + 8, ipLength + 8 + 16)); bb = ByteBuffer.wrap(data); offset = bb.getLong(0);return new MessageId(address, offset);
}
Copy the code

Therefore, messages queried by Message Id are actually queried directly from commitlogs on a specific Broker, which is precise.

That’s fine, too, but what about RocketMQ when querying via Message Key and Unique Key?

Third, the Index

Index Indicates the index file

The ConsumerQueue message consumption queue is an index file built specifically for message subscriptions to speed up the retrieval of messages by subject and message queue.

In addition, RocketMQ introduces a Hash indexing mechanism to index messages with keys such as Message Key and Unique Key.

So, let’s look at the structure of the index file:

To make it easier to understand, we’ll parse this file in code.

Public static void main(String[] args) throws Exception {//index Index file path String path ="C:\\Users\\shiqizhen\\store\\index\\20200506224547616"; ByteBuffer buffer = CommitLogTest.read(path); Long beginTimestamp = buffer.getLong(); Long endTimestamp = buffer.getLong(); Long beginPhyOffset = buffer.getLong(); // The index file contains the maximum physical offset of messages (commitlog file offset). Long endPhyOffset = buffer.getLong(); // The index file contains the maximum physical offset of messages (commitlog offset) long endPhyOffset = buffer.getLong(); / / number of hashslot inthashSlotCount = buffer.getInt(); Int indexCount = buffer.getint (); int indexCount = buffer.getint (); / / 5 millionhashEach slot contains 4 bytes and stores the index indexfor(int i=0; i<5000000; i++){ buffer.getInt(); } //20 million index entriesfor(int j=0; j<20000000; J++){// message key hashcode int hashcode = buffer.getint (); // Message offset long offset = buffer.getLong(); Int timedif = buffer.getint (); int timedif = buffer.getint (); Int pre_no = buffer.getint (); int pre_no = buffer.getint (); } System.out.println(buffer.position()==buffer.capacity()); }Copy the code

If the final output is true, then the parsing process is correct.

2. Build indexes

The body of the Message we send contains a Message Key or a Unique Key, and each of them is indexed.

There are two key points here:

  • Calculates the Hash slot position based on the message Key.
  • Calculates the starting position of Index entries based on the number of Hash slots and Index Index.

Write the Index value of the current Index entry to the Hash slot absSlotPos. Write the details of the Index entry (hashcode/ message offset/time difference /hash slot value) in bytes, starting with the starting offset absIndexPos.

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if(this. IndexHeader. GetIndexCount () < enclosing indexNum) {/ / calculation of the keyhashint keyHash = indexKeyHashMethod(key); / / calculatehashSlot coordinates int slotPos = keyHash % this.hashslotnum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *hashSlotSize; / / long computing time difference value timeDiff = storeTimestamp - this. IndexHeader. GetBeginTimestamp (); timeDiff = timeDiff / 1000; // Calculate the starting offset of the INDEX entry int absIndexPos = IndexHeader.index_header_size + this.hashSlotnum *hashSlotSize + this.indexHeader.getIndexCount() * indexSize; // Write hashcode, message offset, timestamp,hashTrough the value of the enclosing mappedByteBuffer. PutInt (absIndexPos keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); / / the current INDEX contains the number of entries written to HASH groove enclosing mappedByteBuffer. PutInt (absSlotPos, enclosing indexHeader. GetIndexCount ());return true;
    }
    return false;
}
Copy the code

Once the Index Index is built, it is easy to query messages based on Message Key or Unique Key.

For example, we use the RocketMQ client tool to query messages against Unique keys.

adminImpl.queryMessageByUniqKey("order"."FD88E3AB24F6980059FDC9C3620464741BCC18B4AAC220FDFE890007");
Copy the code

On the Broker side, the Unique Key is used to calculate the Hash slot position to find the Index Index data. Get the physical offset of the message from the Index Index, and then go directly to the CommitLog file for the physical offset.

conclusion

This article explores the basic ideas for message storage and message lookup in RocketMQ. Source intermediate process is very complex, but through this bottom-up way, directly from the file, analyze their file structure, so as to clarify their relationship and role, I hope to have a positive effect on friends.

The original is not easy, the guest officers point a praise and then go, this will be the author’s motivation to continue writing ~