sequence
This article focuses on RocketMQ’s maxMessageSize
DefaultMQProducer
Rocketmq – the client – 4.5.2 – sources jar! /org/apache/rocketmq/client/producer/DefaultMQProducer.java
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private final InternalLogger log= ClientLogger.getLog(); / /... /** * Maximum allowed message sizein bytes.
*/
private int maxMessageSize = 1024 * 1024 * 4; // 4M
public int getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
returnmsgBatch; } / /... }Copy the code
- DefaultMQProducer defines maxMessageSize, which defaults to 4M; Both the send and Batch methods call validators. checkMessage(Message, this) to verify messages
Validators
Rocketmq – the client – 4.5.2 – sources jar! /org/apache/rocketmq/client/Validators.java
public class Validators {
public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); public static final int CHARACTER_MAX_LENGTH = 255; / /... /** * Validate message */ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: "+ defaultMQProducer.getMaxMessageSize()); }} / /... }Copy the code
- CheckMessage method checkTopic first, and then check MSG. GetBody () is null, or whether the length is 0, the last check if length is greater than the defaultMQProducer. GetMaxMessageSize (), An MQClientException is thrown if the validation fails
MessageStoreConfig
rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
public class MessageStoreConfig {
//The root directory in which the log data is kept
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//The directory in which the commitlog is kept
@ImportantField
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog"; / /... // The maximum size of message,default is 4M private int maxMessageSize = 1024 * 1024 * 4; public intgetMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(int maxMessageSize) { this.maxMessageSize = maxMessageSize; } / /... }Copy the code
- MessageStoreConfig defines the maxMessageSize property, which defaults to 4M
Specify maxMessageSize=65536 in the conf/broker.conf of the RocketMQ installation directory
DefaultMessageStore
rocketmq/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log= InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private final MessageStoreConfig messageStoreConfig; // CommitLog private final CommitLog commitLog; / /... public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {if (this.shutdown) {
log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if(! this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if(null == result || ! result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); }returnresult; } / /... }Copy the code
- DefaultMessageStore putMessages method can judge messageExtBatch getBody (). If length is greater than the messageStoreConfig. GetMaxMessageSize (), PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, NULL)
summary
Both rocketMQ client and Broker verify whether the message body size exceeds maxMessageSize. DefaultMQProducer on the client side defines maxMessageSize, which is 4M by default. The send and Batch methods both call validators. checkMessage(message, this) to verify the message; The server conf/broker.conf can specify the maxMessageSize size; If the maxMessageSize size needs to be changed, it needs to be changed together with the server
doc
- DefaultMQProducer