Source code for OkHttp request flow (OkHttp version 4.9.1)

Okhttp3 version of the student I before the source code analysis (not completed), the whole or Java implementation, from okhttp4 after the version began to use Kotlin to write, the overall framework process is roughly the same, but there are some changes, let’s look at what they will have the same fireworks

Simple use of synchronous and asynchronous requests (with Get requests as an example)

 // The following are Get requests
    // Synchronize the request
    private fun excuteTest(a) {

        Create an okhttpClient
        val okHttpClient = OkHttpClient()
        2. Create a Request object
        val request = Request.Builder()
            .url(BASE_URL)
            .build()

        //3. Return Response
        okHttpClient.newCall(request).execute()
    }


    // Asynchronous request
    private fun asyncTest(a) {

        //1. Create okhttpClient as well
        val okHttpClient = OkHttpClient()
        2. Create a Request object
        val request = Request.Builder()
            .url(BASE_URL)
            .build()

        // Call the enqueue method
        okHttpClient.newCall(request).enqueue(object :Callback{
            // Failed callback
            override fun onFailure(call: Call, e: IOException) {}// Successful callback
            override fun onResponse(call: Call, response: Response) {}})}Copy the code

Create the Okhttpclient client

val okHttpClient = OkHttpClient()
constructor() : this(Builder())
Copy the code

As you can see, OKHttp is created in Builder mode. In general, it is recommended to use Builder mode if you have a lot of parameters to carry. See the following configuration parameters for Builder, and you can see that a lot of parameters are built


// Build parameters configured by default
class Builder constructor(a){
	// Dispenser (core)
    internal var dispatcher: Dispatcher = Dispatcher()
    / / the connection pool
    internal var connectionPool: ConnectionPool = ConnectionPool()
    // Interceptor (core)
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    // Network interceptor
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
    internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
    // Retry link failed
    internal var retryOnConnectionFailure = true
    internal var authenticator: Authenticator = Authenticator.NONE
    // Local redirection. Default is true
    internal var followRedirects = true
    // SSL redirection
    internal var followSslRedirects = true
    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
    internal var cache: Cache? = null
    internal var dns: Dns = Dns.SYSTEM
    internal var proxy: Proxy? = null
    internal var proxySelector: ProxySelector? = null
    // Proxy authentication
    internal var proxyAuthenticator: Authenticator = Authenticator.NONE
    internal var socketFactory: SocketFactory = SocketFactory.getDefault()
    // Socket factory for HTTPS
    internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
    internal var x509TrustManagerOrNull: X509TrustManager? = null
    // Transport layer version and transport protocol (use default here)
    internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
    internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
    // Certificate locking, using a CertificatePinner to constrain which certification authorities are trusted
    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
    // Verify that the response certificate applies to the host name of the HTTPS request connection.
    internal var certificateChainCleaner: CertificateChainCleaner? = null
    internal var callTimeout = 0
    internal var connectTimeout = 10 _000
    internal var readTimeout = 10 _000
    internal var writeTimeout = 10 _000
    internal var pingInterval = 0
    internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
    internal var routeDatabase: RouteDatabase? = null

    // You can pass in your own configured build parameters
    internal constructor(okHttpClient: OkHttpClient) : this(a) {...// This is the same as the above configuration
    }
Copy the code

Synchronous request flow

okHttpClient.newCall(request).execute()

Seeing that newCall is just an interface, RealCall is the real request enforcer

 override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
Copy the code

Implementation of the excute() method in RealCall

override fun execute(a): Response {
    check(executed.compareAndSet(false.true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)}}Copy the code
  • The check() method checks to see if the Call was Executed. If it was, an exception Already Executed will be thrown, meaning that each Call can only be Executed once

  • Just look at the try/catch and tell the dispatcher it’s executing

  • Call getResponseWithInterceptorChain (), it is through a series of interceptor request processing and response processing to get the final returns the result

  • Whenever fianlly notifies the Dispatcher that it has completed its task, it calls the Dispatcher’s Finish method

private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if(! calls.remove(call))throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    val isRunning = promoteAndExecute()

    if(! isRunning && idleCallback ! =null) {
      idleCallback.run()
    }
  }
Copy the code

The finish method determines whether there are any more requests in the running queue and executes idleCallback’s run method if the conditions are met

GetRepsonseInterceptorChain () method

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(a): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    // The interceptors set in the OkttpClient configuration
    interceptors += client.interceptors
    // Take care of failure retries and redirects
    interceptors += RetryAndFollowUpInterceptor(client)
    // When requesting, do some receiving of the necessary headers, and remove the necessary headers if necessary
    interceptors += BridgeInterceptor(client.cookieJar)
    // Update the cache
    interceptors += CacheInterceptor(client.cache)
    // Be responsible for establishing a link with the server
    interceptors += ConnectInterceptor
    if (! forWebSocket) {
    	// Configure the network interceptor set by okhttpClient
      interceptors += client.networkInterceptors
    }
    // Send request data to the server and read response data from the server
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    // Use responsibility chain mode to open chain calls
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")}return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if(! calledNoMoreExchanges) { noMoreExchanges(null)}}}Copy the code

In summary, getRepsonseInterceptorChain () mainly realized the two things, first thing create interceptor collection, and add a an interceptor, the second thing is call RealInterceptorChain method of proceed, Use chain of responsibility mode to open the chain call to get the actual complete return

Proceed () method implementation

 @Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if(exchange ! =null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// Call the next interceptor in the chain.\
    // Instantiate the RealInterceptorChain object corresponding to the next interceptor
    val next = copy(index = index + 1, request = request)
    // Get the current interceptor
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    // Call the current interceptor method and pass the RealInterceptorChain object of the next interceptor
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if(exchange ! =null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"} } check(response.body ! =null) { "interceptor $interceptor returned a response with no body" }

    return response
  }
Copy the code

The proceed method instantiates the corresponding RealInterceptorChain object of the next interceptor, obtains the current interceptor, and calls the current interceptor’s interceptor method to pass in the next interceptor. In this way, we actually implement the chain call, which returns the actual response, response

Asynchronous request flow

The previous steps are consistent with the synchronous request, starting with the enqueue method of the Dispatcher

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      // Change AsyncCall so that it shares the existing AtomicInteger running the call to the same host.
      if(! call.call.forWebSocket) { val existingCall = findExistingCallWithHost(call.host)if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
Copy the code

Here’s a quick look at the three queues in OKHTTP (asynchronous request queue in preparation, asynchronous request queue in running, synchronous request queue)

  /** Ready async calls in the order they'll be run. */

  // Ready asynchronous queue
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  // A running asynchronous request queue
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  // Synchronize the queue
  private val runningSyncCalls = ArrayDeque<RealCall>()
Copy the code

As you can see in the enQueue method, you first place all call requests into the ready asynchronous request queue in the synchronized keyword block, and then execute the promoteAndExcute() method

The promoteAndExcute() method

private fun promoteAndExecute(a): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        // If the number of runningAsyncCalls is insufficient and the number of hosts occupied by calls is less than the maximum, the call is added to runningAsyncCalls.
        // Execute call with thread pool; Otherwise, add call to readyAsyncCalls.
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }
Copy the code

In simple terms, this string of code is put back to determine whether the number of asynchronous requests currently running is greater than zero based on a series of conditions, in other words, whether there are asynchronous requests currently running; In this method, if runningAsyncCalls are not satisfied, the call request is placed in the asynchronous request queue that is being executed, and then the call is executed using the thread pool, otherwise it is placed in the wait-ready queue

See AysncCall

internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    @Volatile var callsPerHost = AtomicInteger(0)
      private set

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }

    val host: String
      get(a) = originalRequest.url.host

    val request: Request
        get(a) = originalRequest

    val call: RealCall
        get(a) = this@RealCall

    /** * Attempt to enqueue this async call on [executorService]. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */
    / / thread pool
    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if(! success) { client.dispatcher.finished(this) // This call is no longer running!}}}override fun run(a) {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
        	/ / and synchronous execution, will eventually call getResponseWithInterceptorChain () method
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if(! signalledCallback) { val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)}}}}Copy the code

As you can see, AsyncCall inherits the Runnable, through the thread pool to manage the asynchronous task, the final and synchronous request will be called getResponseWithInterceptorChain () method

Interceptor resolution

To sum up, in the interceptor chain processing, help us make the five steps to intercept OKHttp default processing, including RetryAndFollowInterceptor, BridgeInterceptor, CacheInterceptor, ConnectInterceptor, CallServerInterceptor

Redirect RetryAndFollowInterceptor interceptor

We have to combine code are doing RetryAndFollowUpInterceptor mainly introduced the logic

  1. First get and initialize the associated instances.
   @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    var request = chain.request
    val call = realChain.call
    var followUpCount = 0
    var priorResponse: Response? = null
    var newExchangeFinder = true
    var recoveredFailures = listOf<IOException>()
    // There is an infinite loop
    while (true) {... }}Copy the code
  • Here for the corresponding RealInterceptorChain, RealCall object, and then into an infinite loop, let’s look at in the infinite loop structure realized what operation
  1. Get the result returned by the lower level interceptor. If an exception occurs, it can be recovered to determine whether to interrupt or restart the loop.
  try {
        if (call.isCanceled()) {
          throw IOException("Canceled")}try {
          response = realChain.proceed(request)
          newExchangeFinder = true
        } catch (e: RouteException) {
          // The attempt to connect via a route failed. The request will not have been sent.
          if(! recover(e.lastConnectException, call, request, requestSendStarted =false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
          newExchangeFinder = false
          continue
        } catch (e: IOException) {
          // An attempt to communicate with a server failed. The request may have been sent.
          if(! recover(e, call, request, requestSendStarted = e ! is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e
          }
          newExchangeFinder = false
          continue
        }

        // Attach the prior response if it exists. Such responses never have a body.
        if(priorResponse ! =null) {
          response = response.newBuilder()
              .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
              .build()
        }

        val exchange = call.interceptorScopedExchange
        val followUp = followUpRequest(response, exchange)

        if (followUp == null) {
          if(exchange ! =null && exchange.isDuplex) {
            call.timeoutEarlyExit()
          }
          closeActiveExchange = false
          return response
        }

        val followUpBody = followUp.body
        if (followUpBody ! =null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          returnresponse } response.body? .closeQuietly()if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }

        request = followUp
        priorResponse = response
      } finally {
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
Copy the code
  • This is where you get the result of the lower interception, and then based on the information returned, determine whether you need to redirect?

If the number of redirection times is greater than MAX_FOLLOW_UPS = 20, an exception is thrown

  • Then determine whether the information returned by the redirection is abnormal.
    • An exception is thrown and resources are freed
    • If it does not, build a request with the information returned by the redirection and pass it back to the underlying interceptor

The BridgeInterceptor

 override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()

    val body = userRequest.body
    if (body ! =null) {
      val contentType = body.contentType()
      if(contentType ! =null) {
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if(contentLength ! = -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")}else {
        requestBuilder.header("Transfer-Encoding"."chunked")
        requestBuilder.removeHeader("Content-Length")}}if (userRequest.header("Host") = =null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }

    if (userRequest.header("Connection") = =null) {
      requestBuilder.header("Connection"."Keep-Alive")}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") = =null && userRequest.header("Range") = =null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding"."gzip")
    }

    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    if (userRequest.header("User-Agent") = =null) {
      requestBuilder.header("User-Agent", userAgent)
    }

    val networkResponse = chain.proceed(requestBuilder.build())

    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)

    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody ! =null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }

    return responseBuilder.build()
  }
Copy the code
  • If the data returned is gzip, the repsonSE (user-available network request response) can be extracted using GzipResource.
  • In simple terms, bridge interceptors convert user-built requests into requests that can be accessed on the network

CacheInterceptor

The okHTTP cache is very simple to use. All you need to do is call the cache when creating the okhttpClient

override fun intercept(chain: Interceptor.Chain): Response {
    val call = chain.call()
    // Get the cached repsonse according to requestval cacheCandidate = cache? .get(chain.request()) val now = System.currentTimeMillis()// Request determines the cache policy, whether to use network, cache, or both
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)
    val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

    if (cacheCandidate ! =null && cacheResponse == null) {
      // The cache candidate wasn't applicable. Close it.cacheCandidate.body? .closeQuietly() }// If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {
      returncacheResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } }if(cacheResponse ! =null) {
      listener.cacheConditionalHit(call, cacheResponse)
    } else if(cache ! =null) {
      listener.cacheMiss(call)
    }

    var networkResponse: Response? = null
    try {
    	// Calls the next interceptor and decides to get the Repsonse from the network
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null&& cacheCandidate ! =null) { cacheCandidate.body? .closeQuietly() } }// If we have a cache response too, then we're doing a conditional get.
    // if cacheResponse is not empty, that is, cacheResponse exists locally, then compare it to the networkResponse and decide whether to update the cached cacheResponse
    if(cacheResponse ! =null) {
      if(networkResponse? .code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!! .close()// Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).cache!! .trackConditionalCacheHit() cache.update(cacheResponse, response)return response.also {
          listener.cacheHit(call, it)
        }
      } else{ cacheResponse.body? .closeQuietly() } } val response = networkResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()if(cache ! =null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        // Cache those uncached responses
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if(cacheResponse ! =null) {
            // This will log a conditional cache miss only.
            listener.cacheMiss(call)
          }
        }
      }

      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.}}}return response
  }
Copy the code
  1. The cache is not empty and retrieves the cacheCandidate from it
  2. Cachestrategy. Factory Gets the cache policy, which gets the network request and response cache
  3. The cache synchronization method trackResponse is invoked to ensure request consistency
  4. Return a 504 error response for unconnected and cached requests (HTTP_GATEWAY_TIMEOUT)
  5. In the absence of a network, the repsonSE is created directly from the response cache and returned
  6. There is response caching (cacheResponse! = null) and the response code is 304 (networkResponse.code() == HTTP_NOT_MODIFIED)– this response code means using the cache directly. Create a Response with cacheResponse and return it.
  7. If the cache is not empty, and if there is a request header and cache policy, the cache is cached through the PUT method

The cache interceptor determines whether a cache is available based on the information about the request and the cached response. If one is available, it returns the cache to the user, otherwise it continues to retrieve the response from the server in chain of responsibility mode. When the response is retrieved, it is cached to disk.

The connection interceptor is ConnectInterceptor

Take a look at the new version of ConnectInterceptor, which has a major change. Its main purpose is to establish connections

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}
Copy the code
  • Proceed, which returns the response. InitExchange (), which is one of the RealCall methods, is called
  1. InitExChange () method
 internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      check(expectMoreExchanges) { "released"} check(! responseBodyOpen) check(! requestBodyOpen) } val exchangeFinder =this.exchangeFinder!!
    val codec = exchangeFinder.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      this.requestBodyOpen = true
      this.responseBodyOpen = true
    }

    if (canceled) throw IOException("Canceled")
    return result
  }
Copy the code

Can see that it returned to the Exchange, take a look at its construction method, was introduced into the exchangeFinder parameters, this is RetryAndFollowInterceptor redirection interceptor generated, and then calls a method of the find, we continue to new source

  1. ExchangeFinder. The find () method
fun find( client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec {
   try{ val resultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method ! ="GET"
     )
     return resultConnection.newCodec(client, chain)
   } catch (e: RouteException) {
     trackFailure(e.lastConnectException)
     throw e
   } catch (e: IOException) {
     trackFailure(e)
     throw RouteException(e)
   }
 }
Copy the code
  • The find() method returns Exchange Dec (Encodes HTTP Requests and decodes HTTP Responses). Look at the newCodec method in the resultConnection. This is the codec that generates HTTP2 or HTTP1 to determine whether it is HTTP2
  • In simple terms is the HTTP protocol operation abstraction, there are two implementations: Http1ExchangeCodec and Http2ExchangeCodec, as the name implies, they correspond to HTTP/1.1 and HTTP/2 versions of the implementation, in this method internal implementation of connection pool reuse processing
  • There is a resultConnection, which is generated by findHealthyConnection(), and we continue to source
  1. FindHealthyConnection () method
  @Throws(IOException::class)
  private fun findHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection {
    while (true) {
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // Confirm that the connection is good.
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }

      // If it isn't, take it out of the pool.
      candidate.noNewExchanges()

      // Make sure we have some routes left to try. One example where we may exhaust all the routes
      // would happen if we made a new connection and it immediately is detected as unhealthy.
      if(nextRouteToTry ! =null) continueval routesLeft = routeSelection? .hasNext() ? :true
      if (routesLeft) continue

      val routesSelectionLeft = routeSelector? .hasNext() ? :true
      if (routesSelectionLeft) continue

      throw IOException("exhausted all routes")}}Copy the code

This returns a RealConnection and finds that it ends up calling the findConnection() method, which is just a simple operation on the RealConnection object

  1. FindConnection () method
@Throws(IOException::class)
  private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    // Attempt to reuse the connection from the call.
    // Try to use a connection that has already been allocated. The connection that has already been allocated may have been restricted
    val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
    if(callConnection ! =null) {
      var toClose: Socket? = null
      If the connection has been restricted to create new streams, or if it is not the same host and port, return a Socket to close the connection
      synchronized(callConnection) {
        if(callConnection.noNewExchanges || ! sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } }// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
      // because we already acquired it.
      if(call.connection ! =null) {
        check(toClose == null)
        return callConnection
      }

      // The call's connection was released.
      // Close the connectiontoClose? .closeQuietly() eventListener.connectionReleased(call, callConnection) }// We need a new connection. Give it fresh stats.
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    // Attempt to get a connection from the pool.
    // Try to get a connection from the connection pool
    if (connectionPool.callAcquirePooledConnection(address, call, null.false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    // Nothing in the pool. Figure out what route we'll try next.
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry ! =null) {
      // Use a route from a preceding coalesced connection.
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if(routeSelection ! =null&& routeSelection!! .hasNext()) {// Use a route from an existing route selection.
      routes = nullroute = routeSelection!! .next() }else {
      // Compute a new route selection. This is a blocking operation!
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. We have a better chance of matching thanks to connection coalescing.
      // Get a link from the connection pool based on a list of IP addresses
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    // Connect. Tell the call about the connecting call so async cancels work.
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    // If we raced another call connecting to this host, coalesce the connections. This makes for 3
    // different lookups in the connection pool!

    // If we made another connection to the host, please merge the connection. This makes three different lookups in the connection pool
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
    	// Add the connection to the connection poolconnectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call,  newConnection)return newConnection
  }
Copy the code
  • Because the code is written comments, the overall look just about get a connection from a connection pool, and then returns the connection, and said to be three different lookup, we enter into callAcquirePooledConnection method conducted which operations
  1. CallAcquirePooledConnection () method
fun callAcquirePooledConnection( address: Address, call: RealCall, routes: List
       
        ? , requireMultiplexed: Boolean )
       : Boolean {
    for (connection in connections) {
      synchronized(connection) {
        if(requireMultiplexed && ! connection.isMultiplexed)return@synchronized
        if(! connection.isEligible(address, routes))return@synchronized
        call.acquireConnectionNoEvents(connection)
        return true}}return false
  }
Copy the code
  • Obviously, this goes through all the connections, then finds one available, returns true, or false if none is available;
  • The main arguments passed in to this method are address(host and port address), RealCall instance, routes and whether multiplexing requireed is used (this is for Http2)
  1. Going back to the previous findConnection() method, let’s discuss the three different lookups that were invoked
/ / for the first time
connectionPool.callAcquirePooledConnection(address, call, null.false)
/ / the second time
connectionPool.callAcquirePooledConnection(address, call, routes, false)
/ / the third time
connectionPool.callAcquirePooledConnection(address, call, routes, true)
Copy the code

Let’s compare the difference between the three calls

  • The first call is just to get the connection without multiplexing
  • Second at the beginning I don’t think is also like a only take multiplexing connection, but found that it was introduced into the routing list, you can see in callAcquirePooledConnection method, if not in isEligible Http2 will directly returns false, So this is specific to HTTP2, and does not have much to do with multiplexing. See isEligible implementations below:
// The code is too long. Here I have taken only a partial representation of the code
  // 1. This connection must be HTTP/2.
    if (http2Connection == null) return false
  // 2. The routes must share an IP address.
    if (routes == null| |! routeMatchesAny(routes))return false
Copy the code
  • The third time is just taking multiplexed connections
  1. Look at the timing of the three calls
  • First we make the first call to get the connection. What if we don’t get it? As you can see, it picks up a set of routes from localRouteSelector and continues to fetch them in the next call
  // Compute a new route selection. This is a blocking operation!
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes
Copy the code
  • What if I don’t get the connection again next time? At this point you can see that we are going to create a connection newConnection()
   // Connect. Tell the call about the connecting call so async cancels work.
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())
Copy the code

Here I have a question: why do I need to call the connection again when I have already created the connection? Combined with the description of online materials, someone and I also raised this question and briefly analyzed it. The reason is that if there are two requests to access the same IP address at the same time, and then the two requests create a connection respectively, there will be two connections, resulting in a waste of resources. So the point of calling a connection pool is that after the first request creates a connection, when the next request tries to create a connection, it goes to the pool once and if it gets it, it stores the connection it created through nextRouteToTry. Take it out when it’s actually available

  1. A quick summary of the ConnectionInterceptor interceptor

Basically, as I understand it, it’s finding an available connection and generating the corresponding codec

Request the service interceptor CallServerInterceptor

The purpose of this interceptor is to initiate the actual network request and then describe the response returned by the server. Take a look at the code briefly

  if(HttpMethod.permitsRequestBody(request.method) && requestBody ! =null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if(! exchange.connection.isMultiplexed) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }
Copy the code
  • The request has been written to the HTTP request
 response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
Copy the code
  • This shows the body information that has finished reading the response
if ("close".equals(response.request.header("Connection"), ignoreCase = true) | |"close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
Copy the code
  • Closing the connection is complete

In this way, the interceptor completes the process of initiating a network request, releasing a generic framework flow chart

Stern said

To sum up, in which, besides OKHttp internal request process that cache and connection and interceptor these contents are very important points, for OKHttp source code parsing is roughly the classmate of everyone should have their own opinions, said the place with bad bosses welcome advice, some places still with doubt, hope to continue to improve