preface
I recently came up with the idea of doing an in-depth analysis of the main Android open source framework, and then writing a series of articles, including detailed usage and source code analysis of the framework. The purpose is to understand the underlying principle of the framework by appreciating the source code of god, that is, to do not only know it, but also know why.
Here I say their own experience reading source code, I generally in accordance with the peacetime use of a framework or a system source code process, first of all to know how to use, and then go to the bottom of each step to do what, with what good design patterns, why so design.
Series of articles:
- Android mainstream open source framework (a) OkHttp -HttpClient and HttpURLConnection use details
- Android main open source framework (ii) OkHttp usage details
- Android mainstream open source framework (three) OkHttp source code analysis
- Android mainstream open source framework (iv) Retrofit usage details
- Android mainstream open source framework (v) Retrofit source code analysis
- Android mainstream open source framework (six) Glide execution process source code analysis
- More frameworks continue to be updated…
Check out AndroidNotes for more dry stuff
A basic use example of OkHttp
1.1 Synchronizing GET Requests
// create the OkHttpClient object
OkHttpClient client = new OkHttpClient();
// (2) Create Request object
Request request = new Request.Builder()
.url(url)
.build();
// create a Call object.
Call call = client.newCall(request);
// (4) Send the request and get the data returned by the server
Response response = call.execute();
// (5) Fetch the corresponding data
String data = response.body().string();
Copy the code
1.2 Asynchronous GET Requests
// create the OkHttpClient object
OkHttpClient client = new OkHttpClient();
// (2) Create Request object
Request request = new Request.Builder()
.url(url)
.build();
// create a Call object.
Call call = client.newCall(request);
// (4) Send the request and get the data returned by the server
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {}@Override
public void onResponse(Call call, Response response) throws IOException {
// (5) Fetch the corresponding dataString data = response.body().string(); }});Copy the code
As you can see, the basic use of OkHttp is only 5 steps, whether synchronous or asynchronous. The only difference between synchronous requests and asynchronous requests is step (4), which uses the synchronous method execute() and the asynchronous method enqueue(). Next, we will read the source code according to these 5 steps.
For more information on OkHttp, see my article on Android mainstream Open Source Frameworks (2)
Second, OkHttp source code analysis
Source version: 3.11.0
2.1 (1) Create the OkHttpClient object
OkHttpClient client = new OkHttpClient();
Copy the code
To create the OkHttpClient object, click ok.
/*OkHttpClient*/
public OkHttpClient(a) {
this(new Builder());
}
Copy the code
Then there is the parametrized construction:
/*OkHttpClient*/
OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;
this.protocols = builder.protocols;
this.connectionSpecs = builder.connectionSpecs;
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;
this.proxySelector = builder.proxySelector;
this.cookieJar = builder.cookieJar;
this.cache = builder.cache;
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;
boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
if(builder.sslSocketFactory ! =null| |! isTLS) {this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = Util.platformTrustManager();
this.sslSocketFactory = newSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
if(sslSocketFactory ! =null) {
Platform.get().configureSslSocketFactory(sslSocketFactory);
}
this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;
this.dns = builder.dns;
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;
if (interceptors.contains(null)) {
throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
throw new IllegalStateException("Null network interceptor: "+ networkInterceptors); }}Copy the code
As you can see, there are a lot of constants. The Builder mode is used here, so these constants can be configured with build(). If no configuration is used, the default configuration passed in the no-parameter construct is used. Each constant means the following:
/*OkHttpClient*/
public Builder(a) {
dispatcher = new Dispatcher();/ / dispenser
protocols = DEFAULT_PROTOCOLS;/ / HTTP protocol
connectionSpecs = DEFAULT_CONNECTION_SPECS;// Transport layer version and connection protocol
eventListenerFactory = EventListener.factory(EventListener.NONE);// Event monitor factory
proxySelector = ProxySelector.getDefault();// Proxy selector
cookieJar = CookieJar.NO_COOKIES;// cookie
socketFactory = SocketFactory.getDefault();/ / socket factory
hostnameVerifier = OkHostnameVerifier.INSTANCE;// Confirm the host name
certificatePinner = CertificatePinner.DEFAULT;/ / certificate chain
proxyAuthenticator = Authenticator.NONE;// Proxy authentication
authenticator = Authenticator.NONE;// Source server authentication
connectionPool = new ConnectionPool();/ / the connection pool
dns = Dns.SYSTEM;/ / domain name
followSslRedirects = true;// Whether to follow SSL redirection
followRedirects = true;// Whether to follow the redirection
retryOnConnectionFailure = true;// Whether to retry if the connection fails
connectTimeout = 10 _000;// Connection timed out
readTimeout = 10 _000;/ / read timeout
writeTimeout = 10 _000;/ / write timeout
pingInterval = 0;// The interval between HTTP / 2 and Web socket ping
}
Copy the code
2.2 (2) Creating a Request Object
Request request = new Request.Builder()
.url(url)
.build();
Copy the code
As you can see, the builder mode is also used here. Let’s click on Request to see:
/*Request*/
/ /...
final HttpUrl url;
final String method;
final Headers headers;
final @Nullable RequestBody body;
finalMap<Class<? >, Object> tags;/ /...
Copy the code
Finding a Request is simple. It is used to set the url, method, headers, body, and tag of the Request.
2.3 (3) Creating a Call object
Call call = client.newCall(request);
Copy the code
Let’s click on the newCall() method to see:
/*OkHttpClient*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
Copy the code
RealCall’s newRealCall() method was called and the OkHttpClient and Request object were passed in.
Follow up to the newRealCall() method:
/*RealCall*/
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
Copy the code
Discovery is that a RealCall object is created and returned to the upper level. RealCall is an implementation class for Call, which defines request-related operations such as synchronous asynchrony, request cancellation, and so on. So subsequent request-related operations are basically calling the methods defined by Call, which are actually executed by its implementation class, RealCall.
Finally see RealCall constructor, the function is relatively simple, only assignment some constants, and then create the retry and redirect the interceptor (RetryAndFollowUpInterceptor) (speak) behind this:
/*RealCall*/
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
Copy the code
2.4 (4) Send the request and get the data returned by the server
As we said earlier, the only difference between synchronous requests and asynchronous requests is step (4), which uses the synchronous method execute() and the asynchronous method enqueue(). So let’s do it in two different ways.
2.4.1 Synchronizing Requests
Response response = call.execute();
Copy the code
Let’s click on the execute() method to see:
/*RealCall*/
@Override public Response execute(a) throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);/ / (1)
Response result = getResponseWithInterceptorChain();/ / (2)
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);/ / (3)}}Copy the code
Executed () on concern (1) to add the incoming RealCall to a double-endian queue:
/*Dispatcher*/
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
Copy the code
Where runningSyncCalls is a two-ended queue that records the running synchronization request queue:
/*Dispatcher*/
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
Copy the code
Concern (2) returns a Response, the data returned by the server, indicating that this is where the request was executed. This is what we’ll focus on later.
Click in the Finished () method of concern (3) like this:
/*Dispatcher*/
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if(! calls.remove(call))throw new AssertionError("Call wasn't in-flight!");/ / (1)
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0&& idleCallback ! =null) { idleCallback.run(); }}Copy the code
You can see that concern (1) calls.remove(call) simply removes the current RealCall from the running synchronous request queue, indicating that the request has completed.
You’ll notice there’s a dispatcher that doesn’t talk about, but it’s actually a dispatcher that distributes requests. We have just analyzed that the dispatcher involved in synchronous requests is only used to log the queue of synchronous requests that are running and then removed when the request is completed. So this dispenser is mostly used for asynchronous requests, but we’ll leave that to asynchronous requests.
2.4.2 Asynchronous Request
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {}@Override
public void onResponse(Call call, Response response) throws IOException {}});Copy the code
Let’s click on the enqueue() method to see:
/*RealCall*/
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));/ / (1)
}
Copy the code
The first few lines are the same as the source code for the synchronous request. Click on the enqueue() method of concern (1) :
/*Dispatcher*/
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else{ readyAsyncCalls.add(call); }}Copy the code
As you can see, there are a lot of constants and variables in the Dispatcher object, so you can see that Dispatcher is mainly used for asynchronous requests. Let’s look at the constants and variables in the Dispatcher object:
/*Dispatcher*/
// Maximum number of concurrent requests
private int maxRequests = 64;
// Maximum number of requests per host
private int maxRequestsPerHost = 5;
// The callback that is called each time the scheduler becomes idle
private @Nullable Runnable idleCallback;
// The thread pool used to execute asynchronous tasks
private @Nullable ExecutorService executorService;
// Asynchronous request queue in preparation
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// A running asynchronous request queue
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
// A queue of synchronous requests is running
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
Copy the code
Now that you understand what these constants and variables mean, you can easily understand the enqueue() method of concern (1) above, that is, if the number of running asynchronous request queues is less than the maximum number of concurrent requests and the number of running requests per host is less than the maximum number of requests per host, The current request is added to the Running Asynchronous request Queue and executed in the thread pool, otherwise the current request is added to the Asynchronous request Queue in Preparation.
AsyncCall thread pool AsyncCall thread pool AsyncCall thread pool AsyncCall thread pool AsyncCall
/*RealCall*/
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host(a) {
return originalRequest.url().host();
}
Request request(a) {
return originalRequest;
}
RealCall get(a) {
return RealCall.this;
}
@Override protected void execute(a) {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();/ / (1)
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this.new IOException("Canceled"));/ / (2)
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);/ / (3)}}catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);/ / (4)}}finally {
client.dispatcher().finished(this);/ / (5)}}}Copy the code
It turns out that RealCall is an inner class that inherits NamedRunnable and implements Runnable. Execute () is the same as execute() in the synchronous request. It is the same as execute() in the synchronous request. The difference is that there are two more callbacks. ResponseCallback (responseCallback, responseCallback, responseCallback, responseCallback, responseCallback, responseCallback)
Here, OkHttp basic use of the first step (4) in addition to getResponseWithInterceptorChain () method, the other is finished, the following will focus on reading this method.
2.4.3 interceptor
Click getResponseWithInterceptorChain () method to check:
/*RealCall*/
Response getResponseWithInterceptorChain(a) throws IOException {
// Create a collection of interceptors
List<Interceptor> interceptors = new ArrayList<>();
// Add a user-defined interceptor
interceptors.addAll(client.interceptors());
// Add retry and redirection interceptors
interceptors.add(retryAndFollowUpInterceptor);
// Add bridge interceptor
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// Add a cache interceptor
interceptors.add(new CacheInterceptor(client.internalCache()));
// Add connection interceptor
interceptors.add(new ConnectInterceptor(client));
if(! forWebSocket) {// Add a user-defined network interceptor
interceptors.addAll(client.networkInterceptors());
}
// Add server request interceptor
interceptors.add(new CallServerInterceptor(forWebSocket));
// (1) Establish a chain of responsibility
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null.null.null.0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// (2) Handle interceptors in the chain of responsibility
return chain.proceed(originalRequest);
}
Copy the code
As you can see, a number of interceptors are used here, and these interceptors are built into a chain of responsibilities and then dealt with one by one. The chain of responsibility mode is used here, where each interceptor is responsible for the corresponding function, and the completion of the last interceptor will be passed to the next interceptor, until the last interceptor is finished and then the Response will be returned one layer after another.
Let’s verify that the chain of responsibility works as I say, and then see what each interceptor does. Here I mark two concerns: Concern (1) is to build a chain of responsibility and pass in the parameters needed for the chain of responsibility, where parameter 5 is the index of the chain of responsibility, and pass “0” to indicate that the first interceptor is being processed.
Concern (2) deals with interceptors in the chain of responsibilities. Click the proceed() method to go there:
/*RealInterceptorChain*/
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec ! =null&&!this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec ! =null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// (1) start
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// (1) end
// Confirm that the next interceptor made its required call to chain.proceed().
if(httpCodec ! =null && index + 1< interceptors.size() && next.calls ! =1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
Copy the code
As you can see, all you need to do is look at concern (1), except for some judgments. A new chain of responsibilities is built, the index of the chain of responsibilities is increased by one (in order to retrieve the next interceptor from the interceptor collection), the current interceptor is retrieved from the interceptor collection and the intercept() method is called, so that if the interceptor completes its task it returns a Response immediately. Otherwise, the chain of responsibility continues to be processed in the Intercept () method, where the chain’s proceed() method continues to be called. After looking at the source code, which is exactly what we thought it would be, let’s take a look at what each interceptor does.
2.4.3.1 retry and redirect the interceptor (RetryAndFollowUpInterceptor)
This interceptor is responsible for reconnection and redirection after a failure. As you can see from the proceed() method above, each interceptor is called by the Intercept () method, so it is the entry point to read the interceptor.
Retry and redirect the Intercept () method in the interceptor as follows:
/*RetryAndFollowUpInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
// (1) Create StreamAllocation object to coordinate Connections, Streams, and Calls
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
// Redirection times
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
// execute the next interceptor
response = realChain.proceed(request, streamAllocation, null.null);
releaseConnection = false;
} catch (RouteException e) {
// (3) If the Route is abnormal, try to restore it
if(! recover(e.getLastConnectException(), streamAllocation,false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// (4) An I/O exception occurs
booleanrequestSendStarted = ! (einstanceof ConnectionShutdownException);
if(! recover(e, streamAllocation, requestSendStarted, request))throw e;
releaseConnection = false;
continue;
} finally {
// If an exception occurs midway, all resources are released
if (releaseConnection) {
streamAllocation.streamFailed(null); streamAllocation.release(); }}// Build the response body with an empty body
if(priorResponse ! =null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp;
try {
// (5) Check if redirection is required. If not, followUp returns null
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
streamAllocation.release();
throw e;
}
// (6) If no redirection is required, return the previous response
if (followUp == null) {
if(! forWebSocket) { streamAllocation.release(); }return response;
}
// Close the resource
closeQuietly(response.body());
// If the number of redirects is greater than the maximum, StreamAllocation is released and an exception is thrown
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
// If the request cannot reuse the previous connection, it is released and recreated
if(! sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation =new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if(streamAllocation.codec() ! =null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?"); } request = followUp; priorResponse = response; }}Copy the code
This method is annotated in more detail, so let’s focus on the concerns I marked.
- StreamAllocation is a management class that coordinates the relationships between Connections, Streams, and Calls. There is also a client.connectionpool (), which was created when the OkHttpClient object was created in the first step and is a connectionPool. They will only be used in the ConnectInterceptor, as we’ll see later.
Connections: indicates the physical socket that connects to the remote server. Streams: Logical HTTP request/response pairs layered over a connection. Calls: Logical sequence of a stream, usually the initial request and its redirected request.
- (2) : Executes the next interceptor, called in order, which is the BridgeInterceptor.
- (3) (4) : If Route or IO exception occurs, retry. Let’s look at the related methods of retry:
/*RetryAndFollowUpInterceptor*/
private boolean recover(IOException e, StreamAllocation streamAllocation,
boolean requestSendStarted, Request userRequest) {
streamAllocation.streamFailed(e);
// The client configuration failed and will not retry
if(! client.retryOnConnectionFailure())return false;
// Cannot send request body again
if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
// An exception occurred in the isRecoverable() method
if(! isRecoverable(e, requestSendStarted))return false;
// There are no more routes to try
if(! streamAllocation.hasMoreRoutes())return false;
// For failure recovery, use the same route selector with a new connection.
return true;
}
private boolean isRecoverable(IOException e, boolean requestSendStarted) {
// The protocol is abnormal
if (e instanceof ProtocolException) {
return false;
}
// Interrupt the exception
if (e instanceof InterruptedIOException) {
return e instanceofSocketTimeoutException && ! requestSendStarted; }// The SSL handshake is abnormal
if (e instanceof SSLHandshakeException) {
// If the problem was a CertificateException from the X509TrustManager,
// do not retry.
if (e.getCause() instanceof CertificateException) {
return false; }}// The SSL handshake is not authorized
if (e instanceof SSLPeerUnverifiedException) {
// e.g. a certificate pinning error.
return false;
}
// An example of one we might want to retry with a different route is a problem connecting to a
// proxy and would manifest as a standard IOException. Unless it is one we know we should not
// retry, we return true and try a new route.
return true;
}
Copy the code
You can see that when you try to retry, you will not retry if:
-
The client configuration failed and will not retry
-
Unable to send the Request Body again
-
happen ProtocolException (agreement), InterruptedIOException (interrupt), SSLHandshakeException (SSL handshake abnormal), SSLPeerUnverifiedException (SSL handshake unauthorized Any exception in constant
-
There are no more routes to try
-
(5) (6) : Check whether redirection is required. If not, return the previous response. Redirection is required, that is, continue the request retry cycle. Redirection depends on the response code. See the followUpRequest() method instead of the code.
Ps: If you want to get to redirect the domain names again redirection process in the source code, then you can try Guo Lin domain name (http://guolin.tech), the domain name will be redirected to his CSDN blog (https://blog.csdn.net/guolin_blog), Walking through the process will give you a better understanding of how redirection works in the source code.
2.4.3.2 BridgeInterceptor
The interceptor acts as a bridge, first converting the user’s request into a request to the server, then using that request to access the network, and finally converting the response returned by the server into a response available to the user.
Let’s look at the intercept() method in this interceptor:
/*BridgeInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
// (1) Convert the user's request to a request sent to the server
RequestBody body = userRequest.body();
if(body ! =null) {
MediaType contentType = body.contentType();
if(contentType ! =null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if(contentLength ! = -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding"."chunked");
requestBuilder.removeHeader("Content-Length"); }}if (userRequest.header("Host") = =null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") = =null) {
requestBuilder.header("Connection"."Keep-Alive");
}
// If we added the "accept-encoding: gzip" header when creating the Request, it would be our responsibility to unzip the stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") = =null && userRequest.header("Range") = =null) {
// The default is gzip compression
transparentGzip = true;
requestBuilder.header("Accept-Encoding"."gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if(! cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") = =null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
// (1) Convert the user's request to a request-end for the server
// (2) Execute the next interceptor to make the network request
Response networkResponse = chain.proceed(requestBuilder.build());
// (3) Convert the response returned by the server into a response available to the user -start
// Parse the header returned by the server
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
/ / gzip decompression
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
// (3) Convert the response returned by the server into a response available to the user -end
return responseBuilder.build();
}
Copy the code
According to the concerns I marked, it would be roughly:
- (1) : Converts user requests into requests sent to the server. Add some default headers, such as Content-Type, Content-Length, Transfer-Encoding, Host, Connection. Since we can create the Request without adding any headers, we can’t complete the Request without adding some default headers.
- (2) : Execute the next interceptor for network request.
- (3) : Converts the response returned by the server into a response available to the user. It mainly parses the header returned by the server for gzip decompression.
2.4.3.3 CacheInterceptor
The interceptor is mainly used to realize the cache reading and storage, that is, when the network request is executed to the cache interceptor will first determine whether there is a cache, if there is a cache, it will directly return to the cache, not after the implementation of the interceptor to continue to request the network, the successful request will be cached.
Let’s look at the intercept() method in this interceptor:
/*CacheInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
// (1) Get the cache from RequestResponse cacheCandidate = cache ! =null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
// (2) Use the cache policy to determine whether to use the cache or network request, or both or neither
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if(cache ! =null) {
cache.trackResponse(strategy);
}
// There is a cache, but the policy does not use the cache, so resources need to be freed
if(cacheCandidate ! =null && cacheResponse == null) {
closeQuietly(cacheCandidate.body());
}
If no network request is used and no cache is used, then failure is returned
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// (4) If the network request is not used in the policy, then the cache is returned directly
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
// (5) Execute the next interceptor for the network request
networkResponse = chain.proceed(networkRequest);
} finally {
// In the event of an IO or other crash, resources need to be freed in order not to leak the cache body
if (networkResponse == null&& cacheCandidate ! =null) { closeQuietly(cacheCandidate.body()); }}If cache is used in the policy and the response code is 304, the cache is returned
if(cacheResponse ! =null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
cache.trackConditionalCacheHit();
// Update the cache
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if(cache ! =null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// (7) Store the result of the request in the cache
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.}}}return response;
}
Copy the code
According to the concerns I marked, the general flow is:
- (1) : Cache obtained through Request. The cache here is InternalCache, but because InternalCache is an interface and there is only one implementation cache, a cache is actually a cache. The Cache uses the DiskLruCache caching mechanism. That is, the least recently used algorithm is used to Cache data to disks.
- (2) : the cache policy determines whether to use cache or network request, or both or neither. If networkRequest is null, no networkRequest is used. If cacheResponse is null, no cache is used.
- (3) : If network requests are not used in the policy and caching is not used, then failure is returned. This stops the execution of subsequent interceptors and ends the request.
- (4) : If network request is not used in the policy, the execution here indicates that cache is used, then directly return cache. This stops the execution of subsequent interceptors and ends the request.
- (5) : The execution to this point indicates that data needs to be obtained from the network, and the next interceptor will continue to execute the network request.
- (6) : If the cache is used in the policy and the response code is 304, the cache is returned and the cache is updated.
- (7) : Finally, the result returned by the request is cached.
2.4.3.4 ConnectInterceptor
The interceptor is primarily used to open a connection to the target server and then proceed to the next interceptor.
Let’s look at the intercept() method in this interceptor:
/*ConnectInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
// (1) Get StreamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
booleandoExtensiveHealthChecks = ! request.method().equals("GET");
// (2) create HttpCodec
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
// (3) Obtain a RealConnection
RealConnection connection = streamAllocation.connection();
// execute the next interceptor
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
Copy the code
According to the concerns I marked, the general flow is:
- (1) : get StreamAllocation, here for is actually the first interceptor RetryAndFollowUpInterceptor created in.
- (2) : Create HttpCodec, which is obtained by StreamAllocation’s newStream() method.
/*StreamAllocation*/
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
// (5) Find available connections
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
// (6) Create HttpCodec with this available connection
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
returnresultCodec; }}catch (IOException e) {
throw newRouteException(e); }}Copy the code
Let’s look at the findHealthyConnection() method in concern (5) :
/*StreamAllocation*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
// (7) find a connection
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, there is no need for a later health check and the connection is returned directly here
synchronized (connectionPool) {
if (candidate.successCount == 0) {
returncandidate; }}// If not, the creation of new streams is prohibited and the loop continues to find available links
if(! candidate.isHealthy(doExtensiveHealthChecks)) { noNewStreams();continue;
}
returncandidate; }}Copy the code
FindConnection (); findConnection(); findConnection();
/*StreamAllocation*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if(codec ! =null) throw new IllegalStateException("codec ! = null");
if (canceled) throw new IOException("Canceled");
// (8) start
// Try to use the allocated connection
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection ! =null) {
// A connection that has been allocated, and is available, is assigned as an available connection
result = this.connection;
releasedConnection = null;
}
// (8) end
if(! reportedAcquired) {// If the connection has never been marked as acquired, do not mark it as published
releasedConnection = null;
}
// (9) start tries to get a connection from the connection pool
if (result == null) {
Internal.instance.get(connectionPool, address, this.null);
if(connection ! =null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if(releasedConnection ! =null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if(result ! =null) {
// If an available connection is found, return directly
return result;
}
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null| |! routeSelection.hasNext())) { newRouteSelection =true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// (10) Obtain the available connections from the connection pool again based on different routes
if (newRouteSelection) {
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if(connection ! =null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break; }}}If no connection is found, create a new connection
if(! foundPooledConnection) {if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false); }}// If a connection is found available the second time, it returns directly
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// (12) Perform TCP/TLS handshake
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// (13) Add the newly created connection to the pool
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address is created at the same time, this connection is released and that multiplexed connection is acquired.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
Copy the code
Based on the above code analysis, the findConnection() method looks like this:
- (8) : Judge whether the current connection is available, then assign the value, and return directly later
- (9) : If the current connection is not available, try to get the available connection from the connection pool
- (10) : If no available connection is found in the connection pool, switch a different route to get the available connection from the connection pool again
- (11) : Still no connection available, so you have to create a new connection
- (12) : TCP/TLS handshake
- (13) : Finally, add the newly created connection to the connection pool
As you can see, concerns (9) and (13) take and store connections from the connection pool and call internal.instance.get () and internal.instance.put (), respectively. Let’s look at the get() method and see that Internal is an abstract class with a static instance initialized in OkHttpClient’s static code quick:
/*OkHttpClient*/
static {
Internal.instance = new Internal() {
// omit some code...
@Override public RealConnection get(ConnectionPool pool, Address address, StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
// omit some code...
}
Copy the code
The Internal get() method calls the ConnectionPool get() method, so we can be sure that the ConnectionPool is used to manipulate these connections. All you need to know is that it can be used to access connections.
Concern (12) is essentially the core code for establishing a connection to the server. Let’s look at this method:
/*RealConnection*/
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
if(protocol ! =null) throw new IllegalStateException("already connected");
/* Line selection */
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if(! connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if(! Platform.get().isCleartextTrafficPermitted(host)) {throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy")); }}else {
if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
throw new RouteException(new UnknownServiceException(
"H2_PRIOR_KNOWLEDGE cannot be used with HTTPS")); }}while (true) {
try {
// (14) If a tunnel connection is required, perform the tunnel connection
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break; }}else {
// (15) No tunnel connection is required
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
// Establish a protocol
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
// The connection ends
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
// Connection failed
eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if(! connectionRetryEnabled || ! connectionSpecSelector.connectionFailed(e)) {throwrouteException; }}}if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
if(http2Connection ! =null) {
synchronized(connectionPool) { allocationLimit = http2Connection.maxConcurrentStreams(); }}}Copy the code
Concerns (14) and (15) will eventually call the connectSocket() method:
/*RealConnection*/
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
/ / create a socket
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
eventListener.connectStart(call, route.socketAddress(), proxy);
// Set the socket timeout period
rawSocket.setSoTimeout(readTimeout);
try {
// (16) Connect to the socket
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
try {
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw newIOException(npe); }}}Copy the code
As you can see, okHTTP is connected through sockets underneath.
Having looked at the findHealthyConnection() method in concern (5), let’s go back to the method for concern (6) :
/*StreamAllocation*/
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain, StreamAllocation streamAllocation) throws SocketException {
if(http2Connection ! =null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return newHttp1Codec(client, streamAllocation, source, sink); }}Copy the code
This method creates HttpCodec, which encodes and decodes HTTP requests and responses. It has two implementation classes, which are Http1Codec and Http2Codec. If HTTP/2 is used, Http2Codec is created.
-
(3) : Go back to focus (3) and click the Connection() method to find that the RealConnection obtained here is actually the connection fetched or recreated from the connection pool in focus (7) findConnection().
-
(4) : Concern (4) takes the connection and proceeds to the next interceptor.
2.4.3.5 CallServerInterceptor
The interceptor is mainly used to make requests to the server and obtain data. It is the last interceptor in the responsibility chain. After obtaining data from the server, it will directly return to the previous interceptor.
Let’s look at the intercept() method in this interceptor:
/*CallServerInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
// Get the HttpCodec created in ConnectInterceptor
HttpCodec httpCodec = realChain.httpStream();
/ / get RetryAndFollowUpInterceptor StreamAllocation created in
StreamAllocation streamAllocation = realChain.streamAllocation();
// Get the new RealConnection created by the ConnectInterceptor or retrieved from the connection pool
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
// (1) write the request header
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
if(HttpMethod.permitsRequestBody(request.method()) && request.body() ! =null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
// (2) Write the request body
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if(! connection.isMultiplexed()) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
// (3) Read response headers
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
// (4) Read response body
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
Copy the code
As you can see, this interceptor is relatively simple. The previous interceptor, ConnectInterceptor, has already connected to the server and created an HttpCodec object. The HttpCodec object encapsulates the output stream (BufferedSink) and input source (BufferedSource) provided by OKIO, so here we mainly use the HttpCodec object to read and write the server. For example, write request header and request body, read response header and response body.
2.4.4 ConnectionPool
-
Connection pooling is used to manage the reuse of HTTP and HTTP / 2 connections to reduce network latency. If a connection is available from the pool, there is no need to create a new connection and the TCP/TLS handshake is eliminated.
-
The primary constant in the ConnectionPool class
/*ConnectionPool*/
// Thread pool, used to clear expired connections
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */.60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool".true));
// Maximum number of connections allowed to be idle
private final int maxIdleConnections;
// The connection lifetime
private final long keepAliveDurationNs;
// Clean up the invalid connection
private final Runnable cleanupRunnable = new Runnable() {
/ /...
};
// The queue is used to record the connection
private final Deque<RealConnection> connections = new ArrayDeque<>();
Copy the code
- The constructor
/*ConnectionPool*/
public ConnectionPool(a) {
this(5.5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: "+ keepAliveDuration); }}Copy the code
You can see that the constructor sets the default maximum number of connections allowed to be idle to 5 and the connection lifetime to 5 minutes.
- The main function
This section focuses on the get() and PUT () methods used in connection interceptors.
The get () method:
/*ConnectionPool*/
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
returnconnection; }}return null;
}
Copy the code
This method fetkes reusable connections from the connection pool, where the logic is to traverse a two-end queue of recorded connections and fetch reusable connections.
Put () method:
/*ConnectionPool*/
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if(! cleanupRunning) { cleanupRunning =true;
// Perform a cleanup task
executor.execute(cleanupRunnable);
}
// Add the newly created connection to the two-way queue that records connections
connections.add(connection);
}
Copy the code
This method puts the newly created connection into the connection pool, where the logic is to clean up invalid connections and then add the newly created connection to the two-ended queue that records connections.
Let’s start with the cleanup task:
/*ConnectionPool*/
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run(a) {
while (true) {
// Clean up invalid connections
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
Copy the code
This is a blocking cleanup task and is done in an infinite loop. The cleanup() method is called to cleanup invalid connections and return the interval for the next cleanup. The wait() method is called to wait for the lock and time slice to be released. When the wait time is up, the cleanup cycle is repeated.
Let’s look at the cleanup() method:
/*ConnectionPool*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Traverse connections to find invalid connections for cleaning
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
If the value is greater than 0, add 1 to inUseConnectionCount; otherwise add 1 to idleConnectionCount.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// Mark idle connections
long idleDurationNs = now - connection.idleAtNanos;
if(idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; }}if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// If the connection lifetime is greater than or equal to 5 minutes, or the number of idle connections is greater than 5, the link is removed from the queue
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// If the number of free connections is greater than 0, return the time when this connection is about to expire
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// If there is no idle connection, 5 minutes is returned, that is, the interval between the next cleaning is 5 minutes
return keepAliveDurationNs;
} else {
// If there is no connection, the loop is broken
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Do the next cleanup soon
return 0;
}
Copy the code
Check whether the connection lifetime is greater than or equal to 5 minutes or whether the number of idle connections is greater than or equal to 5. Connection is idle by concerns (1) the pruneAndGetAllocationCount () method to determine, we look at this method:
/*ConnectionPool*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
// Get a list of weak references allocations
List<Reference<StreamAllocation>> references = connection.allocations;
// Traverse the list of weak references allocations
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
If StreamAllocation is used, the next loop continues
if(reference.get() ! =null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
StreamAllocation is removed from the list if it is not used
references.remove(i);
connection.noNewStreams = true;
AllocationCount = 0, indicating that the connection is free.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0; }}// The list is not empty, returns the size of the list, greater than 0 indicates that the connection is in use.
return references.size();
}
Copy the code
This method is simple, essentially iterating through the list of weak references allocations, removing StreamAllocation from the list if StreamAllocation is not used, and finally returning the size of the list, which determines whether or not a connection is free, or less than or equal to zero.
2.5 (5) Take out corresponding data
String data = response.body().string();
Copy the code
In step (4), Response will be returned after the synchronous request or asynchronous request is executed. This is the final returned data, through which code, message, header and body can be obtained.
I’m going to talk about body, and I’m going to hit body() and it looks like this:
/*Response*/
public @Nullable ResponseBody body(a) {
return body;
}
Copy the code
As you can see, the body here is ResponseBody, which is an abstract class that cannot be instantiated, and is instantiated using its subclass RealResponseBody. It is assigned in the previous section “2.4.3.5 CallServerInterceptor” :
/*CallServerInterceptor*/
if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
The RealResponseBody object created in the openResponseBody() method returns
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
Copy the code
If there is a cache, the value is assigned in the CacheInterceptor.
The following methods are commonly used in ResponseBody:
/*ResponseBody*/
public final String string(a) throws IOException {
BufferedSource source = source();
try {
Charset charset = Util.bomAwareCharset(source, charset());
return source.readString(charset);
} finally{ Util.closeQuietly(source); }}public final InputStream byteStream(a) {
return source().inputStream();
}
public final byte[] bytes() throws IOException {
long contentLength = contentLength();
if (contentLength > Integer.MAX_VALUE) {
throw new IOException("Cannot buffer entire body for content length: " + contentLength);
}
BufferedSource source = source();
byte[] bytes;
try {
bytes = source.readByteArray();
} finally {
Util.closeQuietly(source);
}
if(contentLength ! = -1&& contentLength ! = bytes.length) {throw new IOException("Content-Length ("
+ contentLength
+ ") and stream length ("
+ bytes.length
+ ") disagree");
}
return bytes;
}
Copy the code
As you can see, all three of these methods internally call source() to get a BufferedSource, which is the input stream provided by Okio. Once you get the input stream, you can convert the body data to the type you want. For example, if you want to return String, call Response.body ().string(). This applies to data not exceeding 1 MB. If you want to return an input stream, call Response.body ().bytestream (), for data larger than 1 MB, such as downloaded files. To return an array of binary bytes, call Response.body ().bytes().
Note that response.body().string() can only be called once, otherwise the following exception will be raised:
W/System.err: java.lang.IllegalStateException: closed
W/System.err: at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:408)
W/System.err: at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:402)
W/System.err: at okhttp3.internal.Util.bomAwareCharset(Util.java:469)
W/System.err: at okhttp3.ResponseBody.string(ResponseBody.java:175)
Copy the code
The RealBufferedSource class failed at line 408.
/*RealBufferedSource*/
@Override
public boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount)
throws IOException {
if (closed) throw new IllegalStateException("closed");
/ /...
}
Copy the code
If closed is true, throw this exception and continue tracing where closed assigns the value:
/*RealBufferedSource*/
@Override public void close(a) throws IOException {
if (closed) return;
closed = true;
source.close();
buffer.clear();
}
Copy the code
As you can see, the only closed assignment is in the close() method, which is the same as the util. closeQuietly(source) in the string() method; The call:
/*ResponseBody*/
public static void closeQuietly(Closeable closeable) {
if(closeable ! =null) {
try {
closeable.close();
} catch (RuntimeException rethrown) {
throw rethrown;
} catch (Exception ignored) {
}
}
}
Copy the code
Response.body ().string() can only be called once. The util.closequietly (source) method closes the input stream. And marked closed to true, then the second call string () method will be in RealBufferedSource. RangeEquals () method to judgment, to true will throw an exception.
The reason for this design is that the body returned by the server can be large, so OkHttp doesn’t store it in memory and only gets it when you need it, not twice if there are no new requests.
Third, summary
After looking at the source code, OkHttp is a very well designed framework. The framework utilizes many design patterns, such as the Builder pattern, the chain of responsibility pattern, and so on. Knowing that the core of OkHttp is the interceptor, the chain of responsibility model is adopted, where each interceptor is responsible for the corresponding function, each interceptor is executed from top to bottom when the request is initiated, and the response data is passed layer by layer.
References:
- Detaching wheel series: Detaching OkHttp
- Android open source framework source appreciation: Okhttp
About me
I am Wildma, CSDN certified blog expert, excellent author of Simple book programmer, good at screen adaptation. If the article is helpful to you, a “like” is the biggest recognition for me!