preface
RocketMQ producer source code analysis I will divided into 3 introduces the three articles, this article is the first article mainly introduces the start-up process of producers and synchronous message transmission process, due to the limited level of the author to write the wrong welcome to comment, like writing is good please point a praise, your thumb up support is the power of I insist on writing.
The noun is introduced
They are producers of messages.
Responsible for production messages, which are generally handled by business systems. A message producer sends messages generated in the business application to the Broker server. RocketMQ provides multiple delivery modes: synchronous, asynchronous, sequential, and unidirectional. Both synchronous and asynchronous require the Broker to return an acknowledgement message, but one-way does not.
From RocketMQ
Producer Group
A set of producers of the same kind who send the same kind of messages and send them logically. If a transaction message is sent and the original producer crashes after sending, the Broker server contacts other producer instances in the same producer group to commit or backtrack consumption.
From RocketMQ
Topic
Represents a collection of a class of messages, each containing several messages, each belonging to only one topic, and is the basic unit of Message subscription for RocketMQ.
From RocketMQ
Message
The physical carrier of information transmitted by a message system, the smallest unit that produces and consumes data, and each message must belong to a topic. Each Message in RocketMQ has a unique Message ID and can carry a Key with a business identity. The system supports Message query by Message ID and Key.
From RocketMQ
Questions lead
- Synchronous send, asynchronous send, sequential send, one-way send, this several ways what is the difference, respectively is how to achieve.
- How do producers ensure that messages are delivered correctly when the primary node of a cluster of primary and secondary brokers is down (or can messages be delivered smoothly when the primary node is down)?
Introduction to core Classes
org.apache.rocketmq.client.producer.DefaultMQProducer
: Default implementation class for message producers, providing methods for sending messages.org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
: encapsulates the logic of sending messages.org.apache.rocketmq.client.producer.TransactionMQProducer
: transaction message producer inheritsDefaultMQProducer
What a transaction message is and how it is implemented will not be covered in this article but will be covered in the next article.org.apache.rocketmq.client.producer.MessageQueueSelector
: Selects the interface class of MessageQueue when sending messages.org.apache.rocketmq.client.impl.factory.MQClientInstance
This class encapsulates a common implementation of the client (producer and consumer).
Learning point
- By looking at the
DefaultMQProducer
Source code we will findDefaultMQProducerImpl
It is a member variable that enhances itself through composition rather than inheritance (more composition and less inheritance, because inheritance is not flexible, and when there are too many inheritance layers, the code is difficult to understand and maintain). DefaultMQProducer
Implements the interfaceorg.apache.rocketmq.client.producer.MQProducer
The interfaceMQProducer
Provides a simple, easy-to-understand method definition that lets users not worry about the internal complexity of the implementation, out of the box, which is actually used hereFacade design patternMQProducer
This is the facade (The main function of facade mode is to provide an interface for clients to access the system and hide the internal complexity of the system).
Source code analysis
Start the process
Start with the producer-initiated process (omit some non-critical code such as log printing, exception catching, etc.)
// DefaultMQProducerImpl#start
public void start(final boolean startFactory) throws MQClientException {
/ / # 1)
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
/ / # 2)
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
/ / # 3)
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if(! registerOK) {this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
/ / # 4
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}".this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
/ / # 5)
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
/ / # 6
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run(a) {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e); }}},1000 * 3.1000);
}
Copy the code
① : Perform different actions according to different states (actually this is a variant implementation of the state mode).
(2) : get into getOrCreateMQClientInstance MQClientInstance instance method we find MQClientInstance instance objects within the process is the only (with the help of simple factory pattern). Here the org. Apache. Rocketmq. Client. Impl. MQClientManager use the singleton pattern
③ Register yourself (the producer) in MQClientInstance.
④ start the client instance service.
⑤ : Send heartbeat messages to all brokers.
⑥ : Periodically scans the expired requests sent by the client.
Let’s look at how the client instance sends heartbeat information to all brokers.
#MQClientInstance#sendHeartbeatTimesTotal public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker(); this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed. [{}]", this.clientId); }} ------------------------------------------------------------------------------------------------------------------------ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - private void sendHeartbeatToAllBroker () {/ / # 1) preparing a heartbeat data final HeartbeatData heartbeatData = this.prepareHeartbeatData(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); if (producerEmpty && consumerEmpty) { log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId); return; } if (! this.brokerAddrTable.isEmpty()) { long times = this.sendHeartbeatTimesTotal.getAndIncrement(); Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, HashMap<Long, String>> entry = it.next(); String brokerName = entry.getKey(); HashMap<Long, String> oneTable = entry.getValue(); if (oneTable ! = null) { for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) { Long id = entry1.getKey(); String addr = entry1.getValue(); if (addr ! = null) { if (consumerEmpty) { if (id ! = MixAll.MASTER_ID) continue; } to try {/ / communicate with Broker sends a heartbeat data int version = this. MQClientAPIImpl. SendHearbeat (addr, heartbeatData, 3000); if (! this.brokerVersionTable.containsKey(brokerName)) { this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4)); } this.brokerVersionTable.get(brokerName).put(addr, version); if (times % 20 == 0) { log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); log.info(heartbeatData.toString()); } } catch (Exception e) { if (this.isBrokerInNameServer(addr)) { log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); } else { log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr, e); } } } } } } } }Copy the code
So far the start process analysis of the producer is finished, we will comb it.
- To obtain
MQClientInstance
Instance. - to
MQClientInstance
Instance object registers the producer. - Start the
MQClientInstance
Service. - Send heartbeats to all brokers.
- Example Start the scheduled task of scanning expired requests.
A synchronous message
Next, we analyze the sending process of synchronous messages.
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
/ / # 1)
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
/ / # 2)
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if(topicPublishInfo ! =null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
/ / # 3)
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
/ / # 4
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if(mqSelected ! =null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
/ / # 5)
if (timeout < costTime) {
callTimeout = true;
break;
}
/ / # 6
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime/* The timeout needs to be subtracted from the time spent */);
endTimestamp = System.currentTimeMillis();
/ / # 7
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if(sendResult.getSendStatus() ! = SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue; }}return sendResult;
default:
break; }}catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if(sendResult ! =null) {
return sendResult;
}
throwe; }}catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
throwe; }}else {
break; }}if(sendResult ! =null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
/ / # today
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
Copy the code
① : Confirm the service status of the producer.
② : Get subscription information by topic.
③ : Retry times of message sending failure.
④ : Select a MessageQueue.
⑤ : Check whether timeout, we can see from the code for multiple times of timeout detection, this segmented detection method can facilitate us to locate the problem.
⑥ : Sends messages.
⑦ : Registration failure.
⑧ : Check whether the Namesrv address is configured.
Next, we will analyze ⑥ in detail.
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
// Get the broker address
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
// Get the broker address
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if(brokerAddr ! =null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if(! (msginstanceof MessageBatch)) {
/ / # 1)
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null! =this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
/ / # 2)
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
// Determine whether the message is a transaction message
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if(tranMsg ! =null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// Hook
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
// Hook
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if(isTrans ! =null && isTrans.equals("true")) {
// Determine if it is a transaction half-message
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null|| msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) ! =null) {
// Determine whether the message is delayed
context.setMsgType(MessageType.Delay_Msg);
}
// Message context is handled by the hooks before the message is sent
this.executeSendMessageHookBefore(context);
}
// Construct the message request header
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if(reconsumeTimes ! =null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if(maxReconsumeTimes ! =null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
// Asynchronous messages
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if(! messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned =true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
// Determine whether the timeout is timed out again, which is helpful for troubleshooting
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
// One-way message
case ONEWAY:
// Synchronize messages
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
/ / # 3)
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); }}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist".null);
}
Copy the code
① : Set a batch number for batch messages.
(2) Check whether messages need to be compressed. Batch messages in the current version do not support compression.
③ According to different exceptions, execute different hooks to finish.
conclusion
Through the above source code analysis we can try to answer the question we raised at the beginning.
- When a node in the Broker cluster becomes unavailable, the producer retries with a failure policy to ensure that the message is delivered correctly.
- From the above source code analysis, we can see that the synchronous send main process is always a thread to do all the work (get route new -> message assembly -> wait for the delivery result)