preface

We know RocketMQ is known for its strong performance. In this article, we take a look at the underlying storage principles and code to find out what the secret sauce of RocketMQ performance is.

Open Source Project Recommendation

Pepper Metrics is an open source tool I developed with my colleagues (github.com/zrbcool/pep…) , by collecting jedis mybatis/httpservlet/dubbo/motan performance statistics, and exposure to Prometheus and other mainstream temporal database compatible data, through grafana show the trend. Its plug-in architecture also makes it easy for users to extend and integrate other open source components. Please give us a STAR, and we welcome you to become developers to submit PR and improve the project together.

Overview of underlying storage core principles

Let’s explain it with this image (some of the images in this article are from Eric’s Tech World).

  • In simple terms, the RMQ all messages stored in a file, the file is in the figure CommitLog, due to the disk in order to write features (both mechanical disk or SSD solid-state drives, sequential write faster than random write) so the RMQ can make good use of the operating system features, to write the message content into memory after a successful return, This part into memory success but hasn’t brush into the hard disk data, called the Dirty pages in the middle of the kernel (Dirty Page), the operating system will be according to the specific situation regularly or when the Dirty pages more than threshold to trigger a write back, and the process of the disk write equivalent to order batch and is written, this is the RMQ write message to one of the reasons for high performance.
  • In the lower right side of the picture, messages are logically abstracted into multiple queues, which are called ConsumeQueues. Based on their names, we can guess that these queues serve read services. Message ids, offsets and other information of messages we write to CommitLog commitlogs. It is written evenly to multiple consumeQueues, which have corresponding files on disk, and we know that generally there are multiple or multithreading consumers of messages, so that the location of messages can be read from these consumeQueues concurrently. They then go to the CommitLog to read the specific message content. In normal cases, the message queue can hit the cache in memory, so the read operation is only the memory, of course, high performance. For example, as the graph below shows, disk reads are very low when RMQ has high performance reads and writes


Combine source code analysis

CommitLog

We simply said the RMQ front, we combined with the source code to dig deeper into the below, what is black technology is how to implement org.apache.rocketmq.store.Com mitLog RMQ abstraction of CommitLog encapsulation, let’s focus on putMessage method, This is the method of writing messages, which has two implementations

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Single message
}
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
    // Batch messages
}
Copy the code

Let’s take a look at the code for a single message

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // ...
        // Get the memory mapped file handle
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        / / lock
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            / /... Omit some code
            Call mappedfile.appendmessage to appendMessage bytes to shared memory and let the operating system or background flush thread do the brushing
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            / /... Omit some code
        } finally {
            / / unlock
            putMessageLock.unlock();
        }
        / /... Omit some code
        // Trigger the disk flushing action. Select synchronous or asynchronous disk flushing according to different configurations
        handleDiskFlush(result, putMessageResult, msg);
        handleHA(result, putMessageResult, msg);
        return putMessageResult;
    }
Copy the code

PutMessage finally calls MappedFile’s appendMessage method to appendMessage bytes to a memory-mapped file.

Memory-mapped files (MMAP) In short, files are mapped directly to user-mode memory addresses, so that operations on files are no longer write/read, but direct operations on memory addresses. Mmap: What is mmap and why does it use performance summary

MappedFile

Next we need to focus on MappedFile, because the real high performance of RMQ is the proper use of mmap memory MappedFile technology and out-of-heap memory operations ByteBuffer, which are encapsulated in this class.

Initialization, when RMQ starts, the corresponding thread will build MappedFile to complete the memory mapping operation. The following two lines are the key code

this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// Perform mmap operation to get mappedByteBuffer
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
Copy the code

As you can see, filechannel. map actually ends up calling the Mmap method of the C library through JNI, as described in this article: Java file mapping (mmap). All contacts in the previous section talked about mappedFile appendMessage (MSG, enclosing appendMessageCallback) call relationship actually is: appendMessage->appendMessagesInner->cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize – currentPos, (messageExt) let’s look at appendMessagesInner briefly, I’ll pick out the key code:

// Get the current position
int currentPos = this.wrotePosition.get(); ByteBuffer byteBuffer = writeBuffer ! =null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// Locate the write position
byteBuffer.position(currentPos);
// Call the implementation to append messages to memory
if (messageExt instanceof MessageExtBrokerInner) {
    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// Move the write position and increase the number of bytes written to the offset
this.wrotePosition.addAndGet(result.getWroteBytes());
Copy the code

As can be seen, MappedFile is to keep writing memory, and then move the end pointer to achieve the message content to the memory map of the append, and the actual file writing time of the memory map file may be called by the operating system regularly, the dirty page is too large, the program actively call bytebuffer. force method

And cb this callback callbacks in fact or in the interior of the CommitLog DefaultAppendMessageCallback defined, we will look at the key part of the code:

byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
Copy the code

MsgStoreItemMemory is also a ByteBuffer, which is used to temporarily store an array of bytes for the current message. The metadata definition of the message can be seen in the following figure:

That adds PutMessageLock

RMQ implements PutMessageLock, PutMessageReentrantLock, PutMessageSpinLock, PutMessageReentrantLock, PutMessageSpinLock PutMessageReentrantLock is the implementation of JDK reentrant lock directly, we focus on PutMessageSpinLock we know that Java lock (heavy lock) is actually called pThread mutex method competition lock, and this is a kernel function, This means a lot of context switching and a delay in waking up if the thread is blocked without the lock. In this way, CAS and spin are used to force the current CPU to run empty and wait for the lock to succeed, thus avoiding the loss of context switch. However, the cost is that a large number of empty operations waste CPU time slice, resulting in high CPU usage. Let’s look at the code:

public class PutMessageSpinLock implements PutMessageLock {
    //true: Can lock, false : in lock.
    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

    @Override
    public void lock(a) {
        boolean flag;
        do {
            // spin + CAS
            flag = this.putMessageSpinLock.compareAndSet(true.false);
        }
        while(! flag); }@Override
    public void unlock(a) {
        this.putMessageSpinLock.compareAndSet(false.true); }}Copy the code

conclusion

To summarize, this article explains RocketMQ’s high performance from the underlying implementation principles and the dark technologies behind it. However, THE performance optimization part of RMQ is believed to be much more than that. Today, I will write it here, and the author will explore it later in the article.

Other Articles by author

Github.com/zrbcool/blo…

Wechat subscription account