Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.
Concept is clear
codec
Codec is a read-write tool for network message interaction.
Purpose: encode HTTP request, decode HTTP response.
Codec is created with a health connection, which contains not only the code access algorithm but also the available health connection. You only need to write to the CODEC to send a request, and read to the CODEC to get the message back.
Exchange
Exchange is the read and write administrator of network message Exchange. Codec is called internally to perform read and write operations.
Sends an HTTP request and response pair, the layer of connection management and exchange Dec (which actually handles I/O) events.
Source code analysis
Get the codec
As we’ve seen in the previous articles, OkHttp is a ConnectInterceptor that connects to the network, as shown in the figure below. The connection step eventually calls the realconnection.newcodec () method in ExchangeFinder
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
...
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
}
}
Copy the code
Realconnection.newcodec () establishes a coDEC from a health connection.
internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
val http2Connection = this.http2Connection
return if(http2Connection ! =null) {
// If http2 is used,
// Returns the http2 codec
Http2ExchangeCodec(client, this, chain, http2Connection)
} else {
// If not http2,
// Returns the http1 codec
socket.soTimeout = chain.readTimeoutMillis()
source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
Http1ExchangeCodec(client, this, source, sink)
}
}
Copy the code
As shown in the code, this method establishes an HTTP2 or HTTP1 codec based on the different HTTP protocol types.
Http1ExchangeCodec
Go to Http1ExchangeCodec(Client, this, source, sink) to analyze what the codec is. Taking the writeRequest method in Http1Exchange Dec as an example, you can see that this method is actually stitching together the Http protocol request rules, writing our request into the data.
/** Returns bytes of a request header for sending on an HTTP transport. */
fun writeRequest(headers: Headers, requestLine: String) {
check(state == STATE_IDLE) { "state: $state" }
sink.writeUtf8(requestLine).writeUtf8("\r\n")
for (i in 0 until headers.size) {
sink.writeUtf8(headers.name(i))
.writeUtf8(":")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n")
}
sink.writeUtf8("\r\n")
state = STATE_OPEN_REQUEST_BODY
}
Copy the code
Http2ExchangeCodec
Also take the writeRequestHeaders method in Http2ExchangeCodec as an example, you can see that the method reads and writes through the Stream. Http2 is different from http1’s one request and one response. Instead, it is divided into multiple streams, and each Stream can have multiple requests. Multiple responses (Server Push).
override fun writeRequestHeaders(request: Request) {
if(stream ! =null) return
valhasRequestBody = request.body ! =null
val requestHeaders = http2HeadersList(request)
stream = http2Connection.newStream(requestHeaders, hasRequestBody)
// We may have been asked to cancel while creating the new stream and sending the request
// headers, but there was still no stream to close.
if(canceled) { stream!! .closeLater(ErrorCode.CANCEL)throw IOException("Canceled") } stream!! .readTimeout().timeout(chain.readTimeoutMillis.toLong(), TimeUnit.MILLISECONDS) stream!! .writeTimeout().timeout(chain.writeTimeoutMillis.toLong(), TimeUnit.MILLISECONDS) }Copy the code
Access to Exchange
After get the codec, farewell to see the figure above, go back to val exchange = realChain. Call the initExchange (chain) this line, don’t forget, we set off at first place, why should we get the codec.
We need to plug a CODEC into an Exchange to generate an Exchange.
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
...
return result
}
Copy the code
After codec is stored in Exchange (appearance mode), Exchange can be used to write head, body, etc.
Codec is the real read and write tool, Exchange is the administrator, is the network message Exchange, request and response back and forth called Exchange.
Read and write a message
Looking at the Exchange class, as shown below, Methods such as finishRequest() and readResponseHeaders() actually call methods such as finishRequest() and readResponseHeaders() of the Exchange Dec class.
class Exchange(
internal val call: RealCall,
internal val eventListener: EventListener,
internal val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
...
@Throws(IOException::class)
fun writeRequestHeaders(request: Request) {
try {
eventListener.requestHeadersStart(call)
codec.writeRequestHeaders(request)
eventListener.requestHeadersEnd(call, request)
} catch(e: IOException) { ... }}@Throws(IOException::class)
fun createRequestBody(request: Request, duplex: Boolean): Sink {
this.isDuplex = duplex
valcontentLength = request.body!! .contentLength() eventListener.requestBodyStart(call)val rawRequestBody = codec.createRequestBody(request, contentLength)
return RequestBodySink(rawRequestBody, contentLength)
}
@Throws(IOException::class)
fun flushRequest(a) {
try {
codec.flushRequest()
} catch(e: IOException) { ... }}@Throws(IOException::class)
fun finishRequest(a) {
try {
codec.finishRequest()
} catch(e: IOException) { ... }}@Throws(IOException::class)
fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
try {
valresult = codec.readResponseHeaders(expectContinue) result? .initExchange(this)
return result
} catch(e: IOException) { ... }}fun responseHeadersEnd(response: Response) {
eventListener.responseHeadersEnd(call, response)
}
@Throws(IOException::class)
fun openResponseBody(response: Response): ResponseBody {
try {
val contentType = response.header("Content-Type")
val contentLength = codec.reportedContentLength(response)
val rawSource = codec.openResponseBodySource(response)
val source = ResponseBodySource(rawSource, contentLength)
return RealResponseBody(contentType, contentLength, source.buffer())
} catch(e: IOException) { ... }}@Throws(SocketException::class)
fun newWebSocketStreams(a): RealWebSocket.Streams {
call.timeoutEarlyExit()
return codec.connection.newWebSocketStreams(this)}fun noNewExchangesOnConnection(a) {
codec.connection.noNewExchanges()
}
fun cancel(a) {
codec.cancel()
}
...
}
Copy the code
Where the truth comes in
ConnectInterceptor gets Exchange from the ConnectInterceptor. The chain of responsibility mode is carried down to the CallServerInterceptor. As the last link in the chain, the interceptor sends a request to the server and receives a response, as shown in the source code below. As you can see, the ConnectInterceptor network request is ultimately executed by the exchange.
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if(HttpMethod.permitsRequestBody(request.method) && requestBody ! =null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if(! exchange.connection.isMultiplexed) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null| |! requestBody.isDuplex()) { exchange.finishRequest() }if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!!!!if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false}}var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!!!!if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) | |"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body? .contentLength() ? : -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body? .contentLength()}")}return response
}
}
Copy the code
reference
square.github.io/okhttp/
zhuanlan.zhihu.com/p/58093669
www.jianshu.com/p/8d69fd920…
Juejin. Cn/post / 684490…