The purpose of this article

1. Sort out the overall flow of OKHTTP

2. Comparison of Java and Kotlin versions (Java version 3.14.x)

3. Process combing is all in the Java version, kotlin as a comparison

Equipment requirements

OkHttp works on Android 5.0+ (API level 21+) and Java 8+.

It must be on Android5.0 or higher and the JDK version is jdk8

Using the process

  • Introduction of depend on

    // The new library is written in kotlin language
    
    implementation("Com. Squareup. Okhttp3: okhttp: 4.9.1." ")
    Copy the code
  • Send the request

    Take the GET request as an example

    1. Create an OkhttpClient client object

      OkHttpClient client = new OkHttpClient();
      Copy the code
    2. Create a Request object (for asynchronous use)

      Request request = new Request.Builder()
            .url(url)
            .build();
      Copy the code
    3. Perform the requested

      // Synchronize the request
      Response response = client.newCall(request).execute();
      Copy the code
      // Asynchronous request
      client.newCall(request).enqueue(new Callback() {
          @Override 
          public void onFailure(Call call, IOException e) {}@Override 
          public void onResponse(Call call, Response response) throws IOException {}Copy the code

Source code comb (Java & Kotlin)

About The Intercept

Recognition interceptor

The following interceptors are in ascending order

  1. User – defined Interceptor :(inherits the Interceptor interface and implements the intercept method)

  2. RetryAndFollowUpInterceptor: failure retry and redirect the interceptor

  3. BridgeInterceterptor: Bridge interceptor (add necessary headers when requesting, remove necessary headers when receiving a response)

  4. CacheInterceptor: CacheInterceptor

    1. Obtain response from cache according to request

    2. Confirm that request determines the cache policy, whether network, cache, or both are used

    3. Invoke the next interceptor and decide to get a response from the network

    4. If cacheResponse already exists, compare it with networkResponse to determine whether to update the cached cacheResponse

    5. Cache uncached Response

    The cache interceptor determines whether a cache is available based on the information about the request and the cached response. If one is available, it returns the cache to the user, otherwise it continues to retrieve the response from the server in chain of responsibility mode. When the response is retrieved, it is cached to disk

  5. ConnectionInterceptor: ConnectionInterceptor

    1. Determine whether the current connection is available: Whether the stream has been closed and is restricted from creating new streams;

    2. If the current connection is not available, obtain a connection from the connection pool.

    3. If no available connection is found in the connection pool, create a new connection, shake hands with it, and add it to the connection pool

  6. NetworkInterceptors: NetworkInterceptors (set when configuring OkHttpClient)

  7. CallServerInterceptor: Request interceptor (responsible for sending request data to and reading response data from the server)

The core
// Call the next interceptor in the chain.
// Call the next interceptor in the chain
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);    / / (1)
    Interceptor interceptor = interceptors.get(index);     / / (2)
    Response response = interceptor.intercept(next);    / / (3)
Copy the code

1. Instantiate the RealIterceptorChain object corresponding to the next interceptor, which will be passed to the current interceptor

  1. Get the current interceptor: Interceptors are the ArryList of interceptors

  2. Calls the intercept() method of the current interceptor and passes the RealIterceptorChain object of the next interceptor

  3. In addition to user Settings in the client interceptor, the first call is retryAndFollowUpInterceptor

summary

1. Interceptors use the chain of responsibility design pattern, which passes requests down layer by layer until one layer can get a Response

2. The response is then passed to the interceptors above, and the interceptors do some processing with respOne. Finally, the response is passed to the RealCall class and executed

In short: Each interceptor corresponds to a RealInterceptorChain, and each interceptor generates the next RealInterceptorChain until the List iteration is complete, as shown below

About Dispatch

Before we begin, we have a brief introduction to the Dispatcher 👇👇👇

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

/** Ready async calls in the order they'll be run. */
   // Asynchronous request queue in preparation
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  // Asynchronous request running
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); 

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  // Synchronize the request
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); 
  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher(a) {}public synchronized ExecutorService executorService(a) {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher".false));
    }
    returnexecutorService; }... }Copy the code

1. There is a maximum number of requests: 64

2. There is a maximum number of requested hosts: 5

3. There is a lazy-loading thread pool, which is created when executorService() is executed

4. There are three queue (asynchronous requests in preparing | running asynchronous request | synchronous request)

1. Create OkhttpClient

OkHttpClient client = new OkHttpClient()
Copy the code

There is no difference between Java and Kotlin in this section, both use Builder mode, and the configurable parameters in Builder are the same, right

public OkHttpClient(a) {
    this(new Builder());
}

OkHttpClient(Builder builder) {
    ....
}

public static final class Builder {
    Dispatcher dispatcher;/ / dispenser
    @Nullable Proxy proxy;
    List<Protocol> protocols;
    List<ConnectionSpec> connectionSpecs;// Transport layer version and connection protocol
    final List<Interceptor> interceptors = new ArrayList<>();/ / the interceptor
    final List<Interceptor> networkInterceptors = new ArrayList<>();
    EventListener.Factory eventListenerFactory;
    ProxySelector proxySelector;
    CookieJar cookieJar;
    @Nullable Cache cache;
    @Nullable InternalCache internalCache;// Internal cache
    SocketFactory socketFactory;
    @Nullable SSLSocketFactory sslSocketFactory;// Socket factory for HTTPS
    @Nullable CertificateChainCleaner certificateChainCleaner;// Verify that the response certificate applies to the host name of the HTTPS request connection.
    HostnameVerifier hostnameVerifier;// Verify that the response certificate applies to the host name of the HTTPS request connection.
    CertificatePinner certificatePinner;// Certificate locking, using a CertificatePinner to constrain which certification authorities are trusted.
    Authenticator proxyAuthenticator;// Proxy authentication
    Authenticator authenticator;// Authentication
    ConnectionPool connectionPool;/ / the connection pool
    Dns dns;
    boolean followSslRedirects; // SSL redirection
    boolean followRedirects;// Local redirect
    boolean retryOnConnectionFailure;// Retry connection failed
    int callTimeout;
    int connectTimeout;
    int readTimeout;
    int writeTimeout;
    int pingInterval;

    // Here are the build parameters configured by default
    public Builder(a) {
        dispatcher = newDispatcher(); protocols = DEFAULT_PROTOCOLS; connectionSpecs = DEFAULT_CONNECTION_SPECS; . }// Pass in your own build parameters
    Builder(OkHttpClient okHttpClient) {
        this.dispatcher = okHttpClient.dispatcher;
        this.proxy = okHttpClient.proxy;
        this.protocols = okHttpClient.protocols;
        this.connectionSpecs = okHttpClient.connectionSpecs;
        this.interceptors.addAll(okHttpClient.interceptors);
        this.networkInterceptors.addAll(okHttpClient.networkInterceptors); . }Copy the code
open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {

  / /...
  constructor() : this(Builder())

 / /...
  internal constructor(okHttpClient: OkHttpClient) : this() {
      this.dispatcher = okHttpClient.dispatcher
      this.connectionPool = okHttpClient.connectionPool
      this.interceptors += okHttpClient.interceptors
      this.networkInterceptors += okHttpClient.networkInterceptors
      this.eventListenerFactory = okHttpClient.eventListenerFactory
      this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure
      this.authenticator = okHttpClient.authenticator
      this.followRedirects = okHttpClient.followRedirects
      this.followSslRedirects = okHttpClient.followSslRedirects
      this.cookieJar = okHttpClient.cookieJar
      this.cache = okHttpClient.cache
      this.dns = okHttpClient.dns
      this.proxy = okHttpClient.proxy
      this.proxySelector = okHttpClient.proxySelector
      this.proxyAuthenticator = okHttpClient.proxyAuthenticator
      this.socketFactory = okHttpClient.socketFactory
      this.sslSocketFactoryOrNull = okHttpClient.sslSocketFactoryOrNull
      this.x509TrustManagerOrNull = okHttpClient.x509TrustManager
      this.connectionSpecs = okHttpClient.connectionSpecs
      this.protocols = okHttpClient.protocols
      this.hostnameVerifier = okHttpClient.hostnameVerifier
      this.certificatePinner = okHttpClient.certificatePinner
      this.certificateChainCleaner = okHttpClient.certificateChainCleaner
      this.callTimeout = okHttpClient.callTimeoutMillis
      this.connectTimeout = okHttpClient.connectTimeoutMillis
      this.readTimeout = okHttpClient.readTimeoutMillis
      this.writeTimeout = okHttpClient.writeTimeoutMillis
      this.pingInterval = okHttpClient.pingIntervalMillis
      this.minWebSocketMessageToCompress = okHttpClient.minWebSocketMessageToCompress
      this.routeDatabase = okHttpClient.routeDatabase
    }
}

Copy the code

2. Execute the request

A synchronous request
OkHttpClient client = new OkHttpClient();
// Synchronize the request
Response response = client.newCall(request).execute();
Copy the code
The whole process

After creating the OkHttpClient object, the internal newCall() method is called and the final request is handed over to RealCall’s execute() method, which processes it internally

1. Ensure that the Call method is executed only once (see below for version differences)

2. Notify the Dispatcher to enter the execution state

3. Through a series of interceptor request processing and response processing to get the final result

4 Tell dispatcher that the execution has been completed

Java version
/**
* Prepares the {@code request} to be executed at   some point in the future.
*/
@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}

// RealCall is the real request enforcer
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
}


Copy the code
 private boolean executed;
@Override public Response execute(a) throws IOException {
    synchronized (this) {
        // Each Call can be executed only once
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    captureCallStackTrace();
    timeout.enter();
    eventListener.callStart(this);
    try {
        // Notify the dispatcher that the dispatcher has entered the execution state
        client.dispatcher().executed(this);
        // Through a series of interceptor request processing and response processing to get the final return result
        Response result = getResponseWithInterceptorChain();
        if (result == null) throw new IOException("Canceled");
        return result;
    } catch (IOException e) {
        e = timeoutExit(e);
        eventListener.callFailed(this, e);
        throw e;
    } finally {
        // Notify the dispatcher that it has completed its execution
        client.dispatcher().finished(this); }}Copy the code
Response getResponseWithInterceptorChain(a) throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    // Interceptors set when configuring OkHttpClient;
    interceptors.addAll(client.interceptors());
    // Take care of failure retries and redirects
    interceptors.add(retryAndFollowUpInterceptor);
    // Add some necessary headers when requesting and remove the necessary headers when receiving the response
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    // Update the cache
    interceptors.add(new CacheInterceptor(client.internalCache()));
    // Establish a connection with the server
    interceptors.add(new ConnectInterceptor(client));
    if(! forWebSocket) {// networkInterceptors set when configuring OkHttpClient
        interceptors.addAll(client.networkInterceptors());
    }
    // Send request data to the server and read response data from the server
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null.null.null.0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    // Use responsibility chain mode to open chain calls
    return chain.proceed(originalRequest);
}

// StreamAllocation object, which acts as a management class and maintains server connections and concurrent streams
This class also initializes a Socket connection object to get an input/output stream object.
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {...// Call the next interceptor in the chain.
    // instantiate the RealIterceptorChain corresponding to the next interceptor
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    // Get the current interceptor
    Interceptor interceptor = interceptors.get(index);
    // Call the intercept() method of the current interceptor and pass the RealIterceptorChain object of the next interceptor to get the responseResponse response = interceptor.intercept(next); .return response;
Copy the code
Kotlin version
// NewCall is actually implemented in the RealCall class
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

Copy the code

RealCall.kt

private val executed = AtomicBoolean()

override fun execute(a): Response {
    check(executed.compareAndSet(false.true)) { "Already Executed" }
    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)}}Copy the code
  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(a): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if(! forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket)val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")}return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if(! calledNoMoreExchanges) { noMoreExchanges(null)}}}Copy the code

In the Java version, the synchronized keyword is used to ensure thread-safety, and executed only once

The Synchronized keyword is removed directly from kotlin, the executed field is set to atomically Boolean, and a CAS action is used to ensure that it has been executed

An asynchronous request
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
    .url(url)
    .build();
client.newCall(request).enqueue(new Callback() {
    @Override 
    public void onFailure(Call call, IOException e) {}@Override 
    public void onResponse(Call call, Response response) throws IOException {}Copy the code
The whole process
  • Create a Request object using the Request builder pattern, then call the newCall() method inside OkHttpClient, passing the final Request to RealCall’s enQueue () method for logical processing inside the method

Go directly to the RealCall code

Java version
public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
Copy the code

Let’s look at the code that passes the current request to the enqueue() method in the Dispatcher interceptor after we’ve seen above that the request will only be executed once by locking it

Dispatcher.java

void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);
      if(! call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host());if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
  }
Copy the code

The method in the interceptor first adds this request to the queue readyAsyncCalls. How do I know it’s a queue? Do you remember what I said above? 👆 👆 👆

Add to the queue and then execute the **promoteAndExecute()** method

 private boolean promoteAndExecute(a) {
    assert(! Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Determine whether the number of requests is greater than the maximum number
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Check whether the number of hosts has reached the maximum.

        // If the number of runningAsyncCalls is insufficient and the number of hosts occupied by calls is less than the maximum, the call is added to runningAsyncCalls.
    // Use the thread pool to execute the call or add the call to readyAsyncCalls.
        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
     
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }
Copy the code

The call is added to the thread pool for execution. Now look at the code for AsynCall, which is an inner class in RealCall

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host(a) {
      return originalRequest.url().host();
    }

    Request request(a) {
      return originalRequest;
    }

    RealCall get(a) {
      return RealCall.this;
    }

    /**
     * Attempt to enqueue this async call on {@code    executorService}. This will attempt to clean up
     * if the executor has been shut down by reporting    the call as failed.
     */
    void executeOn(ExecutorService executorService) {
      assert(! Thread.holdsLock(client.dispatcher()));boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        eventListener.callFailed(RealCall.this, ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if(! success) { client.dispatcher().finished(this); // This call is no longer running!}}}@Override protected void execute(a) {
      boolean signalledCallback = false;
      timeout.enter();
      try {
        // As with synchronous execution, this is the end of the call
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this.new   IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response); }}catch (IOException e) {
        e = timeoutExit(e);
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e); }}finally {
        client.dispatcher().finished(this); }}}Copy the code
Kotlin version
override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false.true)) { "Already Executed" }
    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
Copy the code
 internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)
      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if(! call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)
        if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
Copy the code
 private fun promoteAndExecute(a): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }
Copy the code
 inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    @Volatile var callsPerHost = AtomicInteger(0)
      private set

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }

    val host: String
      get() = originalRequest.url.host

    val request: Request
        get() = originalRequest

    val call: RealCall
        get() = this@RealCall

    /** * Attempt to enqueue this async call on [executorService]. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */
    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if(! success) { client.dispatcher.finished(this) // This call is no longer running!}}}override fun run(a) {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if(! signalledCallback) {val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)}}}}Copy the code

AysncCall in the execute () method, the same is through the Response the Response = getResponseWithInterceptorChain (); To get response, so the asynchronous task also passes

conclusion

First of all, the request to initialize a instance of the Call and then execute it * * the execute () method or the enqueue () method, internal finally will be executed to getResponseWithInterceptorChain () * * method,

This approach uses the chain of responsibility pattern of interceptors, In turn after a user-defined ordinary interceptors, retry the interceptor (RetryAndFollowUpInterceptor), bridging the interceptor (BridgeInterceptor), cache blocker (BridgeInterceptor), connecting the interceptor (CallServerInte Rceptor and user-defined network interceptors and access server interceptors, and finally the response results obtained to the user

Finally, the above

Link interceptor

The two most important interceptors in Okhttp, the cache interceptor and the connection interceptor, are briefly described 👆👆👆 at the beginning of this article

Now, another important interceptor

Java version
@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    booleandoExtensiveHealthChecks = ! request.method().equals("GET");
    // HttpCodec is an abstraction of HTTP protocol operations. There are two implementations: Http1Codec and Http2Codec, which, as the name implies, correspond to HTTP/1.1 and HTTP/2 versions, respectively. Reuse of connection pools is implemented inside this method
    HttpCodec httpCodec = streamAllocation.newStream(client, chain,     doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

Copy the code

When the newStream() method of streamAllocation is called, a series of judgments arrive at the findConnection() method of the streamAllocation class

private RealConnection findConnection(int   connectTimeout, int readTimeout, int writeTimeout,
    int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {...// Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new streams.
      // Try to use the allocated connection. The allocated connection may have been restricted to create a new stream
      releasedConnection = this.connection;
      // Release resources from the current connection. If the connection has been restricted to create new streams, return a Socket to close the connection
      toClose = releaseIfNoNewStreams();
      if (this.connection ! =null) {
        // We had an already-allocated connection and it's good.
        result = this.connection;
        releasedConnection = null;
      }
      if(! reportedAcquired) {// If the connection was never reported acquired, don't report it as released!
        // If the connection has never been marked as acquired, do not mark it as published and reportedAcquired is modified through acquire()
        releasedConnection = null;
      }
    
      if (result == null) {
        // Attempt to get a connection from the pool.
        // Try to get a connection from the connection pool
        Internal.instance.get(connectionPool, address, this.null);
        if(connection ! =null) {
          foundPooledConnection = true;
          result = connection;
        } else{ selectedRoute = route; }}}// Close the connection
    closeQuietly(toClose);
    
    if(releasedConnection ! =null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if(result ! =null) {
      // If we found an already-allocated or pooled connection, we're done.
      // If a connection has been retrieved from the connection pool, it is returned
      return result;
    }
    
    // If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null| |! routeSelection.hasNext())) { newRouteSelection =true;
      routeSelection = routeSelector.next();
    }
    
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");
    
      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
         // Get a link from the connection pool based on a list of IP addresses
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i);// Get a connection from the connection pool
          Internal.instance.get(connectionPool, address, this, route);
          if(connection ! =null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break; }}}if(! foundPooledConnection) {if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }
    
        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        // If there is no connection in the pool, create a new connection and assign it so that we can terminate before the handshake
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false); }}// If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
    // If we find a pool connection the second time, we return it
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
     // Perform TCP/TLS handshake
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
      connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // Pool the connection.
      // Put the connection into the connection pool
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      // If another multiplexed connection to the same address is created at the same time, release this connection and retrieve that connection
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
}
Copy the code

Through the above source code can also prove that we began the conclusion is correct, add the connection pool inside, in simple terms, the connection multiplexing eliminates the TCP and TLS handshake process, because the establishment of the connection itself is also need to consume time, connection multiplexing can improve the efficiency of network access

Finally, what ConnectionPool does

public final class ConnectionPool {
private final Deque<RealConnection> connections = new ArrayDeque<>();
  / /...
   void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if(! cleanupRunning) { cleanupRunning =true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
  
  private final Runnable cleanupRunnable = () -> {
    while (true) {
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (ConnectionPool.this) {
          try {
            ConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };
}
Copy the code

When you create a new connection, you need to put it in the cache on the one hand and clean the cache on the other. In ConnectionPool, when we cache a connection to the ConnectionPool, we simply call the add() method of the two-end queue to add it to the two-end queue, and the operation of cleaning the connection cache is left to the thread pool to perform periodically

Kotlin version
object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}
Copy the code

You know what

1. Why lock in Java version, kotlin uses atomic property value and CAS operation?

2. Why store data in queues? Is a linked list ok?

3. How does the interceptor work, and how does it pass and respond to data?

4. How do I customize interceptors? How do I add a configuration?