To borrow a phrase, “The essence of a message queue is the sending, storing, and receiving of messages.” Therefore, for a message queue, how to effectively send and receive messages is the key and key.

Warning: this is a very hard piece of dry work.

 

Overview of Remoting communication modules in RocketMQ

 

The overall deployment architecture of the RocketMQ message queue is shown below:

Let’s start with a few roles in the RocketMQ message queue cluster:

  • NameServer: What MQ clusters do is do naming services, update and route discovery broker services;
  • Broker-master: Broker message host server;
  • Broker-slave: Broker messages from the machine server;
  • Producer: information producers;
  • -Penny: Consumer.

A part of the RocketMQ cluster communicates as follows:

  • Once the Broker is started, it needs to register itself with NameServer. Then report Topic routing information to NameServer periodically every 30 seconds;
  • When sending messages as a client, the Producer needs to obtain routing information from the locally cached TopicPublishInfoTable based on the Topic of the Msg. If not, the updated routing information is pulled from NameServer again.
  • Message Producer Producer selects a MessageQueue to send messages according to the routing information obtained. The Broker receives and stores messages as the receiver of messages.

As you can see from the above, communication occurs between message producers, brokers and Nameservers (only part of MQ communication is covered here), so the design of a good network communication module in MQ is critical to determining the overall messaging capability and ultimate performance of a RocketMQ cluster.

The RocketMQ-Remoting module is the module responsible for network communication in the RocketMQ message queue. It is relied upon and referenced by almost every other module that requires network communication, such as RocketMQ-Client, RocketMQ-Server, and RocketMq-Namesrv.

To enable efficient data requests and receipt between clients and servers, the RocketMQ message queue customizes the communication protocol and extends the communication module on top of Netty.

The RocketMQ communication module is based on Netty, so before reading RocketMQ’s source code, you should have some knowledge of Netty’s multithreading model, JAVA NIO model, and it will be easier to understand RocketMQ’s source code.

RocketMQ version 4.2.0 is used in this article and netty version 4.0.42.final is relied on. The RocketMQ code structure is shown below:

Source code can be divided into rocketMQ-broker, RocketMQ-Client, RocketMQ-Common, RocketMQ-FilterSRV, RocketMQ-Namesrv and RocketMQ-Remoting modules, The communication framework is encapsulated in the RocketMQ-Remoting module.

This paper mainly from the RocketMQ protocol format, message encoding and decoding, communication mode (synchronous/asynchronous/one-way) and the specific communication process of sending/receiving messages to elaborate.

 

RocketMQ Remoting communication module implementation

 

1. Class structure diagram of Remoting communication module

In terms of the class hierarchy:

  • RemotingService: provides three methods for the top-level interface:
1void start();
2void shutdown();
3void registerRPCHook(RPCHook rpcHook);
Copy the code
  • RemotingClient/RemotingSever: two interfaces inherited the top interface – RemotingService, separately for each Client and Server to provide the necessary method, listed below are RemotingServer method:
1/** 2 * Same as RemotingClient 3 * 4 * @param requestCode 5 * @param processor 6 * @param executor 7 */ 8 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, 9 final ExecutorService executor); 10 11 /** 12 * Registers the default processor 13 * 14 * @param Processor 15 * @param executor 16 */ 17 void registerDefaultProcessor(final) NettyRequestProcessor processor, final ExecutorService executor); 18 19 int localListenPort(); 23 * 24 * @param requestCode 25 * @return 26 */ 27 Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); 31 * @param channel 32 * @param Request 33 * @param timeoutMillis 34 * @return 35 * @throws InterruptedException 36 * @throws RemotingSendRequestException 37 * @throws RemotingTimeoutException 38 */ 39 RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, 40 final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, 41 RemotingTimeoutException; 45 * 46 * @param Channel 47 * @param Request 48 * @param timeoutMillis 49 * @param invokeCallback 50 * @throws InterruptedException 51 * @throws RemotingTooMuchRequestException 52 * @throws RemotingTimeoutException 53 * @throws RemotingSendRequestException 54 */ 55 void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, 56 final InvokeCallback invokeCallback) throws InterruptedException, 57 RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; 58 59 /** 60 * As with RemotingClient, one-way communication, Example: Heartbeat packet 61 * 62 * @param channel 63 * @param Request 64 * @param timeoutMillis 65 * @throws InterruptedException 66 * @throws RemotingTooMuchRequestException 67 * @throws RemotingTimeoutException 68 * @throws RemotingSendRequestException 69 */ 70 void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) 71 throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, 72 RemotingSendRequestException;Copy the code
  • NettyRemotingAbstract: An abstract class for Netty communication processing. It defines and encapsulates the common methods for Netty processing.
  • NettyRemotingClient/NettyRemotingServer: realized RemotingClient and RemotingServer respectively, inherited NettyRemotingAbstract abstract classes. Other components of RocketMQ, such as client, nameServer, and Broker, use these components to send and receive messages.

2. Protocol design, coding and decoding of messages

When a message is sent between the Client and Server, a protocol convention is required for the message to be sent, so it is necessary to customize the RocketMQ message protocol. At the same time, in order to efficiently transmit messages and read received messages in the network, it is necessary to encode and decode messages. In RocketMQ, the RemotingCommand class encapsulates all data content during message transmission, including all data structures as well as encoding and decoding operations.

Some of the member variables of the RemotingCommand class are as follows:

The Header field type Request instructions The Response shows that
code int Request operation code. The responder processes services according to different request codes Answer response code. 0 indicates success, non-0 indicates various errors
language LanguageCode The language implemented by the requester The language implemented by the responder
version int The version of the requester program Version of the responder’s program
opaque int Equivalent to reqeustId, a different request id on the same connection that corresponds to the response message The reply is returned without modification
flag int Flag to distinguish between common RPC and onewayRPC Flag to distinguish between common RPC and onewayRPC
remark String Transmits custom text information Transmits custom text information
extFields HashMap Request custom extension information Respond to custom extension information

Here is an example of a Broker sending a heartbeat registration message to a NameServer:

1[2code=103,// the code where 103 is registered with nameserver 3language=JAVA, 4version=137, 5opaque=58, requestId 6Flag (B)=0, 7remark=null, 8extFields={9 brokerId=0, 10 clusterName=DefaultCluster, 11 brokerAddr=ip1: 10911, 12 haServerAddr=ip1: 10912, 13 brokerName=LAPTOP-SMF2CKDN 14}, 15serializeTypeCurrentRPC=JSONCopy the code

Here is the RocketMQ communication protocol format:

Visible transmission content can be divided into the following four parts:

  • Message length: total length, four bytes of storage, occupying an int;
  • Serialization type & Header length: also an int. The first byte represents the serialization type and the last three bytes represent the header length.
  • Header data: Serialized header data;
  • Message body data: The binary byte data content of the message body.

The encoding and decoding of messages are done in the Encode and decode methods of the RemotingCommand class respectively. The decode method of messages is the reverse process of encoding.

3. Communication mode and process of message

There are three main modes of communication supported in RocketMQ message queues: sync, Async and Oneway.

The “synchronous” communication mode is relatively simple. It is generally used in the scenario of sending heartbeat packets without paying attention to its Response. This article will focus on RocketMQ’s asynchronous communication flow (limited by space, the reader can follow the same pattern to analyze synchronous communication flow).

Here is the overall flow chart of RocketMQ asynchronous communication:

The following two sections mainly introduce the implementation of sending request messages and receiving messages on the Client side and briefly analyze the callback on the Client side.

3.1 Implementation of Client sending request messages

When the client invokes the asynchronous communication interface — invokeAsync, the RemotingClient implementation class — NettyRemotingClient gets the corresponding channel based on addr (if not in the local cache). The invokeAsyncImpl method is then called to stream the data to the NettyRemotingAbstract abstract class. (The actual sending of the request is done in the invokeAsyncImpl method of The NettyRemotingAbstract class.)

The source code for sending the request message is as follows:

1 /** 2 public void invokeAsyncImpl(final Channel, final RemotingCommand request, final long timeoutMillis, 6 final InvokeCallback invokeCallback) 7 throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {8 / / equivalent to the request ID, RemotingCommand creates a request for each request ID, 9 10 Final int opaque = request.getopaque (); 11 boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); 12 if (acquired) { 13 final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); ResponseFuture = new ResponseFuture(Opaque, timeoutMillis, invokeCallback, once); 17 responseTable. Put (opaque, ResponseFuture); 18 Try {19 // Use Netty's channel to send request data. 20 channel.writeAndFlush(request).addListener(new ChannelFutureListener() {21 22 @Override 23 public void operationComplete(ChannelFuture f) throws Exception {24 if (f.isSuccess()) {25 / / if sending a message to the Server successfully, then here directly Set return after 26 responseFuture. SetSendRequestOK (true); 27 return; 28 } else { 29 responseFuture.setSendRequestOK(false); 30 } 31 32 responseFuture.putResponse(null); 33 responseTable.remove(opaque); 36 executeInvokeCallback(responseFuture); 37 } catch (Throwable e) { 38 log.warn("excute callback in writeAndFlush addListener, and callback throw", e); 39} finally {40 responseFuture.release(); 42 } 43 44 log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); 46 45}}); 47} catch (Exception e) {responseFuture.release(); 50 log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); 51 throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); 52 } 53 } else { 54 if (timeoutMillis <= 0) { 55 throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); 56 } else { 57 String info = 58 String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", 59 timeoutMillis, 60 this.semaphoreAsync.getQueueLength(), 61 this.semaphoreAsync.availablePermits() 62 ); 63 log.warn(info); 64 throw new RemotingTimeoutException(info); 65} 66} 67}Copy the code

There are some important data structures to note when sending request messages from the Client:

  • ResponseTable – Saves the request code and response association mapping
1protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable 
Copy the code

Opaque indicates that the initiator of the opaque request has different request id codes on the same connection. When sending a message each time, the initiator can choose synchronous blocking or asynchronous non-blocking. Either way, the request opcode is saved in ResponseFuture’s Map – responseTable.

  • ResponseFuture — Saves the return response (including callback execution methods and semaphores)
1public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback, 2 SemaphoreReleaseOnlyOnce once) { 3 this.opaque = opaque; 4 this.timeoutMillis = timeoutMillis; 5 this.invokeCallback = invokeCallback; 6 this.once = once; 7}Copy the code

For synchronous communication, the third and fourth parameters are NULL; For asynchronous communication, invokeCallback can find the callback method corresponding to the request code according to the responseTable when receiving the message response. Semaphore parameter is used as flow control. When multiple threads write data to a connection at the same time, the number of simultaneous write permits can be controlled by the semaphore.

  • Handling abnormal sending process – Periodically scan the responseTable local cache

The local cache Map of responseTable described above will pile up if an exception occurs when a message is sent (for example, the server does not return a response to the client or the response is lost to the network). A timed task is required to do the responseTable cleanup and collection. The client/server of RocketMQ is started with a timed task called once every 1s to check all responseFuture variables in the responseTable cache to see if they have been returned and to act accordingly.

1public void scanResponseTable() { 2 final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>(); 3 Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator(); 4 while (it.hasNext()) { 5 Entry<Integer, ResponseFuture> next = it.next(); 6 ResponseFuture rep = next.getValue(); 7 8 if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { 9 rep.release(); 10 it.remove(); 11 rfList.add(rep); 12 log.warn("remove timeout request, " + rep); 13 } 14 } 15 16 for (ResponseFuture rf : rfList) { 17 try { 18 executeInvokeCallback(rf); 19 } catch (Throwable e) { 20 log.warn("scanResponseTable, operationComplete Exception", e); 21} 22} 23}Copy the code

3.2 The specific implementation of receiving messages and processing by the Server

The Server side receives the message processing entry in the channelRead0 method of the NettyServerHandler class, which calls the processMessageReceived method (which omits most of the flow and logic of the NettyServer message flow).

The most important server-side request processing methods are as follows:

1public void processRequestCommand(final ChannelHandlerContext ctx, Final RemotingCommand CMD) {2 // Obtain the processor and ExecutorService from the RemotingCommand code 3 Final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); 4 final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; 5 final int opaque = cmd.getOpaque(); 6 7 if (pair ! = null) { 8 Runnable run = new Runnable() { 9 @Override 10 public void run() { 11 try { 12 //rpc hook 13 RPCHook rpcHook  = NettyRemotingAbstract.this.getRPCHook(); 14 if (rpcHook ! = null) { 15 rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); Final RemotingCommand response = pain.getobject1 ().processRequest(CTX, CMD); 19 //rpc hook 20 if (rpcHook ! = null) { 21 rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); 22 } 23 24 if (! cmd.isOnewayRPC()) { 25 if (response ! = null) { 26 response.setOpaque(opaque); 27 response.markResponseType(); 28 try { 29 ctx.writeAndFlush(response); 30 } catch (Throwable e) { 31 PLOG.error("process request over, but response failed", e); 32 PLOG.error(cmd.toString()); 33 PLOG.error(response.toString()); 34 } 35 } else { 36 37 } 38 } 39 } catch (Throwable e) { 40 if (!" com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException" 41 .equals(e.getClass().getCanonicalName())) { 42 PLOG.error("process request exception", e); 43 PLOG.error(cmd.toString()); 44 } 45 46 if (! cmd.isOnewayRPC()) { 47 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, // 48 RemotingHelper.exceptionSimpleDesc(e)); 49 response.setOpaque(opaque); 50 ctx.writeAndFlush(response); 51} 52} 53} 54}; 55 56 if (pair.getObject1().rejectRequest()) { 57 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, 58 "[REJECTREQUEST]system busy, start flow control for a while"); 59 response.setOpaque(opaque); 60 ctx.writeAndFlush(response); 61 return; 66 final requestTask requestTask = new requestTask (run, ctx.channel(), CMD); 68 pisc.getobject2 ().submit(requestTask); 69 } catch (RejectedExecutionException e) { 70 if ((System.currentTimeMillis() % 10000) == 0) { 71 PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) // 72 + ", too many requests and system thread pool busy, RejectedExecutionException " // 73 + pair.getObject2().toString() // 74 + " request code: " + cmd.getCode()); 75 } 76 77 if (! cmd.isOnewayRPC()) { 78 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, 79 "[OVERLOAD]system busy, start flow control for a while"); 80 response.setOpaque(opaque); 81 ctx.writeAndFlush(response); 82 } 83 } 84 } else { 85 String error = " request type " + cmd.getCode() + " not supported"; Final RemotingCommand Response = 88 RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); 89 response.setOpaque(opaque); 90 ctx.writeAndFlush(response); 91 PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); 93 92}}Copy the code

In the above request processing method, the request business code of RemotingCommand is used to match the corresponding business processor. A new thread is then generated and submitted to the corresponding business thread pool for asynchronous processing.

  • ProcessorTable – Request business code mapping variable to business process, business thread pool
1    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
2        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
Copy the code

I think RocketMQ does this to specify different processors for different types of request business code, and the actual processing of the message is not in the current thread, but is encapsulated into tasks and placed in the corresponding thread pool of the business Processor for asynchronous execution.

In RocketMQ you can see this happening a lot, and it’s designed to maximize asynchrony and make sure that each thread is focused on what it’s responsible for.

3.3 Implementation analysis of asynchronous callback execution on the Client

Some of you might wonder where the asynchronous callback on the Client side actually gets executed. As you can see from the RocketMQ Asynchronous Communication sequence diagram above, the callback execution process is actually done on the Client side. The RocketMQ-Remoting communication module provides an interface for asynchronous callback processing.

Combining section 3.1 with the processResponseCommand method of the NettyRemotingAbstract class, you can see how the Client implements asynchronous callbacks. When the Client sends an asynchronous message (when the RocketMQ-Client module finally calls the sendMessageAsync method), the InvokeCallback interface is injected, and when the Server asynchronous thread is actually executed by the business thread pool described above, Execution is triggered when a response is returned to the Client. The code for the processResponseCommand method of the NettyRemotingAbstract class is as follows:

1public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand CMD) {2 // Get the opaque value from RemotingCommand. 3 Final int opaque = cmd.getopaque (); Final ResponseFuture (ResponseFuture =) responseTable.get(opaque); 6 if (responseFuture ! = null) { 7 responseFuture.setResponseCommand(cmd); 8 9 responseTable.remove(opaque); 10 11 if (responseFuture.getInvokeCallback() ! = null) {12 // Execute the async callback (responseFuture) injected by the Client. 14} else {15 / / or release of 16 responseFuture responseFuture variables. PutResponse (CMD); 17 responseFuture.release(); 18 } 19 } else { 20 log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); 21 log.warn(cmd.toString()); 23 22}}Copy the code

The RocketMQ protocol format, message codec, communication mode (synchronous/asynchronous/unidirectional), message sending/receiving and asynchronous callback are introduced. The following focuses on the Netty multithreaded model for the RPC communication part of the RocketMQ message queue.

 

Why use Netty as a high-performance communication library?

 

Why did RocketMQ choose Netty instead of using NIO directly for network programming? It’s worth a brief introduction to Netty.

Netty is a high-performance open source framework for network communication that encapsulates the NIO library of the JDK. It provides asynchronous, event-driven web application frameworks and tools for rapid development of high-performance, highly reliable web server and client programs.

The following are the main reasons why the RPC communication module of a general system chooses Netty as the underlying communication library (the author believes that RocketMQ RPC also chooses Netty for this reason) :

  • Netty’s programming API is easy to use and the development threshold is low, so programmers do not need to pay attention to and understand too much NIO programming model and concept;
  • For programmers, customized development can be carried out according to the requirements of business, and the communication framework can be flexibly customized through Netty’s ChannelHandler;
  • Netty framework itself supports unpacking/unpacking, exception detection and other mechanisms, so that programmers can be freed from the tedious details of JAVA NIO, and only need to pay attention to business processing logic;
  • Netty solved (or rather perfectly circumvented) the JDK NIO Bug (Epoll Bug, which caused Selector polling to be empty, resulting in 100% CPU usage);
  • Netty framework internal threads, selector to do some details of the optimization, carefully designed REACTOR multithreading model, can achieve very efficient concurrent processing;
  • Netty has been proven robust/reliable in several open source projects (Avro, Hadoop’s RPC framework, uses Netty as a communication framework).

 

Netty multithreading model for RPC communication in RocketMQ

 

RocketMQ RPC communication part adopts the “1+N+M1+M2” Reactor multithreading model, which expands and optimizes the network communication part to some extent. This section mainly focuses on the specific design and implementation of this part.

4.1 Design concept and brief introduction of Netty’s Reactor multithreading model

It is necessary to briefly introduce Netty’s Reactor multithreaded model. The design idea of Reactor multithreaded model is divide-and-rule + event-driven.

  • Divide and conquer

Generally speaking, the complete processing process of a network request connection can be divided into accept, read, decode/encode, process and send. The Reactor model maps each step to a task, where the smallest unit of logic performed by a server thread is no longer a complete network request, but the task, executed in a non-blocking manner.

  • event-driven

Each task corresponds to a specific network event. When a task is ready, the Reactor receives a network event notification and distributes the task to the Handler bound to the network event.

4.2 RocketMQ RPC communication Reactor 1+N+M1+M2 multithreading design and implementation

  • Multithreading design and flow of RPC communication Reactor in RocketMQ

RocketMQ’s RPC communication uses Netty components as the underlying communication library, which also follows the Reactor multithreaded model with some extensions and optimizations. The following is a framework diagram of the Netty multithreading model for RocketMQ’s RPC communication layer to give you a general idea of the multithreading separation design for RocketMQ’s RPC communication.

From the block diagram above, you can get a general idea of the Reactor Multithreading model for NAT RemotingServer in RocketMQ. A Reactor main thread (eventLoopGroupBoss) listens for TCP connections, and then dumps them to the Reactor thread pool (eventLoopGroupSelector). It is responsible for registering connected sockets with selectors (The RocketMQ source code automatically selects NIO and Epoll based on the OS type, and can also be configured with parameters), and then listening for real network data. To get the network data, and then throw it to the Worker thread pool (defaultEventExecutorGroup, namely the “M1” above, source of the default setting is 8).

In order to process RPC network requests more efficiently, the Worker thread pool is dedicated to handling Netty network communications (including encoding/decoding, idle link management, network connection management, and network request processing).

The service operations are executed in the service thread pool. According to the service request code of RomotingCommand, find the corresponding processor in the local cache variable processorTable and encapsulate it into a task. It is submitted to the corresponding service processor to process the thread pool for execution (sendMessageExecutor, for example, M2).

The 1+N+M1+M2 Reactor model is listed in the table below:

The number of threads The thread of Thread specification
1 NettyBoss_%d Reactor main thread
N NettyServerEPOLLSelector_%d_%d Reactor thread pool
M1 NettyServerCodecThread_%d The Worker thread pool
M2 RemotingExecutorThread_%d The service Processor handles thread pools
  • RocketMQ RPC communication Reactor multithreading code concrete implementation

Having described the Reactor multithreading process, we should have a comprehensive understanding of RocketMQ’s RPC communication Netty part. Let’s take a look at some of the details from the source code (which requires an understanding of JAVA NIO and Netty concepts and technical points).

When an instance of NettyRemotingServer is initialized, The related variables including serverBootstrap, The nettyServerConfig parameter, the channelEventListener listener, and both eventLoopGroupBoss and eventLoopGroupSelector are initialized If native epoll is enabled on Linux, EpollEventLoopGroup is used. If native epoll is enabled on Linux, EpollEventLoopGroup is used. Otherwise, use Java NIO’s NioEventLoopGroup). The code is as follows:

1public NettyRemotingServer(final NettyServerConfig nettyServerConfig, 2 final ChannelEventListener channelEventListener) { 3 super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); 4 this.serverBootstrap = new ServerBootstrap(); 5 this.nettyServerConfig = nettyServerConfig; 6 this.channelEventListener = channelEventListener; 9 This. eventLoopGroupBoss = 9 This. eventLoopGroupBoss = 9 This. eventLoopGroupBoss = 9 This. eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { 10 private AtomicInteger threadIndex = new AtomicInteger(0); 11 12 @Override 13 public Thread newThread(Runnable r) { 14 return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); 15}}); 17 18 /** 19 * Set NIO or Epoll as the Selector thread pool depending on the configuration. 20 * If Linux is running and native Epoll is enabled, use the EpollEventLoopGroup, which is JNI, Tune c to write epoll; Otherwise, use Java NIO's NioEventLoopGroup. 21 * 22 */ 23 if (useEpoll()) { 24 this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { 25 private AtomicInteger threadIndex = new AtomicInteger(0); 26 private int threadTotal = nettyServerConfig.getServerSelectorThreads(); 27 28 @Override 29 public Thread newThread(Runnable r) { 30 return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); 31} 32}); 33 } else { 34 this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { 35 private AtomicInteger threadIndex = new AtomicInteger(0); 36 private int threadTotal = nettyServerConfig.getServerSelectorThreads(); 37 38 @Override 39 public Thread newThread(Runnable r) { 40 return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); 41}} 42); // omit some codeCopy the code

After the NettyRemotingServer instance is initialized, it will be started. The Server instantiates one acceptor thread (eventLoopGroupBoss) and N I/O threads (eventLoopGroupSelector) during startup. M1 a worker thread (defaultEventExecutorGroup) binding.

What needs to be noted here is that after Worker threads get network data, they hand it to Netty’s ChannelPipeline (which adopts the responsibility chain design mode) and execute it one by one from Head to Tail. These handlers are specified when you create the NettyRemotingServer instance. NettyEncoder and NettyDecoder are responsible for the codec between network transmission data and RemotingCommand. NettyServerHandler receives the RemotingCommand obtained by decoding, and determines whether request or response is processed according to RemotingCommand. According to the service request code, the task is encapsulated into different tasks and submitted to the corresponding service processor for processing in the thread pool.

From the above description, you can summarize the block diagram of RocketMQ’s RPC communications Reactor thread pool model.

As a whole, it can be seen that RocketMQ’s RPC communication relies on Netty’s multi-threaded model. The server listener thread is separated from IO thread, and the business logic of RPC communication layer is further separated from the thread dealing with specific business. Simple time-controlled services are handled directly in the RPC communication section, while complex and time-uncontrollable services are processed in the back-end service thread pool, which improves communication efficiency and overall MQ performance.

NioEventLoop is abstracted to represent a thread that performs processing tasks in a continuous loop. Each NioEventLoop has a selector that listens for the socket link bound to it.

 

conclusion

 

At first look at the RocketMQ source code – THE RPC communication module may feel a bit complicated, but as long as you can analyze and comb through the process of sending a request message from the Client, receiving and processing the message from the Server, and the callback process, the overall situation is not complicated.

RPC communication is also an important part of the RocketMQ source code. To gain a deeper understanding of the process and details, you need to Debug and analyze the corresponding logs in the local environment.

Limited to the author of shallow talent, the content of this article may not understand the place in place, if there is unreasonable explanation of the place also hope to discuss the message together.

Author: Hu Zongtang, China Mobile (Suzhou) Software Technology Co., LTD., senior CLOUD computing software R&D engineer, engaged in public cloud product platform development, architecture design; It currently focuses on high-concurrency, high-availability designs for large distributed systems. Once worked in Ant Financial Alipay, Oracle China R&D center, personal official account: ingenious blog.

Disclaimer: The copyright of this article belongs to the author.