Since the end of a custom control, there has been a month of time, there has been no what want to write, so back to the original intention of the beginning to write, look at the source code of some mainstream open source framework, in-depth understanding of its principle, rather than only know, and do not know why. This is the first article in the series – OkHttp3 (source version 3.10).
basis
OkHttpClient OK_HTTP_CLIENT = new okHttpClient.builder ().addInterceptor(loggingInterceptor) ConnectTimeout (60, TimeUnit. SECONDS) / / set the cache: parameter 1: cache path (/ storage/emulated / 0 / Android package name/data/XXX/cache) parameters of 2: .cache(new cache(new File(getExternalCacheDir()), 100 * 1024 * 1024)).readTimeout(60, TimeUnit.SECONDS) .writeTimeout(60, TimeUnit.SECONDS) .build(); Request Request = builder.url (mUrl).build(); Call Call = ok_http_client.newCall (request); // Create a Call object in Okhttp and bind request to Client. Response Response = call.execute(); // Execute call.enqueue(new) asynchronouslyCallback() {
@Override
public void onFailure(Call call, IOException e) {
LoggerUtil.d("onFailure : "+e.getMessage()); } @Override public void onResponse(Call call, Response response) { responseProcess(response); }}); @override public Call newCall(Request Request) {return RealCall.newRealCall(this, request, false/ *for web socket */);
}
Copy the code
Summary (OkHttp request) :
- Create the OkHttpClient and Request objects
- Encapsulate the Request as a Call object
- Execute ()/enqueue() of Call is used to send synchronous/asynchronous requests
Note:
1. When building OkHttpClient with Builder(), we initialize an important class called Dispatcher. This class will accept our Request queue, either synchronous or asynchronous, and distribute tasks based on different conditions. 2. Ok_http_client.newcall (request), which actually returns a RealCall, so both synchronous and asynchronous requests are made by realCallsCopy the code
OkHttp3 Synchronous/asynchronous request framework process:
Several concepts
- Request URL: OKHttp3 is the basis for HTTP requests that handle URL requests. The format of the URL follows the standard HTTP protocol. For an HTTP server, multiple URLS are provided. For the URL protocol, the basic format is protocol :// hostname(domain-or-ip)[:port] / path / [;parameters][? Query]#fragment
- Protocol: For example, HTTP (s), FTP, and File (File resources are files on the local computer. Format file:///)
- Address: hostname(domain-or-ip)[:port] indicates the domain name or IP Address of the service, and port indicates the port
- Route: When hostname(domain-or-ip) in the URL is domain, it indicates the domain name of the service. The domain name may be resolved by DNS. In other words, an Address can be mapped to multiple routes, and a Route represents a machine IP Address. It is used to establish a TCP/IP network connection
- Connection: A Connection is an instance of a Socket Connection.
- Connection Pool: Connection instances are centrally maintained in the Connection Pool.
Source analysis of synchronous requests
As you can see from the previous section, synchronous requests execute the execute() method and are all requests made by realCalls
@override public Response Execute () throws IOException {synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");
executed = true; } // Capture the call stack trace (not the focus of this article) captureCallStackTrace(); eventListener.callStart(this); Try {// call the dispatcher to queue client.dispatcher().executed(this); One of the / / OkHttp essence Response were obtained through the interceptor chain (specific follow-up interpretation alone) Response result = getResponseWithInterceptorChain ();if (result == null) throw new IOException("Canceled");
returnresult; } catch (IOException e) { eventListener.callFailed(this, e); throw e; } finally {// Call the dispatcher out of the queue client.dispatcher().finished(this); }}Copy the code
The source code shows that for synchronous requests, the Dispatcher simply queues in and queues out, and the rest is handled through the interceptor chain to get the response information.
Asynchronous request source analysis
Asynchronous calls are made by the Enqueue method of the RealCall class
// RealCall class: @override public void enQueue (Callback responseCallback) {synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");
executed = true; } captureCallStackTrace(); eventListener.callStart(this); Client.dispatcher ().enqueue(new AsyncCall(responseCallback)); } // Dispatcher class: private int maxRequests = 64; Private int maxRequestsPerHost = 5; Private @nullable ExecutorService ExecutorService; // The maximum number of requests per host for which a network request is being made // The thread pool performing asynchronous tasks private @nullable ExecutorService ExecutorService; Private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** * The async call queue is running. Including canceled calls that have not yet been completed. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** * The synchronous call that is running. Including canceled calls that have not yet been completed. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); Synchronized void enQueue (AsyncCall Call) {synchronized void enQueue (AsyncCall Call) {// Number of running asynchronous queues < 64, and number of running calls to the shared host < 5if(runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {// Add to the running asynchronous queue runningAsyncCalls.add(call); Execute (call); // Start the thread pool to perform asynchronous tasks. }else{// add to queue readyAsyncCalls.add(call); }}Copy the code
As you can see from the above source code, there are two different queues for asynchronous requests, one for the running request queue and one for the preparing asynchronous call. They are queued according to the number of calls in progress and the number of asynchronous queues in progress. A running asynchronous queue executes its asynchronous task through the thread pool at the same time as it enqueues.
Let’s first look at its thread pool initialization:
// Synchronized ExecutorService public synchronized ExecutorServiceexecutorService() {
if(executorService == null) {/* * corePoolSize: Number of core threads in the thread pool 0 * maximumPoolSize: The maximum value of an int is 2 to the 31st * keepAliveTime: duration of an idle thread 60s * unit: time unit * workQueue: buffer queue used by the thread pool * threadFactory: * handler used by thread pools to create threads: ExecutorService = new ThreadPoolExecutor(0, Integer.max_value, 60, timeUnit.seconds, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher".false));
}
return executorService;
}
Copy the code
The number of core threads in the thread pool is 0, and the maximum thread in the thread pool is an integer maximum.
Q: If we have a large number of network requests, up to Integer.MAX_VALUE, is the performance cost of this thread pool particularly high? A: No, because the maximum size of the runningAsyncCalls queue in OkHttp is 64, so OkHttp requests are limited to 64. Even if we set Integer.MAX_VALUE, it will not affect our performance.
We can execute AsyncCall in our executorService thread pool. Let’s take a look at AsyncCall:
Public abstract class NamedRunnable implements Runnable {protected final String name; public NamedRunnable(String format, Object... args) { this.name = Util.format(format, args); } @Override public final voidrun() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); Try {// the run() method is executed by the execute() method; } finally { Thread.currentThread().setName(oldName); } } protected abstract void execute(); } // It inherits from NamedRunnable, So this Runnable actually executes code in the execute() method Final Class AsyncCall extends NamedRunnable {private Final Callback responseCallback; AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override
protected void execute() {
boolean signalledCallback = false; Try {/ / Response were obtained through the interceptor chain, specific subsequent detailed Response Response = getResponseWithInterceptorChain ();if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true; / / failure callback responseCallback. OnFailure (RealCall. This new IOException ("Canceled"));
} else {
signalledCallback = true; / / success callback responseCallback. OnResponse (RealCall. This response). } } catch (IOException e) {if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else{ eventListener.callFailed(RealCall.this, e); / / failure callback responseCallback. OnFailure (RealCall. This, e); }} finally {// The asynchronous task is finished client.dispatcher().finished(this); }}}Copy the code
As you can see from the source code, the AsyncCall is finally called to execute() to initiate the request, and the execute() method executes what we saw above, Also performed for synchronous request getResponseWithInterceptorChain () method through the interceptor chain to get a response.
Let’s look at finished after synchronous/asynchronous request:
// Asynchronous request finished void Finished (AsyncCall Call) {// Note: Parameter 3true
finished(runningAsyncCalls, call, true); } // Synchronized request finished void Finished (RealCall Call) {// Note: Parameter 3false
finished(runningSyncCalls, call, false); } private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; Synchronized (this) {synchronized (this) {// remove the task from the running synchronization/asynchronous queue, and throw an exception if none are in the queueif(! calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); // Synchronization skips this step, and each step executes this stepif (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if(runningCallsCount == 0 && idleCallback ! = null) { idleCallback.run(); }} // Execute asynchronously private voidpromoteCalls() {// The maximum capacity has been run, then returnif (runningAsyncCalls.size() >= maxRequests) return; // Asynchronous tasks that are not ready for execution are returnedif (readyAsyncCalls.isEmpty()) return; // Traverse the queue of asynchronous requests ready for executionfor(Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); // Fetch the next asynchronous task // if the number of running calls with the shared host is < 5if(runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); // Remove // add to the running asynchronous queue runningAsyncCalls.add(call); // Execute the asynchronous request from the thread pool right away; }if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
Copy the code
We can see that the runningAsyncCalls and readyAsyncCalls queues are executed using the promoteCalls() method to add the pending task (the element in readyAsyncCalls) to the runningAsyncCalls queue.
Now that the synchronous asynchronous request answer process is complete, let’s look at the beauty of OkHTTP’s design — the interceptor.
getResponseWithInterceptorChain()
/ / RealCall class: Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); // We added our own interceptor (ApplicationInterceptor) interceptors.addall (client.interceptors()); / / request redirection interceptor: failure reconnection interceptors, add (retryAndFollowUpInterceptor); Add (new BridgeInterceptor(client.cookieJar()))); // Interceptors. Add (new CacheInterceptor(client.internalCache())); // Interceptors. Add (new ConnectInterceptor(client));if (!forWebSocket) {/ / NetworkInterceptor interceptor (network) interceptors. AddAll (client.net workInterceptors ()); } // The interceptor that actually calls the network request is interceptors.add(new CallServerInterceptor(forWebSocket)); // Interceptor chain, note: Null = null, null, null, null, 0,originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis());return chain.proceed(originalRequest);
}
Copy the code
From getResponseWithInterceptorChain () method of the source code, you can see that in the interceptor is divided into application blocker, network interceptors, these two types are all our own build OkhttpClient added. The focus of this article is not on these two types of interceptors, but on the five interceptors of OkHttp itself, which are one of the best parts of OkHtp.
Proceed (” Interceptors “); proceed (” interceptors “); Proceed (” interceptors “); Proceed (” interceptors “); proceed (” interceptors “);
@override public Response Proceed (Request Request) throws IOException {returnproceed(request, streamAllocation, httpCodec, connection); } public Response proceed(Request request, StreamAllocation streamAllocation , HttpCodec httpCodec,RealConnection connection) throws IOException { ... // Call the next interceptor in the chain. Note: The fifth parameter index = index + 1 RealInterceptorChain next = new RealInterceptorChain (interceptors, streamAllocation httpCodec, connection, index + 1, request, call, eventListener, connectTimeout,readTimeout,writeTimeout); / / from getResponseWithInterceptorChain () we know that the index initialized to 0 / / get the current position Interceptor Interceptor Interceptor = interceptors. Get (index); Response = Interceptor.intercept (next); .returnresponse; } / / RetryAndFollowUpInterceptor class: @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); RealInterceptorChain realChain = (RealInterceptorChain) chain; Call call = realChain.call(); EventListener eventListener = realChain.eventListener(); // Initialize the allocation stream object: StreamAllocation StreamAllocation = New StreamAllocation(client.connectionPool(), createAddress(request.url()), call, eventListener, callStackTrace); this.streamAllocation = streamAllocation; int followUpCount = 0; Response priorResponse = null;while (true) {... Response response; boolean releaseConnection =true; Response = realchain.proceed (request, streamAllocation, null, null); releaseConnection =false; }... }}Copy the code
From this code can be seen in the two parts of two classes each interceptor is composed of the interceptor chain together, the above code RetryAndFollowUpInterceptor blocker, for example, by the method of the interceptor chain proceed, according to the order calls the interceptors, Proceed of the next interceptor chain object is called in each interceptor, thus concatenating all interceptors and finally getting the response information after passing through all interceptors. The request flow chart is as follows:
Take a page from one that feels complete:
RetryAndFollowUpInterceptor
@override public Response intercept(Chain Chain) throws IOException {Request Request = chain-.request (); // 1. Initialize a socket connection allocation stream object streamAllocation = new streamAllocation (client.connectionPool(), createAddress(request.url()), callStackTrace); Int followUpCount = 0; Response priorResponse = null; // Open an infinite loop that executes the first interceptor or request for a failed reconnectionwhile (true) {// If the request has been canceled, release the connection pool resourcesif (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true; Try {// 2. Execute the next interceptor, BridgeInterceptor Response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); // Do not release the link because releaseConnection = may be reusedfalse; } catch (RouteException e) {// connect address fail exception /** * 3. If there is an exception, determine whether to restore * not to continue the connection: * 1. The application layer configuration is not connected. The default istrue* 2. Request error can not continue to use * 3. Whether can recover * 3.1, ProtocolException * 3.2, InterruptedIOException * 3.3, the SSL handshake errors (SSLHandshakeException && CertificateException) * 3.4, certificate pinning error (SSLPeerUnverifiedException) * 4. No more lines to choose from */if(! recover(e.getLastConnectException(),false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue; } catch (IOException e) {// check whether the network request has started. Boolean requestSendStarted =! (e instanceof ConnectionShutdownException); // Determine whether the recovery is possible, i.e. whether to retryif(! recover(e, requestSendStarted, request)) throw e; releaseConnection =false;
continue; } finally {// Release the connectionif(releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); }} // priorResponse if it exists. The buildif(priorResponse ! = null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } /** * 4. Check whether the request needs to be redirected * or whether the request needs to be improved, such as certificate verification, etc. * Whether the request needs to be redirected depends on the response code of the HTTP request, * therefore, In the followUpRequest method, the response code will be obtained according to the response userResponse, * and the connection will be obtained from the connection pool StreamAllocation, and then the Route configuration parameter will be obtained according to the current connection. * * followUpCount is used to keep track of the number of requests we have made * why is it possible that we have made a single request and okhttp has made it more than once? * For example: HTTPS certificate authentication, we need to go through: initiate -> authenticate -> response, * the three steps need to initiate at least two requests, or our network request is redirected, * after our first request to get a new address, then the network request to the new address. * */ Request followUp = followUpRequest(response);if (followUp == null) {
if (!forWebSocket) { streamAllocation.release(); } // Return the resultreturnresponse; } // 5. Close the response flow closeQuietly(response.body()); // 6. Redirection or failed reconnection, whether the maximum limit MAX_FOLLOW_UPS == 20if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: "+ followUpCount); } // If the body content can only be sent once, release the connectionif (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } // 7. Check whether the redirect request and the current request are connected to the same connectionif(! SameConnection (response, followup.url ())) {// release your url before connecting streamallocation.release (); StreamAllocation StreamAllocation = new StreamAllocation(client.connectionPool(), createAddress(followUp.url()), callStackTrace); }else if(streamAllocation.codec() ! = null) { throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?"); } // Update the next network request object request = followUp; PriorResponse = Response; }}Copy the code
Most of the details are explained in the code, but here’s a brief explanation of what this interceptor does:
1. Initialize a connection flow object 2. Call the next interceptor 3Copy the code
BridgeInterceptor
@Override public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); / / build network can be used to send Request to Request / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the main build complete Request head start -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- RequestBody body = userRequest.body();if(body ! = null) { MediaType contentType = body.contentType();if(contentType ! = null) { requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if(contentLength ! = -1) { requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding"."chunked");
requestBuilder.removeHeader("Content-Length"); }}if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {requestBuilder. Header (requestBuilder. Header (requestBuilder."Connection"."Keep-Alive"); } // If we do not specify the encoding format, the default is gzip Boolean transparentGzip =false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding"."gzip"); List< cookie > cookies = cookiejar.loadForRequest (userRequest.url());if(! cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent()); } / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the main build complete request header end -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- / / call the next interceptor Response networkResponse = chain.proceed(requestBuilder.build()); CookieJar == NULL; // The response header does nothing if there is no custom configuration with cookieJar == null. Have, save the new cookies / / the server returned to the Response into developers use the Response (similar to the decompression process) HttpHeaders. ReceiveHeaders (cookieJar userRequest. Url (), networkResponse.headers()); / / build the Response Response. Builder responseBuilder. = networkResponse newBuilder (.) the request (userRequest); Response * condition: * 1. Check whether the server supports GZIP compression. 2. Check whether the server uses GZIP compression. Whether there is a response body */if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding") && HttpHeaders. HasBody (networkResponse)) {// convert to unzip data source GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
Copy the code
Most of the details are explained in the code, but here’s a brief explanation of what this interceptor does:
1. Responsible for transforming a Request built by the user into a Request that can be used for network access 2. Make a network request (i.e. call the next interceptor) to this Resquest that matches the network request. 3. Turn the Response from the network request back into a Response available to the user (unzip)Copy the code
CacheInterceptor
This interceptor is used to cache the Request and Response data. This interceptor works by calling new okHttpClient.builder ().cache(new cache(new File(getExternalCacheDir()), 100 * 1024 * 1024)) to set the cache path and size.
Before looking at the source code for interceptors, let’s look at a few concepts:
- InternalCache in the Cache class
- DiskLruCache Disk cache
- OkHttp uses Okio to handle various streams (instead of Io streams) : There are two key interfaces in Okio, Sink and Source, both of which inherit from Closeable; While Sink can be simply regarded as OutputStream, Source can be simply regarded as InputStream. Both interfaces support read/write timeout Settings.
DiskLruCache
DiskLruCache is the work of JakeWharton. It uses the LRU algorithm to manage the cache. It deletes the least recently used data and retains the most recently used data. This algorithm is slightly different from OkHttp(presumably rewritten), but the principle is the same.
2. The generation of journal file 3. The introduction of journal 4. Write cache 5. Read cache 6. Delete cache 7Copy the code
One. Simple use
// Demo example: File directory = getExternalCacheDir(); int appVersion = 1; int valueCount = 1; long maxSize = 10 * 1024; /* * Parameter description: * File directory: cache directory. * int appVersion: indicates the application version number. * int valueCount: The number of cache files for a key *. If we pass an argument greater than 1, then the cache file suffix is.0,.1, etc. * long maxSize: indicates the upper limit of cache capacity. */ DiskLruCache diskLruCache = DiskLruCache.open(directory, appVersion, valueCount, maxSize); Editor Editor = Disklrucache.edit (string.valueof (System.CurrentTimemillis ())); BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(editor.newOutputStream(0)); Bitmap bitmap = BitmapFactory.decodeResource(getResources(), R.drawable.scenery); bitmap.compress(Bitmap.CompressFormat.JPEG, 100, bufferedOutputStream); editor.commit(); diskLruCache.flush(); diskLruCache.close();Copy the code
This is how DiskLruCache is used. A brief look at its file creation:
2. File creation process
public final class DiskLruCache implements Closeable {
public static DiskLruCache open(File directory, int appVersion, int valueCount, long maxSize) throws IOException {
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize <= 0");
}
if (valueCount <= 0) {
throw new IllegalArgumentException("valueCount <= 0"); } File backupFile = new File(directory, JOURNAL_FILE_BACKUP); // If the backup file existsif(backupFile.exists()) { File journalFile = new File(directory, JOURNAL_FILE); // If the journal file exists, delete the backup file journal. BKPif (journalFile.exists()) {
backupFile.delete();
} else{// if the journal file does not exist, name the backupFile journal renameTo(backupFile, journalFile,false); } } DiskLruCache cache = new DiskLruCache(directory, appVersion, valueCount, maxSize); // Check whether the journal file existsif(cache.journalfile.exists ()) {// If there is a log file, try {// /** * read the journal file, depending on the operation type in the record. * By reading each line of the Journal file, encapsulating it into an Entry object and placing it in the LinkedHashMap collection. * And sets the entry value depending on the beginning of each line. In other words, by reading this file, * we save all the keys in the local cache to the collection, so that we can use the collection to manipulate. */ cache.readJournal(); // This method is mainly used to calculate the total capacity of the current cache and delete illegal cache records and the corresponding files of the records. cache.processJournal(); cache.journalWriter = new BufferedWriter( new OutputStreamWriter(new FileOutputStream(cache.journalFile,true), Util.US_ASCII));
return cache;
} catch (IOException journalIsCorrupt) {
System.out.println("DiskLruCache " + directory + " is corrupt: " + journalIsCorrupt.getMessage() + ", removing"); cache.delete(); }} // Create a new cache directory directory directory.mkdirs(); cache = new DiskLruCache(directory, appVersion, valueCount, maxSize); // Call a new method to create a new journal file cache.rebuildJournal();returncache; }}Copy the code
The open method revolves around creating and reading and writing a journal file, so what is a journal file?
Introduction to journal
If we go to the cache directory, we will find that in addition to the cache file, we will also find a journal file. The journal file is used to record the cached operation, as follows:
libcore.io.DiskLruCache
1
100
2
CLEAN 3400330d1dfc7f3f7f4b8d4d803dfcf6 832 21054
DIRTY 335c4c6028171cfddfbaae1a9c313c52
CLEAN 335c4c6028171cfddfbaae1a9c313c52 3934 2342
REMOVE 335c4c6028171cfddfbaae1a9c313c52
DIRTY 1ab96a171faeeee38496d8b330771a7a
CLEAN 1ab96a171faeeee38496d8b330771a7a 1600 234
READ 335c4c6028171cfddfbaae1a9c313c52
READ 3400330d1dfc7f3f7f4b8d4d803dfcf6
Copy the code
Let’s analyze the contents of this file:
- First line: libcore.io.DiskLruCache, fixed string.
- Second line: 1, DiskLruCache source version number.
- Line 3:1, the version number of the App, passed in by the open() method.
- Line 4:1, each key corresponds to several files, usually 1.
- Line 5: blank line
- Line 6 and subsequent: Cache the operation record.
And the source code has a record of 4 flush commands:
/* * DIRTY Indicates that an entry is being written. * There are two types of writes. If successful, a CLEAN line is written immediately after; * If it fails, a REMOVE record is added. Note that records in the DIRTY state alone are illegal. */ private static final String DIRTY ="DIRTY";
private static final String REMOVE = "REMOVE"; // READ indicates that a record was READ. private static final String READ ="READ"; // CLEAN is followed by the length of the file. Note that there may be multiple files with one key, so there will be multiple numbers. // When the remove(key) method is called manually, a remove record is also written. private static final String CLEAN ="CLEAN";
Copy the code
Iv. Write cache
To get an instance, call DiskLruCache’s edit() method. The interface looks like this:
Public Editor Edit (String key) throws IOException. Simple use)Copy the code
As you can see, the edit() method takes a parameter key, which will become the file name for the cache file, because the image URL may contain special characters that may not be valid when naming the file. Therefore, the key parameter is usually MD5 encoded, and the encoded string must be unique and contain only characters such as 0 -f, which fully conforms to the naming rules of the file.
Read the cache
The method of reading is simpler than that of writing. It is mainly realized by DiskLruCache’s get() method. The interface is as follows:
// Return a snapshot of the cache file, including the cache file size, input stream, and so on. public synchronized Snapshot get(String key) throws IOExceptionCopy the code
This method eventually returns a snapshot of the cache file, including the cache file size, input stream, and so on. With this snapshot we can read the cache file. Just call its getInputStream() method to get the input stream for the cached file. Similarly, the getInputStream() method also needs to pass an index argument, so just pass 0 here.
Delete cache
Remove the cache by means of the Remove () method of DiskLruCache. The interface is as follows:
public synchronized boolean remove(String key) throws IOException
Copy the code
It’s easy to use, but it’s not something you should call very often. Since you don’t need to worry about taking up too much space on your SD card, DiskLruCache will automatically remove the excess cache based on the maximum value we set when we called the open() method. Remove () should only be called if you are sure that the cache for a key has expired and you need to retrieve the latest data from the network.
Vii. Other apis
Size () : returns the total number of bytes of cached data in the current cache path, in bytes. Flush () : synchronizes the operation records in memory to the journal file. Note: You don't need to call flush() every time you write to the cache, and calling it too often doesn't bring any benefit, except for the additional time it takes to synchronize the journal file. The standard approach is to call flush() in the Activity's onPause() method. Close () : closes the DiskLruCache. This is a corresponding method to the open() method. Note: After it is closed, you cannot call any DiskLruCache methods that operate on cached data. In general, you should only call close() in the Activity's onDestroy() method. Delete () : Deletes all cache data, such as manually clearing the cacheCopy the code
InternalCache
// Cache class: Cache(File directory, long maxSize, FileSystem FileSystem) {this.internalCache = newInternalCachePublic Response get(Request Request) throws IOException {returnCache.this.get(request); } public CacheRequest put(Response response) throws IOException { // 2. Save the response data coming back from the requestreturnCache.this.put(response); } public void remove(Request request) throws IOException { // 3. Remove the saved response data cache.this.remove (request); } public void update(Response cached, Response network) { // 4. Update the cached response data cache.this. update(cached, network); } public voidtrackConditionalCacheHit() { Cache.this.trackConditionalCacheHit(); } public void trackResponse(CacheStrategy cacheStrategy) { Cache.this.trackResponse(cacheStrategy); }}; This. Cache = DiskLruCache. Create (fileSystem, directory, 201105, 2, maxSize); }Copy the code
Let’s look at InternalCache’s get and PUT methods. Let’s first look at InternalCache’s PUT method, which holds the Response data from a request. From the code above, we can see that put actually calls the Cache class put:
1. Put method analysis:
// Cache class: @nullable CacheRequest put(Response Response) {String requestMethod = Response.request ().method();if (HttpMethod.invalidatesCache(response.request().method())) {
try {
this.remove(response.request());
} catch (IOException var6) {
}
returnnull; // Do not cache if it is not a response returned by a GET request}else if(! requestMethod.equals("GET")) {
return null;
} else if (HttpHeaders.hasVaryAll(response)) {
return null;
} else{// Encapsulate response in cache.entry and call Edit () of DiskLruCache to return editor cache.entry Entry = new cache.entry (response); Editor editor = null; Try {// cache = md5(); // cache = md5(); Editor = this.cache.edit(key(response.request().url())); // Use the converted key as the LinkHashMap key within DiskLruCache.if (editor == null) {
return null;
} else{// write entry (editor) to the file with Okio sink; // Write the body using CacheRequestImplreturn new Cache.CacheRequestImpl(editor);
}
} catch (IOException var7) {
this.abortQuietly(editor);
returnnull; }}}Copy the code
According to the code above, OkHttp only caches the response returned on a GET request. The response returned from non-GET requests can also be cached, but this is highly complicated and low efficiency. After retrieving the Disklrucache.Editor object Editor, call writeTo() to write the URL, request method, response header field, and so on to the cache, and return a CacheRequestImpl instance. Call cacheWritingResponse() inside the intercept() method of the CacheInterceptor to write the body, Finally, close() of CacheRequestImpl is called to complete the commit(actually calling Editor # commit() internally).
Next let’s look at the edit and writeTo internal implementations:
// DiskLruCache class: public @Nullable Editor Edit (String key) throws IOException {returnedit(key, ANY_SEQUENCE_NUMBER); } synchronized Editor Edit (String key, Long expectedSequenceNumber) throws IOException { If an exception occurs, // a new log file is built at the end, replacing initialize() if the file already exists; // Check whether the cache is closed. CheckNotClosed (); // validateKey(key); // validateKey(key); Entry = lruenen.get (key); //lruEntries are instances of LinkHashMap.if(expectedSequenceNumber ! = ANY_SEQUENCE_NUMBER && (entry == null || entry.sequenceNumber ! = expectedSequenceNumber)) {returnnull; // Snapshot is stale.} // Null is returned if there is an Editor operating on the entryif(entry ! = null && entry.currentEditor ! = null) {returnnull; } // Clean if neededif (mostRecentTrimFailed || mostRecentRebuildFailed) {
executor.execute(cleanupRunnable);
returnnull; } // Empty the log buffer to prevent leakage journalWriter.writeutf8 (DIRTY).writebyte ()' ').writeUtf8(key).writeByte('\n');
journalWriter.flush();
if (hasJournalErrors) {
returnnull; // If the log file cannot be edited} // Create a new instance of disklrucache.entry for the requested URL // and put it in lruEntriesif (entry == null) {
entry = new Entry(key);
lruEntries.put(key, entry);
}
Editor editor = new Editor(entry);
entry.currentEditor = editor;
returneditor; } // cache.entry class: public void writeTo(Editor editor) throws IOException { BufferedSink sink = Okio.buffer(editor.newSink(0)); Sink. WriteUtf8 (this.url). WriteByte (10); sink.writeUtf8(this.requestMethod).writeByte(10); sink.writeDecimalLong((long) this.varyHeaders.size()).writeByte(10); int i = 0; int size;for (size = this.varyHeaders.size(); i < size; ++i) {
sink.writeUtf8(this.varyHeaders.name(i)).writeUtf8(":").writeUtf8(this.varyHeaders.value(i)).writeByte(10);
}
sink.writeUtf8((new StatusLine(this.protocol, this.code, this.message)).toString()).writeByte(10);
sink.writeDecimalLong((long) (this.responseHeaders.size() + 2)).writeByte(10);
i = 0;
for (size = this.responseHeaders.size(); i < size; ++i) {
sink.writeUtf8(this.responseHeaders.name(i)).writeUtf8(":").writeUtf8(this.responseHeaders.value(i)).writeByte(10);
}
sink.writeUtf8(SENT_MILLIS).writeUtf8(":").writeDecimalLong(this.sentRequestMillis).writeByte(10);
sink.writeUtf8(RECEIVED_MILLIS).writeUtf8(":").writeDecimalLong(this.receivedResponseMillis).writeByte(10);
if (this.isHttps()) {
sink.writeByte(10);
sink.writeUtf8(this.handshake.cipherSuite().javaName()).writeByte(10);
this.writeCertList(sink, this.handshake.peerCertificates());
this.writeCertList(sink, this.handshake.localCertificates());
sink.writeUtf8(this.handshake.tlsVersion().javaName()).writeByte(10);
}
sink.close();
}
Copy the code
Next, let’s look at the cache.entry constructor:
Entry(Response response) {
this.url = response.request().url().toString();
this.varyHeaders = HttpHeaders.varyHeaders(response);
this.requestMethod = response.request().method();
this.protocol = response.protocol();
this.code = response.code();
this.message = response.message();
this.responseHeaders = response.headers();
this.handshake = response.handshake();
this.sentRequestMillis = response.sentRequestAtMillis();
this.receivedResponseMillis = response.receivedResponseAtMillis();
}
Copy the code
We find that the cache.entry constructor does not have a Response body(), so where is our body cached? CacheRequest is the key to caching the body of a Response, more on that later.
2. Analysis of GET method:
// Cache class: @nullable Response get(Request Request) {// Convert url to key String key = key(request.url()); DiskLruCache.Snapshot snapshot; Entry entry; Try {// get the DiskLruCache.Snapshot instance Snapshot = cache.get(key);if (snapshot == null) {
return null;
}
} catch (IOException e) {
// Give up because the cache cannot be read.
returnnull; } try {// Get an Okio Source entry from snapshot.getSource() = new Entry(snapshot.getSource(ENTRY_METADATA)); } catch (IOException e) { Util.closeQuietly(snapshot);returnnull; Response response = entry. Response (snapshot);if(! entry.matches(request, response)) { Util.closeQuietly(response.body());return null;
}
returnresponse; } // Synchronized Snapshot: public synchronized Snapshot get(String key) throws IOException {initialize(); checkNotClosed(); validateKey(key); // Find entry from lruEntries, entry entry = lruenen.get (key);if(entry == null || ! entry.readable)returnnull; // Get the snapshot value of Entry. Snapshot snapshot = entry.snapshot();if (snapshot == null) return null;
redundantOpCount++;
journalWriter.writeUtf8(READ).writeByte(' ').writeUtf8(key).writeByte('\n'); // If redundantOpCount exceeds 2000 and the size of lruEntries, delete dataif (journalRebuildRequired()) {
executor.execute(cleanupRunnable);
}
returnsnapshot; } //DiskLruCache.Entry class: Snapshotsnapshot() {
if(! Thread.holdsLock(DiskLruCache.this)) throw new AssertionError(); Source[] sources = new Source[valueCount]; // Defensive copy since these can be zeroed out. long[] lengths = this.lengths.clone(); Try {// Iterate over the cached file and generate the corresponding sourcesfor(int i = 0; i < valueCount; i++) { sources[i] = fileSystem.source(cleanFiles[i]); } // Create Snapshot and returnreturn new Snapshot(key, sequenceNumber, sources, lengths);
} catch (FileNotFoundException e) {
// A file must have been deleted manually!
for (int i = 0; i < valueCount; i++) {
if(sources[i] ! = null) { Util.closeQuietly(sources[i]); }else {
break;
}
}
// Since the entry is no longer valid, remove it so the metadata is accurate (i.e.
// the cache
// size.)
try {
removeEntry(this);
} catch (IOException ignored) {
}
returnnull; }}Copy the code
The GET process is relatively simple compared to the PUT process. DiskLruCache.Snapshot is a Snapshot value of DiskLruCache.Entry, which encapsulates the Source of DiskLruCache. According to the conditions, the corresponding cache file is found from Disklrucache.entry, and the Source is generated. It is encapsulated in Snapshot, and then the Source is obtained through snapshot.getSource() to read the cache file.
Summary: after analyzing InternalCache, we know that Cache is only an upper layer executor, and the true InternalCache is implemented by DiskLruCache. In the DiskLruCache, stream files through FileSystem, based on Okio Sink/Source.
intercept
Let’s go back to the Intercept method of CacheInterceptor and continue our analysis:
/ / we from RealCall getResponseWithInterceptorChain () method, / / in the add (new CacheInterceptor (client) internalCache ())); InternalCache @override public Response intercept(Chain Chain) throws IOException {// If cache is configured: Read from cache first Response Response cacheCandidate = cache! = null ? Cache.get (chain-.request ()) // Get the cache: null; long now = System.currentTimeMillis(); // Cache policy, which determines whether the cache is valid by some rule // 1. CacheStrategy based on Request and previously cached Response // 2. Use the CacheStrategy to decide whether to request the network or return the cache directly // 3. If the request network is decided in 2, the returned network response is compared with the local cache in this step. CacheStrategy Strategy = new cacheStrategy.factory (now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse;if(cache ! = null) { cache.trackResponse(strategy); }if(cacheCandidate ! = null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); } // If the network is disabled according to the cache strategy and the cache is invalid, return an empty Responseif (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } // If the network is disabled according to the cache strategy and there is a cache, use the cache directlyif (networkRequest == null) {
returncacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } Response networkResponse = null; Try {// execute the next interceptor and initiate the networkRequest networkResponse = chain-.proceed (networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body.
if(networkResponse == null && cacheCandidate ! = null) { closeQuietly(cacheCandidate.body()); }} // If we also have cached responses, then we are doing conditional fetching.if(cacheResponse ! = null) {// And the server returns a 304 status code (indicating that the cache has not expired or server resources have not been modified)if(networkresponse.code () == HTTP_NOT_MODIFIED) {// build cacheResponse Response = cacheresponse.newbuilder () .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response);return response;
} else{ closeQuietly(cacheResponse.body()); } // If the network resource has been modified: Using the network Response back to the latest data Response Response. = networkResponse newBuilder () cacheResponse (stripBody (cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); // Cache the latest dataif(cache ! = null) {if(HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, NetworkRequest)) {CacheRequest CacheRequest = cache.put(response); // Write the body of Responsereturn cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
Copy the code
A quick summary of what the above code does:
- If the cache is configured when the OkhttpClient is initialized, the caceResponse is fetched from the cache
- Build a CacheStrategy object from the current request and caceResponse
- CacheStrategy This policy object determines whether caceResponse and Request are valid based on the rules. If not, it sets caceResponse and Request to NULL, respectively
- If request and caceResponse are empty, an empty Respone object with status code 504 and body Util.EMPTY_RESPONSE is returned by CacheStrategy (step 3)
- After CacheStrategy processing (step 3), if resQuest is null and cacheResponse is not null, the cacheResponse object is returned directly
- Execute the next interceptor to initiate a network request,
- If the server resource is not expired (status code 304) and there is a cache, the cache is returned
- The latest resource returned by the network (networkResponse) is cached locally and networkResponse is returned.
We have left a Response body cached above that has not been analyzed, so let’s take a look at the implementation of the cacheWritingResponse method:
/ / CacheInterceptor class: private Response cacheWritingResponse(final CacheRequest cacheRequest, Response Response)throws IOException {// Some applications return null; For compatibility, we treat it as an empty cache request.if (cacheRequest == null) return response;
Sink cacheBodyUnbuffered = cacheRequest.body();
if (cacheBodyUnbuffered == null) returnresponse; // Get BufferedSource final BufferedSource for Response.body ()source= response.body().source(); // Build the BufferedSink final BufferedSink cacheBody = okio.buffer (cacheBodyUnbuffered); // Note: The body Source cacheWritingSource = new used to actually write the ResponseSource() {
boolean cacheRequestClosed;
@Override
public long read(Buffer sink, long byteCount) throws IOException { long bytesRead; Try {// insert byteCount into sink and delete bytesRead = source.read(sink, byteCount); } catch (IOException e) {if(! cacheRequestClosed) { cacheRequestClosed =true;
cacheRequest.abort(); // Failed to write a complete cache response.
}
throw e;
}
if (bytesRead == -1) {
if(! cacheRequestClosed) { cacheRequestClosed =true; cacheBody.close(); // Cache response body complete}return- 1; // return -1} // read in sinksourceCopyTo (cacheBody.buffer(), sink.size() -bytesRead, bytesRead); cacheBody.emitCompleteSegments();return bytesRead;
}
@Override
public Timeout timeout() {
return source.timeout();
}
@Override
public void close() throws IOException {
if(! cacheRequestClosed && ! discard(this, HttpCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) { cacheRequestClosed =true; cacheRequest.abort(); } source.close(); }}; String contentType = response.header("Content-Type");
long contentLength = response.body().contentLength();
returnresponse.newBuilder() .body(new RealResponseBody(contentType, contentLength, Okio.buffer(cacheWritingSource))) // Notice the last argument, which is described after.build(); }Copy the code
It might be a little confusing from this code to see how source is cached in the read method of the cacheWritingSource inner class, so we’ll have to see where the cacheWritingSource is passed and by whom the read method is called. We can see from the last return that the cacheWritingSource is encapsulated into the Response return. As we mentioned above, the entire chain of interceptors will eventually return the Response to the onResponse callback for asynchronous requests or as a return value for synchronous requests. So we end up calling Response only with the string() method of the body() of Response. So let’s look at what this method does.
// ResponseBody ==> Public final String String () throws IOException {BufferedSourcesource = source(a); try { Charset charset = Util.bomAwareCharset(source, charset()); / / fromsourceReads the result string inreturn source.readString(charset);
} finally {
Util.closeQuietly(source); }}Copy the code
ResponseBody is actually just an abstract class, and its implementation class is RealResponseBody. We can see from the RealResponseBody that the source was initialized from its constructor:
RealResponseBody class: Private Final BufferedSourcesource;
public RealResponseBody(
@Nullable String contentTypeString, long contentLength, BufferedSource source) {
this.contentTypeString = contentTypeString;
this.contentLength = contentLength;
this.source = source;
}
Copy the code
So when did our RealResponseBody get initialized? We’re talking about the cached Response, so the cached Response must have been returned from our CacheInterceptor, so the Response that we returned from the cacheWritingResponse method above, during the build process, That’s actually where we initialize the RealResponseBody. So our source at this point is the okio.buffer (cacheWritingSource) we passed in the return value of the cacheWritingResponse method. The okio.buffer (cacheWritingSource) method returns the RealBufferedSource class (passing in the cacheWritingSource), So source.readString(charset) in response.body ().string() is actually the readString method of RealBufferedSource.
// RealBufferedSource class public final Buffer Buffer = new Buffer(); public final Sourcesource; // Finally see what we want to seesource, thissourceThat's the cacheWritingSource (anonymous inner class used to write to the body cache of Response) that's passed in, RealBufferedSource(Source)source) {
if (source == null) throw new NullPointerException("source == null");
this.source = source;
}
@Override
public String readString(Charset charset) throws IOException {
if (charset == null) throw new IllegalArgumentException("charset == null"); // writeAll, buffer.writeall (source);
returnbuffer.readString(charset); } @override public Long writeAll(Source) @override public long writeAll(Source) @override public long writeAll(Sourcesource) throws IOException {
if (source == null) throw new IllegalArgumentException("source == null"); long totalBytesRead = 0; // Pay attention hereforLoop, ifread! = -1 always polling, so always executing cacheWritingSourcereadWrite the body data of Response until it is finished and return -1for (long readCount; (readCount = source.read(this, Segment.SIZE)) ! = 1) { totalBytesRead +=readCount;
}
return totalBytesRead;
}
Copy the code
To summarize:
- The intercept method in the CacheInterceptor cache the latest Response (execution method: cacheWritingResponse)
- The cacheWritingResponse method has an anonymous inner class (cacheWritingSource) that is actually used to write to the body cache of the Response.
- Since the cacheWritingResponse method returns a Response (where the ResponseBody contains okio.buffer (cacheWritingSource), i.e. BufferedSource) to the cache interceptor, The cache interceptor is passed up to the interceptor until it is returned to the return value of the synchronous request or to the onResponse method of the asynchronous request.
- The readString of the BufferedSource (actually RealBufferedSource) is triggered when we call the Response.body().string() method.
- ReadString, in turn, calls Buffer’s writeAll (passing in cacheWritingSource) and makes a for loop to execute all the read methods in the anonymous inner class (cacheWritingSource) in step 2 to write to the Response cache
RealConnection
RealConnection is the implementation class of Connection. RealConnection encapsulates the underlying Socket Connection and uses OKio (another separate open source project from Square) to read and write data from the server interaction. First take a look at its member properties:
private final ConnectionPool connectionPool; private final Route route; // The following fields are initialized by the connect() method and will never be assigned again. // Tcp Socket private Socket; // Application layer socket // Private Handshake; // Private Protocol Protocol; Private Http2Connection Http2Connection; / / bysourceAnd sink, the input and output streams that interact with the server private BufferedSourcesource; private BufferedSink sink; // If noNewStreams is set to. // If noNewStreams is set totrue, noNewStreams is alwaystruePublic Boolean noNewStreams; public Boolean noNewStreams; Public int successCount; Public int allocationLimit = 1; public int allocationLimit = 1; Allocations is associated StreamAllocation, which counts which streams are set up on a connection, Public final List<Reference<StreamAllocation>> public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();Copy the code
As you can see from its member properties, the RealConnection holds the Socket connection and retains the input and output streams that sink and Source use to interact with the server. So having a RealConnection means we already have a communication link (Socket link) with the server. The three-way handshake is also implemented in this class, in its connect method, which we analyze in the ConnectInterceptor interceptor.
When requesting a URL using OkHttp, RealConnection does the following:
- It uses the URL and configures OkHttpClient to create an address. This address specifies how we connect to the Webserver. It attempts to retrieve the connection with that address from the connection pool.
- If it does not find a connection in the connection pool, it selects the route to try. This usually means making a DNS request to get the IP address of the server. Then select the TLS version and proxy server if necessary.
- If it is a new route, it can be connected by building a direct socket connection, A TLS tunnel (HTTPS over HTTP proxy), or a TLS connection. It will establish a handshake when necessary.
- It sends the HTTP request and reads the response.
- If there is a problem with the connection, OkHttp selects another route and tries again. This allows OkHttp to recover if a subset of server addresses is not available. This is also useful when pooled connections are out of date or when the version of TLS being attempted is not supported.
StreamAllocation
HTTP version background:
From the original 1.0 version, to the subsequent 1.1 version, to Google SPDY, and then to the 2.0 version, the HTTP protocol has been more and more improved. There are two major differences between Http2.0 and Http1.0 and 1.1. 2.0 addresses two of the most important issues of the older versions (1.1 and 1.0) : 2. Using the multiplexing technology, multiple streams can share a socket connection, and each TCP connection is completed through a socket, which corresponds to a host and port. If multiple streams (i.e., requests) are connected to the same host and port, they can share the same socket. This has the advantage of reducing the time required for a TCP three-way handshake. In OKHttp, a RealConnection records a connection. This is a RealConnection. In this class, use socket to connect and use HandShake to handle HandShake.
Three concepts: request, connection, flow
We need to understand that HTTP allocation requires the establishment of a new “stream” on the “connection”. We call StreamAllocation a “stream bridge “, which is responsible for finding a” connection “and establishing a” stream “for a” request “, thus completing the long-distance communication. So StreamAllocation has to do with requests, connections, and streams.
The StreamAllocation annotation explains in detail that a Connection is a logistics channel based on a Socket, a Stream represents a logical Stream, and a Call encapsulates a request. It was also mentioned earlier that a Call can involve multiple streams (such as redirection or Auth authentication). If StreamAllocation wants to solve the above problem, it needs two steps: one is to find the connection, the other is to obtain the stream. So the StreamAllocation should include a Stream(the Stream in OKHttp is HttpCodec); You should also include a Connection. If you want to find a suitable connection, you also need a ConnectionPool property. So there should be a method for fetching streams in StreamAllocation which is newStream(); FindConnection () is the method to find the appropriate stream; There should also be methods of finish() to close the flow object after completing the requested task, methods of termination and cancellation, and methods of freeing resources.
Let’s first look at its member attributes:
The /** * address specifies a Webserver (such as github.com) and all the static configuration required to connect to that server: port number, HTTPS Settings, and preferred network protocol (such as HTTP/2 or SPDY). * urls that share the same address can also share the same underlying TCP socket connection. Shared connections have significant performance advantages: * Lower latency, higher throughput (due to slow TCP startup), and power savings. OkHttp automatically reuses HTTP/1.x connections and HTTP/2 and SPDY multiconnections using ConnectionPool. */ public final Address address; // Address /** * routing provides the dynamic information needed to actually connect to the Web server. This is the specific IP address to try (discovered by a DNS query), * the exact proxy server to use (if using a ProxySelector), and the version of TLS to negotiate (for HTTPS connections). * * An address may have many routes. For example, a Web server hosted in multiple data centers might generate multiple IP addresses in its DNS response. * */ private Route route; // Route private final ConnectionPool ConnectionPool; Private final Object callStackTrace; Private final RouteSelector RouteSelector; Private int refusedStreamCount; Private RealConnection connection; // Private Boolean released; Private Boolean Canceled // Canceled private HttpCodec; // Connect to the required streamCopy the code
From its member properties you can see that StreamAllocation is actually a wrapper class for the various components that OkHtpp requests. StreamAllocation is related to: 1. Find a suitable connection 2. 3. Find the appropriate stream method findConnection(), which we analyze in the ConnectInterceptor interceptor.
HttpCodec
From StreamAllocation we have already mentioned that HttpCodec is actually a stream in “request, connection, stream” and HttpCodec is just an interface. The two implementation classes are Http1Codec and Http2Codec. Corresponding to Http1.1 protocol and Http2.0 protocol respectively. We’ll focus on Http1Codec in this article:
// Configure the client for this flow. For HTTPS proxy tunnels, it can be null. final OkHttpClient client; // Owns the stream allocation for this stream. For HTTPS proxy tunnels, it can be null. final StreamAllocation streamAllocation; // The input and output streams that interact with the server are final BufferedSourcesource; final BufferedSink sink; // State of the current stream, STATE_IDLE: idle connection ready to write request header int state = STATE_IDLE; Private long headerLimit = HEADER_LIMIT; private long headerLimit = HEADER_LIMIT; public Http1Codec(OkHttpClient client , StreamAllocation streamAllocation, BufferedSourcesource, BufferedSink sink) {
this.client = client;
this.streamAllocation = streamAllocation;
this.source = source;
this.sink = sink;
}
Copy the code
As you can see from the Http1Codec members and constructors, when the Http1Codec is initialized, the sink and source of the interaction with the server are passed in. The last interceptor, CallServerInterceptor, actually sends the request and gets the response.
ConnectionPool
Where do we see ConnectionPool in the OkHttp process?
- In the okHttpClient. Builder constructor, the ConnectionPool is initialized
- We’ve also seen ConnectionPool in the StreamAllocation newStream method.
- StreamAllocation calls the findConnection method to find a Connection that can be used. ConnectionPool is also involved here. When the findConnection method looks for a Connection, it first tries to reuse StreamAllocation’s own Connection. If this Connection is not available, It will then go to the ConnectionPool to find the appropriate Connection.
Overall, ConnectionPool is responsible for all connections, including the reuse of connections and the cleanup of unwanted connections. OkHttp abstracts all connections between the client and server as connections (the actual implementation class is RealConnection). ConnectionPool is designed to manage all connections. What it does is: Reuse the Connection as much as time allows and clean it up. The external gets a Connection object by calling the get method, and adds a new Connection by calling the PUT method.
The get method
// ConnectionPool class: // A linear collection that supports inserting and removing elements at both ends. Private final Deque<RealConnection> connections = new ArrayDeque<>(); @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) { assert (Thread.holdsLock(this)); / / traverse connectionsfor(RealConnection Connection: connections) {// Check whether the connection meets the criteriaif (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
returnconnection; }}returnnull; Public final List<Reference<StreamAllocation>> Allocations = new ArrayList<>(); // RealConnection class: // Current stream allocations for this connection public final List<Reference<StreamAllocation>> Allocations = new ArrayList<>(); Public Boolean isEligible(Address Address, @nullable Route Route) {// Eligible Connection has StreamAllocation limitsif (allocations.size() >= allocationLimit || noNewStreams) return false; // Whether the non-host fields of the address overlap (same)if(! Internal.instance.equalsNonHost(this.route.address(), address))return false; // Whether the host matches exactlyif (address.url().host().equals(this.route().address().url().host())) {
return true; } // At this point we have no hostname match. However, if our join merge requirements are met, we can still provide the request. // 1. This connection must be HTTP / 2.if (http2Connection == null) return false; // 2. The route must share the IP address. This requires us to provide DNS addresses for both hosts, which only happens after the route planning. We cannot merge the connections that use the proxy because the proxy does not tell us the IP address of the source server.if (route == null) return false;
if(route.proxy().type() ! = Proxy.Type.DIRECT)return false;
if(this.route.proxy().type() ! = Proxy.Type.DIRECT)return false;
if(! this.route.socketAddress().equals(route.socketAddress()))return false; // 3. The server certificate for this connection must cover the new host.if(route.address().hostnameVerifier() ! = OkHostnameVerifier.INSTANCE)return false;
if(! supportsUrl(address.url()))return false; // 4. The certificate must match the host. try { address.certificatePinner().check(address.url().host(), handshake().peerCertificates()); } catch (SSLPeerUnverifiedException e) {return false;
}
return true; } // StreamAllocation class: public void acquire(RealConnection connection, boolean reportedAcquired) { assert (Thread.holdsLock(connectionPool));if(this.connection ! = null) throw new IllegalStateException(); // Keep the connection this. Connection = connection; this.reportedAcquired = reportedAcquired; // Add this allocations to the allocations, Used for RealConnection. IsEligible method to judge whether the current Connection with StreamAllocation restrictions over the Connection. The allocations. Add (new StreamAllocationReference(this, callStackTrace)); }Copy the code
To summarize:
-
The isEligible method (which determines whether the traversed connection meets the criteria, that is, whether it is reusable) :
1. If the Connection has been allocated more than the allocation limit or has been marked, it is not eligible. 2. Then call equalsNonHost, mainly whether Address china-africa host (host) fields overlap (), if there are different. 3. The next step is to determine whether the host is the same. If the host is the same (and 1 and 2 also match), then the Connection is reusable for the current Address. 4. If 1, 2, and 3 are not met, the connection can still be reused if some conditions are met. For details, see the code comment aboveCopy the code
-
Acquire method (StreamAllocation class) :
1. Save the traversal connections for reusable connection (2) the StreamAllocation class of weak references StreamAllocationReference add add into this reuse connection, Determine whether the current Connection has more than the StreamAllocation limit 3. The connection reserved by this method will be used by the findConnection method (as described in the ConnectInterceptor section above).Copy the code
Put method
void put(RealConnection connection) { assert (Thread.holdsLock(this)); // Whether to enable the asynchronous cleanup taskif(! cleanupRunning) { cleanupRunning =true; executor.execute(cleanupRunnable); } // Add to connections connections.add(connection); }Copy the code
The PUT method is simple, adding a Connection object directly to the Connections dual-end queue. One thing to note here is that if cleanupRunning is false, a cleanupRunnable will be added to the thread pool for the purpose of cleaning up. This cleanup operation will be analyzed immediately.
Cleanup: Cleans up unwanted connections
private final Runnable cleanupRunnable = new Runnable() {
@Override
public void run() {// This cleanupRunnable is an infinite loop that will continue to execute as long as the cleanup method does not return -1.while (true) {// call cleanup to find and cleanup unwanted connections (returns the duration in nanometers) longwaitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return; // When the cleanup method does not return -1, the current Runnable goes to sleep.if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L); Synchronized (connectionPool.this) {try {// synchronized (connectionPool.this) {// synchronized (connectionPool.this) {// synchronized (connectionPool.this) {// synchronized (connectionPool.this) {// synchronized (connectionPool.this) {// synchronized (connectionPool.this) {waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } } }; /** * Perform maintenance on this pool and expel the maximum number of idle connections if the keep active limit or free connection limit is exceeded. * Returns the duration in nanometers until the next call to the method. If no further cleaning is required, -1 is returned. */ long cleanup(long now) { intinUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE; // Find the connection to be expelled, or the expiration date of the next expulsion. synchronized (this) {for(Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); // If the connection is in use, skip the search. // To clean up potentially leaking streamAllocations and return the number of streamallocations that are using this connectionif (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue; } // idle connections remember idleConnectionCount++; long idleDurationNs = now - connection.idleAtNanos; // Determine whether the connection has the longest idle timeif(idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } // If the current Connection has exceeded the maximum idle time // or the number of idle connections exceeds the maximum number of idle connections, the Connection should be reclaimedif(longestIdleDurationNs > = this. KeepAliveDurationNs | | idleConnectionCount > enclosing maxIdleConnections) {/ / it is removed from the list, Then close it below (outside of the synchronization block). connections.remove(longestIdleConnection); }else if(idleConnectionCount > 0) {// Return keepalive - maximum idle time the connection is currently alive (that is, how long the connection needs to be cleaned up)return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {// All connections are in use. Note All connections can be cleared only after their lifetime is at leastreturn keepAliveDurationNs;
} else{// No connection, idle or in use. cleanupRunning =false;
return- 1; }} / / 3. The closing of the Connection socket / / code here shows that the Connection has been more than the maximum idle time, should be recycled closeQuietly (longestIdleConnection. The socket ()); // Continue cleaningreturn0; } private int pruneAndGetAllocationCount(RealConnection connection, long now) { List<Reference<StreamAllocation>> references = connection.allocations; // Iterate over weak references to StreamAllocation stored in the current RealConnectionfor(int i = 0; i < references.size(); ) { Reference<StreamAllocation> reference = references.get(i); // If the weak reference to StreamAllocation is not empty, continuation is skippedif(reference.get() ! = null) { i++;continue; } / / if StreamAllocation weak references is empty StreamAllocation StreamAllocationReference streamAllocRef = (StreamAllocation.StreamAllocationReference) reference; String message ="A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?"; Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace); Remove (I); // Remove the references in this location. connection.noNewStreams =true; // If references is empty, that is, the connection has no StreamAllocation use, then the connection can be clearedif (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return0; }}return references.size();
}
Copy the code
Logical summary:
- Iterate through all the connection of each connection calls pruneAndGetAllocationCount whether it idle connection. If it is in use, it simply traverses one.
- For idle connections, check whether they are idle for the longest time.
- For the connection with the longest idle time, if it exceeds the set maximum idle time (5 minutes) or the maximum number of idle connections (5), the connection will be cleared. Otherwise, calculate the time required for the next cleanup, so that the cycle in the cleanupRunnable will sleep for the corresponding time and continue cleaning after waking up.
ConnectInterceptor
After the CacheInterceptor executes, the next interceptor, ConnectInterceptor, is executed, so let’s take a look at the source code in its intercept method:
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); // Get the StreamAllocation object from the interceptor chain // This StreamAllocation object is actually the second parameter of the interceptor chain, Is initialized in the first interceptor StreamAllocation StreamAllocation = realChain. StreamAllocation (); // We need the network to satisfy this request. Possiblyfor validating a conditional GET.
boolean doExtensiveHealthChecks = ! request.method().equals("GET"); It has two subclasses, Http1Codec and Http2Codec, respectively, corresponding to Http1.1 protocol and Http2.0 protocol, this article mainly learn the former. Http1Codec includes two important attributes, namelysource* CallServerInterceptor uses the I/O operation provided by HttpCodec to complete network communication. * */ HttpCodec httpCodec = streamAllocation.newStream(client, chain,doExtensiveHealthChecks); // Get the RealConnetion, the actual network I/O transfer object (actually this step is simple, A return of just one step in the code access to the connection) RealConnection connection = streamAllocation. Connection (); // Execute the next interceptorreturn realChain.proceed(request, streamAllocation, httpCodec, connection);
}
Copy the code
That’s all there is to this interceptor thing? Haha, that is to think more, the interceptor in the things can be hidden deep, there is material very ah. Let’s take a look at the HttpCodec and RealConnection acquisition processes separately.
// StreamAllocation class: public HttpCodec newStream(OkHttpClient client, Interceptor.Chain Chain, Boolean)doExtensiveHealthChecks) {//1. Get the set connection timeout, the read/write timeout, and whether to reconnect. int connectTimeout = chain.connectTimeoutMillis(); intreadTimeout = chain.readTimeoutMillis(); int writeTimeout = chain.writeTimeoutMillis(); int pingIntervalMillis = client.pingIntervalMillis(); boolean connectionRetryEnabled = client.retryOnConnectionFailure(); RealConnection resultConnection = findHealthyConnection(connectTimeout,readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled,
doExtensiveHealthChecks); //3. Initialize the class (HTTP 1.1 and HTTP 2.0) that codec the request and the result through a ResultConnection. // This is mainly initialization, and will be used later in an interceptor. HttpCodec resultCodec = resultConnection.newCodec(client, chain, this); synchronized (connectionPool) { codec = resultCodec; / / return HttpCodecreturnresultCodec; } } catch (IOException e) { throw new RouteException(e); }}Copy the code
From the above code, it looks like this method does two things:
- Call findHealthyConnection to get a RealConnection object.
- Generate an HttpCodec object from the obtained RealConnection and return it.
So let’s move on to the findHealthyConnection method:
// StreamAllocation class: Private RealConnection findHealthyConnection(int connectTimeout, int)readTimeout,
int writeTimeout, int pingIntervalMillis,boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {// Get the RealConnection object RealConnection Candidate = findConnection(connectTimeout,readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); // If this is a completely new connection, we can skip the extensive health check. synchronized (connectionPool) {if(candidate.successCount == 0) {// Directly returnreturncandidate; }} /** * Destroy unhealthy links in the pool * Unhealthy RealConnection conditions are as follows: * RealConnection object socket not closed * Input stream of socket not closed * Output stream of socket not closed * Http2 connection not closed * */if(! candidate.isHealthy(doExtensiveHealthChecks)) {/ / destruction of resources (this method will be called deallocate method (lift) / / need to release the Socket connection, Close the Socket by executing the closeQuietly method) noNewStreams();continue;
}
returncandidate; }}Copy the code
In this code, you can see that retrieving the RealConnection object is given to the findConnection method:
// StreamAllocation class: private RealConnection findConnection(int connectTimeout, intreadTimeout,
int writeTimeout,int pingIntervalMillis
, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; Connection releasedConnection; Socket toClose; Synchronized (connectionPool) {// 2. Make some judgments about whether it has been released, whether the codec class is empty, and whether the user has cancelledif (released) throw new IllegalStateException("released");
if(codec ! = null) throw new IllegalStateException("codec ! = null");
if (canceled) throw new IOException("Canceled"); // (try to reuse) Try to use an allocated connection. We need to be careful here because the connections we have already allocated may be limited to creating new streams. releasedConnection = this.connection; toClose = releaseIfNoNewStreams();if(this.connection ! = null) { // We had an already-allocated connection and it's good. result = this.connection; releasedConnection = null; } if (! reportedAcquired) { // If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
if(result == null) {/** * 4. Try to get a connection from the connection pool. Note that the last argument is null ** Internal is an abstract class, The class is implemented in the static{} block of OkHttpClient (an anonymous inner class) * and its get method actually calls one of the onnectionPool connection pool's GET methodsforLoop through the connection pool looking for a qualified connection * which updates the connection value through the acquire method in StreamAllocation. * */ Internal.instance.get(connectionPool, address, this, null);if(connection ! = null) { foundPooledConnection =true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if(releasedConnection ! = null) { eventListener.connectionReleased(call, releasedConnection); }if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if(result ! = null) {// If we find an already allocated or pooled connection, we're done.returnresult; } // If we need to choose a route, please choose one. This is a block operation. boolean newRouteSelection =false;
if(selectedRoute == null && (routeSelection == null || ! routeSelection.hasNext())) { newRouteSelection =true; RouteSeletor routeSelection = RoutesElector.next (); } //5. Synchronized (connectionPool) {if (canceled) throw new IOException("Canceled");
if(newRouteSelection) {// 6. Now that we have a set of IP addresses (Route), try again to get connections from the pool. List<Route> routes = routeSelection.getAll();for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if(connection ! = null) { foundPooledConnection =true;
result = connection;
this.route = route;
break; }}} // Not foundif(! foundPooledConnection) {if(selectedRoute == null) { selectedRoute = routeSelection.next(); } // Create a connection and immediately assign it to the assignment. It is possible that the asynchronous cancel () will interrupt the handshake we are about to do. route = selectedRoute; refusedStreamCount = 0; Result = new RealConnection(connectionPool, selectedRoute); selectedRoute = new RealConnection(connectionPool, selectedRoute); // Update connection acquire(result,false); }} // If we find the merge connection the second time, we are done.if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
returnresult; } /** * 8. Do TCP + TLS handshake. This is a block operation. Open a Socket connection by calling RealConnection's "connect" method. * If it is a normal HTTP request, the Socket will be used for the connection. * If it is an HTTPS request, the handshake will be used to establish the channel. * */ result.connect(connectTimeout,readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true; Put (connectionPool, result); // 9. // Finally add a multiplexing judgment, which is available only in Http2 // if another multiplexing connection is created at the same time, release the connection and use another linkif (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
Copy the code
This code is a bit too much, but it will be explained in the code comments. Here’s a quick summary:
- The first thing to do is to determine whether it has been cancelled, whether the encoding codec is empty, etc
- Reuse connections, if they can be reused
- If step 2 cannot be reused, get one from the ConnectionPool
- Return if the connection obtained in step 3 is not empty
- If the connection obtained in step 3 is null, then the connection is obtained from the connection pool (the difference between step 3 and step 3 is that the Route parameter is passed the second time). If the return is not null, then the connection is returned
- If no available connection is obtained in the previous five steps, create a new connection and add it to the ConnectionPool
- Finally, a call to RealConnection’s connect method opens a Socket link for the next interceptor to use
Let’s move on to RealConnection’s connect operation:
// RealConnection class: public void connect(int connectTimeout, intreadTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener EventListener) {// protocol is used to check whether the connection is establishedif(protocol ! = null) throw new IllegalStateException("already connected"); RouteException routeException = null; List<ConnectionSpec> connectionSpecs = rout.address ().connectionSpecs(); // Connection spec selector (used to select connections, such as: ConnectionSpecSelector = New ConnectionSpecSelector(connectionSpecs);if (route.address().sslSocketFactory() == null) {
if(! connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) { throw new RouteException(new UnknownServiceException("CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if(! Platform.get().isCleartextTrafficPermitted(host)) { throw new RouteException(new UnknownServiceException("CLEARTEXT communication to " + host + " not permitted by network " +
"security policy")); }}while (trueThe requiresTunnel() method implementation is actually simple: it determines whether address's sslSocketFactory is empty and whether the proxy proxy type is Httpif (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our
// resources.
break; }}else{// Execute the Socket connection connectSocket(connectTimeout,readTimeout, call, eventListener); } // establishProtocol establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener); eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections " +
"attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
if(http2Connection ! = null) { synchronized (connectionPool) { allocationLimit = http2Connection.maxConcurrentStreams(); }}} /** * Whether all work is building HTTPS connections through proxy tunnels. The problem here is that the proxy server can issue an authentication challenge and then close the connection. */ private void connectTunnel(int connectTimeout, intreadTimeout, int writeTimeout , Call Call,EventListener EventListener) throws IOException {//1, createTunnelRequest object Request tunnelRequest = createTunnelRequest(); HttpUrl url = tunnelRequest.url(); //forLoop: MAX_TUNNEL_ATTEMPTS == 21for(int i = 0; i < MAX_TUNNEL_ATTEMPTS; I++) {//2, open socketreadTimeout, call, eventListener); //3. Request to open the tunnel and return tunnelRequestsource)
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url); //4. Open the tunnel successfullywhilecycleif (tunnelRequest == null) break; // If the tunnel fails to be opened, close related resources and continuewhileLoop closeQuietly (rawSocket); rawSocket = null; sink = null;source= null; eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null); }} /** * Does all the work needed to build a full HTTP or HTTPS connection on the raw socket. */ private void connectSocket(int connectTimeout, intreadTimeout, Call call, EventListener eventListener) throws IOException { Proxy proxy = route.proxy(); Address address = route.address(); / / 1, initialize the Socket rawSocket = proxy. The type () = = proxy. The DIRECT | | proxy. The type () = = proxy. The HTTP? address.socketFactory().createSocket() : new Socket(proxy); / / use SOCKS proxy server eventListener. ConnectStart (call, the route. The socketAddress (), the proxy). rawSocket.setSoTimeout(readTimeout); Get ().connectSocket(rawSocket, rout.socketAddress (), connectTimeout); } catch (ConnectException e) { ConnectException ce = new ConnectException("Failed to connect to "+ route.socketAddress()); ce.initCause(e); throw ce; } try {// Sink = OutputStream; Source = InputStreamsourceIs used to open the tunnel connection and the last interceptor is used for the actual network request to send and get the responsesource = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest, HttpUrl URL) throws IOException {// Concatenate CONNECT command String requestLine ="CONNECT " + Util.hostHeader(url, true) + "HTTP / 1.1";
while (true) {// Another onewhileHttp1Codec tunnelConnection = new Http1Codec(null, null,source, sink); / / send the CONNECT request to open the tunnel links, tunnelConnection. WriteRequest (tunnelRequest. Headers (), requestLine); / / complete link tunnelConnection. FinishRequest (); / / build the response and control is the response inputStream flow response. = tunnelConnection readResponseHeaders (false)
.request(tunnelRequest)
.build();
switch (response.code()) {
case HTTP_OK:
return null;
caseHTTP_PROXY_AUTH:// indicates that the server requires an access certificate to the client, TunnelRequest = rout.address ().proxyAuthenticator().authenticate(route, response); // The proxy authentication failsif (tunnelRequest == null)
throw new IOException("Failed to authenticate with proxy"); // If the proxy authentication succeeds, but the response requires close, the TCP connection is closed. In this case, the client cannot send data on the connectionif ("close".equalsIgnoreCase(response.header("Connection"))) {
return tunnelRequest;
}
break; }}} private void establishProtocol(ConnectionSpecSelector ConnectionSpecSelector) throws IOException {// If SSL is not usedif (route.address().sslSocketFactory() == null) {
protocol = Protocol.HTTP_1_1;
socket = rawSocket;
return; } // If it is SLL connectTls(connectionSpecSelector); // If it is HTTP2if (protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink) .listener(this) .build(); http2Connection.start(); }}Copy the code
What is a tunnel? Tunneling is one of the uses of HTTP. The data (or payload) to be passed through tunnels can be data frames or packets of different protocols. In simple terms, Tunneling is to transmit data of another network protocol using one network protocol. For example, the network of host A and host B is an IPv6 network of the same type, but the network connecting A and B is an IPv4 network. For communication,A and B can use the tunneling technology. When the packets pass through the multi-protocol router of IPv4 data, the IPv6 packets are put into the IPv4 packets. Then the IPv4 packet wrapped with IPv6 packets is sent to B. When the packet reaches B’s router, the original IPv6 packet is stripped off and sent to B.
SSL tunnel: The SSL tunnel is used to transmit encrypted SSL data over the firewall. In this case, the tunnel is used to transmit non-HTTP traffic (SSL traffic) over the firewall to the specified server.
How do you open the tunnel? HTTP provides the CONNECT method, which is reserved in THE HTTP/1.1 protocol for proxy servers that can pipe connections, to suggest a Web tunnel. The client sends a CONNECT request to the tunnel gateway to open a TCP link. After the tunnel is opened, all data sent by the client through the HTTP tunnel is forwarded to the TCP link, and all data responded by the server is sent to the client through the tunnel. (Note: For more information, see chapter 8 of the Computer Networks Fifth edition and the HTTP Authoritative Guide.) The format of CONNECT at the head of HTTP can be simply stated as follows: CONNECT hostname: Port HTTP/1.1
This part is not in-depth analysis, interested partners to query it.
CallServerInterceptor
The CallServerInterceptor interceptor is the last interceptor on the Okhttp interceptor chain, followed by the ConnectInterceptor, which is responsible for opening the TCP connection. The main function of CallServerInterceptor is to send a request to the server and finally return a Response object for the client to use.
The 100-continue function is used when the client asks the server whether to process the POST data. If the post data is not processed, the client does not upload the POST data. In normal cases, the server returns 100 or an error code after receiving the request.
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; HttpCodec HttpCodec = realchain.httpStream (); StreamAllocation streamAllocation = realChain.streamAllocation(); RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); realChain.eventListener().requestHeadersStart(realChain.call()); . / / sends a request to the server httpCodec writeRequestHeaders (request); realChain.eventListener().requestHeadersEnd(realChain.call(), request); Response.Builder responseBuilder = null; // Check whether the body is requestedif(HttpMethod.permitsRequestBody(request.method()) && request.body() ! = null) {// If the request has an "Expect: 100-continue" header, wait for an "HTTP / 1.1 100 continue" response before sending the request body. // If we don't get it, please return what we did (e.g. 4xx response) without transmitting the request body.if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); realChain.eventListener().responseHeadersStart(realChain.call()); / / build responseBuilder object responseBuilder = httpCodec. ReadResponseHeaders (true);
}
if(responseBuilder == null) {// if "Expect: 100-continue "expect, please send a request to the server body Realchain-eventListener ().requestBodyStart(realchain-call ()); long contentLength = request.body().contentLength(); CountingSink requestBodyOut = new CountingSink(httpCodec.createRequestBody(request, contentLength)); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); Request.body ().writeto (bufferedRequestBody); // Write the request body to bufferedRequestBody. // Push all buffered bytes to their final destination and release the resources held by this sink. bufferedRequestBody.close(); realChain.eventListener() .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount); }else if(! Connection.ismultiplexed ()) {// block reuse of HTTP / 1 connections if the "Expect: 100-continue" expectation is not met. Otherwise, we are still obligated to transmit the request body to keep the connection in a consistent state. streamAllocation.noNewStreams(); } // The actual httpcodec.finishRequest () is called with sink.flush(); // Read the response header, status code, etcif (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false); } // Create Response, write this Request, handshake status, Request time, Results the Response time Response = responseBuilder. Request (request) handshake (streamAllocation. Connection (). Handshake ()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); int code = response.code();if(code == 100) {// The server sent a 100-continue, even if we had no request. Also try again to read the actual reply responseBuilder = httpCodec. ReadResponseHeaders (false); response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); code = response.code(); } realChain.eventListener() .responseHeadersEnd(realChain.call(), response); // Return an empty body based on the status code and webSocketif (forWebSocket && code == 101) {
// Connection is upgrading, but
// we need to ensure interceptors see a non-null
// response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else{response = response.newBuilder() // Return the stream that reads the response body, And build the client available RealResponseBody. Body (httpCodec. OpenResponseBody (response)). The build (); } // If close is set, disconnect the connectionif ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) { streamAllocation.noNewStreams(); } // HTTP 204(no content) indicates that the response message contains headers and a status line, but no body content. // HTTP 205(reset content) indicates that the response was executed successfully, and resets the page (Form) for the user to enter next time.if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
Copy the code
From the CallServerInterceptor code, we can see that OkHttp sends the request and receives the response through the HttpCodec, so let’s take a look at each step:
- Send the request
/ / Http1Codec class: @override public void writeRequestHeaders(Request Request) throws IOException {// Returns the Request status line, such as GET/HTTP / 1.1. String requestLine = RequestLine.get( request, streamAllocation.connection().route().proxy().type()); // Write a request to writeRequest(request.headers(), requestLine); } /** * send the request to the server via OkIO's Sink object (which can be viewed as the Socket's OutputStream). */ public void writeRequest(Headers headers, String requestLine) throws IOException {if(state ! = STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(":")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
Copy the code
We know that HTTP supports methods such as post,delete,get, and put, which require a RequestBody (in the case of Okhttp, RequestBody). So then Okhttp does the response to the request body after writeRequestHeaders, We also know from the above analysis that the RequestBody is sent via the writeTo method of the RequestBody (actually call the write method of the bufferedRequestBody object). An instance of RequestBody may be FormBody or a custom ReqeustBody:
// Create a RequestBody (FormBody is a RequestBody implementation class) formBody.builder FormBody = new FormBody.Builder();if(mParams ! = null && ! mParams.isEmpty()) {for(Map.Entry<String,String> entry: mParams.entrySet()) { formBody.add(entry.getKey(),entry.getValue()); }} RequestBody Form = formBody.build();}} RequestBody form = formBody.build(); Request.builder Builder = new Request.Builder();if(mHeader ! = null && ! mHeader.isEmpty()) {for(Map.Entry<String,String> entry: mHeader.entrySet()) { builder.addHeader(entry.getKey(),entry.getValue()); Final Request Request = Build.post (form).url(mUrl).build(); Call call = getOkHttpClient().newCall(request); call.enqueue(newCallback() {
@Override
public void onFailure(Call call, IOException e) {
sendFailure();
LoggerUtil.d("onFailure : "+e.getMessage()); } @Override public void onResponse(Call call, Response response) { responseProcess(response); }}); @override public void writeTo(BufferedSink sink) throws IOException {writeOrCountBytes(sink,false);
}
private long writeOrCountBytes(@Nullable BufferedSink sink, boolean countBytes) {
long byteCount = 0L;
Buffer buffer;
if (countBytes) {
buffer = new Buffer();
} else{ buffer = sink.buffer(); } // Write the request body to sink's cachefor (int i = 0, size = encodedNames.size(); i < size; i++) {
if (i > 0) buffer.writeByte('&');
buffer.writeUtf8(encodedNames.get(i));
buffer.writeByte('=');
buffer.writeUtf8(encodedValues.get(i));
}
if (countBytes) {
byteCount = buffer.size();
buffer.clear();
}
return byteCount;
}
Copy the code
Can see request body is by writing sink writeTo method the cache, finally through bufferedRequestBody. Close (); Method to send the request body to the server and release the resource (described in the interceptor logic).
- Get response information
// Http1Codec class: @override public Response.BuilderreadResponseHeaders(boolean expectContinue) throws IOException {
if(state ! = STATE_OPEN_REQUEST_BODY && state ! = STATE_READ_RESPONSE_HEADERS) { throw new IllegalStateException("state: "+ state); } try {// HTTP response StatusLine, such as "HTTP / 1.1 200 OK" StatusLine StatusLine = statusline.parse (readHeaderLine()); Response.builder responseBuilder = new Response.Builder().protocol(statusline.protocol) response.Builder responseBuilder = new Response.Builder().protocol(statusline.protocol) // HTTP protocol version .code(statusline.code) // HTTP response status code.message (statusline.message) // HTTP message :like"OK" or "Not Modified"
.headers(readHeaders()); // Read the response headerif (expectContinue && statusLine.code == HTTP_CONTINUE) {
return null;
} else if (statusLine.code == HTTP_CONTINUE) {
state = STATE_READ_RESPONSE_HEADERS;
return responseBuilder;
}
state = STATE_OPEN_RESPONSE_BODY;
returnresponseBuilder; } catch (EOFException e) {// The server terminates the stream before sending the response. IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
private String readThrows IOException {// PassessourceReads a String line = source. ReadUtf8LineStrict (headerLimit); headerLimit -= line.length();return line;
}
public Headers readHeaders() throws IOException { Headers.Builder headers = new Headers.Builder(); // Read the response header data. The response header and the response body data are separated by a blank line. When the read data is empty, the response header is readfor (String line; (line = readHeaderLine()).length() != 0; ) {
Internal.instance.addLenient(headers, line);
}
return headers.build();
}
Copy the code
We can see that the above code only obtains the response header. Let’s take a look at the code that reads the response body:
// CallServerInterceptor# intercept:Response = response. NewBuilder () / / the above analysis is required here to build the client available RealResponseBody response of the body. The body (httpCodec. OpenResponseBody (response)) .build(); / / Http1Codec class: @Override public ResponseBody openResponseBody(Response response) throws IOException { streamAllocation.eventListener.responseBodyStart(streamAllocation.call); String contentType = response.header("Content-Type"); // Determine whether there is a response body (judging from the response header)if(! HttpHeaders.hasBody(response)) { Sourcesource = newFixedLengthSource(0);
return new RealResponseBody(contentType, 0, Okio.buffer(source)); } // The Source object of the InputStream with the response body is constructed for the corresponding Socket (used to obtain the response body later)if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
Source source = newChunkedSource(response.request().url());
return new RealResponseBody(contentType, -1L, Okio.buffer(source));
}
long contentLength = HttpHeaders.contentLength(response);
if(contentLength ! = -1) { Sourcesource = newFixedLengthSource(contentLength);
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}
return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
}
Copy the code
The openResponseBody sends the Socket InputStream object to OkIo’s Source object (Sink, Source). It then encapsulates the RealResponseBody (which is a subclass of ResponseBody) as the body of the Response. So how do we use this body to get the string from the server? As we mentioned earlier in our analysis of cache interceptors, the last step in getting network data is actually to call the responseBody.string () method:
Public final String String () throws IOException {BufferedSourcesource = source(a); try { Charset charset = Util.bomAwareCharset(source, charset()); //InputStream reads datareturn source.readString(charset);
} finally {
Util.closeQuietly(source); }}Copy the code
Calling source-readstring at this point not only reads from the server but also cache the response body with a CacheInterceptor (see the CacheInterceptor analyzed above). Note that the last call to this method is closeQuietly to close the InputStream InputStream for the current request, so the string() method can only be called once and will report an error if called again, since the InputStream has already been closed.
So far, after a week, I finally analyzed the whole process, but in fact, there are still some parts I didn’t understand deeply, such as: routing, routing selector, connection specification selector, etc., I will leave it to the follow-up study.
OkHttp3 routing, the agent and may refer to: www.jianshu.com/p/5c98999bc…
Reference links:
Blog.csdn.net/chunqiuwei/…
Juejin. Im/post / 684490…
www.jianshu.com/p/5bcdcfe9e…
www.jianshu.com/p/c963617ea…
www.jianshu.com/p/6166d2898…
.
(Note: If there is any error in the elaboration, please correct me. Welcome to give directions and exchange)