Producer Sending process

Before sending a message, a producer must first know the information about the topic and which broker the topic is on. How does a producer do this?

Remember at the beginning of 01? Namesrv provides an interface for obtaining routing information from topic (RouteInfoManager#pickupTopicRouteData). Producer returns TopicRouteData based on this interface. Know which broker to send the Topic to.

TopicRouteData

private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code

From the structure of the return, you know the broker address and what queues are available under the topic, and then you can send a message.

After obtaining the information of the topic, the producer will encapsulate the information into another data structure

class TopicPublishInfo {...private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = newThreadLocalIndex(); . }class MessageQueue {
    private String topic;
    private String brokerName;
    private int queueId;
}
Copy the code

A messageQueueList looks something like this

[{"brokerName":"broker-a"."queueId":0."topic":"TBW102%zouLeTopic"
    },
    {
        "brokerName":"broker-a"."queueId":1."topic":"TBW102%zouLeTopic"
    },
    {
        "brokerName":"broker-a"."queueId":2."topic":"TBW102%zouLeTopic"
    },
    {
        "brokerName":"broker-a"."queueId":3."topic":"TBW102%zouLeTopic"
    },
    {
        "brokerName":"broker-b"."queueId":0."topic":"TBW102%zouLeTopic"
    },
    {
        "brokerName":"broker-b"."queueId":1."topic":"TBW102%zouLeTopic"
    },
    {
        "brokerName":"broker-b"."queueId":2."topic":"TBW102%zouLeTopic"
    },
    {
        "brokerName":"broker-b"."queueId":3."topic":"TBW102%zouLeTopic"}]Copy the code

As you can see, topic TBW102%zouLeTopic above exists in both broker-B and broker-A

We know that topic can have many queues, so how does a producer choose which queue to send messages to?

The answer is to poll from messageQueueList to select a queue to send (each thread has its own counter).

TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        // In most cases, this function is used
        return selectOneMessageQueue();
    } else {
        // If lastBrokerName is not null, a broker with a different name than lastBrokerName will be selected.
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if(! mq.getBrokerName().equals(lastBrokerName)) {returnmq; }}// If not, return
        returnselectOneMessageQueue(); }}public MessageQueue selectOneMessageQueue(a) {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}
Copy the code

What happens if a producer fails to send a message?

Code from the TopicPublishInfo#selectOneMessageQueue(Final String lastBrokerName) method. As you can see, if lastBrokerName is not null, a broker with a different name than lastBrokerName is returned. So when will lastBrokerName not be null? If the message fails to be sent and is retried, it will not be null. (Default: 2 retries at most)

The send delay fails on the producer

In fact, on the producer side, sendLatencyFaultEnable controls whether send delay failover is enabled. However, this parameter defaults to false. When this parameter is enabled, the preceding logic is not performed.

The logic implementation is under this class: MQFaultStrategy What this class does is determine how long a producer should not select the broker to send messages based on the time it takes to send messages to the broker.

The producer already knows which broker to send to, so what types of sending methods does rocketMQ provide? What are the differences?

The producer provides three modes: synchronous, asynchronous, and unidirectional

Synchronization: The producer waits for the broker to trigger the producer’s retry mechanism

Asynchronous: The producer is in the thread pool. The logic of sending messages can trigger the retry mechanism of the producer

In fact, when sending asynchronously, you can only retry errors that occur on the broker side. If there is a network problem, it is impossible to retry

One way: One way to send means to send, regardless of whether the message was sent successfully or not. The retry mechanism of the producer cannot be triggered

Asynchronous send retry logic MQClientAPIImpl#sendMessageAsync

private void sendMessageAsync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final AtomicInteger times,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws InterruptedException, RemotingException {
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            RemotingCommand response = responseFuture.getResponseCommand();

            // Todo has no callback set
            if (null== sendCallback && response ! =null) {

                try {
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                    if(context ! =null&& sendResult ! =null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); }}catch (Throwable e) {
                }

                producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                return;
            }

            if(response ! =null) {
                try{... Business error in}catch (Exception e) {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);

                    // Todo has an exception, and this method will call message sending again
                    onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, e, context, false, producer); }}else {
                / /...
                if(! responseFuture.isSendRequestOK()) {// .. 
                     // Todo has an exception, and this method will call message sending again
                    onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer);
                } else if (responseFuture.isTimeout()) {
                    // ...
                    // Todo has an exception, and this method will call message sending again
                    onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer);
                } else {
                     // ...
                     // Todo has an exception, and this method will call message sending again
                    onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer); }}}}); }Copy the code

Finally, the producer will retry in the onExceptionImpl method (default: 2 retries at most)

conclusion

Message Sending Process
  1. Gets the broker to which the topic belongs
  2. Polling to select a queue
  3. If the send fails (synchronous, asynchronous), retry (default: 2 times)
other
  1. If a network fault occurs during asynchronous transmission, retry cannot be performed
  2. The producer side has parameterssendLatencyFaultEnableCan be openedSend delay failover mechanism
  3. whenproducerWhen a message fails to be sent, the default policy is to avoid the broker that failed last time (if it is a two-master structure)

The design of the rocketMQ message protocol

Finished, the sending process of producer. When communicating with the broker, the producer must follow certain protocols. How is the message protocol designed in rocketMQ?

Before we talk about that, let’s think about, how do you design the protocol? What elements are needed?

A protocol normally contains the following information

  • Magic number (not required), used to determine if the packet is invalid in the first place
  • The version number can support protocol upgrade
  • The serialization algorithm, which serialization and deserialization of the message body, can be extended from this, for example: JSON, Protobuf, Hessian, JDK
  • Instruction type, business related. For example, obtain routing information and register broker information
  • Request number (not required) to provide asynchronous capability for duplex communication
  • The length of the message
  • The message body

RocketMQ message protocol design

NettyEncoder#encode() RemotingCommandRocketMQ message protocol class

public class RemotingCommand {
    
    // Instruction type
    private int code;
    private LanguageCode language = LanguageCode.JAVA;
     
    / / version number
    private int version = 0;
    
    
    private int opaque = requestId.getAndIncrement();
    private int flag = 0;
    private String remark;
    private HashMap<String, String> extFields;
    private transient CommandCustomHeader customHeader;

    private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
    
    // Message body
    private transient byte[] body;
    
    // Netty calls this method to encode the message
    public ByteBuffer encodeHeader(final int bodyLength) {
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData;
        headerData = this.headerEncode();

        length += headerData.length;

        // 3> body data length
        length += bodyLength;

        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        result.flip();

        returnresult; }}Copy the code

Client Netty thread model

NettyRemotingClient

Netty Bootstrap build

this.bootstrap
.group(this.eventLoopGroupWorker)
.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // ...
        pipeline.addLast(
            defaultEventExecutorGroup,
            new NettyEncoder(),
            new NettyDecoder(),
           // ...}});Copy the code

Work Number of EventLoopGroup threads

// Write dead 1, immutable
this.eventLoopGroupWorker = new NioEventLoopGroup(1.new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClientSelector_%d".this.threadIndex.incrementAndGet())); }});Copy the code

IO number of threads

this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            // Write dead 4, configurable
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); }});Copy the code

Code appreciation

ThreadLocalIndex

public class ThreadLocalIndex {
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement(a) {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }
            
        / / todo details
        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;

        this.threadLocalIndex.set(index);
        return index;
    }

    @Override
    public String toString(a) {
        return "ThreadLocalIndex{" +
            "threadLocalIndex=" + threadLocalIndex.get() +
            '} '; }}Copy the code