background

Recently, I reorganized the source code of Okhttp. I also wrote an article about Okhttp source code before, I think that will not be in-depth understanding of Okhttp, so this is still like fried salted rice? No – no – no, this time I will arrange some essence part, let everybody to learn something, as shown in the title, the topic to be discussed is how the connection pool Okhttp work, as well as its work principle, why want to finishing this article, because the connection pool Okhttp big may be asked during the interview process, thus summed up here, For your reference.

In order to better understand the process of initiating synchronization and asynchrony, I have drawn a sketch for you. If there are any inaccuracies, please point out:

  • We know that the Okhttp object passed through okhttpClient is initialized from the Builder object,The Builder is used here in Builder mode. The Builder mode basically isolates the initialization of properties from the external class, while the initialization of properties is handed over to the inner class Buidler class. The advantage of this is that the external class does not care about the initialization of properties.And there are at initializationinterceptors,networkInterceptorsInitialization of both interceptors, andDispatcher (dispatcher)Initialization, and what needs to be covered laterThe cache (cache)Initialization, etc.
  • After initialization through the builder build method to constructokhttpClientObject, which is called a client class, returns a RealCall object through its newCall method. Request information is required during the newCall process. Request information wraps the URL, Method, headers, and body information. Finally delivered via RealCall’s synchronous or asynchronous methodsokhttpClientthedispatcherIs checked before processing synchronous or asynchronousexecuted, so we cannot call asynchronous or synchronous methods on the same RealCall.
  • RealCall is wrapped as one when asynchronousAsyncCall, which is a runnable object. Then it comes to the dispenser asynchronous processing section, which will start with theAsyncCallTo join thereadyAsyncCallsWhich represents the set of requests for the preparation phase, followed by theRunningAsyncCalls (this collection holds collections to be requested soon)andreadyAsyncCallsSet to find the same hostAsyncCallIf found, the number of the same host recorded in the record will be given to theAsyncCall.Note that the number of hosts is recorded using the AtomicInteger
  • It then determines whether the maximum number of requests is greater than 64 and whether there are more than 5 hosts of the same host.This is also an okHTTP interviewIf both pass, the currentAsyncCallIncrement the number of host records of the same value by one, and then add torunningAsyncCallsIn the set, and then iterate over what just met the conditionAsyncCall, through the thread poolAsyncCallNotice hereThe thread pool configuration is not the core thread, there is no limit to the total number of threads, are the core thread that is to say, there is no limit to the number and, non-core threads waiting time is 60 seconds, and use task queue is SynchronousQueue will, it is a blocking queue, there is no capacity only be inside have no task, To the task in there, when it finished, can only be given away to put, such as its mission. Is this the Executors JDK provided inside newCachedThreadPool thread pool, may be okhttp want to define the parameters of the thread factory, define the name of the thread.
  • And that’s why I’m going into the child thread here, becauseAsyncCallIs a runnable, so eventually the execution comes to its run method, and the run method will eventually go toexecuteMethod, which goes to the interceptor part of okHTTP’s most interesting single-list structure, which assembs all interceptors into a collection and passes it toRealInterceptorChaintheprocessMethod, in which the next one is handled firstRealInterceptorChainInitialize it out, and then take the next oneRealInterceptorChainTo the current InterceptorinterceptMethod, and eventually a response is returned toAsyncCalltheexecuteMethods.
  • Process the currentAsyncCallAfter, will hand overdispatcher, it will put theAsyncCallSubtract one from the host number of therunningAsyncCallsFrom the set, and then from the setreadyAsyncCallsTake the rest of the setAsyncCallContinue until the execution is completereadyAsyncCallsThe inside of theAsyncCall.

For okHTTP making asynchronous and synchronous requests, see okHTTP source code here

Or look at the okhttpOkHttp grand process analysis I analyzed earlier

This is how okHTTP is executed, but the most important part is the interceptor part. I will introduce the connection pool part of the interceptor in this chapter, and introduce it in source code form:

Meaning of connection pooling

  • Frequent attempts to establish Sokcet (TCP three-way handshake) and disconnect sockets (TCP four-way breakup) are costly and time consuming. Keepalive connections in HTTP play an important role in reducing latency and increasing speed.
  • Reusing connections requires connection management, which introduces the concept of connection pooling.
  • Okhttp supports five concurrent KeepAlive connections. The default link life is 5 minutes (the time that a link remains alive after it is idle). The connection pool is implemented by ConectionPool to recycle and manage connections.

The profile

The connection pool part is mainly in the RealConnectionPool class, which stores all connections using Connections. CleanupRunnable is a RealConnection specifically designed to clear timeouts. If there is a cleanup task, there must be a cleanup thread pool. KeepAliveDurationNs indicates the keep-alive time of each connection, which is 5 minutes by default. MaxIdleConnections specifies the maximum number of connections in the pool, which is 5 by default. The fpga field in RealConnection is used to store the number of transmitters in the connection, and the number of transmitters in it can be used to mark whether the RealConnection is in use.

Note: If you just want to know how to ask questions about the connection pool during the interview, you can read the end of the article

Source code analysis

1.RealCall#getResponseWithInterceptorChain

Interceptors assembly is in the RealCall getResponseWithInterceptorChain method into the collection:

Response getResponseWithInterceptorChain () throws IOException {/ / all set of interceptors assembly List < Interceptor > interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(new RetryAndFollowUpInterceptor(client)); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); Add (new ConnectInterceptor(client)); if (! forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); try { Response response = chain.proceed(originalRequest); return response; } catch (IOException e) { } finally { } } }Copy the code

2.ConnectInterceptor#intercept

The connection pool is wrapped up in the ConnectInterceptor, so let’s take a look inside:

@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); Realchain.transmitter (); // Connect to link = realChain.transmitter(); Boolean doExtensiveHealthChecks =! request.method().equals("GET"); Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks); return realChain.proceed(request, transmitter, exchange); }Copy the code

3.Transmitter#newExchange

Get the Transmitter object of chain, which is the wrapper class of RealCall, okhttpClient, connectionPool, etc., and pass the identification of whether the Transmitter is not a GET request to the newExchange method:

Exchange newExchange(Interceptor.Chain chain, Boolean doExtensiveHealthChecks) {//exchangeFinder is a package for connectionPool, connectionPool, request, etc. It is initialized in RetryAndFollowUpInterceptor blocker ExchangeCodec codec = exchangeFinder. Find (client, chain, doExtensiveHealthChecks); Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec); synchronized (connectionPool) { this.exchange = result; return result; }}Copy the code

4.ExchangeCodec#find

This method hands the acquisition of exchange ecodec to an exchangeFinder object, which is an interface and implements classes such as http2exchange ecodec and Http1ExchangeCodec, These two classes of http1 and http2 said the connection is established, which realized the writeRequestHeaders and createRequestBody method, these two methods are used in CallServerInterceptor blocker. ExchangeFinder is the connectionPool, connectionPool, request information such as packaging, it is initialized in RetryAndFollowUpInterceptor blocker. Let’s move on to exchangeFinder’s find method:

public ExchangeCodec find(
    OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  int connectTimeout = chain.connectTimeoutMillis();
  int readTimeout = chain.readTimeoutMillis();
  int writeTimeout = chain.writeTimeoutMillis();
  int pingIntervalMillis = client.pingIntervalMillis();
  boolean connectionRetryEnabled = client.retryOnConnectionFailure();

  try {
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
        writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
    return resultConnection.newCodec(client, chain);
  } catch (RouteException e) {
  } catch (IOException e) {

  }
}
Copy the code

5.ExchangeCodec#findHealthyConnection

There’s nothing to say about this method, just look at the findHealthyConnection method:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Boolean doExtensiveHealthChecks) throws IOException {// Start a loop to search for qualified RealConnection while (true) {// Search for connections within the current keep-alive period RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); // If this is a brand new connection, We can skip the extensive health checks. Synchronized (connectionPool) { If (candidate.successCount == 0) {return candidate; }} // If the socket connection breaks or fails, it is considered not a valid connection. candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges(); continue; } return candidate; }}Copy the code

6.ExchangeCodec#findConnection

This method starts a loop to find connections that meet the keep-alive duration, and then determines whether the connection failed during the CallServerInterceptor interceptor’s processing or if the socket connection for the connection was broken and is not considered a valid connection. We’ll focus on how findConnection handles keep-alive connections:

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled"); hasStreamFailure = false; Route previousRoute = retryCurrentRoute()? transmitter.connection.route() : null; ReleasedConnection = transmitter. Connection; toClose = transmitter.connection ! = null && transmitter.connection.noNewExchanges ? transmitter.releaseConnectionNoEvents() : null; // Check whether there is a transmitter. Connection! = null) { result = transmitter.connection; releasedConnection = null; } / / if the above didn't find from the connection pool for connecting the if (result = = null) {if (connectionPool. TransmitterAcquirePooledConnection (address, transmitter, null, false)) { foundPooledConnection = true; result = transmitter.connection; } else { selectedRoute = previousRoute; } } } closeQuietly(toClose); // If (result! = null) { return result; } // Check for new routes Boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || ! routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } List<Route> routes = null; Synchronized (connectionPool) {if (newRouteSelection) {routes = routeselection.getall (); / / again from the connection pool for the if (connectionPool. TransmitterAcquirePooledConnection (address, transmitter, routes, false)) { foundPooledConnection = true; result = transmitter.connection; }} // The connection was not found from the pool. FoundPooledConnection result = new RealConnection(connectionPool, selectedRoute); connectingConnection = result; }} / / find return the connection from the connection pool if (foundPooledConnection) {eventListener. ConnectionAcquired (call, result); return result; } result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); connectionPool.routeDatabase.connected(result.route()); Socket socket = null; synchronized (connectionPool) { connectingConnection = null; / / to continue from the connection pool to find a time to see if any eligible connection pool (connectionPool. TransmitterAcquirePooledConnection (address, transmitter, routes, true)) { result.noNewExchanges = true; socket = result.socket(); result = transmitter.connection; } else {// add the current connection to the connectionPool if the condition is not met connectionpool.put (result); } } closeQuietly(socket); return result; }Copy the code

First will determine whether you are assigned first connection, if not, from the connection pool to find qualified connection, through connectionPool. TransmitterAcquirePooledConnection to find, if found, return true After that, the connection will be put into the transmitter. If there is no connection in the connection pool, the connection will be created. After that, the TCP connection will be prepared, and then the connection will be obtained again from the connection pool. This involves obtaining okHTTP proxies and routes, which I do not understand very well, so I will not describe here

7.RealConnectionPool#transmitterAcquirePooledConnection

We know above, through the connectionPool. TransmitterAcquirePooledConnection get connected, through the connectionPool. Put puts the connection to the connection pool, we first to see how to obtain the effective connection:

boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter, @Nullable List<Route> routes, Boolean requireed) {// Connections is a Deque queue, for (RealConnection connection: Connections) {// default requireMultiplexed to false if (requireMultiplexed &&! connection.isMultiplexed()) continue; // Check whether the current connection is qualified if (! connection.isEligible(address, routes)) continue; // Assign a value to the transmitter connection object // Add the transmitter object to the connection for short contact transmitter.acquireConnectionNoEvents(connection); return true; } return false; }Copy the code

The connection pool storage space in okHTTP is a queue, and the queue is an ArrayDeque, which is a double-endian queue:

private final Deque<RealConnection> connections = new ArrayDeque<>();
Copy the code

8.RealConnection#isEligible

If the connection is not qualified, just skip the connection, then the operation of the transmitter acquireConnectionNoEvents method, respectively, we see the isEligible (acceptance) and acquireConnectionNoEvents method:

boolean isEligible(Address address, @nullable List<Route> routes) {// If the host of the url is the same as the host of the Route of the current connection, the connection is considered qualified (address.url().host().equals(this.route().address().url().host())) { return true; } return true; }Copy the code

I have removed the other criteria to determine whether the connection is qualified, leaving only the host comparison code. If two connection hosts are the same, it is considered to be the connection to be found, otherwise it is not.

9.Transmitter#acquireConnectionNoEvents

Void acquireConnectionNoEvents (RealConnection connection) {/ / this. In the current connection to the Transmitter connection = connection; // Add a transmitter for the current connected CCD, note that this is packaged by weak reference, Pay attention to the collection will be useful to the connection. Behind the clean transmitters. The add (new TransmitterReference (this, callStackTrace)); }Copy the code

The current connection is attached to the current Transmitter connection, because it will be returned to Result later. Then add a weakly referenced CCD object to the current connected CCD set, It has been said in the beginning that the number of CCD set of realConnection is for determining whether the realConnection is being used when clean.

If the above conditions are met, the transmitterAcquirePooledConnection returns true, says from the connection pool to find realConnection succeeded.

10.RealConnectionPool#put

RealConnection = RealConnectionPool = RealConnectionPool = RealConnectionPool = RealConnectionPool = RealConnectionPool = RealConnectionPool = RealConnectionPool = RealConnectionPool

void put(RealConnection connection) { assert (Thread.holdsLock(this)); // If there is no work being cleared if (! cleanupRunning) { cleanupRunning = true; Executor. Execute (cleanupRunnable); } // Add the connection to the queue. }Copy the code

11.RealConnectionPool#cleanupRunnable

The put process is very simple. It does the cleanup and then the add. Let’s take a look at the cleanup and go straight to the definition of cleanupRunnable:

Private Final Runnable cleanupRunnable = () -> {while (true) { And returns how long to wait for the next cleanup long waitNanos = cleanup(system.nanotime ()); If (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); Synchronized (RealConnectionPool. This) {try {// synchronized (RealConnectionPool. Waiting time is waitMillis then continue cleaning operation RealConnectionPool. This. Wait (waitMillis waitNanos (int)); } catch (InterruptedException ignored) { } } } } };Copy the code

This method returns the time to wait for the next cleanup operation. If it returns -1, there are no realconnections to cleanup. Specify how many seconds to wait for the thread to continue the cleanup operation by synchronizing the code block.

12.RealConnectionPool#cleanup

Ok, let’s focus on the cleanup method:

long cleanup(long now) { int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE; synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); / / have to judge the current connection is using the if (pruneAndGetAllocationCount (connection, now) > 0) {/ / if true when using inUseConnectionCount + 1, And directly out of the loop inUseConnectionCount++; continue; } // if the connection is not in use, add 1 to the variable idleConnectionCount++; Long idleDurationNs = now-connection. idleAtNanos; If (idleDurationNs > longestIdleDurationNs) {longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; }} // Remove if (longestIdleDurationNs >=) from the connection pool if the maximum number of connections alive exceeds the keep-alive time or the number of connections not in use exceeds maxIdleConnections this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { connections.remove(longestIdleConnection); } else if (idleConnectionCount > 0) {// If there are still connections that are not in use, but the keep-alive time does not exceed the maximum number of keep-alive connections, Return keepAliveDurationNs - longestIdleDurationNs; } else if (inUseConnectionCount > 0) {return keepAliveDurationNs;} else if (inUseConnectionCount > 0) {keepAliveDurationNs; } else {// If there is no connection in use, return -1, indicating that the next cleanup operation is not required. return -1; } } closeQuietly(longestIdleConnection.socket()); return 0; }Copy the code

The cleanup method is fairly clear:

  • First of all, according to thepruneAndGetAllocationCountThe idleConnectionCount () method is used to determine whether the connection is in use. If it is, it jumps out of the loop. If not, the idleConnectionCount is increased by 1.
  • Figure out how long the current connection has been alive, and then find the connection that has survived the longest.
  • If the oldest connection is longer than the keep-alive time or the number of connections not in use exceeds maxIdleConnections, it is removed from the connection pool. Note that the method returns 0, and proceed with the next cleanup operation.
  • If there are no connections in use, but the keep-alive time does not exceed the keep-alive time and the number does not exceed the maximum number, returns how long it will take before the next cleanup operation.
  • If all connections are in use, the time set by keep-alive is returned directly, which is 5 minutes.
  • If no connection is in use, -1 is returned, indicating that the next cleanup operation is not required.

13.RealConnectionPool#pruneAndGetAllocationCount

When removing pruneAndGetAllocationCount method use to judge whether the connection is used, do you remember we have talked about above realConnection transmitters of collection, it is to store the connection object of transmitter, Here’s a look:

private int pruneAndGetAllocationCount(RealConnection connection, long now) { List<Reference<Transmitter>> references = connection.transmitters; for (int i = 0; i < references.size(); ) { Reference<Transmitter> reference = references.get(i); // If the Transmitter object is not empty, the link is used. If (reference. Get ()! = null) { i++; continue; } // If it is empty, the current weak reference is removed from the collection. connection.noNewExchanges = true; // If the RealConnection non-contact set is empty, it is not in use and the time for non-contact investigation is set to 0. If (references. IsEmpty ()) {connection.idleatnanos = now-KeepAliveDurationns; return 0; }}Copy the code

14.Transmitter#releaseConnectionNoEvents

In the above analysis, we found that if the CCD Transmitter in RealConnection is empty, it means that the connection is not in use. When is it empty? We can found in releaseConnectionNoEvents method to remove the operation of the Transmitter:

Socket releaseConnectionNoEvents() { int index = -1; / / if the current connection and inside of any of the Transmitter is the same, is found to remove Transmitter for (int I = 0, size = this. Connection. The transmitters. The size (); i < size; i++) { Reference<Transmitter> reference = this.connection.transmitters.get(i); if (reference.get() == this) { index = i; break; } } RealConnection released = this.connection; / / delete the current Transmitter released. Transmitters. Remove (index); this.connection = null; if (released.transmitters.isEmpty()) { released.idleAtNanos = System.nanoTime(); if (connectionPool.connectionBecameIdle(released)) { return released.socket(); } } return null; }Copy the code

This method is a method for determining the deletion of the Transmitter for CCD in realConnection, which will trigger the deletion of Transmitter every time it enters the closed connection. Therefore, it can be seen that the number of CCD in connection decreases when the connection is closed. If the number of non-contact fields in the connection decreases to 0, the connection is not in use and it can be removed from the connection pool during the cleaning process. In we create RealConnection connection of and access to the connection pool will be called when the Transmitter acquireConnectionNoEvents method, This method will add a Transmitter to the transmitters RealConnection set a weak reference object, this also happens to echo and releaseConnectionNoEvents method.

conclusion

So much for the source code part of the connection pool, let’s summarize the connection pool related issues:

  • Connection pooling is designed to solve the problem of frequently establishing Sokcet connections (TCP three-way handshake) and disconnecting sockets (TCP four-way breakup).
  • Okhttp connection pools support keep-alive connections for up to five links, and the default keep-alive duration is 5 minutes.
  • The connection pool implementation class is RealConnectionPool, which is responsible for the storage and cleanup through ArrayDeque’s two-ended queue, removing tasks assigned to the thread pool to process cleanupRunnable.
  • RealConnection is given to RealConnection each time it is created or fetched from the connection pool

For the CCD set, a reference to the transmitter object is added mainly for determining later whether the connection is in use

  • Connections with the same host in the connection pool are compared when looking for connections in the connection pool.
  • If a connection cannot be found in the connection pool, it is created and stored in the connection pool.
  • When the connection is placed into the connection pool, the cleaning task will be put into the thread pool for execution. In the deletion task, it will determine whether the current connection is in use and whether the size of the CCD set based on RealConnection is 0. If not, Locate the connection with the longest idle time. If the connection with the longest idle time exceeds the default keep-alive value of 5 minutes or the number of idle connections exceeds the maximum keep-alive value of 5 minutes, the connection with the longest alive time will be deleted from the connection pool. Ensure the maximum idle time and maximum number of connections for keep-alive.