The author used dubbo 2.7.8, the latest version, for analysis

background

In microservice systems, besides the processing hardware, the communication framework should be the first to affect the performance.dubboThe underlying default isnettyAs a communication framework, it extends synchronous to asynchronous functions, etc. Today we are going to learn dubbo’s thread pool model, which is divided into the consumer end and the production end. The following is the general thread model of both ends.

Other custom thread pools and boss thread pools in Netty are omitted from the figure.

The model on the production side is simple, the client side is relatively complex, and the call chain is also complex on the client side. Dubbo thread pool to create a model with fixed, cached, Limited, Eager, using fixed fixed number of threads by default.

Thinking about?

  • What is the purpose of thread pool design?
  • What are the advantages of Dubbo designing client thread pools this way?
  • How does the thread pool model relate to exception events, such as timeouts?

Analysis of the

The service side

The server receives the event via Netty’s handler and passes the received information to the business thread pool. In the process of DubboProtocol exposing the service, the variable requestHandler is passed downstream to the Exchangers layer (see the Dubbo layered architecture).

public class DubboProtocol extends AbstractProtocol {
    public static final String NAME = "dubbo";
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter(){
    	public void received(channel, Object message){}
        public CompletableFuture reply(channel, Object message){}
        private void invoke(channel, String methodKey) {}}}private ProtocolServer createServer(URL url) {
     / /...
     Exchangers.bind(url, requestHandler);// Pass the requestHandler as a Netty handler
}

Through layer upon layer / / ChannelEventRunnable. The run () call will call to ExchangeHandler. Received ()org.... transport.dispatcher.all.AllChannelHandler#received()public void received(Channel channel, Object message) throws RemotingException {
// Message is passed to the business thread pool for processing
    ExecutorService executor = getPreferredExecutorService(message);
    executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}
Copy the code

The client

The client business thread calls through layers of Invoker and finally comes to DubboInvoker.

org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
protected Result doInvoke(final Invocation invocation) throws Throwable {
  RpcInvocation inv = (RpcInvocation) invocation;
  ExchangeClient currentClient;
  boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
  int timeout = calculateTimeout(invocation, methodName);
  if (isOneway) {/ / comment 1
      boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
      currentClient.send(inv, isSent);
      return AsyncRpcResult.newDefaultAsyncResult(invocation);
  } else {
      ExecutorService executor = getCallbackExecutor(getUrl(), inv);/ / comment 2CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result =new AsyncRpcResult(appResponseFuture, inv);
      result.setExecutor(executor);
      returnresult; }}Copy the code

Note 1 is the oneway mode, which does not need to care about the oneway smell in RocketMQ. Note 2 captures a thread pool, which is the thread pool corresponding to the red box in the article. Let’s look at the method content

org.apache.dubbo.rpc.protocol.AbstractInvoker#getCallbackExecutor()
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
    ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);/ / comment 1
  if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {/ / comment 2
      return new ThreadlessExecutor(sharedExecutor);
  } else {
      returnsharedExecutor; }}Copy the code

Note 1: Create a shared thread pool. Note 2: Wrap the shared thread pool with the ThreadlessExecutor thread pool wrapper class for synchronous requests. The name of this class makes sense because less reduces the use of threads.

//org.apache.dubbo.common.threadpool.ThreadlessExecutor#execute()
public void execute(Runnable runnable) {
    if(! waiting) { sharedExecutor.execute(runnable); }else {queue.add(runnable);}
}
Copy the code

It’s pretty easy to do ifwaitingVariable istrue(default: true), submit the task to the queue, otherwise submit to the shared thread pool for execution. Else (ChannelEventRunnableThe implementation class.AllChannelHandler is the handler handler used by Netty to receive eventsCONNECTED.DISCONNECTED.RECEIVED.CAUGHTThey are connect, disconnect, receive read, and exception callback.

@Override
public void connected(Channel channel) throws RemotingException {
    ExecutorService executor = getExecutorService();// The shared thread pool obtained directly
    executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
}
public void disconnected(Channel channel) throws RemotingException {
    ExecutorService executor = getExecutorService();// The shared thread pool obtained directly
    executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
	// Get the less thread pool
    ExecutorService executor = getPreferredExecutorService(message);
    executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}
Copy the code

Of these four, only RECEIVED events are actually submitted to the less thread pool, so we focus on whether the thread is submitted to the queue or to the shared thread pool when submitting the thread. We pull the timeline to before sending the request. After AsyncToSyncInvoker invokes () and sends the request, This is followed by get().

public Result invoke(Invocation invocation) throws RpcException {
  Result asyncResult = invoker.invoke(invocation);
  if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
      asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);//}}public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if(executor ! =null && executor instanceof ThreadlessExecutor) {
        ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
        threadlessExecutor.waitAndDrain();// Note 1, emphasis
    }
    return responseFuture.get(timeout, unit);
}
Copy the code

The get() method determines if the object holds a thread pool and is less, and executes waitAndDrain(), which has the answer.

public void waitAndDrain(a) throws InterruptedException {
    if (finished) {
        return;
    }
    Runnable runnable = queue.take();/ / comment 1
    synchronized (lock) {
        waiting = false;/ / comment 2
        runnable.run();
    }
    runnable = queue.poll();
    while(runnable ! =null) {
        runnable.run();
        runnable = queue.poll();
    }
    finished = true;
}
Copy the code

Note 1 calls the queue’s take() method, which blocks and is suspended if there are no tasks in the queue. Note that the business thread is park, Will be until the time of netty worker thread calls AllChannelHandler. Received () the task submitted to less threads, put in the queue, the business will be unpark thread, This becomes the business thread that executes the ChannelEventRunnable run() method, which decodes the message, processes the return, and so on.

Note 2 that waiting is set to false when waitAndDrain() processes only one request, meaning that any subsequent work submitted to the less thread pool will be distributed to the shared thread pool rather than to the queue.

org... support.header.HeaderExchangeHandler#handleResponse()static void handleResponse(Channel channel, Response response) {
  if(response ! =null&&! response.isHeartbeat()) { DefaultFuture.received(channel, response); }}public static void received(Channel channel, Response response, boolean timeout) {
  DefaultFuture future = FUTURES.remove(response.getId());
  if(future ! =null) {
      Timeout t = future.timeoutCheckTask;
      if(! timeout) { t.cancel(); } future.doReceived(response);/ / comment 1
  }
  CHANNELS.remove(response.getId());
}
Copy the code

Comment 1 finally calls the doReceived of the currently requested future and lets the future complete so that when the business thread executes responseFuture.get(timeout, unit) here, the current future is complete and does not block again. So looking back, after receiving the read event from the Netty worker thread, all processing is handed over to the business thread itself, which reduces thread switching and optimizes the thread pool.

When do you commit back to the shared thread pool

I’m going to take you through an example where if the client times out, it will execute a timer in the time wheel, which is in DefaultFuture.

private static class TimeoutCheckTask implements TimerTask {
  public void run(Timeout timeout) {
      DefaultFuture future = DefaultFuture.getFuture(requestID);
      if(future.getExecutor() ! =null) {
          future.getExecutor().execute(() -> notifyTimeout(future));/ / comment 1
      } else{ notifyTimeout(future); }}}Copy the code

Note 1: a timeout task will be submitted to the shared thread pool when the server actually returns the data.

conclusion

Dubbo’s thread pool design is very subtle, which can be used for reference and absorption in many business scenarios.