“Okhttp3 4.9.3 Simple parsing”

This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together

First, write first

There are a lot of great posts about okHttp3 on the web, and every time I read them, I feel like I’m enlightened. But wait a few days to look back, or as strange as at the beginning, investigate its root cause, we are just enjoying the results of others follow the ideas of others cloud read source code again. Okhttp has been optimized and updated from earlier Java versions to Kotlin versions, with implementation details tweaked. Rereading the source code together with your own thinking can give you a deep understanding of how OKHTTP is implemented.

Two, from the basic use
  • Introducing dependencies into the project,Githubaddressokhttp
dependencies {
  / /...
  implementation("Com. Squareup. Okhttp3: okhttp: 4.9.3." ")}Copy the code
  • From a simple sync(execute)Start with the request to analyze the overall loading process:
private fun clientNetWork(a) {
     val request: Request = Request.Builder()
         .url("https://www.baidu.com")
         .build()
     OkHttpClient().newCall(request).execute()
}
Copy the code

Execute () is an interface method (Call). Call is a request that needs to be executed, and the parameters for the request have been prepared. Of course, since it is a request, it can be cancelled. The second represents a single request and response flow and therefore cannot be executed again.

A call is a request that has been prepared for execution. A call can be canceled. As this object represents a single request/response pair (stream), it cannot be executed twice.

Call interface specific code implementation, focusing on synchronous execution method execute() and asynchronous request enqueue() :

interface Call : Cloneable {
  fun request(a): Request
  // Synchronize the request
  @Throws(IOException::class)
  fun execute(a): Response
  // Asynchronous request
  fun enqueue(responseCallback: Callback)
  
  fun cancel(a)
  
  fun isExecuted(a): Boolean
  
  fun isCanceled(a): Boolean
  
  fun interface Factory {
   fun newCall(request: Request): Call
  }
}
Copy the code

Execute () is overridden by RealCall. Execute () is overridden by RealCall.

  • RealCall– Is a bridge between the application and the networkexecute()Method implementation:
override fun execute(a): Response {
  check(executed.compareAndSet(false.true)) { "Already Executed" } / / # 1
  timeout.enter()/ / # 2
  callStart()/ / # 3
  try {
    client.dispatcher.executed(this)/ / # 4
    return getResponseWithInterceptorChain()/ / # 5
  } finally {
    client.dispatcher.finished(this)/ / # 6}}Copy the code

Code is very small, step by step analysis, first of all, the synchronous request was checked – to determine whether the request has been executed; Here, atomic CAS optimistic lock under concurrent packet is used. CAS comparison algorithm is also used to improve efficiency. The second is the judgment of the timeout time, which is relatively simple. Look at the implementation of callStart(). The code:

  private fun callStart(a) {
    this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
    eventListener.callStart(this)}Copy the code

Look at the name guess should be something like event listening, possibly including some information to record and print. Back in the RealCall class, let’s see what the eventListener does:

internal val eventListener: EventListener = client.eventListenerFactory.create(this)
Copy the code

The EventListener is an abstract class, and the only implementation class in the project is LoggingEventListener.

abstract class EventListener {
    open fun callStart(
    call: Call
  ){}// Other methods are omitted
}
Copy the code

Implement a concrete implementation of this method in the class LoggingEventListener:

class LoggingEventListener private constructor(
  private val logger: HttpLoggingInterceptor.Logger
) : EventListener() {
    override fun callStart(call: Call) {
    startNs = System.nanoTime()
    logWithTime("callStart: ${call.request()}")}}Copy the code

To summarize the purpose of callStart(), when a synchronous or asynchronous request is added to the queue, callStart() is executed immediately (without reaching the thread limit) to record the time the request was started and some information about the request. As follows:

  override fun callStart(call: Call) {
    startNs = System.nanoTime()
    logWithTime("callStart: ${call.request()}")}private fun logWithTime(message: String) {
    val timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
    logger.log("[$timeMs ms] $message")}Copy the code

Friends’ stuff in the fourth paragraph # 4 client. The dispatcher. Executed (this), it seems that is open to perform here, but actually it is really so? Go back to the OkHttpClient class and see what the dispatcher really is.

@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
Copy the code

Concrete implementation class Dispatcher code (save important code) :

class Dispatcher constructor() {
  /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()
  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()
  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningSyncCalls = ArrayDeque<RealCall>()

  constructor(executorService: ExecutorService) : this() {
    this.executorServiceOrNull = executorService
  }
    @get:Synchronized var maxRequests = 64
    set(maxRequests) {
      require(maxRequests >= 1) { "max < 1: $maxRequests" }
      synchronized(this) {
        field = maxRequests
      }
      promoteAndExecute()
    }
    @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
      require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
      synchronized(this) {
        field = maxRequestsPerHost
      }
      promoteAndExecute()
    }
  	// Asynchronous request
    internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)
      //Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to the same host.
      if(! call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)
        if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
  // Synchronize the request
  /** Used by [Call.execute] to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }
    private fun promoteAndExecute(a): Boolean {
    this.assertThreadDoesntHoldLock()
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
        //Max capacity.
        if (runningAsyncCalls.size >= this.maxRequests) break 
        //Host max capacity.
        if (asyncCall.callsPerHost.get() > =this.maxRequestsPerHost) continue 
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }
    return isRunning
  }
}
Copy the code

The Dispatcher is the policy that handles the execution of asynchronous requests, although developers can implement their own policies.

Policy on when async requests are executed. Each dispatcher uses an ExecutorService to run calls internally. If you supply your own executor, it should be able to run the configured maximum number of calls concurrently.

Know the function of the Dispatcher, back to the client. The Dispatcher. Executed (this), namely:

  /** Used by [Call.execute] to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }
Copy the code

Combine execute() with Dispatcher analysis

  • DispatcherMainly for asynchronous request scheduling function, internal implementation of the thread pool, the request distribution execution.
  • Consists of three asynchronous queues with two ends respectively waitingreadyAsyncCalls, the queue in executionrunningAsyncCalls, the synchronization queue in executionrunningSyncCalls. Used hereArrayDequeWhy notLinkedListIf you’re interested, think about it.
  • It is executed internally when the request is synchronousexecute()willRealCallAdded to therunningSyncCallsIn the queue.

Execute () : execute() : execute() : execute() : execute() : execute() : execute() : execute() : execute() : execute()) : execute() : execute() : execute() : execute()) : execute() : execute() : execute()) : execute() : execute() : execute()) : execute() : execute() : execute()) : execute() : execute();

Invokes the request immediately, and blocks until the response can be processed or is in error.

To step # 4 after execution, return getResponseWithInterceptorChain () / / # 5 the core of this method is the request to advance step by step. It is also the core module of the OKHTTP network request responsibility chain.

Three, the chain of open – getResponseWithInterceptorChain ()
  • GetResponseWithInterceptorChain () implementation
  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(a): Response {
    //Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if(! forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket)val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )
    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")}return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if(! calledNoMoreExchanges) { noMoreExchanges(null)}}}Copy the code

Analysis getResponseWithInterceptorChain () method is necessary to look before OkHttpClient structure parameters, using the Builder pattern, many parameters, can be configured to change a lot of things, refine focuses on several parameters:

class Builder constructor() {
  // Asynchronously request the task scheduler
  internal var dispatcher: Dispatcher = Dispatcher()
  / / link pool
  internal var connectionPool: ConnectionPool = ConnectionPool()
  // Custom interceptor
  internal val interceptors: MutableList<Interceptor> = mutableListOf()
  // Custom network interceptor
  internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
  / / cache
  internal var cache: Cache? = null
  / / agent
  internal var proxy: Proxy? = null
  // Link protocol
  internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
  / /...
}
// Add a custom interceptor method
fun addInterceptor(interceptor: Interceptor) = apply {
  interceptors += interceptor
}
// Add a custom network interceptor method
fun addNetworkInterceptor(interceptor: Interceptor) = apply {
  networkInterceptors += interceptor
}
Copy the code

What’s the difference between adding a custom interceptor and adding a custom web interceptor? The method seems similar, but check the official instructions to find some details. The documentation explains the subtle differences between Application Interceptor and Network Interceptors. Back to the first RealCall view getResponseWithInterceptorChain () is how to blocker combination of assembly:

internal fun getResponseWithInterceptorChain(a): Response {
  // Build a full stack of interceptors.
  val interceptors = mutableListOf<Interceptor>()
  interceptors += client.interceptors / / # 1
  interceptors += RetryAndFollowUpInterceptor(client)
  interceptors += BridgeInterceptor(client.cookieJar)
  interceptors += CacheInterceptor(client.cache)
  interceptors += ConnectInterceptor
  if(! forWebSocket) { interceptors += client.networkInterceptors/ / # 2
  }
  interceptors += CallServerInterceptor(forWebSocket)
}
Copy the code

See #1 and #2 for the location of the custom interceptor and the custom network interceptor, respectively. The custom interceptor is the header of the interceptor chain. The custom network interceptor is added between the ConnectInterceptor and CallServerInterceptor interceptors. To sum up:

  • Custom interceptors do some pre-request work at the head of the interceptor chain, including some Log print-outs, such as the LoggingInterceptor.

  • A custom network interceptor is added between the ConnectInterceptor and CallServerInterceptor. It can be executed twice and gets more information, including the original url and the redirected url.

  • Application interceptors

Don’t need to worry about intermediate responses like redirects and retries.

Are always invoked once, even if the HTTP response is served from the cache.

Observe the application’s original intent. Unconcerned with OkHttp-injected headers like If-None-Match.

Permitted to short-circuit and not call Chain.proceed().

Permitted to retry and make multiple calls to Chain.proceed().

Can adjust Call timeouts using withConnectTimeout, withReadTimeout, withWriteTimeout.

  • Network Interceptors

Able to operate on intermediate responses like redirects and retries.

Not invoked for cached responses that short-circuit the network.

Observe the data just as it will be transmitted over the network.

Access to the Connection that carries the request.

In summary, the sequence structure of the whole chain can be obtained. If both the custom interceptor and the custom network interceptor are included, For custom interceptors – > RetryAndFollowUpInterceptor – > BridgeInterceptor – > CacheInterceptor – > ConnectInterceptor – > custom web blocker – > CallServerInte Rceptor; So how does the chain work in order? Okhttp design is more subtle, here in constructing RealInterceptorChain objects into the index information, the index is recorded the location of a single chain of interceptors, while RealInterceptorChain. Proceed (request: Request) increments index++ step by step all the way to the end of the chain.

val chain = RealInterceptorChain(
  call = this,
  interceptors = interceptors,
  // Record the interceptor's index pass pair
  index = 0,
  exchange = null,
  request = originalRequest,
  connectTimeoutMillis = client.connectTimeoutMillis,
  readTimeoutMillis = client.readTimeoutMillis,
  writeTimeoutMillis = client.writeTimeoutMillis
)
val response = chain.proceed(originalRequest)
Copy the code
Iv. Proceed chain advance
  • RealInterceptorChainIn the wayproceed()The concrete implementation of the method
@Throws(IOException::class)
override fun proceed(request: Request): Response {
  check(index < interceptors.size)
  calls++
  if(exchange ! =null) {
   check(exchange.finder.sameHostAndPort(request.url)) {
    "network interceptor ${interceptors[index - 1]} must retain the same host and port"
   }
   check(calls == 1) {
    "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// Call the next interceptor in the chain.
  val next = copy(index = index + 1, request = request)
  val interceptor = interceptors[index]
  @Suppress("USELESS_ELVIS")
  val response = interceptor.intercept(next) ?: throw NullPointerException(
    "interceptor $interceptor returned null")
  / /...
  return response
}
Copy the code

Simple analysis of the process:

1. The constructor parameter of the RealInterceptorChain contains information about index. The increment of index++ is continuously executed using the PROCEED method.

2. The Interceptor implements the Interceptor interface, where Fun Proceed (request: Request): Response ensures chained links. Of course, the order of interceptors is arranged according to certain rules, and analyzed one by one.

Five, RetryAndFollowUpInterceptor retry and redirection
companion object {
  private const val MAX_FOLLOW_UPS = 20
}
override fun intercept(chain: Interceptor.Chain): Response {
  val realChain = chain as RealInterceptorChain
  var request = chain.request
  val call = realChain.call
  var followUpCount = 0
  var priorResponse: Response? = null
  var newExchangeFinder = true
  var recoveredFailures = listOf<IOException>()
	while (true) {
    //....
    try {
      response = realChain.proceed(request)
      newExchangeFinder = true}}//....
}
Copy the code

1. The retry interceptor specifies a default of 20 retries

2. Proceed (request) with response = realchain.proceed (request) as the cut-off point, including other interceptors, all work done before the chain of responsibility is passed, and then the request is sent to the next interceptor.

3. Response = realchain.proceed (request) Each interceptor is tossed along the chain of responsibility up to the result obtained by the last interceptor to process the response and do some post-processing.

4. Of course, not every request goes through the entire chain. For example, CacheInterceptor when a cached response is received by a CacheInterceptor, subsequent interceptors will not pass the response.

Six, BridgeInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
  val userRequest = chain.request()
  val requestBuilder = userRequest.newBuilder()
  val body = userRequest.body
  if(body ! =null) {
    val contentType = body.contentType()
    if(contentType ! =null) {
      requestBuilder.header("Content-Type", contentType.toString())
    }
    val contentLength = body.contentLength()
    if(contentLength ! = -1L) {
      requestBuilder.header("Content-Length", contentLength.toString())
      requestBuilder.removeHeader("Transfer-Encoding")}else {
      requestBuilder.header("Transfer-Encoding"."chunked")
      requestBuilder.removeHeader("Content-Length")}}/ /...
  // The upper part is divided into the preceding operation
  val networkResponse = chain.proceed(requestBuilder.build())
  // The next part of the code is to get the response
  cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
}
Copy the code

1. The bridge interceptor mainly supplements the information of the requested Hader, including the content length, etc.

2. Pass the request to the next chain and wait for the response message returned.

3. Subsequent operations include Cookie, Gzip compressed information, and user-Agent information supplement.

Seven, CacheInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
  val call = chain.call()
  valcacheCandidate = cache? .get(chain.request())
  val now = System.currentTimeMillis()
  val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
  val networkRequest = strategy.networkRequest
  val cacheResponse = strategy.cacheResponse
  / /...
  var networkResponse: Response? = null
  try {
    networkResponse = chain.proceed(networkRequest)
  } finally {
  // If we're crashing on I/O or otherwise, don't leak the cache body.
  if (networkResponse == null&& cacheCandidate ! =null) { cacheCandidate.body? .closeQuietly() } }/ /...
}
Copy the code
  • OkHttpClientCaching is not enabled in the default construct of theinternal var cache: Cache? = null, which needs to be added manually, but has a default implementation based on the least recently used algorithmDiskLruCacheDisk cache:
private val client: OkHttpClient = OkHttpClient.Builder()
  .cache(Cache(
    directory = File(application.cacheDir, "http_cache"),
    // $0.05 worth of phone storage in 2020
    maxSize = 50L * 1024L * 1024L // 50 MiB
  ))
  .build()
Copy the code

1. Cache interceptor is not enabled by default, you need to specify the cache directory when calling, internal disk cache based on DiskLruCache.

2. When the cache is enabled and the cache is hit, the call of the chain will not continue to be passed down (at this point, the response has been obtained) and the subsequent operation will be directly carried out.

3. If it does not match, it will continue to the next chain, which is ConnectInterceptor.

Eight, ConnectInterceptor
/** * Opens a connection to the target server and proceeds to the next interceptor. * The network might be used for the returned response, * or to validate a cached response with a conditional GET. */
object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}
Copy the code
  • ConnectInterceptorThe amount of code is small and the main function is clear.

1. Establish a TCP or TCP-TLS connection with the target server.

2. Unlike the previous interceptor, the preorder operation is based on the method realchain.proceed (), but the ConnectInterceptor has no postorder operation, which is delivered to the next interceptor.

Nine, CallServerInterceptor
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
  val realChain = chain as RealInterceptorChain
  val exchange = realChain.exchange!!
  val request = realChain.request
  val requestBody = request.body
  val sentRequestMillis = System.currentTimeMillis()
  exchange.writeRequestHeaders(request)
  var invokeStartEvent = true
  var responseBuilder: Response.Builder? = null
  if(HttpMethod.permitsRequestBody(request.method) && requestBody ! =null) {
    if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
      exchange.flushRequest()
      responseBuilder = exchange.readResponseHeaders(expectContinue = true)
      exchange.responseHeadersStart()
      invokeStartEvent = false
     }
     if (responseBuilder == null) {
       if (requestBody.isDuplex()) {
         exchange.flushRequest()
         val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
         requestBody.writeTo(bufferedRequestBody)
        } else {
         val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
         requestBody.writeTo(bufferedRequestBody)
         bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if(! exchange.connection.isMultiplexed) { exchange.noNewExchangesOnConnection() } } }else {
      exchange.noRequestBody()
    }
    if (requestBody == null| |! requestBody.isDuplex()) { exchange.finishRequest() }if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!!!!if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false}}var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!!!!if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }
    exchange.responseHeadersEnd(response)
    response = if (forWebSocket && code == 101) {
    response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) | |"close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body? .contentLength() ? : -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body? .contentLength()}")}return response
  }
}
Copy the code
  • CallServerInterceptorIs the tail of the whole chain of responsibility:

1. Essentially, request and I/O operations are performed to write the requested data to the Socket.

2. The TCP/TCP-TLS port that reads the response data from the Socket is based on OKIO for I/O operations. The efficient okHTTP request also depends on OKIO support.

3. The reponse returns the data to the previous interceptor that contains subsequent operations, except for the ConnectInterceptor, which has no subsequent operations.

The interceptor flowchart is as follows:

Ten, ReaclCall. The execute ()
  • Looking back at the method of synchronizing requests:
  override fun execute(a): Response {
    check(executed.compareAndSet(false.true)) { "Already Executed" }
    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)}}Copy the code
  • inDispatcherThe request for synchronization in theRealCallAdd the instance torunningSyncCalls.add(call)Queue in execution.
  • RealCallisCallIs the only implementation class ofexecute()The method is clearly defined,Executed immediatelyandblockinguntilresponseThe return of the result orerrorMessage interrupt block.
  • After * * getResponseWithInterceptorChain ()The chain advance has now got the result, so what is the subsequent operation? Take a firstJavabasisTry {}finally{}** structure, anddispatcher.finished(call: RealCall).
try{
  client.dispatcher.executed(this)
  return getResponseWithInterceptorChain()
} finally{
  client.dispatcher.finished(this)}/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
  finished(runningSyncCalls, call)
}
Copy the code

1. Barring extreme cases, system.exit () or otherwise, the finally block must execute, whether or not an exception occurs, and whether or not a return precedes it.

2. A finally block is always executed before a return, whether or not a return is included ina try block.

3. If a finally block has a return, then returns in try and catch blocks have no chance to be executed.

Return to the execute() method. Dispatcher.finished (this) is executed before the response result is returned. Look at the finished() implementation.

  /** Used by [Call.execute] to signal completion. */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }
  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      / / # 1
      if(! calls.remove(call))throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }
    / / # 2
    val isRunning = promoteAndExecute()
    if(! isRunning && idleCallback ! =null) {
      idleCallback.run()
    }
  }

  private fun promoteAndExecute(a): Boolean {
    this.assertThreadDoesntHoldLock()
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      / / # 3
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
				// Max capacity.
        if (runningAsyncCalls.size >= this.maxRequests) break 
        // Host max capacity.
        if (asyncCall.callsPerHost.get() > =this.maxRequestsPerHost) continue 
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      / / # 4
      isRunning = runningCallsCount() > 0
    }
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }
    / / # 5
    return isRunning
  }
	
 @Synchronized fun runningCallsCount(a): Int = runningAsyncCalls.size + runningSyncCalls.size

Copy the code

Call. Remove (call) returns true, i.e. the synchronization request is removed from runningSyncCalls. So idleCallback is empty.

#4 asyncCall (asyncCall) asyncCall (asyncCall) asyncCall (asyncCall) The result of isRunning is true. Idlecallback.run () will not be executed, and idleCallback is null.

  • At this point, the synchronization request built in the original example is completed, blocking the result callback for page presentation or other operations.
  • The whole process is as follows:

1. Start the synchronization request task from **OkHttpClient().newCall(request).execute()**.

2. The resulting RealCall object is the only implementation class of Call, in which the synchronous method execute() blocks immediately until a result is returned or an error interrupts the block.

3. The synchronous execute() request method in RealCall is executed and the asynchronous task Dispatcher in OkHttpClient instance adds the requested instance RealCall to the two-end queue runningSyncCalls.

4. Through the method of RealCall getResponseWithInterceptorChain (request) open the interceptor chain of responsibility, will ask one by one, and the index and held by operation, Second, except for ConnectInterceptor and CallServerInterceptor, all default interceptors have pre-order and post-order operations with chain.proceed(request) as the demarcation point. After receiving response, the operations are processed sequentially and then sequentially.

Before 5. Eventually returns the response to the ongoing sync task done to remove the operation of the queue is finally in the client. The dispatcher. The finished (this) method, finally the result the response returned to the client, so the whole process of synchronization request is over.

Enqueue ()
  • The general flow of an asynchronous request is similar to that of a synchronous request, but with the addition of a transition from wait queue to execution queue.
   internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
    / / # 1
    readyAsyncCalls.add(call)
   //Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
   //the same host.
      if(! call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)
        if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    / / # 2
    promoteAndExecute()
  }

 private fun promoteAndExecute(a): Boolean {
    this.assertThreadDoesntHoldLock()
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        / / # 3
        val asyncCall = i.next()
				// Max capacity.
        if (runningAsyncCalls.size >= this.maxRequests) break 
        // Host max capacity.
        if (asyncCall.callsPerHost.get() > =this.maxRequestsPerHost) continue 
        / / # 4
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        / / # 5
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }
    return isRunning
  }
Copy the code
  • The #1 method adds asynchronous request tasks to the readyAsyncCalls wait queue, and #2 enables execution tasks (of course Dispatcher maintains a thread pool to handle these requests).
  • In promoteAndExecute(), #3 removes the asynchronous task from the waiting queue readyAsyncCalls and #5 adds it to the executing asynchronous task queue runningAsyncCalls until all requests have been executed.
Twelve, summary
  • This is only a brief analysis of the general flow, ignoring some other implementation details, such as request cache pool, request protocol. Includes implementation details of local cache DiskLruCache, and some threshold judgment processing for asynchronous requests. After all, from Java to the current Kotlin version of OKHTTP has experienced a lot of iteration and more mature, of course, the network request this position is still unshakable 🐶.

Xiii. Documentation

Github

Square