Pressure test reportCompared with Kafka, RocketMQ’s performance indicators (TPS and RT) remained stable as the number of topics increased with the coexistence of Producer and Consumer. This article discusses the reasons for RocketMQ’s high performance from a message storage perspective, focusing on four areas: the structure of message file storage, the flow of messages from Broker to persistent disk, flush strategy, and memory mapping optimization mechanisms.
Message file storage structure
Like Kafka, RocketMQ chooses to operate directly on the file system to improve storage efficiency, but RocketMQ maximizes the message persistence process to sequential writes. To further understand the message storage structure, the author of this article deployed RocketMQ on a single machine and delivered a certain amount of messages. The default RocketMQ storage path is $HOME/store. The related file directory structure is as follows.
$ The tree ~ / store/commitlog ~ / store/consumequeue ~ / store/index/Users/chenyang/store/commitlog ├ ─ ─ 00000000000000000000 └ ─ ─ 00000000001073741824 / Users/chenyang/store/consumequeue └ ─ ─ TopicTest ├ ─ ─ 0 │ ├ ─ ─ 00000000000000000000 │ ├ ─ ─ 00000000000012000000 │ │ 00000000000006000000 ├ ─ ─ ├ ─ ─ 00000000000018000000 │ └ ─ ─ 00000000000024000000 ├ ─ ─ 1 │ └ ─ ─... ├─ 2 │ ├─ 3 └ └ ─ ─ ─ ─... / Users/chenyang/store/index └ ─ ─ 20190626213710317Copy the code
Kafka uses topics as the basic unit of file storage, meaning that each Topic has its own data file and index file. When there are a large number of topics, message persistence becomes a random write to disk behavior, where disk IO becomes a major factor affecting system throughput. RocketMQ addresses these issues by first converting message writes to sequential writes, with messages written to the same CommitLog for all topics. Also, since messages still need to be consumed on a Topic basis, RocketMQ asynchronously builds multiple logical queues and indexes for each Topic based on commitlogs: ConsumeQueue records the location of messages in the CommitLog. Given a Topic and a message Key, the Index file provides the message retrieval capability, which is mainly used in troubleshooting and data statistics. ConsumeQueue and Index are still built in order.
RocketMQ converts IO to memory by mapping files directly to user-mode memory addresses using Mmap. Since the size of the memory map must be specified with MMAP, the RocketMQ convention is that a single CommitLog file is equal to 1 GB, each message and its meta information is sequentially appended to the file, and there may be a free area at the end of the file. The size of a single ConsumeQueue file is 6000000 B, and 30 W records are stored. Each record is fixed at 20 B. A single Index file is 420000040B in size and contains IndexHeader, HashSlot, and MessageIndex. The file name of the second CommitLog is 1024 * 1024 * 1024 = 00000000001073741824. The file name of the second ConsumeQueue is 20 * 30 W = 00000000000006000000.
RocketMQ writes messages sequentially to CommitLog in the storage format shown in the following figure. In addition to recording the attributes of the message itself (message length, message body, Topic length, Topic, message attribute length, and message attribute), CommitLog also records information about the consuming queue on which the message is sent (consuming queue ID and offset). Due to the variable length of the stored entry, if the remaining space of the CommitLog cannot meet the message, the CommitLog appends a store entry with MAGIC CODE equal to BLANK_MAGIC_CODE at the end as a closing mark. And stores the message to the next CommitLog file.
Unlike CommitLog, ConsumeQueue uses a fixed-length storage structure for its storage entries, as shown in the following figure. To implement fixed-length storage, ConsumeQueue stores Hash codes for message tags. Whether to consume the message is determined by comparing the HashCode of the Consumer’s subscribed Tag with the Tag HashCode in the stored entry.
On the basis of existing CommitLog and ConsumeQueue, the message sending and consuming logic of the messaging middleware is already satisfied. RocketMQ introduces Index to Index messages for troubleshooting: Quickly locate messages given a Topic and Key. The following figure shows the file storage structure of Index. The overall design concept of Index is similar to that of HashMap persistence on disk. The chained address method is also used to resolve Hash conflicts: each Hash Slot is associated with a list of Message indexes, and multiple Message indexes are connected via preIndexOffset.
This section discusses the core file storage structure associated with the RocketMQ message store. Whether it is CommitLog, ConsumeQueue, or Index, RocketMQ uses a uniform MappedFile abstraction. This article discusses how RocketMQ builds commitlogs around mappedfiles in conjunction with memory maps (asynchronous builds of ConsumeQueue and Index are not covered in this article).
Message storage process
At startup, the Broker registers the message handler with a Core BrokerController and routes the request to the corresponding message handler based on the request RequestCode. Whereas NameServer relegates all network traffic to a single message handler, Broker Eight message handlers are defined (AdminBrokerProcessor, ClientManageProcessor, ConsumerManageProcessor, EndTransactionProcessor, and ForwardRequestProc) Essor, PullMessageProcessor, QueryMessageProcessor and SendMessageProcessor). SendMessageProcessor is responsible for processing message sending requests, and the core code related to its registration is simplified as follows.
// org.apache.rocketmq.broker.BrokerStartup#main
public static void main(String[] args) {
start(createBrokerController(args));
}
// org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {
.
try {
// Configuation initialization.
final BrokerConfig brokerConfig = new BrokerConfig(a);
final NettyServerConfig nettyServerConfig = new NettyServerConfig(a);
final NettyClientConfig nettyClientConfig = new NettyClientConfig(a);
final BrokerController controller = new BrokerController(
brokerConfig.
nettyServerConfig.
nettyClientConfig.
messageStoreConfig);
.
boolean initResult = controller.initialize(a);
.
return controller;
} catch (Throwable e) {
.
}
return null;
}
// org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize(a) throws CloneNotSupportedException {
.
// Create several thread pool service.
this.registerProcessor(a);
// Create and execute a periodic action.
.
}
// org.apache.rocketmq.broker.BrokerController#registerProcessor
public void registerProcessor(a) {
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE.
sendProcessor.
this.sendMessageExecutor);
.
}
Copy the code
SendMessageProcessor implements the NettyRequestProcessor interface, whose core processing logic for messages is implemented in the processRequest method. RocketMQ sends messages including single messages and batch messages. This section takes sending a single message as an example. Messages from different topics are eventually sequentially persisted to a shared CommitLog, which consists of a fixed-size file queue defined as MappedFileQueue. Each file in MappedFileQueue is defined as an MappedFile, and each MappedFile corresponds to a specific file used to persist messages to disk. The dependencies between CommitLog, MappedFileQueue, and MappedFile are shown below.
Broker startup DefaultMessageStore is initialized. DefaultMessageStore is an abstraction of the RocketMQ message store CommitLog maintenance, ConsumeQueue & Index asynchronous build (ReputMessageService), MappedFile The allocation of memory mapping (AllocateMappedFileService), HA (HAService) and so on security. DefaultMessageStore stores messages to CommitLog via the putMessage method. The core code is condensed as follows.
// org.apache.rocketmq.store.DefaultMessageStore#putMessage
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
if (this.shutdown) {
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE. null);
}
if (BrokerRole.SLAVE = = this.messageStoreConfig.getBrokerRole()) {
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE. null);
}
if (!this.runningFlags.isWriteable()) {
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE. null);
}
if (msg.getTopic().length(a) > Byte.MAX_VALUE) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL. null);
}
if (msg.getPropertiesString(a) ! = null
&& msg.getPropertiesString().length(a) > Short.MAX_VALUE) {
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED. null);
}
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY. null);
}
PutMessageResult result = this.commitLog.putMessage(msg);
.
return result;
}
Copy the code
Before you store messages to CommitLog, You need to verify the state of DefaultMessageStore, the current Broker node role, whether DefaultMessageStore allows writes, whether Topic and Properties are too long, and whether PageCache is busy. After verification, the putMessage method of CommitLog appends the message to the last MappedFile in the MappedFile ue. The core flow of the putMessage method (which does not involve delayed messages for now) consists of trying to fetch the last MappedFile and then limiting the Append CommitLog to a serial operation by locking the CommitLog; If no MappedFile is obtained or MappedFile is full, create a new MappedFile. Append the message to MappedFile. If END_OF_FILE is returned, MappedFile does not have enough space left. Create a new MappedFile and re-append the message to the new MappedFile. Release the CommitLog lock. The above logic core code is simplified as follows.
// org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
.
AppendMessageResult result = null;
.
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(a);
putMessageLock.lock(a); //spin or ReentrantLock ,depending on store config
try {
.
if (null = = mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
.
result = mappedFile.appendMessage(msg. this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
.
result = mappedFile.appendMessage(msg. this.appendMessageCallback);
break;
case .
}
.
} finally {
putMessageLock.unlock(a);
}
// This section does not discuss Broker flush strategy and HA mechanism
handleDiskFlush(result. putMessageResult. msg);
handleHA(result. putMessageResult. msg);
return putMessageResult;
}
Copy the code
Learn more about how to create mappedfiles. DefaultMessageStore starting AllocateMappedFileService threads during initialization. When an MappedFile needs to be created, the MappedFile name is first calculated based on startOffset, including two scenarios: If MappedFile does not exist at this time, it means that the current message is sent for the first time or the historical MappedFile has been cleaned. CreateOffset cannot simply equal startOffset % mappedFileSize. To avoid unlimited growth of the MappedFile file name; If the last MappedFile already exists and is full, createOffset is equal to fromOffset + mappedFileSize of the last MappedFile. Message processing thread based on createOffset build two consecutive AllocateRequest and insert maintenance requestQueue AllocateMappedFileService thread. In AllocateMappedFileService thread reads requestQueue AllocateRequest create corresponding MappedFile asynchronous. During creation, the message processing thread waits synchronously for the MappedFile to complete creation through CountDownLatch. Why don’t think about a problem, message processing threads directly create MappedFile synchronization, but by creating a AllocateRequest request, unite by AllocateMappedFileService thread asynchronous processing? The core code to build AllocateRequest and insert the requestQueue is condensed below.
// org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile
public MappedFile getLastMappedFile(final long startOffset) {
return getLastMappedFile(startOffset. true);
}
// org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile
public MappedFile getLastMappedFile(final long startOffset. boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile(a);
if (mappedFileLast = = null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast ! = null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset(a) + this.mappedFileSize;
}
if (createOffset ! = -1 && needCreate) {
String nextFilePath = this.storePath + File.separator +
UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService ! = null) {
mappedFile =
this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath.
nextNextFilePath. this.mappedFileSize);
}
.
return mappedFile;
}
return mappedFileLast;
}
// org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath. String nextNextFilePath. int fileSize) {
int canSubmitRequests = 2;
.
AllocateRequest nextReq = new AllocateRequest(nextFilePath. fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath. nextReq) = = null;
if (nextPutOK) {
.
boolean offerOK = this.requestQueue.offer(nextReq);
.
canSubmitRequests-;
}
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath. fileSize);
boolean nextNextPutOK =
this.requestTable.putIfAbsent(nextNextFilePath. nextNextReq) = = null;
if (nextNextPutOK) {
if (canSubmitRequests < = 0) {
.
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
.
}
}
.
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result ! = null) {
boolean waitOK = result.getCountDownLatch().await(waitTimeOut.
TimeUnit.MILLISECONDS);
if (!waitOK) {
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile(a);
}
}
} catch (InterruptedException e) {
log.warn(this.getServiceName(a) + " service has exception. ". e);
}
return null;
}
Copy the code
To answer the above questions, AllocateMappedFileService thread loop from requestQueue AllocateRequest, AllocateRequest implement the Comparable interface, based on the smallest in the file name. When MappedFile needs to be created, build two AllocateRequest at the same time, Message processing thread through CountDownLatch AllocateMappedFileService asynchronous threads to create the first MappedFile file into a synchronous operation (RocketMQ exist a lot of use CountDownLatch Converting the asynchronous to synchronous case), and the second MappedFile file to AllocateMappedFileService still create asynchronous create threads. When the message processing thread needs to create the MappedFile again, it can directly retrieve the MappedFile that has been created. Simplify the following AllocateMappedFileService thread creation MappedFile core logic.
// org.apache.rocketmq.store.AllocateMappedFileService#run
public void run(a) {
while (!this.isStopped(a) && this.mmapOperation()) {}
}
// org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation
private boolean mmapOperation(a) {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take(a);
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
.
if (req.getMappedFile(a) = = null) {
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(a);
mappedFile.init(req.getFilePath(), req.getFileSize(),
messageStore.getTransientStorePool());
} catch (RuntimeException e) {
.
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(),
messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
.
req.setMappedFile(mappedFile);
.
}
} catch (InterruptedException e) {
.
} catch (IOException e) {
.
} finally {
.
req.getCountDownLatch().countDown(a);
}
return true;
}
Copy the code
Continue to understand how MappedFile is created. If isTransientStorePoolEnable to true, will MappedFile TransientStorePool application of heap memory (Direct Byte Buffer) space as the writeBuffer, When writing a message, write the message to writeBuffer first, then commit the message to fileChannel and flush. Otherwise, create MappedFile memory MappedFile byte buffer mappedByteBuffer, write the message to mappedByteBuffer, and flush. After the message is written, update wrotePosition (which has not yet been flushed to disk). Why RocketMQ writes messages in two different ways is not discussed in this article. Message append to MappedFile core code is simplified as follows.
// org.apache.rocketmq.store.MappedFile#appendMessage
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg.
final AppendMessageCallback cb) {
return appendMessagesInner(msg. cb);
}
// org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt.
final AppendMessageCallback cb) {
int currentPos = this.wrotePosition.get(a);
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer ! = null ?
writeBuffer.slice(a) : this.mappedByteBuffer.slice(a);
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer.
this.fileSize - currentPos.
(MessageExtBrokerInner) messageExt);
} else {
.
}
this.wrotePosition.addAndGet(result.getWroteBytes());
.
return result;
}
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset.
final ByteBuffer byteBuffer.
final int maxBlank.
final MessageExtBrokerInner msgInner) {
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position(a);
.
final int msgLen = calMsgLength(bodyLength. topicLength. propertiesLength);
.
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory. maxBlank);
this.msgStoreItemMemory.putInt(maxBlank);
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
byteBuffer.put(this.msgStoreItemMemory.array(), 0. maxBlank);
return new AppendMessageResult(...). ;
}
// Initialization of storage space
byteBuffer.put(this.msgStoreItemMemory.array(), 0. msgLen);
AppendMessageResult result = new AppendMessageResult(...). ;
.
return result;
}
Copy the code
If there is enough space left for news writing, set the byteBuffer (writeBuffer/mappedByteBuffer) is equal to the position of wrotePosition, perform the byteBuffer put method to write a byte array into the can. If the MappedFile doesn’t have enough space left (msgLen + END_FILE_MIN_BLANK_LENGTH > maxBlank), write BLANK_MAGIC_CODE to byteBuffer and return END_OF_FILE. The message processing thread creates a new MappedFile and appends the message to byteBuffer, as shown in the following figure.
Brush set strategy
The RocketMQ flush policy is divided into commit and Flush phases, commitLogService and flushCommitLogService are responsible for each phase, as shown in the following figure: In the commit phase, if isTransientStorePoolEnable to true, the data from the writeBuffer write fileChannel, otherwise the data still resides in mappedByteBuffer; In the Flush phase, data is persisted to disk from fileChannel or mappedByteBuffer.
In the CommitLog construction phase, commitLogService is instantiated as CommitRealTimeService. For synchronous flush, flushCommitLogService is instantiated as GroupCommitService. For asynchronous flushes, flushCommitLogService is instantiated as FlushRealTimeService. CommitRealTimeService, GroupCommitService, and FlushRealTimeService all inherit from ServiceThread and implement the Runnable interface. Threads are created after the Broker is started and loop through the action. The CommitLog constructor and startup code are simplified below.
// org.apache.rocketmq.store.CommitLog#CommitLog
public CommitLog(final DefaultMessageStore defaultMessageStore) {
.
if (FlushDiskType.SYNC_FLUSH = =
defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService(a);
} else {
this.flushCommitLogService = new FlushRealTimeService(a);
}
this.commitLogService = new CommitRealTimeService(a);
.
}
// org.apache.rocketmq.store.CommitLog#start
public void start(a) {
this.flushCommitLogService.start(a);
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start(a);
}
}
Copy the code
The GroupCommitService thread commits writeBuffer data to fileChannel. Commit is triggered by the following two conditions.
- WriteBuffer to commit the data in the number of pages greater than or equal to commitCommitLogLeastPages, defaults to 4 pages, each page size of 4 KB, when to commit more than 16 KB of data, perform a commit operation.
- Recent commitCommitLogThoroughInterval interval did not perform a commit operation, take the initiative to perform a commit operation, default is 200 ms.
The previous two trigger conditions control that writeBuffer data can be merged before being written to fileChannel, improving I/O performance. Commit After the commit operation is complete, GroupCommitService wakes up the flushCommitLogService thread to flush. The code corresponding to the above process is simplified as follows.
// org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
@Override
public void run(a) {
while (!this.isStopped()) {
.
int commitDataLeastPages = .
int commitDataThoroughInterval = .
long begin = System.currentTimeMillis(a);
if (begin > = (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis(a);
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup(a);
}
.
} catch (Throwable e) {
.
}
}
.
}
Copy the code
Commit operations are not discussed in detail in this section. CommittedPosition and wrotePosition maintained by MappedFile mark the start and end offsets of writeBuffer data to be committed, respectively. The COMMIT operation constructs the ByteBuffer based on this and writes the fileChannel. The core code of the above process is as follows.
// org.apache.rocketmq.store.MappedFile#commit0
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get(a);
int lastCommittedPosition = this.committedPosition.get(a);
if (writePos - this.committedPosition.get(a) > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice(a);
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
.
}
}
}
Copy the code
In contrast to Kafka, RocketMQ offers synchronous flush capability in addition to asynchronous flush capability. The implementation of synchronous flush is similar to MappedFile creation, that is, GroupCommitRequest is written to the request queue, and the asynchronous thread GroupCommitService consumes the request. For asynchronous brush set, if isTransientStorePoolEnable to true, wake up the data from the writeBuffer CommitRealTimeService thread commit to fileChannel, Otherwise, wake up the FlushRealTimeService thread to flush the mappedByteBuffer. The core code for RocketMQ to initiate a flush is condensed below.
// org.apache.rocketmq.store.CommitLog#handleDiskFlush
public void handleDiskFlush(AppendMessageResult result.
PutMessageResult putMessageResult. MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH = =
this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request =
new GroupCommitRequest(result.getWroteOffset(a) + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(...). ;
if (!flushOK) {
.
}
} else {
service.wakeup(a);
}
}
// Asynchronous flush
else {
if (!isTransientStorePoolEnable()) {
flushCommitLogService.wakeup(a);
} else {
commitLogService.wakeup(a);
}
}
}
Copy the code
To avoid lock contention for GroupCommitRequest requests, the GroupCommitService thread maintains the GroupCommitRequest read queue requestsRead and write queue requestsWrite. The submission and consumption of GroupCommitRequest are not blocked. After consuming the requestsRead queue, the GroupCommitService thread empties the requestsRead and exchanges the requestsRead and requestsWrite. The above logic core code is simplified as follows.
class GroupCommitService extends FlushCommitLogService {
private volatile List<GroupCommitRequest> requestsWrite =
new ArrayList<GroupCommitRequest> ();
private volatile List<GroupCommitRequest> requestsRead =
new ArrayList<GroupCommitRequest> ();
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false. true)) {
waitPoint.countDown(a); // notify
}
}
private void swapRequests(a) {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doCommit(a) {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// Invoke force() to flush fileChannel/mappedByteBuffer to disk.
}
.
this.requestsRead.clear(a);
} else {
.
}
}
}
public void run(a) {
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit(a);
} catch (Exception e) {
.
}
}
.
synchronized (this) {
this.swapRequests(a);
}
this.doCommit(a);
}
.
}
Copy the code
Whether open isTransientStorePoolEnable, asynchronous brush set to a FlushRealTimeService thread processing. In handleDiskFlush approach, if isTransientStorePoolEnable to true, only awakened CommitRealTimeService threads, However, the CommitRealTimeService thread actually wakes up the FlushRealTimeService thread after commit. The FlushRealTimeService thread maintains lastFlushTimestamp to mark the last flush disk. Flush is triggered by the FlushRealTimeService thread in the same way as the GroupCommitService thread commits: FileChannel or mappedByteBuffer to flush data page size greater than or equal to flushPhysicQueueLeastPages page, the default is 4 pages; Last flush time interval more than flushPhysicQueueThoroughInterval disk, the default is 10 s. Finally, the FlushRealTimeService thread calls the Force methods of fileChannel and mappedByteBuffer to flush the data.
Memory mapping optimization
RocketMQ uses Mmap to map a memory region of kernel space to user space. Once the mapping is established, changes made by an application to this memory region are reflected directly into kernel space, and vice versa. Compared with read/write system calls, Mmap reduces data copying between kernel space and user space, which can improve I/O efficiency in scenarios where a large amount of data is transferred. However, memory mapping through Mmap only maps the file disk address and virtual address, and the physical memory does not fill the disk file content. When a file is actually read or written, a page miss interrupt is generated and thrown into the kernel before the disk file contents are read into physical memory. RocketMQ designed the MappedFile preheating mechanism for the above scenario.
Review MappedFile creation process, AllocateMappedFileService threads polling AllocateRequest request queue and create MappedFile, at this point the file system already exists in the corresponding fixed size file. MappedFile memory preheating (warmMapedFileEnable) is enabled for RocketMQ and the mapping space of MappedFile file is greater than or equal to mapedFileSizeCommitLog (1 GB). Call the warmMappedFile method to preheat the MappedFile. The above logic core code is simplified as follows.
// org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation
private boolean mmapOperation(a) {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take(a);
.
if (req.getMappedFile(a) = = null) {
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(a);
mappedFile.init(req.getFilePath(), req.getFileSize(),
messageStore.getTransientStorePool());
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
.
// pre write mappedFile
if (mappedFile.getFileSize(a) > = getMapedFileSizeCommitLog(a)
&& isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(getFlushDiskType(),
getFlushLeastPagesWhenWarmMapedFile());
}
.
}
} catch (InterruptedException e) {
.
} catch (IOException e) {
.
} finally {
.
}
return true;
}
Copy the code
WarmMappedFile OS_PAGE_SIZE Writes a 0 to the mappedByteBuffer at intervals. In this case, a page failure occurs on the corresponding page and the operating system allocates physical memory for the corresponding page. At the same time, if the disk flushing policy is synchronous, you need to flush each page. Finally, JNA calls the mlock method to lock the physical memory corresponding to the mappedByteBuffer, preventing the operating system from scheduling the related memory pages to the swap space, so as to improve the read and write performance when accessing the MappedFile. The core code of warmMappedFile is simplified as follows.
// org.apache.rocketmq.store.MappedFile#warmMappedFile
public void warmMappedFile(FlushDiskType type. int pages) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice(a);
int flush = 0;
for (int i = 0. j = 0; i < this.fileSize; i + = MappedFile.OS_PAGE_SIZE. j+ +) {
byteBuffer.put(i. (byte) 0);
if (type = = FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) > = pages) {
flush = i;
mappedByteBuffer.force(a);
}
}
// prevent gc
if (j % 1000 = = 0) {
.
}
}
// force flush when prepare load finished
if (type = = FlushDiskType.SYNC_FLUSH) {
mappedByteBuffer.force(a);
}
this.mlock(a);
}
// org.apache.rocketmq.store.MappedFile#mlock
public void mlock(a) {
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(a);
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer. new NativeLong(this.fileSize));
}
.
}
Copy the code
conclusion
RocketMQ has made a number of performance optimizations to achieve high performance messaging middleware. This paper analyzes the design and implementation of RocketMQ message storage based on source code. The message storage structure supporting sequential write, MappedFile creation, asynchronous thread with CountDownLatch to implement asynchronous-to-synchronous task execution, out-of-heap memory, MappedFile memory preheating and JNA memory locking are discussed.
reference
[1] https://github.com/apache/rocketmq
[2] http://jm.taobao.org/2016/04/07/kafka-vs-rocketmq-topic-amout/
[3] https://rocketmq.apache.org/docs/quick-start/
[4] http://man7.org/linux/man-pages/man2/mmap.2.html
[5] http://man7.org/linux/man-pages/man2/mlock.2.html