This is the third day of my participation in the August More text Challenge. For details, see:August is more challenging

Having previously mentioned its task scheduling in the simple use of synchronous and asynchronous processes, let’s take a look at the core dispatcher class

OkHTTP Dispatcher

A brief description

We all know that the difference between synchronous and asynchronous method calls is very clear. Execute () is used for synchronous and enqueue is used for asynchronous. Before we do that, we can consider two questions

  1. How does OKHTTP implement synchronous asynchronous request?

This is done primarily through the Dispatcher class, where the status of both synchronous and asynchronous requests being sent is managed in the Dispatcher

  1. What exactly is a Dispatcher?

The role of the Dispatcher is to maintain the state of the request and maintain a pool of threads for executing the request

Dispatcher source implementation

After answering the above question, the Dispatcher maintains the state of our requests (both synchronous and asynchronous), so every network request is sent to the ready request queue via the Call-encapsulated Dispatcher. OkhTTP is much more efficient than maintaining a thread pool internally compared to other networking frameworks

Define the queue

The definitions of these queues have already been mentioned, but it is important to review them here because they are important

/** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
Copy the code
  • First look at readyAsyncCall, which is a ready asynchronous request queue. When our entire asynchronous request does not meet the criteria, it will be placed in the ready asynchronous request queue and cached. When the criteria are met again, it will be placed in the asynchronous request queue for execution

  • Then we have runningAyncCalls, which is a queue of asynchronous requests that are being executed. If you look at its comment, note that it contains requests that have been cancelled but not completed

  • An important thread pool is the executorService, because it is the thread pool that dispatcher uses to maintain asynchronous requests for efficient network requests

With that said, why do asynchronous requests need two queues?

As a producer/consumer model, Dispatcher is the producer, and ExecutorService is the consumer pool. Since the consumer/producer model is a consumer/producer model, there must be two queues to store asynchronous requests that are executed and asynchronous requests that are waiting, and a Deque cache. A Deque is performing a task;

Look at the use of Dispatcher for synchronous and asynchronous requests

Synchronous request (call.execute())
  /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
Copy the code
  • Here we can see that what the Synchronous request Dispatcher does is a very simple addition to the collection
  • Whenever there is a new synchronization request, the Dispatcher will simply add it to the queue that the synchronization is executing
Asynchronous requests (call.enqueue())

The use of the Dispatcher for asynchronous requests is a little more complicated than for synchronous requests

  1. We’ve already requested asynchronous requests through the Enqueue () method of the Dispatcher, which you can see in the source code
  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else{ readyAsyncCalls.add(call); }}Copy the code
  • First meet two conditions (an asynchronous request queue is a running number should be less than the maximum number of requests, one is under the current request queue Host requests must be less than the maximum of we give it a set), can add packaging good AsyncCall to at this time we are performing asynchronous queue, and then to the thread pool, The thread pool here is responsible for creating, destroying, and managing some threads automatically.
  • If the preceding conditions are not met, the device is directly added to the ready asynchronous waiting queue for the cache waiting operation.
  • Asynchronous requests aren’t complicated, but they do help by encapsulating request management with a Dispatcher
  1. Next, let’s look at the execution of the executorService() thread pool
  public synchronized ExecutorService executorService(a) {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher".false));
    }
    return executorService;
  }
Copy the code

This is similar to a common thread pool, except for the first three parameters

  • The first represents the number of core thread pools, set to 0 because after a period of idle time, we need to destroy all the threads we use
  • The second parameter indicates the maximum pool of threads. We set it to the maximum integer value. This is because we can extend the maximum number of threads infinitely when requests come in
  • The third parameter indicates that when we have more threads than the core thread, the maximum idle thread lifetime is 60 seconds

What do you mean by setting these three parameters for a thread pool

In our actual run, we need to enable 20 concurrent requests, at which time the thread pool will create 20 threads, and when the work is done, the thread pool will shut down all the threads after 60 seconds. This is what the three parameters of its thread pool are for

  1. The thread must be removed from the runningAsyncCalls queue after the Call is executed, so when will the thread in the readySsyncCalls queue be executed?

To get back to the source code, look at the AsyncCall that enqueue passed in to encapsulate the request. We already know that it is a Runnable, so it must be in the thread pool when it executes

@Override protected void execute(a) {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this.new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response); }}catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e); }}finally {
        client.dispatcher().finished(this); }}}Copy the code
The finish method in dispatcher

The previous part of this code has been explained in previous articles, but if I’m looking at it from the Dispatcher’s point of view, I can see that there is a finally, which executes with or without exceptions, and that it mainly executes the Finish method in the Dispatcher

 private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if(! calls.remove(call))throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0&& idleCallback ! =null) { idleCallback.run(); }}Copy the code

The first step is to call calls.remove(), which is to remove the asynchronous request we are executing. The second step is promoteCalls(), which is to adjust our task queue. Both asynchronous and synchronous request queues are thread unsafe. So we call this method in a synchronized block of code that determines whether the number of requests being executed is equal to zero and that idleCallback is not allowed to be null, and then continues execution

Look at the promoteCalls() implementation in the Finished method

We know that Finish does the main work, and if you look at the promoteCalls() method, it turns out that this is where our cache is scheduled for the queue of asynchronous requests

  private void promoteCalls(a) {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.}}Copy the code

As you can see, it first iterates through the original asynchronous request queue, calls its remove() to remove the last element, and then joins the asynchronous request queue that is executing, which is what the promoteCalls method does

Stern said

So far, that’s my summary of the task Dispatcher in Okhttp. Next, I’ll look at the interceptors in Okhttp, which are the core part of Okhttp and a powerful mechanism provided by Okhttp

To be continued