A list,
RocketMQ is alibaba’s open source distributed messaging middleware. It borrows from The Kafka implementation and supports message subscription and publishing, sequential messaging, transactional messaging, timed messaging, message backtracking, dead-letter queues, and other functions. RocketMQ architecture is divided into four main parts, as shown in the following figure:
-
Producer: indicates message producers and supports distributed cluster deployment.
-
Consumer: message consumers that support distributed cluster deployment.
-
NameServer: NameServer is a very simple Topic routing registry that supports the dynamic registration and discovery of brokers. Producers and consumers use NameServer to dynamically know the routing information of brokers.
-
Broker: The Broker is responsible for storing, forwarding, and querying messages.
This article examines how message storage modules are designed in the Broker based on Apache RocketMQ version 4.9.1.
Storage architecture
The message file path for RocketMQ is shown.
CommitLog
The message body and metadata storage body store the message body content written by the Producer end, and the message content is not fixed length. The default size of a file is 1 GB, the file name length is 20 bits, and the remaining offset is the start offset. For example, 00000000000000000000 indicates the first file. The start offset is 0, and the file size is 1 GB =1073741824. When the first file is full, the second file is 00000000001073741824, and the start offset is 1073741824, and so on.
ConsumeQueue
Message consumption queues, Consumequeue files can be thought of as commitlog-based index files. The ConsumeQueue file adopts a fixed length design. Each entry is 20 bytes, consisting of 8 bytes of CommitLog physical offset, 4 bytes of message length, and 8 bytes of tag hashcode. A single file consists of 30W entries. Each entry can be accessed randomly like an array, and each ConsumeQueue file is about 5.72 MB in size.
IndexFile
Index files, which provide a way to query messages by key or time interval. A single IndexFile is about 400 meters in size, and an IndexFile can hold 2000W indexes. The underlying storage design of IndexFile is similar to the JDK HashMap data structure.
Other files: include the config folder, which stores runtime configuration information. Abort file, indicating whether the Broker closed properly; The checkpoint file stores the timestamp of the last flush of Commitlog, ConsumeQueue, and Index files. These are beyond the scope of this article.
In contrast to Kafka, Kafka has one file per partition per Topic, which is written sequentially and flushed regularly. But once a single Broker has too many topics, sequential writes degenerate into random writes. The RocketMQ single Broker ensures that all topics are written sequentially in the same CommitLog. RocketMQ reads the message by taking the actual physical offset of the message from the ConsumeQueue and sending it to the CommitLog to read the message, which causes a random read.
2.1 Page Cache and Mmap
Before we introduce the Broker message storage module implementation, let’s explain the concepts of Page Cache and Mmap.
Page Cache is the OS’s Cache of files, used to speed up the reading and writing of files. Generally speaking, the sequential read and write speed of the program is nearly as fast as the read and write speed of the memory. The main reason is that the OS uses the Page Cache mechanism to optimize the performance of the read and write operation, and uses part of the memory as the Page Cache. The OS first writes data to the Cache and then asynchronously flusits the data from the Cache to physical disks by the PDFlush kernel thread. If a file fails to hit the Page Cache, the OS prereads the data files of other adjacent blocks in sequence when the file is read from the physical disk.
Mmap directly maps physical files on disks to memory addresses in user mode, reducing the performance overhead of traditional I/O copying disk file data back and forth between the buffer of the operating system kernel address space and the buffer of the user application address space. FileChannel in Java NIO provides the map() method to implement MMap. See this article for a comparison of FileChannel and MMAP read and write performance.
2.2 the Broker module
The Broker storage architecture diagram below shows the business flow of the Broker module from receiving a message to returning a response.
Business access Layer: RocketMQ implements the underlying communication based on Netty’s Reactor multithreaded model. EventLoopGroupBoss Creates TCP connections. By default, there is only one thread. After the connection is established, it is thrown to the Reactor subthread pool eventLoopGroupSelector for read/write event processing.
DefaultEventExecutorGroup responsible for SSL authentication, codec, free checking, network connection management. Then, according to the service request code of RomotingCommand, find the corresponding processor in the local cache variable processorTable, encapsulate the task into a task, and submit it to the corresponding processor to process the thread pool for execution. The Broker module uses this four-tier thread pool to improve system throughput.
Business processing layer: processes various business requests through RPC calls, among which:
-
SendMessageProcessor handles requests from Producer to send messages.
-
PullMessageProcessor handles requests for Consumer consumption messages.
-
QueryMessageProcessor handles requests to query messages by message Key.
Storage logic layer: DefaultMessageStore is RocketMQ’s storage logic core class that provides the ability to store, read, and delete messages.
File mapping layer: Map Commitlog, ConsumeQueue, and IndexFile files to storage object MappedFile.
Data transfer layer: Messages are read and written based on MMAP memory mapping, and messages are read and written based on MMAP and written from out-of-heap memory.
The following sections examine how RocketMQ implements high-performance storage from a source point of view.
3. Message writing
Taking the production of a single message as an example, the sequential logic for message writing is shown below, and the business logic flows between layers as shown in the Broker storage architecture above.
The bottom-layer message writing core code is on the CommitLog asyncPutMessage method, which includes obtaining MappedFile, writing a message to the buffer, and submitting a flush request. Note that there are spinlocks or reentrantlocks before and after these steps to ensure that individual brokers write messages serially.
//org.apache.rocketmq.store.CommitLog::asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {... putMessageLock.lock();//spin or ReentrantLock ,depending on store config
try {
// Get the latest MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); .// Write messages to the buffer
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); .// Submit the flush requestCompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); . }finally{ putMessageLock.unlock(); }... }Copy the code
Here’s what the three steps do.
3.1 Initializing MappedFile
When they start the Broker is initialized management MappedFile create AllocateMappedFileService asynchronous thread. Message processing thread and AllocateMappedFileService thread through the queue requestQueue association.
Message to call when AllocateMappedFileService putRequestAndReturnMappedFile method to requestQueue in create MappedFile request submitted, We’re going to build two AllocateRequest and put them in the queue.
To create MappedFile AllocateMappedFileService thread loops from requestQueue AllocateRequest. The message processing thread waits through CountDownLatch for the first MappedFile to be created and returns.
When the message processing thread needs to create the MappedFile again, it can directly obtain the MappedFile that has been created before. This reduces the file creation wait time by pre-creating mappedfiles.
//org.apache.rocketmq.store.AllocateMappedFileService::putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
// Request the creation of MappedFile
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null; .// Request that the next MappedFile be created in advance
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null; .// Get the MappedFile created this time
AllocateRequest result = this.requestTable.get(nextFilePath); . }//org.apache.rocketmq.store.AllocateMappedFileService::run
public void run(a) {..while (!this.isStopped() && this.mmapOperation()) {
}
...
}
//org.apache.rocketmq.store.AllocateMappedFileService::mmapOperation
private boolean mmapOperation(a) {...// Get the AllocateRequest from the queue
req = this.requestQueue.take(); .// Check whether the off-heap memory pool is enabled
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// Enable MappedFile for off-heap memory
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} else {
/ / ordinary MappedFile
mappedFile = newMappedFile(req.getFilePath(), req.getFileSize()); }.../ / MappedFile preheating
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); } req.setMappedFile(mappedFile); . }Copy the code
The mappedByteBuffer is created each time a normal MappedFile request is created, and the following code shows how Java MMap is implemented.
//org.apache.rocketmq.store.MappedFile::init
private void init(final String fileName, final int fileSize) throws IOException {...this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); . }Copy the code
If out-of-heap memory (transientStorePoolEnable = true) is enabled, mappedByteBuffer is only used to read messages, and out-of-heap memory is used to write messages, thus separating read and write messages. Out-of-heap memory objects are not created every time a new MappedFile is created, but are initialized at system startup based on the size of the out-of-heap memory pool. Each out-of-heap DirectByteBuffer is the same size as a CommitLog file and is guaranteed not to be swapped into virtual memory by locking out-of-heap memory.
//org.apache.rocketmq.store.TransientStorePool
public void init(a) {
for (int i = 0; i < poolSize; i++) {
// Allocate out-of-heap memory the same size as the CommitLog file
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
// Lock the off-heap memory to ensure that it is not displaced into virtual memory
LibC.INSTANCE.mlock(pointer, newNativeLong(fileSize)); availableBuffers.offer(byteBuffer); }}Copy the code
The mmapOperation method above has a section of MappedFile preheating logic. Why do I need file preheating? How do you do file preheating?
This is because mMAP only maps the virtual memory address of the process to the physical memory address, and does not load the Page Cache into memory. If the read/write data does not hit the Page Cache, Page miss interrupts and data is reloaded from the disk to the memory, which affects the read/write performance. To prevent page missing exceptions and prevent the operating system from scheduling associated memory pages into swap space, RocketMQ preheats files, as follows.
//org.apache.rocketmq.store.MappedFile::warmMappedFile
public void warmMappedFile(FlushDiskType type, int pages) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
The operating system allocates physical memory space by writing 1 gigabyte of byte 0. If there is no padding value, the operating system will not actually allocate physical memory, preventing page missing exceptions when writing messages
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) {
if((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { flush = i; mappedByteBuffer.force(); }}//prevent gc
if (j % 1000= =0) {
Thread.sleep(0); }}//force flush when prepare load finished
if(type == FlushDiskType.SYNC_FLUSH) { mappedByteBuffer.force(); }...this.mlock();
}
//org.apache.rocketmq.store.MappedFile::mlock
public void mlock(a) {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
// Lock the Page Cache of the file with the system call mlock to prevent it from being swapped to swap space
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
// Advise the operating system through the system call madvise that the file should be accessed in the near future
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
}
Copy the code
To sum up, RocketMQ precreates one file at a time to reduce file creation delays and avoids missing page exceptions when reading and writing by preheating the file.
3.2 Message Writing
3.2.1 write CommitLog
The logical view of each message stored in the CommitLog is shown in the following figure. TOTALSIZE is the storage space occupied by the entire message.
The following table shows which fields are contained in each of the following messages, as well as the space occupied by these fields and a brief description of the fields.
The message is written by calling the appendMessagesInner method of MappedFile.
//org.apache.rocketmq.store.MappedFile::appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
// Determine whether to use DirectBuffer or MappedByteBuffer for write operationsByteBuffer byteBuffer = writeBuffer ! =null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); . byteBuffer.position(currentPos); AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt, putMessageContext); .return result;
}
//org.apache.rocketmq.store.CommitLog::doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {... ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); .// Just write the message to the buffer
byteBuffer.put(preEncodeBuffer);
msgInner.setEncodedBuff(null); .return result;
}
Copy the code
At this point, the message is finally written to ByteBuffer, but has not yet been persisted to disk. The next section explains the flushing mechanism. So here’s a question: how do ConsumeQueue and IndexFile write?
The answer is ReputMessageService that stores the logical layer in the storage architecture diagram. When MessageStore is initialized, it starts an asynchronous Thread called ReputMessageService, which calls doReput in a loop to notify ConsumeQueue and IndexFile of updates. ConsumeQueue and IndexFile can be updated asynchronously because CommitLog stores information about the queues and topics needed to restore ConsumeQueue and IndexFile, even if the Broker service fails unexpectedly. After the Broker restarts, it can restore ConsumeQueue and IndexFile according to CommitLog.
//org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService::run
public void run(a) {...while (!this.isStopped()) {
Thread.sleep(1);
this.doReput(); }... }//org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService::doReput
private void doReput(a) {...// Get new messages stored in CommitLog
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false.false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
/ / if you have new message, respectively called CommitLogDispatcherBuildConsumeQueue, CommitLogDispatcherBuildIndex build ConsumeQueue and IndexFile
DefaultMessageStore.this.doDispatch(dispatchRequest); }... }Copy the code
3.2.2 write ConsumeQueue
As shown in the figure below, each record of the ConsumeQueue is a 20-byte CommitLog physical offset of 8 bytes, a 4-byte message length, and an 8-byte tag Hashcode.
The ConsumeQueue record persistence logic is as follows.
//org.apache.rocketmq.store.ConsumeQueue::putMessagePositionInfo
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {...this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if(mappedFile ! =null) {...return mappedFile.appendMessage(this.byteBufferIndex.array()); }}Copy the code
3.2.3 write IndexFile
The logical structure of an IndexFile is shown in the figure below, similar to the array-and-linked list structure of the JDK’s HashMap. It consists of Header, Slot Table, and Index Linked List.
Header: The 40-byte Header of an IndexFile. Contains the following fields:
-
BeginTimestamp: This IndexFile contains the minimum storage time for messages.
-
EndTimestamp: The maximum storage time of messages in this IndexFile.
-
BeginPhyoffset: The minimum CommitLog file offset that contains messages in this IndexFile.
-
EndPhyoffset: The maximum CommitLog file offset that contains messages in this IndexFile.
-
HashSlotcount: The total number of hashslots contained in the IndexFile.
-
IndexCount: The number of Index entries used in the IndexFile.
Slot Table: contains 500w Hash slots by default. Each Hash Slot stores the location of the first IndexItem of the same Hash value.
Index Linked List: Contains a maximum of 2000W IndexItems by default. Its composition is as follows:
-
Key Hash: The Hash of the message Key, which is compared when searched against the Key, and then compared against the Key itself.
-
CommitLog Offset: Physical shift of the message.
-
Timestamp: the difference between the time the message was stored and the Timestamp of the first message.
-
Next Index Offset: position of the Next IndexItem saved after a hash conflict occurs.
Each hash Slot in the Slot Table holds the position of the IndexItem in the Index Linked List. If hash conflicts occur, the new IndexItem is inserted into the Linked header. Its Next Index Offset holds the position of the previous linked header IndexItem and overwrites the hash Slot in the Slot Table to the position of the latest IndexItem. The code is as follows:
//org.apache.rocketmq.store.index.IndexFile::putKey
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
intabsSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; .// Get the latest message location from Slot Table
int slotValue = this.mappedByteBuffer.getInt(absSlotPos); .int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
// Store the previous link header IndexItem position
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// Update the hash Slot in Slot Table to the latest message location
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true; . }Copy the code
To sum up, a complete message writing process includes synchronous writing to the Commitlog file cache and asynchronous building of ConsumeQueue and IndexFile.
3.3 Messages are flushed
The RocketMQ message flushing is divided into synchronous flushing and asynchronous flushing.
(1) Synchronous flush: RocketMQ’s Broker will actually return a successful ACK response to the Producer only after the message has been persisted to disk. Synchronous flush is a good guarantee for THE reliability of MQ messages, but has a significant performance impact, and is commonly used for financial services.
(2) Asynchronous flush: it can make full use of the advantages of OS Page Cache. As long as the message is written into Page Cache, the successful ACK can be returned to the Producer. Flush messages are committed by background asynchronous threads, which reduces read and write latency and improves MQ performance and throughput. Asynchronous flush can be enabled or disabled for out-of-heap memory.
When a flush request is submitted in the CommitLog, it is decided whether to flush synchronously or asynchronously based on the current Broker configuration.
//org.apache.rocketmq.store.CommitLog::submitFlushRequest
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// Flush the disk simultaneously
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
return request.future();
} else {
service.wakeup();
returnCompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }}// Asynchronously flush disks
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
// Enable asynchronous flush of out-of-heap memory
commitLogService.wakeup();
}
returnCompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }}Copy the code
The inheritance relationship among GroupCommitService, FlushRealTimeService, and CommitRealTimeService is shown in the figure below.
GroupCommitService: synchronizes the flush thread. As shown in the following figure, messages are written to the Page Cache and flushed synchronously through GroupCommitService. The message processing thread blocks and waits for the flush result.
//org.apache.rocketmq.store.CommitLog.GroupCommitService::run
public void run(a) {...while (!this.isStopped()) {
this.waitForRunning(10);
this.doCommit(); }... }//org.apache.rocketmq.store.CommitLog.GroupCommitService::doCommit
private void doCommit(a) {...for (GroupCommitRequest req : this.requestsRead) {
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
// Wake up the message processing thread waiting for the flush to completereq.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); }... }//org.apache.rocketmq.store.MappedFile::flush
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
...
// When writeBuffer or fileChannel position is not 0, fileChannel is used to forcibly flush disks
if(writeBuffer ! =null || this.fileChannel.position() ! =0) {
this.fileChannel.force(false);
} else {
// Use MappedByteBuffer to forcibly flush the disk
this.mappedByteBuffer.force(); }... }}Copy the code
FlushRealTimeService: the asynchronous flush thread of out-of-heap memory that is not enabled. As shown in the figure below, when a message is written to the Page Cache, the message processing thread immediately returns and flushes the disk asynchronously through FlushRealTimeService.
//org.apache.rocketmq.store.CommitLog.FlushRealTimeService
public void run(a) {...// Determine whether to periodically brush disks
if (flushCommitLogTimed) {
// Set hibernation interval
Thread.sleep(interval);
} else {
// If you wake up, brush the disk, non-periodic brush disk
this.waitForRunning(interval); }...// The same forcible flush method is used as the GroupCommitService
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); . }Copy the code
CommitRealTimeService: Starts asynchronous flush threads for out-of-heap memory. As shown in the figure below, the message processing thread writes the message to out-of-heap memory and immediately returns. Subsequently, messages are asynchronously submitted from the out-of-heap memory to the Page Cache through CommitRealTimeService, and flushed asynchronously by the FlushRealTimeService thread.
Note: After the message is asynchronously submitted to the Page Cache, the business can read the message from the MappedByteBuffer.
After the message is written to the off-heap memory writeBuffer, the isAbleToCommit method determines whether it has accumulated to at least the number of committed pages (4 by default). If the number of pages reaches the minimum number of pages submitted, it is submitted in batches; Otherwise it still resides in off-heap memory, where there is a risk of message loss. Through this batch operation, read and write Page Cahe will be several pages apart, reduce the probability of Page Cahe read and write conflict, achieve read and write separation. The specific implementation logic is as follows:
//org.apache.rocketmq.store.CommitLog.CommitRealTimeService
class CommitRealTimeService extends FlushCommitLogService {
@Override
public void run(a) {
while (!this.isStopped()) {
...
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); .// Commit the message to the memory buffer, and MappedFile:: COMMIT0 is called. Commit the message until the minimum number of pages is committed, otherwise it will remain in out-of-heap memory
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
if(! result) {// Wake up the flushCommitLogService to forcibly flush disksflushCommitLogService.wakeup(); }...this.waitForRunning(interval); }}}//org.apache.rocketmq.store.MappedFile::commit0
protected void commit0(a) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
// The message was submitted to the Page Cache without actually flushing
if (writePos - lastCommittedPosition > 0) {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos); }}Copy the code
Here is a summary of the usage scenarios and advantages and disadvantages of the three brush mechanisms.
4. Message reading
The message reading logic is much simpler than the writing logic. The following focuses on the analysis of how to realize the query message according to offset and the query message according to key.
4.1 Querying Information Based on Offset
The process of reading a message is to find the physical offset address of the message from the ConsumeQueue, and then read the physical content of the message from the CommitLog file.
//org.apache.rocketmq.store.DefaultMessageStore::getMessage
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
long nextBeginOffset = offset;
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = this.commitLog.getMaxOffset();
// Find the ConsumeQueueConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); .// Find the MappedFile of the corresponding ConsumeQueue according to offset
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long maxPhyOffsetPulling = 0;
int i = 0;
// The maximum size of information that can be returned cannot be greater than 16M
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
CommitLog physical address
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
intsizePy = bufferConsumeQueue.getByteBuffer().getInt(); maxPhyOffsetPulling = offsetPy; .// Retrieve the specific Message from the CommitLog based on offset and size
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); .// Place Message in the result set
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
}
/ / update the offset
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); . getResult.setStatus(status); getResult.setNextBeginOffset(nextBeginOffset);return getResult;
}
Copy the code
4.2 Query By Key
The process of reading a message is to find a record in the IndexFile using topic and key, and read the physical content of the message from the CommitLog file based on the offset of the CommitLog in the record.
//org.apache.rocketmq.store.DefaultMessageStore::queryMessage
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
QueryMessageResult queryMessageResult = new QueryMessageResult();
long lastQueryMsgTime = end;
for (int i = 0; i < 3; i++) {
// Get the physical offset addresses of messages recorded in the IndexFile CommitLog file
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); .for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
longoffset = queryOffsetResult.getPhyOffsets().get(m); . MessageExt msg =this.lookMessageByOffset(offset);
if (0== m) { lastQueryMsgTime = msg.getStoreTimestamp(); }...// Retrieve the message content in the CommitLog file
SelectMappedBufferResult result = this.commitLog.getData(offset, false); . queryMessageResult.addMessage(result); . }}return queryMessageResult;
}
Copy the code
In IndexFile, search CommitLog files for physical offset addresses as follows:
//org.apache.rocketmq.store.index.IndexFile::selectPhyOffset
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
// Get the first IndexItme storage location of the same hash key, that is, the first node of the list
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// Iterate over the list node
for (intnextIndexToRead = slotValue; ;) {if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
// Add the phyOffsets
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
// Continue iterating through the listnextIndexToRead = prevIndexRead; }... }Copy the code
Five, the summary
This article introduces the RocketMQ storage system from the source point of view of the core module implementation, including storage architecture, message writing and message reading.
RocketMQ writes messages from all topics to CommitLog, implementing a strict sequence of writes. Preheat files to prevent Page Cache from being swapped to swap space, reducing Page miss interrupt when reading or writing files. Mmap is used to read and write CommitLog files. The operations on files are directly performed on memory addresses, greatly improving the file read and write efficiency.
In scenarios that require high performance and low data consistency, you can enable out-of-heap memory to separate read and write data to improve disk throughput. In short, the learning of storage modules requires a certain understanding of operating system principles. The optimal solution for extreme performance adopted by the authors is worth studying.
Vi. References
1.RocketMQ official documentation
Author: Zhang Zhenglin, Vivo Internet Server Team