Source code for OkHttp request flow (OkHttp version 4.9.1)
Okhttp3 version of the student I before the source code analysis (not completed), the whole or Java implementation, from okhttp4 after the version began to use Kotlin to write, the overall framework process is roughly the same, but there are some changes, let’s look at what they will have the same fireworks
Simple use of synchronous and asynchronous requests (with Get requests as an example)
// The following are Get requests
// Synchronize the request
private fun excuteTest(a) {
Create an okhttpClient
val okHttpClient = OkHttpClient()
2. Create a Request object
val request = Request.Builder()
.url(BASE_URL)
.build()
//3. Return Response
okHttpClient.newCall(request).execute()
}
// Asynchronous request
private fun asyncTest(a) {
//1. Create okhttpClient as well
val okHttpClient = OkHttpClient()
2. Create a Request object
val request = Request.Builder()
.url(BASE_URL)
.build()
// Call the enqueue method
okHttpClient.newCall(request).enqueue(object :Callback{
// Failed callback
override fun onFailure(call: Call, e: IOException) {}// Successful callback
override fun onResponse(call: Call, response: Response) {}})}Copy the code
Create the Okhttpclient client
val okHttpClient = OkHttpClient()
constructor() : this(Builder())
Copy the code
As you can see, OKHttp is created in Builder mode. In general, it is recommended to use Builder mode if you have a lot of parameters to carry. See the following configuration parameters for Builder, and you can see that a lot of parameters are built
// Build parameters configured by default
class Builder constructor(a){
// Dispenser (core)
internal var dispatcher: Dispatcher = Dispatcher()
/ / the connection pool
internal var connectionPool: ConnectionPool = ConnectionPool()
// Interceptor (core)
internal val interceptors: MutableList<Interceptor> = mutableListOf()
// Network interceptor
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
// Retry link failed
internal var retryOnConnectionFailure = true
internal var authenticator: Authenticator = Authenticator.NONE
// Local redirection. Default is true
internal var followRedirects = true
// SSL redirection
internal var followSslRedirects = true
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
internal var cache: Cache? = null
internal var dns: Dns = Dns.SYSTEM
internal var proxy: Proxy? = null
internal var proxySelector: ProxySelector? = null
// Proxy authentication
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
// Socket factory for HTTPS
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
// Transport layer version and transport protocol (use default here)
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
// Certificate locking, using a CertificatePinner to constrain which certification authorities are trusted
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
// Verify that the response certificate applies to the host name of the HTTPS request connection.
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var callTimeout = 0
internal var connectTimeout = 10 _000
internal var readTimeout = 10 _000
internal var writeTimeout = 10 _000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
// You can pass in your own configured build parameters
internal constructor(okHttpClient: OkHttpClient) : this(a) {...// This is the same as the above configuration
}
Copy the code
Synchronous request flow
okHttpClient.newCall(request).execute()
Seeing that newCall is just an interface, RealCall is the real request enforcer
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
Copy the code
Implementation of the excute() method in RealCall
override fun execute(a): Response {
check(executed.compareAndSet(false.true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)}}Copy the code
-
The check() method checks to see if the Call was Executed. If it was, an exception Already Executed will be thrown, meaning that each Call can only be Executed once
-
Just look at the try/catch and tell the dispatcher it’s executing
-
Call getResponseWithInterceptorChain (), it is through a series of interceptor request processing and response processing to get the final returns the result
-
Whenever fianlly notifies the Dispatcher that it has completed its task, it calls the Dispatcher’s Finish method
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
if(! calls.remove(call))throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute()
if(! isRunning && idleCallback ! =null) {
idleCallback.run()
}
}
Copy the code
The finish method determines whether there are any more requests in the running queue and executes idleCallback’s run method if the conditions are met
GetRepsonseInterceptorChain () method
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(a): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
// The interceptors set in the OkttpClient configuration
interceptors += client.interceptors
// Take care of failure retries and redirects
interceptors += RetryAndFollowUpInterceptor(client)
// When requesting, do some receiving of the necessary headers, and remove the necessary headers if necessary
interceptors += BridgeInterceptor(client.cookieJar)
// Update the cache
interceptors += CacheInterceptor(client.cache)
// Be responsible for establishing a link with the server
interceptors += ConnectInterceptor
if (! forWebSocket) {
// Configure the network interceptor set by okhttpClient
interceptors += client.networkInterceptors
}
// Send request data to the server and read response data from the server
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
// Use responsibility chain mode to open chain calls
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")}return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if(! calledNoMoreExchanges) { noMoreExchanges(null)}}}Copy the code
In summary, getRepsonseInterceptorChain () mainly realized the two things, first thing create interceptor collection, and add a an interceptor, the second thing is call RealInterceptorChain method of proceed, Use chain of responsibility mode to open the chain call to get the actual complete return
Proceed () method implementation
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if(exchange ! =null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// Call the next interceptor in the chain.\
// Instantiate the RealInterceptorChain object corresponding to the next interceptor
val next = copy(index = index + 1, request = request)
// Get the current interceptor
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
// Call the current interceptor method and pass the RealInterceptorChain object of the next interceptor
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if(exchange ! =null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"} } check(response.body ! =null) { "interceptor $interceptor returned a response with no body" }
return response
}
Copy the code
The proceed method instantiates the corresponding RealInterceptorChain object of the next interceptor, obtains the current interceptor, and calls the current interceptor’s interceptor method to pass in the next interceptor. In this way, we actually implement the chain call, which returns the actual response, response
Asynchronous request flow
The previous steps are consistent with the synchronous request, starting with the enqueue method of the Dispatcher
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
// Change AsyncCall so that it shares the existing AtomicInteger running the call to the same host.
if(! call.call.forWebSocket) { val existingCall = findExistingCallWithHost(call.host)if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
Copy the code
Here’s a quick look at the three queues in OKHTTP (asynchronous request queue in preparation, asynchronous request queue in running, synchronous request queue)
/** Ready async calls in the order they'll be run. */
// Ready asynchronous queue
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
// A running asynchronous request queue
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
// Synchronize the queue
private val runningSyncCalls = ArrayDeque<RealCall>()
Copy the code
As you can see in the enQueue method, you first place all call requests into the ready asynchronous request queue in the synchronized keyword block, and then execute the promoteAndExcute() method
The promoteAndExcute() method
private fun promoteAndExecute(a): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// If the number of runningAsyncCalls is insufficient and the number of hosts occupied by calls is less than the maximum, the call is added to runningAsyncCalls.
// Execute call with thread pool; Otherwise, add call to readyAsyncCalls.
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
Copy the code
In simple terms, this string of code is put back to determine whether the number of asynchronous requests currently running is greater than zero based on a series of conditions, in other words, whether there are asynchronous requests currently running; In this method, if runningAsyncCalls are not satisfied, the call request is placed in the asynchronous request queue that is being executed, and then the call is executed using the thread pool, otherwise it is placed in the wait-ready queue
See AysncCall
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
val host: String
get(a) = originalRequest.url.host
val request: Request
get(a) = originalRequest
val call: RealCall
get(a) = this@RealCall
/** * Attempt to enqueue this async call on [executorService]. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */
/ / thread pool
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if(! success) { client.dispatcher.finished(this) // This call is no longer running!}}}override fun run(a) {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
/ / and synchronous execution, will eventually call getResponseWithInterceptorChain () method
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if(! signalledCallback) { val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)}}}}Copy the code
As you can see, AsyncCall inherits the Runnable, through the thread pool to manage the asynchronous task, the final and synchronous request will be called getResponseWithInterceptorChain () method
Interceptor resolution
To sum up, in the interceptor chain processing, help us make the five steps to intercept OKHttp default processing, including RetryAndFollowInterceptor, BridgeInterceptor, CacheInterceptor, ConnectInterceptor, CallServerInterceptor
Redirect RetryAndFollowInterceptor interceptor
We have to combine code are doing RetryAndFollowUpInterceptor mainly introduced the logic
- First get and initialize the associated instances.
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
// There is an infinite loop
while (true) {... }}Copy the code
- Here for the corresponding RealInterceptorChain, RealCall object, and then into an infinite loop, let’s look at in the infinite loop structure realized what operation
- Get the result returned by the lower level interceptor. If an exception occurs, it can be recovered to determine whether to interrupt or restart the loop.
try {
if (call.isCanceled()) {
throw IOException("Canceled")}try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
// The attempt to connect via a route failed. The request will not have been sent.
if(! recover(e.lastConnectException, call, request, requestSendStarted =false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
if(! recover(e, call, request, requestSendStarted = e ! is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
// Attach the prior response if it exists. Such responses never have a body.
if(priorResponse ! =null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
if(exchange ! =null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
val followUpBody = followUp.body
if (followUpBody ! =null && followUpBody.isOneShot()) {
closeActiveExchange = false
returnresponse } response.body? .closeQuietly()if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
Copy the code
- This is where you get the result of the lower interception, and then based on the information returned, determine whether you need to redirect?
If the number of redirection times is greater than MAX_FOLLOW_UPS = 20, an exception is thrown
- Then determine whether the information returned by the redirection is abnormal.
- An exception is thrown and resources are freed
- If it does not, build a request with the information returned by the redirection and pass it back to the underlying interceptor
The BridgeInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body ! =null) {
val contentType = body.contentType()
if(contentType ! =null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if(contentLength ! = -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")}else {
requestBuilder.header("Transfer-Encoding"."chunked")
requestBuilder.removeHeader("Content-Length")}}if (userRequest.header("Host") = =null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") = =null) {
requestBuilder.header("Connection"."Keep-Alive")}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") = =null && userRequest.header("Range") = =null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding"."gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") = =null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody ! =null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
Copy the code
- If the data returned is gzip, the repsonSE (user-available network request response) can be extracted using GzipResource.
- In simple terms, bridge interceptors convert user-built requests into requests that can be accessed on the network
CacheInterceptor
The okHTTP cache is very simple to use. All you need to do is call the cache when creating the okhttpClient
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
// Get the cached repsonse according to requestval cacheCandidate = cache? .get(chain.request()) val now = System.currentTimeMillis()// Request determines the cache policy, whether to use network, cache, or both
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
if (cacheCandidate ! =null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.cacheCandidate.body? .closeQuietly() }// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// If we don't need the network, we're done.
if (networkRequest == null) {
returncacheResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } }if(cacheResponse ! =null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if(cache ! =null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
// Calls the next interceptor and decides to get the Repsonse from the network
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null&& cacheCandidate ! =null) { cacheCandidate.body? .closeQuietly() } }// If we have a cache response too, then we're doing a conditional get.
// if cacheResponse is not empty, that is, cacheResponse exists locally, then compare it to the networkResponse and decide whether to update the cached cacheResponse
if(cacheResponse ! =null) {
if(networkResponse? .code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!! .close()// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).cache!! .trackConditionalCacheHit() cache.update(cacheResponse, response)return response.also {
listener.cacheHit(call, it)
}
} else{ cacheResponse.body? .closeQuietly() } } val response = networkResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()if(cache ! =null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
// Cache those uncached responses
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if(cacheResponse ! =null) {
// This will log a conditional cache miss only.
listener.cacheMiss(call)
}
}
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.}}}return response
}
Copy the code
- The cache is not empty and retrieves the cacheCandidate from it
- Cachestrategy. Factory Gets the cache policy, which gets the network request and response cache
- The cache synchronization method trackResponse is invoked to ensure request consistency
- Return a 504 error response for unconnected and cached requests (HTTP_GATEWAY_TIMEOUT)
- In the absence of a network, the repsonSE is created directly from the response cache and returned
- There is response caching (cacheResponse! = null) and the response code is 304 (networkResponse.code() == HTTP_NOT_MODIFIED)– this response code means using the cache directly. Create a Response with cacheResponse and return it.
- If the cache is not empty, and if there is a request header and cache policy, the cache is cached through the PUT method
The cache interceptor determines whether a cache is available based on the information about the request and the cached response. If one is available, it returns the cache to the user, otherwise it continues to retrieve the response from the server in chain of responsibility mode. When the response is retrieved, it is cached to disk.
The connection interceptor is ConnectInterceptor
Take a look at the new version of ConnectInterceptor, which has a major change. Its main purpose is to establish connections
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
Copy the code
- Proceed, which returns the response. InitExchange (), which is one of the RealCall methods, is called
- InitExChange () method
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
check(expectMoreExchanges) { "released"} check(! responseBodyOpen) check(! requestBodyOpen) } val exchangeFinder =this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
}
Copy the code
Can see that it returned to the Exchange, take a look at its construction method, was introduced into the exchangeFinder parameters, this is RetryAndFollowInterceptor redirection interceptor generated, and then calls a method of the find, we continue to new source
- ExchangeFinder. The find () method
fun find( client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec {
try{ val resultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method ! ="GET"
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
Copy the code
- The find() method returns Exchange Dec (Encodes HTTP Requests and decodes HTTP Responses). Look at the newCodec method in the resultConnection. This is the codec that generates HTTP2 or HTTP1 to determine whether it is HTTP2
- In simple terms is the HTTP protocol operation abstraction, there are two implementations: Http1ExchangeCodec and Http2ExchangeCodec, as the name implies, they correspond to HTTP/1.1 and HTTP/2 versions of the implementation, in this method internal implementation of connection pool reuse processing
- There is a resultConnection, which is generated by findHealthyConnection(), and we continue to source
- FindHealthyConnection () method
@Throws(IOException::class)
private fun findHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// Confirm that the connection is good.
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
// If it isn't, take it out of the pool.
candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes
// would happen if we made a new connection and it immediately is detected as unhealthy.
if(nextRouteToTry ! =null) continueval routesLeft = routeSelection? .hasNext() ? :true
if (routesLeft) continue
val routesSelectionLeft = routeSelector? .hasNext() ? :true
if (routesSelectionLeft) continue
throw IOException("exhausted all routes")}}Copy the code
This returns a RealConnection and finds that it ends up calling the findConnection() method, which is just a simple operation on the RealConnection object
- FindConnection () method
@Throws(IOException::class)
private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
// Attempt to reuse the connection from the call.
// Try to use a connection that has already been allocated. The connection that has already been allocated may have been restricted
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if(callConnection ! =null) {
var toClose: Socket? = null
If the connection has been restricted to create new streams, or if it is not the same host and port, return a Socket to close the connection
synchronized(callConnection) {
if(callConnection.noNewExchanges || ! sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } }// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
// because we already acquired it.
if(call.connection ! =null) {
check(toClose == null)
return callConnection
}
// The call's connection was released.
// Close the connectiontoClose? .closeQuietly() eventListener.connectionReleased(call, callConnection) }// We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
// Try to get a connection from the connection pool
if (connectionPool.callAcquirePooledConnection(address, call, null.false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// Nothing in the pool. Figure out what route we'll try next.
val routes: List<Route>?
val route: Route
if (nextRouteToTry ! =null) {
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if(routeSelection ! =null&& routeSelection!! .hasNext()) {// Use a route from an existing route selection.
routes = nullroute = routeSelection!! .next() }else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
// Get a link from the connection pool based on a list of IP addresses
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
// Connect. Tell the call about the connecting call so async cancels work.
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
// If we made another connection to the host, please merge the connection. This makes three different lookups in the connection pool
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
// Add the connection to the connection poolconnectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call, newConnection)return newConnection
}
Copy the code
- Because the code is written comments, the overall look just about get a connection from a connection pool, and then returns the connection, and said to be three different lookup, we enter into callAcquirePooledConnection method conducted which operations
- CallAcquirePooledConnection () method
fun callAcquirePooledConnection( address: Address, call: RealCall, routes: List
? , requireMultiplexed: Boolean )
: Boolean {
for (connection in connections) {
synchronized(connection) {
if(requireMultiplexed && ! connection.isMultiplexed)return@synchronized
if(! connection.isEligible(address, routes))return@synchronized
call.acquireConnectionNoEvents(connection)
return true}}return false
}
Copy the code
- Obviously, this goes through all the connections, then finds one available, returns true, or false if none is available;
- The main arguments passed in to this method are address(host and port address), RealCall instance, routes and whether multiplexing requireed is used (this is for Http2)
- Going back to the previous findConnection() method, let’s discuss the three different lookups that were invoked
/ / for the first time
connectionPool.callAcquirePooledConnection(address, call, null.false)
/ / the second time
connectionPool.callAcquirePooledConnection(address, call, routes, false)
/ / the third time
connectionPool.callAcquirePooledConnection(address, call, routes, true)
Copy the code
Let’s compare the difference between the three calls
- The first call is just to get the connection without multiplexing
- Second at the beginning I don’t think is also like a only take multiplexing connection, but found that it was introduced into the routing list, you can see in callAcquirePooledConnection method, if not in isEligible Http2 will directly returns false, So this is specific to HTTP2, and does not have much to do with multiplexing. See isEligible implementations below:
// The code is too long. Here I have taken only a partial representation of the code
// 1. This connection must be HTTP/2.
if (http2Connection == null) return false
// 2. The routes must share an IP address.
if (routes == null| |! routeMatchesAny(routes))return false
Copy the code
- The third time is just taking multiplexed connections
- Look at the timing of the three calls
- First we make the first call to get the connection. What if we don’t get it? As you can see, it picks up a set of routes from localRouteSelector and continues to fetch them in the next call
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
Copy the code
- What if I don’t get the connection again next time? At this point you can see that we are going to create a connection newConnection()
// Connect. Tell the call about the connecting call so async cancels work.
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
Copy the code
Here I have a question: why do I need to call the connection again when I have already created the connection? Combined with the description of online materials, someone and I also raised this question and briefly analyzed it. The reason is that if there are two requests to access the same IP address at the same time, and then the two requests create a connection respectively, there will be two connections, resulting in a waste of resources. So the point of calling a connection pool is that after the first request creates a connection, when the next request tries to create a connection, it goes to the pool once and if it gets it, it stores the connection it created through nextRouteToTry. Take it out when it’s actually available
- A quick summary of the ConnectionInterceptor interceptor
Basically, as I understand it, it’s finding an available connection and generating the corresponding codec
Request the service interceptor CallServerInterceptor
The purpose of this interceptor is to initiate the actual network request and then describe the response returned by the server. Take a look at the code briefly
if(HttpMethod.permitsRequestBody(request.method) && requestBody ! =null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if(! exchange.connection.isMultiplexed) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
Copy the code
- The request has been written to the HTTP request
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
Copy the code
- This shows the body information that has finished reading the response
if ("close".equals(response.request.header("Connection"), ignoreCase = true) | |"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
Copy the code
- Closing the connection is complete
In this way, the interceptor completes the process of initiating a network request, releasing a generic framework flow chart
Stern said
To sum up, in which, besides OKHttp internal request process that cache and connection and interceptor these contents are very important points, for OKHttp source code parsing is roughly the classmate of everyone should have their own opinions, said the place with bad bosses welcome advice, some places still with doubt, hope to continue to improve