preface
In service discovery, we know that the Broker will start a scheduled task to register its routing information to each NameServer when starting, and the Producer will pull and update the routing information corresponding to the Topic to the local NameServer when starting. This routing information, which we know from the previous article is TopicRouteData, contains BrokerDatas and QueueDatas. When a Producer sends a message, it is clear that the Producer gets the local route for the Topic first. What if the Producer can’t find the route locally? If TopicRouteData is found, a topic corresponds to multiple QueueData, and each Queue records the Broker to which it belongs. How does a topic select routing information and what load-balancing strategy is used to select a Queue? Find out with you.
Explore message sending
Our code for sending a message using RocketMq is as follows:
DefaultMQProducer producer = new DefaultMQProducer(topic); Producer. SetNamesrvAddr (127.0.0.1: "9876"); producer.start(); Message msg = new Message(topic,"testTag","testKey","testMessage".getBytes()); SendResult result = producer.send(msg);Copy the code
DefaultMQProducerImpl send(); defaultmQproducer.send ();
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
Copy the code
2, the real message is sent in DefaultMqProducerImpl. SendDefaultImpl () method
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException{ ... Are in front of the check TopicPublishInfo TopicPublishInfo = this. TryToFindTopicPublishInfo (MSG) getTopic ()); if (topicPublishInfo ! = null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected ! = null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() ! = SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; }... Following this is some exception handling}Copy the code
3, we see the first call tryToFindTopicPublishInfo (topic), according to the message topic topic for routing information. Let’s see what’s going on in this method
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
Copy the code
- This method takes TopicPublishInfo from the topicPublishInfoTable, If not call mQClientFactory. UpdateTopicRouteInfoFromNameServer (topic) from the remote nameServer topic for routing information, the analysis has been made in the article the service discovery. Then put the remote ones into the topicPublishInfoTable.
- NameServer does not have routing information for a topic. If you have not configured a topic and the message is sent for the first time, nameServer has no routing information for that topic. Here is executed this. MQClientFactory. UpdateTopicRouteInfoFromNameServer (topic, true, enclosing defaultMQProducer); Look at the familiar methods updateTopicRouteInfoFromNameServer (topic, isDefault, defaultMQProducer)
So isDefault=true and defaultMQProducer! = null, so I went to this branch, call getDefaultTopicRouteInfoFromNameServer topic by default routing information.
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
}
Copy the code
The createTopicKey we see is the specified Topic, called “TBW102”, which is automatically created when the broker allows automatic creation of a Topic.
Obtain routing information based on topic. Get it from the local directory first and return it when it is obtained locally. Otherwise, go through the service discovery logic again, fetch it from the remote NameServer, and return it. If the TopicX message reaches the Broker, it automatically creates a route for the TopicX Topic. If the TopicX message reaches the Broker, it automatically creates a route for the TopicX Topic. Then synchronize to NameServer.
4. Select a Queue to send messages. In service discovery we know that NameServer returns TopicRouteData with queueDatas and brokerDatas inside. QueueData contains information about all queues corresponding to a Topic. The structure is as follows:
public class QueueData implements Comparable<QueueData> { private String brokerName; //Queue owning Broker private int readQueueNums; Private int writeQueueNums; // The number of read queues configured for this Topic on this Broker. // The number of write queues configured for this Topic on the Broker. private int topicSynFlag; }Copy the code
For RocketMq, a Queue is an abstract concept, not a specific Queue. Topic, QueuData, and Broker are 1:1:1, and QueueData is essentially a record of all routing information for a Topic on a Broker.
In the previous step, when the producer retrieves TopicRouteData for a Topic from NameServer, it converts it to TopicPublishInfo, which is stored in the local routing table.
{ TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl ! = null) { impl.updateTopicPublishInfo(topic, publishInfo); }}}Copy the code
Inside the topicRouteData2TopicPublishInfo, traverses TopicRouteData QueueData, according to the configuration of the read and write the number of queue, generate MessageQueue, in TopicPublishInfo.
List<QueueData> qds = route.getQueueDatas(); Collections.sort(qds); for (QueueData qd : qds) { if (PermName.isWriteable(qd.getPerm())) { BrokerData brokerData = null; for (BrokerData bd : route.getBrokerDatas()) { if (bd.getBrokerName().equals(qd.getBrokerName())) { brokerData = bd; break; } } if (null == brokerData) { continue; } if (! brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { continue; } for (int i = 0; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); }}}Copy the code
5. Go back to where the message was sent. As long as times < timesTotal does not exceed the number of retries and timeout < costTime, retries can be sent within the timeout period.
6, enclosing selectOneMessageQueue (topicPublishInfo lastBrokerName); The load balancer selects a MessageQueue to send. The logic for selecting a Queue is in the selectOneMessageQueue(lastBrokerName) method of TopicPublishInfo
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
Copy the code
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
Copy the code
It can be seen that the load balancing strategy is the modulus taking algorithm of the counter. When lastBrokerName is empty, the counter is incremented and its value modulates the number of messaeQueue lists to obtain the corresponding subscript MessageQueue. When lastBrokerName is not empty, the counter is incremented and then traversed through the messaeQueue list. The counter value is modulated to the number of messageQueue lists to obtain the corresponding index. If the corresponding index is still lastBroker, the loop is repeated. Otherwise return the Broker with the corresponding subscript.
7, then there is the core of the message flow, in DefaultMqProducerImpl. SendKernelImpl () method First of all to get the Broker Queue corresponding address
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
Copy the code
Once you have the Broker address, encapsulate the message content and other messages into the request header
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); .Copy the code
The sendMessage() method of MQClientAPIImpl is then called
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
Copy the code
In sendMessage(), the encapsulated request is called to the encapsulated Netty for network transmission. The first is to encapsulate the request RemotingCommand and set the message content.
RemotingCommand request = null; String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); boolean isReply = msgType ! = null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG); if (isReply) { if (sendSmartMsg) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader); } } else { if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } } request.setBody(msg.getBody());Copy the code
The calls are then made separately depending on whether the sending mode is one-way, asynchronous, or synchronous.
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
Copy the code
You can see that one-way is invokeOneway and then null is returned, and asynchronous is sendMessageAsync() passing in sendCallback. Synchronization is when sendMessageSync waits for the result and returns it. At this point, the sending of the message is complete.
Let’s review the entire process of sending a message
conclusion
We saw several keywords DefaultMqProducerImpl messages, topicPublishInfoTable, updateTopicPublicInfoFromNameServer, the default Topic (TBW102), TopicPublishInfo. SelectOneMessageQueue and MQClientAPIImpl. SendMessage (). By the end of this article, you can answer the following questions
- What is the process of obtaining the routing for the topic in the first step? What if I don’t get it from Nameserver?
- After obtaining TopicPublicInfo and receiving multiple MessageQueue, how to choose one MessageQueue for load balancing to send?
- What is the general process of sending messages? What are the last three ways to make a network call?