1. Introduction

While the previous article looked at the Provider threading model, this article starts with the Consumer threading model, and there are many similarities between the two.

The IO thread is responsible for establishing connections and reading and writing IO data to the server. The business thread handles Body deserialization and should also include the logic for the server to call back to the client. Compared to the server side, the business thread on the client side does little but parse the response.

When a Consumer creates a NettyClient, it also wraps ChannelHandler, which includes thread-dispatch capabilities. This means that the Consumer has the same thread-dispatch capabilities as the Provider, and the policy can be customized.

2. IO thread

As analyzed in the previous article, when a Consumer references a remote service, it creates a RegistryDirectory, goes to the registry to subscribe to the service, gets the ProviderUrls, and converts the ProviderUrls into the corresponding Invoker via Protocol. Take DubboInvoker as an example. When DubboInvoker objects are created, Dubbo creates an ExchangeClient client. ExchangeClient establishes a connection with the server for sending subsequent RPC call requests.

By default, a Consumer creates only one ExchangeClient instance for the same Provider node and establishes a long connection with the server, which is shared by all. Of course, you can also establish multiple shared connections. Set the parameter shareConnections. Dubbo also supports the establishment of additional exclusive connections for a Service. The connections parameter is set, with unmatched or 0 indicating that a shared connection is used. In general, use the default shared connection and you are not advised to change this configuration unless network transport is a performance bottleneck.

private ExchangeClient[] getClients(URL url) {
    // Whether to use shared connection
    boolean useShareConnect = false;
    If the number of connections =0, the shared connection is used; otherwise, an exclusive connection is created
    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    List<ReferenceCountExchangeClient> shareClients = null;
    if (connections == 0) {
        useShareConnect = true;
        // If a shared connection is used, the same batch of clients are shared for the same address
        String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
        shareClients = getSharedClient(url, connections);
    }
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (useShareConnect) {
            clients[i] = shareClients.get(i);
        } else {
            // Initialize the clientclients[i] = initClient(url); }}return clients;
}
Copy the code

The initClient() method creates the client and establishes a connection with the server. In this example, NettyClient creates Bootstrap, binds the EventLoopGroup, and then establishes a connection with the server.

private static final EventLoopGroup NIO_EVENT_LOOP_GROUP = eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker");

bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
Copy the code

NIO_EVENT_LOOP_GROUP is static and can be thought of as a thread pool with DEFAULT_IO_THREADS, which has a value of CPU cores +1 but not more than 32. The EventLoopGroup is shared by all nettyclients, meaning that no matter how many remote nodes a Consumer communicates with, it will create at most one IO thread for DEFAULT_IO_THREADS.

At this point, the IO thread has been created, which is mainly responsible for establishing connections with the server, encoding and decoding messages, Body serialization, heartbeat processing and other operations.

3. Service threads

The Consumer business thread is created in the NettyClient constructor, and the parent constructor calls initExecutor() to initialize the business thread pool, which by default uses the cached thread pool.

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    // Initialize the thread pool
    initExecutor(url);
    / / create the Bootstrap
    doOpen();
    // Connect to remote
    connect();
}
Copy the code

Rely on DefaultExecutorRepository Dubbo create a thread pool class, it will create good thread pool cache to Map containers, the Key is the remote service Port Port, this means that different Port will create a thread pool alone. I’m a little confused here, why create a separate thread pool? Address should be used as the Key for thread isolation, and the maximum number of cached threads should be set. By default, the number of cached threads in the cached thread pool is unlimited. Using CachedThreadPool as an example, here is the code to create a thread pool:

public Executor getExecutor(URL url) {
    // Thread name prefix
    String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
    // Number of core threads
    int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
    // Maximum number of threads
    int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
    // The number of queues defaults to 0
    int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
    // The thread active time is 60 seconds by default
    int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
    return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                                  queues == 0 ? new SynchronousQueue<Runnable>() :
                                  (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                   : new LinkedBlockingQueue<Runnable>(queues)),
                                  new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
Copy the code

The business thread on the client side does much less than the server side, mainly deserializing the data returned by the server side. Then Dubbo’s advanced “parameter callback” feature, where the server can call back the client’s logic, should also be executed on the business thread.

4. Dispatcher

The Consumer sends the RPC call request by IO thread. After receiving the data from the server, the client decodes the message to get the Response, and then sends the message to the business thread pool. This operation is completed by the Dispatcher. The Dispatcher interface definition is very simple. It does not have the ability of thread dispatch itself. Instead, it loads the ChannelHandler with the ability of thread dispatch through SPI adaptive mode, and then hands it to process.

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
    ChannelHandler dispatch(ChannelHandler handler, URL url);
}
Copy the code

When was ChannelHandler created with thread-dispatch capability? The answer is in the NettyClient constructor, which wraps ChannelHandler.

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                                        .getAdaptiveExtension().dispatch(handler, url)));
}
Copy the code

Dubbo provides five thread dispatch policies. The default policy is ALL, where all messages are dispatched to the business thread pool for execution.

strategy instructions
all All messages are dispatched to the thread pool, including requests, responses, connection events, disconnect events, and so on
direct None of the messages are dispatched to the thread pool and are executed directly on the IO thread
message Only request and response messages are dispatched to the thread pool; all other messages are executed on the IO thread
execution Only the request message is dispatched to the thread pool; all other messages are executed on the IO thread
connection On an IO thread, disconnection events are queued, executed one by one, and other messages are dispatched to the thread pool

The AllChannelHandler, for example, creates a ChannelEventRunnable for all events and submits them to the business thread pool for execution through an asynchronous task. At this point, the I/O thread finishes its work. Here is the received() method code:

public void received(Channel channel, Object message) throws RemotingException {
    // Get the business thread pool
    ExecutorService executor = getPreferredExecutorService(message);
    try {
        // Submit the asynchronous task, processing the message IO thread ends here
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); }}Copy the code

The packaging process for ChannelHandler on the Dubbo Consumer side is as follows:

NettyClientHandler#channelRead
>>MultiMessageHandler#received
>>>>HeartbeatHandler#received
>>>>>>AllChannelHandler#received
>>>>>>>>DecodeHandler#received
>>>>>>>>>>HeaderExchangeHandler#received
Copy the code

We now know that before AllChannelHandler, messages were processed on the IO thread, and then on the business thread.

5. To summarize

The Consumer server is also divided into IO threads and business threads. The IO thread is responsible for establishing connections with the server, IO reading and writing, message encoding and decoding, and heartbeat processing. For Netty, all connections share the same EventLoopGroup. The default thread count is CPU core +1 but not more than 32. The default business thread pool uses the cached thread pool. There is no limit to the number of threads, but the good news is that the business thread on the client side is not doing much. The Consumer sends an RPC Request, the Request object is encoded and sent by the IO thread, and the server responds to the result. The IO thread receives the data and decodes it into a Response object, which is then handed to the AllChannelHandler, which sends the Response to the business thread. The business thread deserializes the Result in Response and finally returns the Result to the calling thread.