In the three processes, message sending is the most simple and easy to start with. It is suitable for junior middle school children as the starting point of MQ research and learning. Therefore, this article focuses on the process and details of sending a normal message in RocketMQ, a distributed message queue, from a message delivery point of view.

RocketMQ network Architecture Diagram

The network deployment architecture of the RocketMQ distributed message queue is shown in the following figure (where Producer Producer sends ordinary messages to the cluster).

RocketMQ deployment architecture.jpg

A few notes on the characters above:

(1) NameServer The RocketMQ cluster’s NameServer (or registry), which itself is stateless (there may be temporary inconsistencies on each NameServer instance, but with periodic updates it is mostly consistent), is used to manage the cluster’s metadata (for example, KV configuration, Topic, Broker registration information).

(2) Broker (Master) : The main node of RocketMQ message Broker server, which plays the role of connecting Producer’s message sending and Consumer’s message consumption, and storing messages.

(3) Broker (Slave) : RocketMQ message Broker server backup node, mainly through synchronous/asynchronous mode to synchronize messages from the master node to backup, to ensure the high availability of RocketMQ cluster;

(4) Producer: In this case, Producer is the Producer of ordinary messages. The messages are sent to the RocketMQ master node based on the RocketMQ-Client module.

For the relationship of several communication links in the figure above:

(1) Producer and NamerServer: Each Producer will establish a TCP connection with an instance in the NameServer cluster and pull Topic routing information from the NameServer instance.

(2) Producer and Broker: The Producer establishes a TCP connection with the Master Broker server associated with the topic it wants to send, which is used to send messages and timed heartbeat messages.

(3) Broker and NamerServer: The Broker (Master or Slave) establishes a TCP connection with each NameServer instance. The Broker registers its configured Topic information to each machine in the NameServer cluster at startup. That is, each NameServer has the Topic routing configuration of the broker. The Master is not connected to the Master, but the Master is connected to the Slave.

2. Demo method for sending ordinary messages

The RocketMQ source Project example package contains the simplest sample code for sending ordinary messages (ps: For those new to RocketMQ, use the sample code below for systematic learning and debugging).

We can directly run the “org. Apache. Rocketmq. Example. Simple” under the package Producer class is the main method can complete a common message sending (mainly the HTML code is as follows, here will need local NameServer and Broker instances are deployed) :

Iii. Full process interpretation of RocketMQ sending ordinary messages

As you can see from the previous section, the demo code for message producers to send messages is relatively simple, with only a few lines of code at its core, but after digging into RocketMQ’s Client module, the core process for sending messages is a bit more complex. The following will mainly analyze and elaborate the startup process of DefaultMQProducer, the sending method and the message processing of Broker agent server.

3.1 Startup process of DefaultMQProducer

In the demo code where the client sends normal messages, we start DefaultMQProducer instance, which calls the start() method of DefaultMQProducerImpl, the implementation class that generates messages by default.

@Override

public void start() throws MQClientException {

this.defaultMQProducerImpl.start();

}

DefaultMQProducerImpl, an implementation class that generates messages by default, starts as follows:

(1) Initialize MQClientInstance instance object and register it in local cache variable — producerTable;

(2) Save the default Topic (” TBW102 “) to the local cache variable — topicPublishInfoTable;

(3) MQClientInstance invokes its own start() method to start some client local service threads, such as pull message service, client network communication service, re-load balancing service, and several other scheduled tasks (including, Update the route/clean up the offline Broker/ send the heartbeat/persist the consumerOffset/ adjust the thread pool) and start again (this time with false);

(4) Finally send heartbeat packets to all Broker proxy server nodes;

To sum up, DefaultMQProducer’s main startup process is as follows:

DefaultMQProducer Starts the process of the start method

Here are a few points to note:

(1) In a client, a producerGroup can only have one instance;

(2) According to different clientId, MQClientManager will give different MQClientInstance;

(3) According to different producerGroup, MQClientInstance will give different MQProducer and MQConsumer (stored in local cache variables — producerTable and consumerTable);

3.2 Send The core process of the sending method

There are three main ways to send messages through Rocketmq’s client module:

(1) Synchronization mode

(2) Asynchronous mode

(3) Oneway

Methods (1) and (2) are commonly used to send messages. The specific method depends on the service situation. This section describes the core process of sending messages in combination with the synchronous sending mode. In the synchronous sending mode, if a message fails to be sent, a maximum of three retries are performed (you can also set this parameter). In other modes, one retry is performed. The entry to the core process of sending messages in synchronous mode is as follows:

3.2.1 Try to obtain the TopicPublishInfo routing information

If we debug it step by step, we’ll see that the sendDefaultImpl() method prevalidates the sent message first. If the message of the Topic and the Body are no problem, then will call – tryToFindTopicPublishInfo () method, according to the sent messages contained in the Topic to try from the Client side to find in the local cache variable – topicPublishInfoTable, If there is no will update the Topic from the NameServer routing information (including, call the updateTopicRouteInfoFromNameServer MQClientInstance instance method, Eventually enforce a MQClientAPIImpl instance getTopicRouteInfoFromNameServer method), here there are the following two scenarios:

(1) The producer sends the message for the first time (Topic does not exist in NameServer at this time) : the first fetch failed to pull down from the remote NameServer and update the local cache variable – topicPublishInfoTable successfully. Therefore, the second time we need to construct the TopicPublishInfo object from the TopicRouteData variable of the default Topic — TBW102, and update the topicPublishInfoTable, the local cache variable of the DefaultMQProducerImpl instance.

In addition, in this type of scenario, when a message is sent to the Broker proxy server, Business in SendMessageProcessor processor sendBatchMessage/sendMessage method of super msgCheck (CTX, requestHeader, response) pre check in, Will call TopicConfigManager createTopicInSendMessageMethod method, on the Broker to complete the creation of a new Topic and persistence to the configuration file (configuration file path: {rocketmq. Home. Dir} / store/config/switchable viewer. Json). (Ps: This part is actually outside the scope of the Broker, but it is mentioned slightly because it involves the creation of a new Topic.)

(2) The producer sends the existing message to the Topic: Because in the NameServer has been going on for the Topic, so in the first time when will be able to get them back to and updates to the local cache variable topicPublishInfoTable, then tryToFindTopicPublishInfo method can return directly.

The source code for this part of the core method in RocketMQ is as follows (annotated) :

/ * *

* Pull Topic routing information from the remote NameServer registry when it does not exist in the local cache

*

* @param topic

* @param timeoutMillis

* @param allowTopicNotExist

* @return

* @throws MQClientException

* @throws InterruptedException

* @throws RemotingTimeoutException

* @throws RemotingSendRequestException

* @throws RemotingConnectException

* /

public TopicRouteData getTopicRouteInfoFromNameServer(final

String topic, final long timeoutMillis,

boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {

GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();

requestHeader.setTopic(topic);

// After setting Topic parameters in the request header, send a request for Topic routing information

The request to the NameServer

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode

.GET_ROUTEINTO_BY_TOPIC, requestHeader);

// Return the response directly

RemotingCommand response = this.remotingClient.invokeSync(null,

request, timeoutMillis);

assert response ! = null;

switch (response.getCode()) {

// If NameServer has no Topic to send messages to

case ResponseCode.TOPIC_NOT_EXIST: {

if (allowTopicNotExist && ! topic.equals(MixAll

.DEFAULT_TOPIC)) {

log.warn(“get Topic [{}] RouteInfoFromNameServer

is not exist value”, topic);

}

break;

}

// If the fetch Topic exists, it is returned successfully, using TopicRouteData

Decode, and return TopicRouteData directly

case ResponseCode.SUCCESS: {

byte[] body = response.getBody();

if (body ! = null) {

return TopicRouteData.decode(body, TopicRouteData.class);

}

}

default:

break;

}

throw new MQClientException(response.getCode(),

response.getRemark());

}

The mapping of TopicRouteData to TopicPublishInfo is as follows:

Mapping TopicRouteData to TopicPublishInfo in Client. JPG

The TopicRouteData and TopicPublishInfo routing variables above are roughly as follows:

TopicRouteData variable content.jpg

TopicPublishInfo variable content.jpg

3.2.2 Selecting a queue for sending messages

After obtaining the TopicPublishInfo routing information, the RocketMQ client, by default, The selectOneMessageQueuef() method selects a queue (MessageQueue) from the messageQueueList in TopicPublishInfo to send the message. The specific fault-tolerant policies are defined in the MQFaultStrategy class:

public class MQFaultStrategy {

// Maintain the delay for each Broker to send messages

private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl();

// Send message delay fault tolerance switch

private boolean sendLatencyFaultEnable = false;

// Delay level array

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

// The length array is not available

private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

.

}

The sendLatencyFaultEnable switch is used to select which of the following:

(1) sendLatencyFaultEnable on: Filter out brokers that are not available based on random incremental modulus. A “latencyFaultTolerance” is a fixed amount of time to back off from previous failures. For example, if the latency of the last request exceeds 550Lms, back away from 3000Lms; Over 1000L, retreat to 60000L.

(2) sendLatencyFaultEnable switch off (default off) : select a queue (MessageQueue) to send messages by random incremental modulus.

/ * *

* Depending on whether sendLatencyFaultEnable is on or not

Circumstance select queue to send message

* @param tpInfo

* @param lastBrokerName

* @return

* /

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)

{

if (this.sendLatencyFaultEnable) {

try {

//1. Filter not available based on random increment modulus

Broker agent; For the previous failure, do retreat by a certain time

int index = tpInfo.getSendWhichQueue()

.getAndIncrement();

for (int i = 0; i < tpInfo.getMessageQueueList().

size(); i++) {

int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

if (pos < 0)

pos = 0;

MessageQueue mq = tpInfo.getMessageQueueList().

get(pos);

if (latencyFaultTolerance.isAvailable(mq.

getBrokerName())) {

if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

return mq;

}

}

final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();

int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

if (writeQueueNums > 0) {

final MessageQueue mq = tpInfo.

selectOneMessageQueue();

if (notBestBroker ! = null) {

mq.setBrokerName(notBestBroker);

mq.setQueueId(tpInfo.getSendWhichQueue().

getAndIncrement() % writeQueueNums);

}

return mq;

} else {

latencyFaultTolerance.remove(notBestBroker);

}

} catch (Exception e) {

log.error(“Error occurred when selecting message

queue”, e);

}

return tpInfo.selectOneMessageQueue();

}

//2. Select a queue by random incremental modulus

(MessageQueue) to send messages

return tpInfo.selectOneMessageQueue

(lastBrokerName);

}

3.2.3 Sending the Encapsulated RemotingCommand Packet

After selecting the queue to send the message, RocketMQ calls the sendKernelImpl() method to send the message (which is the core of the actual message sent through RocketMQ’s Remoting communication module). The following steps are completed in this method:

(1) according to the front to the MessageQueue brokerName, call MQClientInstance instance findBrokerAddressInPublish () method, which is sent in the message Broker proxy server address, If not, follow the new routing information;

(2) if not disable, sending messages will have a hook function before and after the execution of the (executeSendMessageHookBefore () /

ExecuteSendMessageHookAfter () method);

(3) encapsulate the information related to the message into a RemotingCommand packet, in which the RequestCode is one of the following:

A.send_message (ordinary sent message)

B. Send_message_v2 (optimize network packet sending) C. Send_batch_message (send messages in batches)

(4) According to the obtained Broke proxy server address, send the encapsulated RemotingCommand packet to the corresponding Broker. The default sending timeout is 3s.

(5) Here, the real call to RocketMQ’s Remoting communication module to complete message sending is in MQClientAPIImpl instance sendMessageSync() method, the code is as follows:

private SendResult sendMessageSync(

final String addr,

final String brokerName,

final Message msg,

final long timeoutMillis,

final RemotingCommand request

) throws RemotingException, MQBrokerException, InterruptedException {

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

assert response ! = null;

return this.processSendResponse(brokerName, msg, response);

}

(6) The processSendResponse method handles the normal and abnormal cases of sending and returns the sendResult object;

(7) After sending back, call updateFaultItem to update the available time of the Broker proxy server;

(8) for abnormal situation, and mark – retryAnotherBrokerWhenNotStoreOK, when set to true, at the time of failure, will choose to change a Broker;

After the producer sends the message, the client log is printed as follows:

SendResult [sendStatus=SEND_OK, msgId=020003670EC418B4AAC208AD46930000, offsetMsgId=AC1415A200002A9F000000000000017A, messageQueue=MessageQueue [topic=TopicTest, brokerName=HQSKCJJIDRRD6KC, queueId=2], queueOffset=1]

3.3 Broker Server message processing brief analysis

There are many Processor business processors in a Broker server, which are used to process different types of requests. One or more of these processors share a single Processor thread pool. For received messages, the Broker uses the business processor SendMessageProcessor to process them. SendMessageProcessor does the following in turn:

(1) Message pre-check, including whether the broker is writable, check whether queueId exceeds the specified size, and whether the Topic routing information in the message exists. If not, create a new one. This corresponds to the section “Trying to get routing information from TopicPublishInfo” above. If the Topic routing information does not exist, the Broker logs are as follows:

2018-06-14 17:17:24 INFO SendMessageThread_1 – receive

SendMessage request command, RemotingCommand [code=310,

language=JAVA, version=252, opaque=6, flag(B)=0,

remark=null, extFields={a=ProducerGroupName,

b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1528967815569,

h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD

46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]

2018-06-14 17:17:24 WARN SendMessageThread_1 –

The topic TopicTest not exist, producer: /172.20.21.162:62661

2018-06-14 17:17:24 INFO SendMessageThread_1 –

Create new topic by default topic:[TBW102] config:

[TopicConfig [topicName=TopicTest, readQueueNums=4,

writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG,

Producer topicSysFlag = 0, order = false]] : [62661] 172.20.21.162:

After Topic routing information is created and the second message is sent, the Broker logs are as follows:

2018-08-02 16:26:13 INFO SendMessageThread_1 –

receive SendMessage request command,

RemotingCommand [code=310, language=JAVA,

version=253, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest,

c=TBW102, d=4, e=2, f=0, g=1533198373524, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC20

8AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]

2018-08-02 16:26:13 INFO SendMessageThread_1 –

the msgInner’s content is:MessageExt [queueId=2,

storeSize=0, queueOffset=0, sysFlag=0,

bornTimestamp=1533198373524,

BornHost = / 172.20.21.162:53914, storeTimestamp = 0, storeHost = / 172.20.21.162:10911 msgId = null,

commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message

[topic=TopicTest, flag=0, properties={KEYS=OrderID188, UNIQ_KEY=020003670EC418B4AAC208AD46930000, WAIT=true,

TAGS=TagA}, body=11body’s content is:Hello world]]

(2) Build MessageExtBrokerInner;

(3) call “brokerController. GetMessageStore () putMessage”

Make MessageExtBrokerInner drop persistent;

BrokerStatsManager does some statistical updates based on message drop results (normal/abnormal) and sets Response and returns.

Four,

The flow of sending a normal message using a RocketMQ client is roughly done here. Sequential messages, distributed transaction messages, etc., will be introduced in the following pages, so stay tuned. Limited to the author of shallow talent, the content of this article may not understand the place in place, if there is unreasonable explanation of the place also hope to discuss the message together.