Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

Concept is clear

codec

Codec is a read-write tool for network message interaction.

Purpose: encode HTTP request, decode HTTP response.

Codec is created with a health connection, which contains not only the code access algorithm but also the available health connection. You only need to write to the CODEC to send a request, and read to the CODEC to get the message back.

Exchange

Exchange is the read and write administrator of network message Exchange. Codec is called internally to perform read and write operations.

Sends an HTTP request and response pair, the layer of connection management and exchange Dec (which actually handles I/O) events.

Source code analysis

Get the codec

As we’ve seen in the previous articles, OkHttp is a ConnectInterceptor that connects to the network, as shown in the figure below. The connection step eventually calls the realconnection.newcodec () method in ExchangeFinder

  fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          ...
      )
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      
    }
  }
Copy the code

Realconnection.newcodec () establishes a coDEC from a health connection.

internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
    val socket = this.socket!!
    val source = this.source!!
    val sink = this.sink!!
    val http2Connection = this.http2Connection

    return if(http2Connection ! =null) {
      // If http2 is used,
      // Returns the http2 codec
      Http2ExchangeCodec(client, this, chain, http2Connection)
    } else {
       // If not http2,
      // Returns the http1 codec
      socket.soTimeout = chain.readTimeoutMillis()
      source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
      sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
      Http1ExchangeCodec(client, this, source, sink)
    }
  }
Copy the code

As shown in the code, this method establishes an HTTP2 or HTTP1 codec based on the different HTTP protocol types.

Http1ExchangeCodec

Go to Http1ExchangeCodec(Client, this, source, sink) to analyze what the codec is. Taking the writeRequest method in Http1Exchange Dec as an example, you can see that this method is actually stitching together the Http protocol request rules, writing our request into the data.

  /** Returns bytes of a request header for sending on an HTTP transport. */
  fun writeRequest(headers: Headers, requestLine: String) {
    check(state == STATE_IDLE) { "state: $state" }
    sink.writeUtf8(requestLine).writeUtf8("\r\n")
    for (i in 0 until headers.size) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(":")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n")
    }
    sink.writeUtf8("\r\n")
    state = STATE_OPEN_REQUEST_BODY
  }
Copy the code

Http2ExchangeCodec

Also take the writeRequestHeaders method in Http2ExchangeCodec as an example, you can see that the method reads and writes through the Stream. Http2 is different from http1’s one request and one response. Instead, it is divided into multiple streams, and each Stream can have multiple requests. Multiple responses (Server Push).

override fun writeRequestHeaders(request: Request) {
    if(stream ! =null) return

    valhasRequestBody = request.body ! =null
    val requestHeaders = http2HeadersList(request)
    stream = http2Connection.newStream(requestHeaders, hasRequestBody)
    // We may have been asked to cancel while creating the new stream and sending the request
    // headers, but there was still no stream to close.
    if(canceled) { stream!! .closeLater(ErrorCode.CANCEL)throw IOException("Canceled") } stream!! .readTimeout().timeout(chain.readTimeoutMillis.toLong(), TimeUnit.MILLISECONDS) stream!! .writeTimeout().timeout(chain.writeTimeoutMillis.toLong(), TimeUnit.MILLISECONDS) }Copy the code

Access to Exchange

After get the codec, farewell to see the figure above, go back to val exchange = realChain. Call the initExchange (chain) this line, don’t forget, we set off at first place, why should we get the codec.

We need to plug a CODEC into an Exchange to generate an Exchange.

 internal fun initExchange(chain: RealInterceptorChain): Exchange {
    ...
    val exchangeFinder = this.exchangeFinder!!
    val codec = exchangeFinder.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    ...
    return result
  }
Copy the code

After codec is stored in Exchange (appearance mode), Exchange can be used to write head, body, etc.

Codec is the real read and write tool, Exchange is the administrator, is the network message Exchange, request and response back and forth called Exchange.

Read and write a message

Looking at the Exchange class, as shown below, Methods such as finishRequest() and readResponseHeaders() actually call methods such as finishRequest() and readResponseHeaders() of the Exchange Dec class.


class Exchange(
  internal val call: RealCall,
  internal val eventListener: EventListener,
  internal val finder: ExchangeFinder,
  private val codec: ExchangeCodec
) {
  ...

  @Throws(IOException::class)
  fun writeRequestHeaders(request: Request) {
    try {
      eventListener.requestHeadersStart(call)
      codec.writeRequestHeaders(request)
      eventListener.requestHeadersEnd(call, request)
    } catch(e: IOException) { ... }}@Throws(IOException::class)
  fun createRequestBody(request: Request, duplex: Boolean): Sink {
    this.isDuplex = duplex
    valcontentLength = request.body!! .contentLength() eventListener.requestBodyStart(call)val rawRequestBody = codec.createRequestBody(request, contentLength)
    return RequestBodySink(rawRequestBody, contentLength)
  }

  @Throws(IOException::class)
  fun flushRequest(a) {
    try {
      codec.flushRequest()
    } catch(e: IOException) { ... }}@Throws(IOException::class)
  fun finishRequest(a) {
    try {
      codec.finishRequest()
    } catch(e: IOException) { ... }}@Throws(IOException::class)
  fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
    try {
      valresult = codec.readResponseHeaders(expectContinue) result? .initExchange(this)
      return result
    } catch(e: IOException) { ... }}fun responseHeadersEnd(response: Response) {
    eventListener.responseHeadersEnd(call, response)
  }

  @Throws(IOException::class)
  fun openResponseBody(response: Response): ResponseBody {
    try {
      val contentType = response.header("Content-Type")
      val contentLength = codec.reportedContentLength(response)
      val rawSource = codec.openResponseBodySource(response)
      val source = ResponseBodySource(rawSource, contentLength)
      return RealResponseBody(contentType, contentLength, source.buffer())
    } catch(e: IOException) { ... }}@Throws(SocketException::class)
  fun newWebSocketStreams(a): RealWebSocket.Streams {
    call.timeoutEarlyExit()
    return codec.connection.newWebSocketStreams(this)}fun noNewExchangesOnConnection(a) {
    codec.connection.noNewExchanges()
  }

  fun cancel(a) {
    codec.cancel()
  }
...
}

Copy the code

Where the truth comes in

ConnectInterceptor gets Exchange from the ConnectInterceptor. The chain of responsibility mode is carried down to the CallServerInterceptor. As the last link in the chain, the interceptor sends a request to the server and receives a response, as shown in the source code below. As you can see, the ConnectInterceptor network request is ultimately executed by the exchange.

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()

    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    if(HttpMethod.permitsRequestBody(request.method) && requestBody ! =null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if(! exchange.connection.isMultiplexed) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }

    if (requestBody == null| |! requestBody.isDuplex()) { exchange.finishRequest() }if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!!!!if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false}}var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      // Server sent a 100-continue even though we did not request one. Try again to read the actual
      // response status.
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!!!!if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }

    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) | |"close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body? .contentLength() ? : -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body? .contentLength()}")}return response
  }
}
Copy the code

reference

square.github.io/okhttp/

zhuanlan.zhihu.com/p/58093669

www.jianshu.com/p/8d69fd920…

Juejin. Cn/post / 684490…