An overview of the
RocketMQ version 4.9.1 has some performance optimizations for brokers, and this batch of PR is mounted under ISSUE#2883.
Compared to version 4.9.0, real-time TPS produced by small messages is approximately 28% better.
I have some commit to Improve produce performance in M/S mode:
- Change log level to debug: “Half offset {} has been committed/rolled back”
- Optimise lock in WaitNotifyObject
- Remove lock in HAService
- Remove lock in GroupCommitService
- Eliminate array copy in HA
- Remove putMessage/putMessages method in CommitLog which has too many duplicated code.
- Change default value of some parameters: sendMessageThreadPoolNums/useReentrantLockWhenPutMessage/flushCommitLogTimed/endTransactionThreadPoolNums
- Optimise performance of asyncPutMessage (extract some code out of putMessage lock)
- extract generation of msgId out of lock in CommitLog (now only for single message processor)
- extract generation of topicQueueTable key out of sync code
- extract generation of msgId out of lock in CommitLog (for batch)
- fix ipv6 problem introduced in commit “Optimise performance of asyncPutMessage (extract some code out of putMessage lock)”
- Remove an duplicate MessageDecoder.string2messageProperties for each message, and prevent store “WAIT=true” property (in most case) to save 9 bytes for each message.
- Improve performance of string2messageProperties/messageProperties2String, and save 1 byte for each message.
- Optimise parse performance for SendMessageRequestHeaderV2
The following will be from the source level to a detailed analysis of the optimization point and the reason for optimization. Understanding these optimizations requires familiarity with the RocketMQ source code, and for ease of understanding, some prior knowledge of optimization points will be added.
Optimization analysis
Transaction Message Log Optimization (1)
- Change log level to debug: “Half offset {} has been committed/rolled back”
The default configuration is to print one log for each message. The change mainly removes log printing from transaction messages.
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
if (removeMap.containsKey(i)) {
log.~~info~~("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
}
Copy the code
The lesson from this relatively simple optimization is that you need to be careful when printing logs, especially for high-performance middleware like RocketMQ, which can be CPU intensive.
In addition, if operations such as string concatenation are involved in the log, they are more costly and should be avoided.
Optimization/Removal of locks in master/Slave replication and Synchronous brush processes (2-4)
Improve produce performance in M/S mode
- Optimise lock in WaitNotifyObject
- Remove lock in HAService
- Remove lock in GroupCommitService
To learn a few tips on how to optimize, take a look at the principles of master slave replication and synchronous flush in RocketMQ. The two operations operate on basically the same principle.
Pre-knowledge: master-slave replication and synchronous swipe disk production and consumption mode
Within RocketMQ, both master/slave replication and synchronous flush are handled cooperatively by multiple threads. Take GroupTransferService as an example. Message processing threads (multiple threads) continuously receive messages and produce messages to be replicated. Another ServiceThread single-thread processes the replication results. RocketMQ uses double buffers for batch processing.
As shown in the figure below, while the consumer is processing data, the producer can continue to add data without being affected. In the first stage, there are three pieces of data in the producer Buffer and two pieces of data in the consumer Buffer. Since the consumer is a single thread and there is no other thread to compete with it, it can process these two pieces of data in batches. When it’s done, it swaps references to the two buffers, so it can batch up to three more buffers in the second phase.
Optimization 1: Synchronized synchronized is changed to splock
RocketMQ has previously used heavyweight synchronized to ensure thread safety for producer writes to putRequest(), swapping Buffer references to swapRequests(), and internal processing.
The actual putRequest() method does nothing but add data to the list; SwapRequests () does swap operations, which take less time, so you can use spinlocks instead. Each add unlock costs only 2 CAS operations without thread switching.
Optimization 2: The WaitNotifyObject class
WaitNotifyObject is used for asynchronous notification between threads. Used in master-slave replication logic. The use is similar to that of synchronized wait() and nofityAll(), the wait-notification mechanism.
The master and slave replication threads circulate data, and if there is no data, wait by calling WaitNotifyObject#allWaitForRunning().
After the CommitLog saves the message, the WaitNotifyObject#wakeUpAll() method is called to wake up the master and slave replication threads.
This optimization reduces the number of times you need to enter a synchronized code block.
Change point: Change waitingThreadTable to ConcurrentHashMap, and then move waitingThreadTable out of the synchronized block.
Volatile Boolean hasNotified changes to AtomicBoolean hasNotified
Eliminate unnecessary array copying in master/slave replication (5)
- Eliminate array copy in HA
To understand this optimization, you need to learn the basics, including the memory-mapped files used by the CommitLog in RocketMQ, and the master/slave replication process.
Memory mapping file mmap
The CommitLog of RocketMQ is a memory mapped file (Mmap). The following diagram compares the difference between normal IO and memory-mapped IO.
Mmap maps files directly to user memory, so that operations on files do not need to be copied into PageCache, but converted into operations on PageCache mapping address, so that random read and write files and memory have similar speed (random address mapped into memory).
Master/slave replication process summary
The RocketMQ Master/Slave replication mechanism sends messages to the Slave after they are written to the CommitLog.
The modification point is in the Slave process of master/Slave replication. HAClient is an implementation class that connects Slave to Master.
The HAClient#run() method does the following:
- Salve connects to the master and reports the current offset of the slave to the master
- After receiving the data, the master confirms the starting position for sending data to the slave
- Master queries the MappedFIle corresponding to the start position
- The master sends the found data to the slave
- The slave receives data and saves it to its CommitLog
At steps 4 and 5, the data received by the Slave is stored in a ByteBuffer. When it is sent to the CommitLog, the original code creates a byte array and copies the data from the read ByteBuffer.
Optimization: Reduce byte array copying
The array copy step in the Master/slave copy logic can be omitted. The ByteBuffer read from the Master can be passed to the CommitLog with the start location and length of the data. This allows you to pass the data in ByteBuffer without recopying the byte array.
Remove putMessage/putMessages methods with duplicate code from CommitLog (6)
- Remove putMessage/putMessages method in CommitLog which has too many duplicated code.
This optimization focuses on reducing redundant code
The original CommitLog had these methods for saving messages
- PutMessage: Saves a single message synchronously
- AsyncPutMessage: Saves a single message asynchronously
- PutMessages: Saves batch messages synchronously
- AsyncPutMessages: Saves batch messages asynchronously
The logic for saving messages synchronously is similar to that for saving messages asynchronously, but instead of reusing the code, each method is implemented separately. This results in a lot of duplicate code for both synchronous and asynchronous methods.
This Patch merges putMessage & asyncPutMessage, putMessages & asyncPutMessages methods, calls waiting methods of asynchronous methods in synchronous methods, and removes a lot of repetitive code.
Adjust the default values of several parameters for sending messages (7)
- Change default value of some parameters: sendMessageThreadPoolNums/useReentrantLockWhenPutMessage/flushCommitLogTimed/endTransactionThreadPoolNums
Message save/send parameter optimization
When RocketMQ saves messages, the CommitLog must be stored sequentially and only single-threaded. Before writing, a lock must be acquired, which is the most critical lock for RocketMQ performance.
As early as before the 3.2 X version that lock is synchronized, from RocketMQ4. X began to introduce the spin lock and as the default, at the same time the parameter sendMessageThreadPoolNums (dealing with Client side sends the message thread pool threads) to change to 1, This saves the overhead of entering or leaving a weight lock by writing each message to a single thread as CommitLog.
The logic of message processing is not written to the CommitLog (which cannot be parallel). There are also some tasks that cost a lot of CPU, and multi-threading is better. After some practical tests, 4 threads is a reasonable value. So this parameter defaults to MIN(number of logical processors, 4).
Since there are four threads, using a spin lock is probably not a good idea, because a thread that can’t get the lock will idle the CPU. So switch to reentrant lock, to true useReentrantLockWhenPutMessage parameters is still relatively good.
Transaction message two-phase processing thread size
EndTransactionThreadPoolNums is two-phase transaction message thread size, sendMessageThreadPoolNums specifies the phase processing thread pool size. If the processing speed can’t keep up with the stage of the second phase would lead to two phase missing messages back to check in great quantities, so suggest endTransactionThreadPoolNums should be greater than sendMessageThreadPoolNums, suggest at least 4 times.
Enable the scheduled disk brush function
Parameter flushCommitLogTimed specifies whether disk flushing is timed. The previous value is false, indicating real-time disk flushing.
The brush disk related parameters have also been adjusted. By default, RocketMQ is an asynchronous flush, but an asynchronous flush request is triggered each time a message is processed. Change the parameter flushCommitLogTimed to true, that is, flush disk every 500ms by default. The I/o pressure is greatly reduced and the reliability is not reduced in the case of master/slave synchronous replication.
Optimize the putMessage lock operation(8 to 12)
Improve produce performance in M/S mode.
- Optimise performance of asyncPutMessage (extract some code out of putMessage lock)
- extract generation of msgId out of lock in CommitLog (now only for single message processor)
- extract generation of topicQueueTable key out of sync code
- extract generation of msgId out of lock in CommitLog (for batch)
- fix ipv6 problem introduced in commit “Optimise performance of asyncPutMessage (extract some code out of putMessage lock)”
CommitLog is the RocketMQ message storage file. All messages on a single Broker are stored sequentially in CommitLog.
CommitLog writing can only be performed on a single thread, and a lock must be acquired before writing. This lock is the most critical for RocketMQ performance.
In theory it’s just a matter of writing to the MappedByteBuffer, but practice is often more complicated than theory, because there are a lot of things going on inside the lock for a variety of reasons.
Due to the complexity of the current code, this optimization is the biggest change in this batch of modifications, but its logic is actually very simple, is to do things inside the lock, try to do it outside the lock, can be prepared first data. It includes the following changes:
- Put most of the preparation (coding) of the Buffer out of the lock, ahead of time.
- Lazy initialization of MessageId (out of lock), the generation of this MessageId involves a lot of codec and data replication, which is actually quite performance expensive.
- The Key used to check the hash table inside the lock is a concatenated string, this time also changed to outside the lock.
- By the way, I have made up for the missing IPv6 treatment.
- Useless code is removed.
Optimize asyncPutMessage performance to take preparations out of the lock
Let’s take a look at the code changes. The new code in green on the right is the code that used to be inside the lock, but is now outside the lock.
The right of the new putMessageThreadLocal. GetEncode (.) encode (MSG) completed a large number of pre operation, The original CommitLog# DefaultAppendMessageCallback# doAppend () method of operation of moving to the lock.
The first copy of the following code is before the change. The doAppend() method is an in-lock operation. The second is modified, with the encode() method drawn before locking.
// commitlog.java before modification
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// ...
/** * Serialize message */
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// ... Determines whether there is sufficient free space
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
Copy the code
// commitlog.java after modification
protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
/** * Serialize message */
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// Initialization of storage space
this.resetByteBuffer(encoderBuffer, msgLen);
// 1 TOTALSIZE
this.encoderBuffer.putInt(msgLen);
// 2 MAGICCODE
this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.encoderBuffer.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.encoderBuffer.putInt(msgInner.getQueueId());
// 5 FLAG
this.encoderBuffer.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET, need update later
this.encoderBuffer.putLong(0);
// 7 PHYSICALOFFSET, need update later
this.encoderBuffer.putLong(0);
// 8 SYSFLAG
this.encoderBuffer.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.encoderBuffer.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
// 11 STORETIMESTAMP
this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
// 13 RECONSUMETIMES
this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.encoderBuffer.putInt(bodyLength);
if (bodyLength > 0)
this.encoderBuffer.put(msgInner.getBody());
// 16 TOPIC
this.encoderBuffer.put((byte) topicLength);
this.encoderBuffer.put(topicData);
// 17 PROPERTIES
this.encoderBuffer.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.encoderBuffer.put(propertiesData);
encoderBuffer.flip();
return null;
}
Copy the code
The pre-encoded data is then put into the private ByteBuffer encodedBuff field in MessageExtBrokerInner for use in the doAppend() method
MessageId lazy loading
Using the functional interface Supplier, put the logic for calculating the MessageId into Supplier. The Supplier is passed in when the resulting object is created, rather than directly evaluating the MessageId.
The Supplier calculation of MessageId is performed when the result getMsgId() method is called.
// CommitLog#DefaultAppendMessageCallback
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
// ...
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
// ...
return result;
}
Copy the code
// AppendMessageResult.java
public String getMsgId(a) {
// msgId lazy loading
if (msgId == null&& msgIdSupplier ! =null) {
msgId = msgIdSupplier.get();
}
return msgId;
}
Copy the code
To optimize thePerformance for message Header parsing (13-15)
Remove placeholders at the end of strings to save message transfer size
Optimize string format property storage. RocketMQ stores a Map as a string during message transfer, receives the message and parses it into a Map.
Strings store maps in this format:
key1\u0001value1\u0002key2\u0001value2\u0002
Copy the code
The Patch optimizes the \u0002 at the end of the string, saving 1 byte for each message.
Optimize the performance of string and map parsing to each other
Effects before and after optimization:
Benchmark Mode Cnt Score Error Units(10000 loop in each op)TempTest. MessageProperties2String THRPT 2 2257.276 ops/s TempTest. MessageProperties2String_old THRPT 2 1464.342 ops/s TempTest. String2messageProperties THRPT 2 1590.499 ops/s TempTest. String2messageProperties_old THRPT 2 605.118 ops/sCopy the code
- String to Map optimization
The optimization point is to pre-calculate the length that needs to be parsed into a string and then define the initial length for The StringBuilder.
A StringBuilder is a class that can dynamically increase its own data length, with a default length (the Capacity property) of 16. Its underlying structure is actually char[].
In high TPS scenarios, StringBuilder defaults to length 16 and processes a normal message at least twice internally, generating two objects and two group replicates for nothing.
So the optimization solution is to calculate the required length and specify it when you create a StringBuffer.
- Map to String optimization
You can see that the code on the right replaces the split method with indexOf and subString methods
The split method also uses indexOf and SubString internally, but creates a new ArrayList to hold the returned results and copy them to String[] when returned.
The method on the right saves the segmented string directly into map, avoiding the process of storing it in ArrayList, reducing copying and avoiding the loss of ArrayList expansion.
Optimize Broker request header decoding performance (15)
- Optimise parse performance for SendMessageRequestHeaderV2
RocketMQ’s communication protocol defines instructions with different headers that share a common parsing method based on reflection to parse and set message headers.
The efficiency of this Header parsing method is very low. In this optimization, the method of parsing the request Header for sending messages is defined separately, and the attributes in the Map are directly gotten to improve the efficiency.
The request header that sends the message will look something like:
{
"code":310."extFields": {"f":"0"."g":"1482158310125"."d":"4"."e":"0"."b":"TopicTest"."c":"TBW102"."a":"please_rename_unique_group_name"."j":"0"."k":"false"."h":"0"."i":"TAGS\u0001TagA\u0002WAIT\u0001true\u0002"
},
"flag":0."language":"JAVA"."opaque":206."version":79
}
Copy the code
public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
@CFNotNull
private String a; // producerGroup;
@CFNotNull
private String b; // topic;
@CFNotNull
private String c; // defaultTopic;
@CFNotNull
private Integer d; // defaultTopicQueueNums;
@CFNotNull
private Integer e; // queueId;
@CFNotNull
private Integer f; // sysFlag;
@CFNotNull
private Long g; // bornTimestamp;
@CFNotNull
private Integer h; // flag;
@CFNullable
private String i; // properties;
@CFNullable
private Integer j; // reconsumeTimes;
@CFNullable
private boolean k; // unitMode = false;
private Integer l; // consumeRetryTimes
@CFNullable
private boolean m; //batch
Copy the code
Receives the message, the Header will be decoded into SendMessageRequestHeaderV2 class
public CommandCustomHeader decodeCommandCustomHeader(Class<? extends CommandCustomHeader> classHeader)
throws RemotingCommandException {
CommandCustomHeader objectHeader;
try {
objectHeader = classHeader.newInstance();
} catch (InstantiationException e) {
return null;
} catch (IllegalAccessException e) {
return null;
}
if (this.extFields ! =null) {
Field[] fields = getClazzFields(classHeader);
for (Field field : fields) {
if(! Modifier.isStatic(field.getModifiers())) { String fieldName = field.getName();if(! fieldName.startsWith("this")) {
try {
String value = this.extFields.get(fieldName);
if (null == value) {
Annotation annotation = getNotNullAnnotation(field);
if(annotation ! =null) {
throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
}
continue;
}
field.setAccessible(true);
String type = getCanonicalName(field.getType());
Object valueParsed;
if (type.equals(StringCanonicalName)) {
valueParsed = value;
} else if (type.equals(IntegerCanonicalName1) || type.equals(IntegerCanonicalName2)) {
valueParsed = Integer.parseInt(value);
} else if (type.equals(LongCanonicalName1) || type.equals(LongCanonicalName2)) {
valueParsed = Long.parseLong(value);
} else if (type.equals(BooleanCanonicalName1) || type.equals(BooleanCanonicalName2)) {
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DoubleCanonicalName1) || type.equals(DoubleCanonicalName2)) {
valueParsed = Double.parseDouble(value);
} else {
throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
}
field.set(objectHeader, valueParsed);
} catch (Throwable e) {
}
}
}
}
objectHeader.checkFields();
}
return objectHeader;
}
Copy the code
static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2( RemotingCommand request)
throws RemotingCommandException {
SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2();
HashMap<String, String> fields = request.getExtFields();
if (fields == null) {
throw new RemotingCommandException("the ext fields is null");
}
String s = fields.get("a");
checkNotNull(s, "the custom field <a> is null");
r.setA(s);
s = fields.get("b");
checkNotNull(s, "the custom field <b> is null");
r.setB(s);
s = fields.get("c");
checkNotNull(s, "the custom field <c> is null");
r.setC(s);
s = fields.get("d");
checkNotNull(s, "the custom field <d> is null");
r.setD(Integer.parseInt(s));
s = fields.get("e");
checkNotNull(s, "the custom field <e> is null");
r.setE(Integer.parseInt(s));
s = fields.get("f");
checkNotNull(s, "the custom field <f> is null");
r.setF(Integer.parseInt(s));
s = fields.get("g");
checkNotNull(s, "the custom field <g> is null");
r.setG(Long.parseLong(s));
s = fields.get("h");
checkNotNull(s, "the custom field <h> is null");
r.setH(Integer.parseInt(s));
s = fields.get("i");
if(s ! =null) {
r.setI(s);
}
s = fields.get("j");
if(s ! =null) {
r.setJ(Integer.parseInt(s));
}
s = fields.get("k");
if(s ! =null) {
r.setK(Boolean.parseBoolean(s));
}
s = fields.get("l");
if(s ! =null) {
r.setL(Integer.parseInt(s));
}
s = fields.get("m");
if(s ! =null) {
r.setM(Boolean.parseBoolean(s));
}
return r;
}
Copy the code
On the left is actually a generic decoding method, on the right is for news production instruction SendMessageRequestHeaderV2 optimized decoding method. Instead of using a common parser, we simply set each attribute one at a time, achieving a roughly four-fold performance improvement.
The resources
- Apache RocketMQ 4.9.1 Path to High Performance Optimization
- How close is RocketMQ to the physical limit of performance?
- Look for better string.split performance in Java
- RocketMQ — Communication protocol