This article is a long way from RocketMQ Source Code Parsing – The Beginning, and I’m ashamed of myself. On the one hand is busy with work (fish), on the other hand has been entangled from which aspects will make it easier to understand, and if you paste too much source code, the effect of reading may be counterproductive.
So in order to improve the reading quality of the article, fat decided to simplify, abandon excessive source code analysis, combined with more design, the purpose is: read straight call good guy!
In a message queue architecture, various roles may interact with each other and transfer data at any time.
Therefore, communication module is an indispensable core module in message queue design.
And a good network communication module, to a large extent determines the ability of message transmission and overall performance.
This article from RocketMQ communication module source code analysis, in-depth study of high performance network communication module is exactly how to achieve.
Overall architecture diagram of the RocketMQ message queue 👇
The roles and functions of each role in the RocketMQ architecture can be viewed:
Here we focus on the communication between the roles:
NameServer
- The NameServer scans all connections to live brokers every 10 seconds. If NameServer does not receive a heartbeat for more than 2 minutes, it disconnects from the Broker.
Broker
- Each Broker establishes long connections to all NameServer nodes and reports Topic information to all NameServer nodes every 30 seconds.
Producer
- Producer establishes a long connection to one (randomly selected) node in the NameServer cluster, and by default gets updates on all Topic queues from NameServer every 30 seconds. This means that if the Broker is unavailable, Producer can sense at most 30 seconds that all messages sent to the Broker fail.
- The Producer establishes a long connection with the Broker providing topic services. By default, the Producer sends heartbeats to all associated brokers for 30 seconds. The Broker scans all live connections every 10 seconds. Then, the connection to the Producer is closed.
Consumer
- The Consumer establishes a long connection to one of the nodes in the NameServer cluster (randomly selected), and by default gets the latest queue status for a Topic from NameServer every 30 seconds. This means that when the Broker is unavailable, It takes a maximum of 30 seconds for a Consumer to sense.
- The Consumer sends heartbeat to all associated brokers every 30 seconds. The Broker scans all live connections every 10 seconds. If a connection has not sent heartbeat data within 2 minutes, it closes the connection. All consumers in the Consumer Group are notified, and the consumers in the Group reallocate the queue and continue to consume.
As you can see, the RocketMQ architecture forms a complex communication network between roles, and each link has the potential to affect the communication performance of the entire message team.
The RocketMQ-Remoting module is the core module of RocketMQ that is responsible for network communication and is the main module for this reading. (RocketMQ version 4.4.1 is used in this article.)
The RocketMQ communication module is based on the Netty extension. Before reading the source code of the communication module section, you should have a basic understanding of Netty. You should know the overall communication model of Netty and the knowledge of NIO threading model. This will not make confusion in the following source code parsing.
RocketMQ multithreaded model
Remoting’s network communication is based on the Netty implementation, so the entire communication architecture is extended based on the Netty model.
1. Structure of Remoting communication module
Let’s look at the class structure of the Remoting communication module.
- RemotingService: The top-level interface that defines three methods
void start(a);
void shutdown(a);
void registerRPCHook(RPCHook rpcHook);
Copy the code
- RemotingServer: Defines the interface of the server and inherits RemotingService
// Register the handler
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
// Register the default handler
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
int localListenPort(a);
// Get different handlers according to the request code
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
RemotingCommand is returned
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
// Asynchronous communication
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
// One-way communication
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
Copy the code
- RemotingClient: Defines the interface of the client and inherits RemotingService. The method defined is similar to that of RemotingServer.
- NettyRemotingAbstract: A Netty communication abstract class that defines and encapsulates server and client public methods.
- NettyRemotingServer: server implementation class, which implements RemotingServer interface and inherits NettyRemotingAbstract class.
- NettyRemotingClient: the implementation class of the client, which implements the RemotingClient interface and inherits the NettyRemotingAbstract class.
Simply put, RocketMQ abstracts and encapsulates the communication Server and Client on the basic framework of Netty communication, making the structure more concise and easy to expand.
2. Netty’s multi-threaded model
Netty is a high-performance, asynchronous event-driven NIO framework that uses the Reactor pattern to build a threading model.
Reactor three threading models:
-
Single-threaded Reactor model
All I/O operations are done on a NIO thread, which is responsible for both client connection and read/write operations.
Disadvantages: A NIO thread is responsible for both I/O connections and I/O reads and writes, which may lead to excessive thread load, increasingly inefficient processing performance, and even lead to CPU flight and system downtime risks.
-
Multithreaded Reactor model
The biggest difference from the single-threaded model is that a group of NIO threads are responsible for I/O reads and writes, separating THE I/O connection from the read and write, and improving the I/O read and write rate.
This is also the model used in most scenarios to support everyday business scenarios with high concurrent connections.
- Master slave thread model
A single Acceptor thread may become overwhelmed and suffer performance bottlenecks if millions of clients are connected concurrently. The master-slave thread model features a NIO thread pool instead of a single thread responsible for I/O connections.
3. RocketMQ’s threading model
RocketMQ uses the design of a multi-line Reactor model to achieve network communication:
As you can see from this diagram, RocketMQ has made a series of extensions and optimizations to Netty’s native multithreaded Reactor model. Remember the main numbers :(1 + N + M1 + M2)
- A Reactor main thread (eventLoopGroupBoss (1)) listens for TCP connections, establishes connections, creates socketchannels, and registers them with selectors.
- RocketMQ is automatically selected in the source code based on the OS type
NIO
和Epoll
, you can also configure parameters and then listen for real network data. - Get the network data, then throw it to the Worker thread pool (eventLoopGroupSelector = N, default = 3),
- Before the real execution of business logic requires SSL authentication, codec, free checking, network connection management, these tasks to defaultEventExecutorGroup (namely “M1” for the above, the source code in the default setting is 8) to do it.
- While processing business operations are performed in the business thread pool, according to
RomotingCommand
The business request code code to goprocessorTable
Find the corresponding processor in the local cache variable, encapsulate it into a task, and submit it to the corresponding service processor to process the thread pool for execution. (sendMessageExecutor, for example, isM2“). The thread pool continues to grow at several steps from the entry point to the business logic, which is related to the complexity of the logic at each step. The more complex, the wider the concurrency channel required.
The number of threads | The thread of | Thread specification |
---|---|---|
1 | NettyBoss_%d | Reactor main thread |
N | NettyServerEPOLLSelector_%d_%d | Reactor thread pool |
M1 | NettyServerCodecThread_%d | The Worker thread pool |
M2 | RemotingExecutorThread_%d | The service Processor handles thread pools |
For more details, please refer to the Apache RocketMQ Developer’s Guide to Design. You must be familiar with the entire Call link and design structure of Netty.
Protocol design, encoding and decoding of messages
As we know, the data transmitted on the network is in binary format. When sending and receiving messages between Server and Client, it needs to be serialized and deserialized, so it can also be understood as the process of data decoding and encoding.
However, decoding and encoding must be carried out according to the same message protocol, just like the binary message is a letter, where is the beginning, where is the content, where is the end, the writer and the recipient must have the same convention.
Therefore, to ensure that messages can be sent and received correctly, it is necessary to ensure that they use the same message protocol and decoding mode, or there will be
👳♂️ : shipper shipper d
🤷♂️ :??
RocketMQ customizes communication protocols and message codecs to more efficiently transport messages over the network and read received messages.
Take a look at the format of RocketMQ’s custom communication protocol:
Visible transmission content can be divided into the following four parts:
(1) Message length: total length, four bytes of storage, occupying an int type;
(2) Serialization type & header length: also an int, the first byte represents the serialization type, and the last three bytes represent the header length;
(3) Message header data: message header data after serialization;
(4) Message body data: binary byte data content of message body;
The RemotingCommand class encapsulates the data of the message protocol, including not only all data structures, but also encoding and decoding operations.
The members of the RemotingCommand class are as follows:
The Header field | type | Request instructions | The Response shows that |
---|---|---|---|
code | int | Request operation code. The responder processes services according to different request codes | Answer response code. 0 indicates success, non-0 indicates various errors |
language | LanguageCode | The language implemented by the requester | The language implemented by the responder |
version | int | The version of the requester program | Version of the responder’s program |
opaque | int | Equivalent to requestId, different request identifiers on the same connection correspond to those in the response message | The reply is returned without modification |
flag | int | Flag to distinguish between common RPC and onewayRPC | Flag to distinguish between common RPC and onewayRPC |
remark | String | Transmits custom text information | Transmits custom text information |
extFields | HashMap<String, String> | Request custom extension information | Respond to custom extension information |
The RemotingCommand class is used to encode the message, and to give you an idea of the RocketMQ custom specification and processing of the message.
NettyEncoder class encoder
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
// 1. Message Length + Serialization type + Header Length + Data Header
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
// 2. Message Body
byte[] body = remotingCommand.getBody();
if(body ! =null) { out.writeBytes(body); }}catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if(remotingCommand ! =null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); }}Copy the code
Refer to the RocketMQ custom communication protocol format shown above
Step 1: remotingCommand encodeHeader () will be the first three parts of communication protocols into the byte
Step 2: remotingCommand.getBody() converts the message content to byte
Eventually turned into can transmit binary data in the network, we further remotingCommand. EncodeHeader () :
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// 1.message length
result.putInt(length);
// 2.serialization type + header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// 3.header data
result.put(headerData);
result.flip();
return result;
}
Copy the code
In fact, the logic is relatively clear, that is, to convert the first three parts of the custom protocol into byte respectively. MarkProtocolType (HeaderData.length, serializeTypeCurrentRPC)
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
Copy the code
May be some students did not see the above bit operation, fat hao at the beginning to see is also a face meng force, after some data research is actually very well understood. You can refer to: www.cnblogs.com/mcsfx/p/110…
The communication mode and flow of messages
There are three main RocketMQ communication modes:
- Sync
- Async
- Oneway
The following focuses on analyzing the sending process of the client in sync communication mode.
1. The Client sends a request message
When a client (sender) sends a Message, it calls the DefaultMQProducerImpl class send(Message MSG) directly, which is synchronous by default. And this method will call the invokeSync method in the NettyRemotingClient class, get the Channel to the server, and then call the invokeSyncImpl method in the NettyRemotingAbstract class, Sends a message to the server.
The source code for invokeAsyncImpl to send a message is as follows (annotated) :
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
// Equivalent to requestID, each request generates a unique ID, incremented by one each time
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null.null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// Use Netty's Channel to send request data to the server
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
// Executing this method also calls the countDownLatch countDown() method
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed."); }});// Use countDownLatch for synchronization
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw newRemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); }}return responseCommand;
} finally {
this.responseTable.remove(opaque); }}Copy the code
- Opaque: Indicates the request ID. Each request from the same client generates a unique request code.
- ResponseFuture: is the wrapper object that gets the result of sending a message, where RocketMQ uses the CountDownLatch counter to implement synchronous communication mode. One is created by default when you create an object
countDownLatch = new CountDownLatch(1)
Is called after a message is sent by calling ChannelWaitResponse (timeoutMillis) (actually call countdownlatch.await ())
The block waits for the result and then releases the counter in the Channel’s callback function. - ResponseTable: Stores a mapping table of request identifiers and response results. In the synchronous mode, it plays a small role, mainly in asynchronous communication mode, because of network loss, the asynchronous call compensation processing.
2. The Server receives messages and processes logic
The Server side receives the message’s core processing entry in the channelRead0 method of the NettyServerHandler class and calls processRequestCommand, the core method responsible for processing the request message.
/**
* Process incoming request command issued by remote peer.
*
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// According to the request business code, get the corresponding processing class and thread pool
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if(pair ! =null) {
Runnable run = new Runnable() {
@Override
public void run(a) {
try {
// preprocessing
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
// Core processing method
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
// post-processing
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if(! cmd.isOnewayRPC()) {if(response ! =null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); }}else{}}}catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if(! cmd.isOnewayRPC()) {finalRemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); }}}};// If the rejected request is true
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
// Encapsulates the task to be executed using the thread pool bound to the processor of the current business
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) = =0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if(! cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); }}}else {
String error = " request type " + cmd.getCode() + " not supported";
finalRemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); }}Copy the code
RocketMQ uses a series of design patterns to abstract the message business code in the way it handles request commands, making the code logic of the entire calling method more concise and extensible.
- ProcessorTable: Mapping table of business code with business processor and business thread pool
/** * This container holds all processors per request code, aka, for each incoming request, we may look up the * responding processor in this map to handle the request. */
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
Copy the code
It can be seen that if you want to modify and expand the service, you only need to change the corresponding service processor, and the scalability is very high. Both are encapsulated as RequestTask threads, which are asynchronously executed by the corresponding business thread pool.
conclusion
The core of this article is the in-depth analysis of RocketMQ communication module, from the multi-threaded model, message protocol design and decoding, message communication mode these aspects and combined with the code more in-depth understanding of the whole communication module design and interaction process.
RocketMQ is an excellent message queue framework, but it must be supported by a high-performance communication architecture.
Of course, an efficient network communication architecture, besides excellent communication design, also needs to ensure the stability of communication. For example, how to ensure that the message sent by the client is not lost, the client load balancing, etc., these are explained in detail.
If the content of this article is not understood in place, welcome to leave a message to discuss ~
Ordinary change, will change ordinary
I am a house xiaonian, a low-key young man in the Internet
Welcome to wechat search “house New Year”, click to follow, read more shared good articles