1. Introduction
MQProducer is the producer interface provided by RocketMQ. The default implementation is DefaultMQProducer. If a transaction message is to be sent, the corresponding implementation class is TransactionMQProducer. Instead of discussing transactional messages, this article focuses on the process of starting and sending messages by default producers.
DefaultMQProducer is simply a cosmetic class that RocketMQ exposes to the user. It holds an internal producer implementation DefaultMQProducerImpl, which is the concrete performer. The advantage of using this structure is that RocketMQ can change implementation classes at will in subsequent releases, all of which is zero awareness to the user.
RocketMQ is based on Netty. Since Producer interacts with NameServer and Broker, RocketMQ is also a Netty client. When Producer starts, he starts Netty clients simultaneously. It then pulls the Broker information to the NameServer and sends the heartbeat to the Broker. When the message is sent, the Topic route message is pulled from the NameServer, a MessageQueue is polled, and the message is sent to the associated Broker.
2. Source code analysis
The author draws the sequence diagram of Producer starting and message sending:
2.1 Starting the Client
Before sending a message, you first need to create Producer and then start it.
DefaultMQProducer producer = new DefaultMQProducer(GroupName);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Copy the code
1. In the constructor of DefaultMQProducer, the producer implementation class DefaultMQProducerImpl is created.
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
// Namespace
this.namespace = namespace;
// The producer group name
this.producerGroup = producerGroup;
// Producer implementation
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
Copy the code
2. Start Producer. The core of the Producer is to start DefaultMQProducerImpl because it is only the appearance class. If message tracing is enabled, the TraceDispatcher service is also enabled, which is not discussed here.
public void start(a) throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null! = traceDispatcher) {try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e); }}}Copy the code
DefaultMQProducerImpl starts to do the following:
- Check GroupName
- Set the InstanceName
- Create MQClientInstance
- Register Producer to producerTable
- Start the MQClientInstance
- Heartbeat to all brokers
The checkConfig() method checks the validity of GroupName, which cannot be too long, cannot contain special characters, and cannot use DEFAULT_PRODUCER, etc.
If InstanceName is not set, it is automatically set to process PID+#+ timestamp.
public void changeInstanceNameToPID(a) {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#"+ System.nanoTime(); }}Copy the code
Then, according to ClientID to get the MQClientInstance, no new client instance will be created and ProducerGroup will be registered with the MQClientInstance.
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
Copy the code
After successful registration, MQClientInstance can be launched.
4. An MQClientInstance represents a client instance, which internally maintains a Netty client, NettyRemotingClient, to communicate with the server. This object is heavy and avoids frequent creation. The startup process of MQClientInstance does the following:
- Send a request for NameServerAddr
- Start the Netty client
- Start scheduled tasks
- Start message pull service (consumer)
- Start load Balancing Service (Consumer)
First, if you don’t manually set NameServerAddr, the client will try to read the environment variable RocketMq.namesrv.domain, which is expected to be a URL link, and the client will send an HTTP request every 2 minutes to update NameServerAddr. Why do you do that? Hardcoded NameServerAddr is not flexible because NameServer is likely to be a cluster environment with several machines whose IP addresses are not fixed. You can send these data flexibly through the configuration center.
if (null= =this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
Copy the code
With NameServerAddr, you can start the Netty client for communication. MQClientAPIImpl has the NettyRemotingClient built in.
this.mQClientAPIImpl.start();
Copy the code
After the Netty client is started, it can communicate with the server by pulling Topic routing information from NameServer and sending heartbeat messages to the Broker. All of these are performed using scheduled tasks. Therefore, the Producer can start various scheduled tasks.
this.startScheduledTask();
Copy the code
5. Start various scheduled tasks as follows: Update NameServerAddr every two minutes:
if (null= =this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e); }}},1000 * 10.1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
Copy the code
Update Topic routing information periodically from NameServer:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); }}},10.this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
Copy the code
Clearing offline brokers and sending heartbeat messages:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); }}},1000.this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
Copy the code
6.Net Ty client starts, and the scheduled task starts. Finally, Producer sends a heartbeat to all brokers. After sending the heartbeat, if the Producer is a consumer, FilterClassSource is uploaded to the Broker, which performs the message filtering logic.
public void sendHeartbeatToAllBrokerWithLock(a) {
if (this.lockHeartbeat.tryLock()) {
try {
// Send heartbeat to all brokers
this.sendHeartbeatToAllBroker();
// Consumer: upload FilterClassSource, which is used by Broker message filtering
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock(); }}else {
log.warn("lock heartBeat, but failed. [{}]".this.clientId); }}Copy the code
At this point, the service is started.
2.2 Message Sending
There are three modes of sending messages: synchronous sending, asynchronous sending, and unidirectional sending. Synchronous sending blocks and waits for the client to respond with an ACK before proceeding further. Asynchronous sending is non-blocking, and a callback function is registered when the message is sent. The callback is executed on success/failure. One-way send just sends out the request, regardless of whether it succeeds or not, is the fastest, at the subtle level.
For the moment, we will only discuss synchronous sending. The corresponding API is as follows:
Message msg = new Message("Topic Name"."message body".getBytes());
SendResult result = producer.send(msg);
Copy the code
The message sending process can be divided into three layers: the interface layer, the core message processing layer and the network transmission layer. The API called by the client is the interface layer, which is very simple. However, the Producer has to process a series of messages and finally send them to the server through Netty.
1. There’s nothing to see at the interface layer, just call the RocketMQ API and look directly at DefaultMQProducerImpl’s sendDefaultImpl method. It mainly does the following:
- Check the message
- Get Topic publishing information
- Processing message retry
The sent message is verified first. The Topic must be valid and cannot be sent to any predefined Topic. The message Body must not be empty, and empty messages are meaningless to RocketMQ.
Validators.checkMessage(msg, this.defaultMQProducer);
Copy the code
Once the message is verified, it can be sent, but where should it be sent?
The Producer reads from the cache the TopicPublishInfo corresponding to the Topic, which is the publishing information for the Topic, including the queues under the Topic, and the routing information for the Topic. If there is no data in the cache, NameServer is asked to pull it.
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// Get it from the local cache
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null== topicPublishInfo || ! topicPublishInfo.ok()) {// No cache, or invalid
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// send a request to pull from NameServer and update the cache
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true.this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
returntopicPublishInfo; }}Copy the code
TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
Copy the code
Use a String array to record brokers that failed to be sent and skip them when retry.
String[] brokersSent = new String[timesTotal];
Copy the code
In order to improve the efficiency of message production and consumption and to store messages in fragments, there can be multiple message queues under a single Topic. Which MessageQueue should the message be sent to?
The Producer calls the selectOneMessageQueue method and selects a MessageQueue from the list, which is a polling algorithm by default.
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
Copy the code
The selected MessageQueue contains the Broker on which the queue is stored and its QueueID.
/ / by Topic
private String topic;
// The stored Broker
private String brokerName;
/ / queue ID
private int queueId;
Copy the code
With MessageQueue, the Producer knows which Broker to send the message to, and then calls sendKernelImpl, the core message sending method.
2. SendKernelImpl is the core method for sending messages, which does the following:
- Find the Master address according to BrokerName
- Message compression
- Set message sysFlag
- Executing hook functions
- Build the Header and call the API to send it
Find the Broker host address based on the BrokerName where the MessageQueue is located. Brokers can be clustered, but messages will only be sent to the Master, so you must find the brokerId=0 machine.
public String findBrokerAddressInPublish(final String brokerName) {
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if(map ! =null && !map.isEmpty()) {
// Find Broker address with brokerId 0
return map.get(MixAll.MASTER_ID);
}
return null;
}
Copy the code
If it is not found in the local cache, it is pulled from NameServer.
if (null == brokerAddr) {
// Cache not found, try to pull from NameServer
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
Copy the code
Having the Broker host address, the Producer can send it, but before sending it, the Producer does a bunch of processing. For example, call the tryToCompressMessage method to try to compress the message Body to save bandwidth. Using the Zip compression algorithm, the precondition for compression is that the Body is larger than 4KB. If the Body is compressed, the Broker must know that the message must be decompressed when it is stored, or the message will become unreadable. How do you let the Broker know? Each Bit has a different meaning. The first Bit indicates whether the Body of the message is compressed.
if (this.tryToCompressMessage(msg)) {
// Try to compress the message body, if so, set the sysFlag first position to 1
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
Copy the code
The third bit indicates whether the message is transactional, and if so, set it to 1.
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if(tranMsg ! =null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
Copy the code
After the message is processed and ready to be sent, the preceding hook function is executed before sending. It then becomes important to create a request header object that sends the message to SendMessageRequestHeader, which tells the Broker who produced the message, which Topic it belongs to, what properties the message has, and so on.
public class SendMessageRequestHeader implements CommandCustomHeader {
@CFNotNull
Which producer group does the message come from?
private String producerGroup;
@CFNotNull
// Topic to which the message belongs
private String topic;
@CFNotNull
/ / the default Topic
private String defaultTopic;
@CFNotNull
// The default number of queues
private Integer defaultTopicQueueNums;
@CFNotNull
// The ID of the message to be sent to MessageQueue
private Integer queueId;
@CFNotNull
/** * system flag, the first Bit indicates whether the Body is compressed */
private Integer sysFlag;
@CFNotNull
// When the message was created
private Long bornTimestamp;
@CFNotNull
/ / the message Flag
private Integer flag;
@CFNullable
// Custom attributes, HashMap concatenated into strings
private String properties;
@CFNullable
// The number of repeated purchases
private Integer reconsumeTimes;
@CFNullable
//
private boolean unitMode = false;
@CFNullable
// Whether to batch messages
private boolean batch = false;
// Maximum number of repeated consumption
private Integer maxReconsumeTimes;
}
Copy the code
With the request header set up, you can create the RemotingCommand object. RemotingCommand is a RocketMQ communication protocol object. Whether a request or response is transmitted via Netty, it is a sequence of bytes serialized by RemotingCommand. This can be quickly created using RequestCode and RequestHeader, which corresponds to the RequestCode and tells the server what request to make. The RequestHeader corresponds to the request parameter, and the corresponding RequestCode to send the message is SEND_MESSAGE.
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
Copy the code
There is also an optimized V2 version of the RequestCode that sends messages, which simply simplifies the RequestHeader property name so that FastJson performance is better.
Once RemotingCommand is created, it can be sent to the Broker via Netty using the invokeSyncImpl method, which calls RPC hook functions before and after network requests.
doBeforeRpcHooks(addr, request);
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
Copy the code
The invokeSyncImpl ultimately channels RemotingCommand to the server, registers a callback, and writes the result back to the ResponseFuture.
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed."); }});Copy the code
At this point, the whole process of sending the message ends.
3. Summary
DefaultMQProducer is a producer appearance class provided by RocketMQ. The core implementation is DefaultMQProducerImpl. The purpose of this is to change producer implementations at any time without user awareness. The producer communicates with the Broker and is therefore a Netty client. When the producer is started, the Netty client is also started, and then pulls messages from NameServer through a series of scheduled tasks and sends heartbeat to the Broker.
When the message is sent, the Producer prevalidates the message, checks the routing information for the Topic to the NameServer, polls one of the many MessageQueue, and searches for the Master host address according to the BrokerName corresponding to the MessageQueue. Build request header SendMessageRequestHeader and RemotingCommand objects to send the request to the Broker via Netty.