Producer Sending process
Before sending a message, a producer must first know the information about the topic and which broker the topic is on. How does a producer do this?
Remember at the beginning of 01? Namesrv provides an interface for obtaining routing information from topic (RouteInfoManager#pickupTopicRouteData). Producer returns TopicRouteData based on this interface. Know which broker to send the Topic to.
TopicRouteData
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code
From the structure of the return, you know the broker address and what queues are available under the topic, and then you can send a message.
After obtaining the information of the topic, the producer will encapsulate the information into another data structure
class TopicPublishInfo {...private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = newThreadLocalIndex(); . }class MessageQueue {
private String topic;
private String brokerName;
private int queueId;
}
Copy the code
A messageQueueList looks something like this
[{"brokerName":"broker-a"."queueId":0."topic":"TBW102%zouLeTopic"
},
{
"brokerName":"broker-a"."queueId":1."topic":"TBW102%zouLeTopic"
},
{
"brokerName":"broker-a"."queueId":2."topic":"TBW102%zouLeTopic"
},
{
"brokerName":"broker-a"."queueId":3."topic":"TBW102%zouLeTopic"
},
{
"brokerName":"broker-b"."queueId":0."topic":"TBW102%zouLeTopic"
},
{
"brokerName":"broker-b"."queueId":1."topic":"TBW102%zouLeTopic"
},
{
"brokerName":"broker-b"."queueId":2."topic":"TBW102%zouLeTopic"
},
{
"brokerName":"broker-b"."queueId":3."topic":"TBW102%zouLeTopic"}]Copy the code
As you can see, topic TBW102%zouLeTopic above exists in both broker-B and broker-A
We know that topic can have many queues, so how does a producer choose which queue to send messages to?
The answer is to poll from messageQueueList to select a queue to send (each thread has its own counter).
TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
// In most cases, this function is used
return selectOneMessageQueue();
} else {
// If lastBrokerName is not null, a broker with a different name than lastBrokerName will be selected.
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if(! mq.getBrokerName().equals(lastBrokerName)) {returnmq; }}// If not, return
returnselectOneMessageQueue(); }}public MessageQueue selectOneMessageQueue(a) {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
Copy the code
What happens if a producer fails to send a message?
Code from the TopicPublishInfo#selectOneMessageQueue(Final String lastBrokerName) method. As you can see, if lastBrokerName is not null, a broker with a different name than lastBrokerName is returned. So when will lastBrokerName not be null? If the message fails to be sent and is retried, it will not be null. (Default: 2 retries at most)
The send delay fails on the producer
In fact, on the producer side, sendLatencyFaultEnable controls whether send delay failover is enabled. However, this parameter defaults to false. When this parameter is enabled, the preceding logic is not performed.
The logic implementation is under this class: MQFaultStrategy What this class does is determine how long a producer should not select the broker to send messages based on the time it takes to send messages to the broker.
The producer already knows which broker to send to, so what types of sending methods does rocketMQ provide? What are the differences?
The producer provides three modes: synchronous, asynchronous, and unidirectional
Synchronization: The producer waits for the broker to trigger the producer’s retry mechanism
Asynchronous: The producer is in the thread pool. The logic of sending messages can trigger the retry mechanism of the producer
In fact, when sending asynchronously, you can only retry errors that occur on the broker side. If there is a network problem, it is impossible to retry
One way: One way to send means to send, regardless of whether the message was sent successfully or not. The retry mechanism of the producer cannot be triggered
Asynchronous send retry logic MQClientAPIImpl#sendMessageAsync
private void sendMessageAsync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final AtomicInteger times,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
// Todo has no callback set
if (null== sendCallback && response ! =null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
if(context ! =null&& sendResult ! =null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); }}catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return;
}
if(response ! =null) {
try{... Business error in}catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
// Todo has an exception, and this method will call message sending again
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer); }}else {
/ /...
if(! responseFuture.isSendRequestOK()) {// ..
// Todo has an exception, and this method will call message sending again
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
// ...
// Todo has an exception, and this method will call message sending again
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
// ...
// Todo has an exception, and this method will call message sending again
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer); }}}}); }Copy the code
Finally, the producer will retry in the onExceptionImpl method (default: 2 retries at most)
conclusion
Message Sending Process
- Gets the broker to which the topic belongs
- Polling to select a queue
- If the send fails (synchronous, asynchronous), retry (default: 2 times)
other
- If a network fault occurs during asynchronous transmission, retry cannot be performed
- The producer side has parameters
sendLatencyFaultEnable
Can be openedSend delay failover mechanism - when
producer
When a message fails to be sent, the default policy is to avoid the broker that failed last time (if it is a two-master structure)
The design of the rocketMQ message protocol
Finished, the sending process of producer. When communicating with the broker, the producer must follow certain protocols. How is the message protocol designed in rocketMQ?
Before we talk about that, let’s think about, how do you design the protocol? What elements are needed?
A protocol normally contains the following information
- Magic number (not required), used to determine if the packet is invalid in the first place
- The version number can support protocol upgrade
- The serialization algorithm, which serialization and deserialization of the message body, can be extended from this, for example: JSON, Protobuf, Hessian, JDK
- Instruction type, business related. For example, obtain routing information and register broker information
- Request number (not required) to provide asynchronous capability for duplex communication
- The length of the message
- The message body
RocketMQ message protocol design
NettyEncoder#encode()
RemotingCommand
RocketMQ message protocol class
public class RemotingCommand {
// Instruction type
private int code;
private LanguageCode language = LanguageCode.JAVA;
/ / version number
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
// Message body
private transient byte[] body;
// Netty calls this method to encode the message
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
returnresult; }}Copy the code
Client Netty thread model
NettyRemotingClient
Netty Bootstrap build
this.bootstrap
.group(this.eventLoopGroupWorker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ...
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
// ...}});Copy the code
Work Number of EventLoopGroup threads
// Write dead 1, immutable
this.eventLoopGroupWorker = new NioEventLoopGroup(1.new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d".this.threadIndex.incrementAndGet())); }});Copy the code
IO number of threads
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
// Write dead 4, configurable
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); }});Copy the code
Code appreciation
ThreadLocalIndex
public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
public int getAndIncrement(a) {
Integer index = this.threadLocalIndex.get();
if (null == index) {
index = Math.abs(random.nextInt());
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
}
/ / todo details
index = Math.abs(index + 1);
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
return index;
}
@Override
public String toString(a) {
return "ThreadLocalIndex{" +
"threadLocalIndex=" + threadLocalIndex.get() +
'} '; }}Copy the code