preface
When I looked at the Okhttp source code, I found an AsyncTimeout object in the Transmitter class. After looking at the code, this class is used to do some timeout detection. This paper mainly summarizes the author’s research on AsyncTimeout mechanism.
This article is based on OKHTTP 3.14.9
Github: github.com/square/okht…
Gradle dependency: implementation group: ‘com.squareup.okhttp3’, name: ‘okhttp’, version: ‘3.14.9’
AsyncTimeout
The AsyncTimeout class is located in the Okio library and is integrated from Timeout. The class has the following comment:
/**
* This timeout uses a background thread to take action exactly when the timeout occurs. Use this to
* implement timeouts where they aren't supported natively, such as to sockets that are blocked on
* writing.
*
* <p>Subclasses should override {@link #timedOut} to take action when a timeout occurs. This method
* will be invoked by the shared watchdog thread so it should not do any long-running operations.
* Otherwise we risk starving other timeouts from being triggered.
*
* <p>Use {@link #sink} and {@link#source} to apply this timeout to a stream. The returned value * will apply the timeout to each operation on the wrapped stream. * * <p>Callers should call {@link #enter} before doing work that is subject to timeouts, and {@link
* #exit} afterwards. The return value of {@link #exit} indicates whether a timeout was triggered.
* Note that the call to {@link #timedOut} is asynchronous, and may be called after {@link #exit}.
*/
public class AsyncTimeout extends Timeout {
Copy the code
Here are a few useful bits of information:
- This is a tool that uses uniform subthreads to detect timeouts, mainly for classes that do not support timeouts natively.
- It provides a
timedOut()
Method, asA callback that detects a timeout. - Internally supplied
sink()
andsource()
Method can be adapted to stream read/write timeout detection, which can correspond to stream reads/writes on network requests, as discussed later. - provide
enter()
andexit()
As a call to start and end the timer. That is to say,The starting point for execution timing will beenter()
happen.
Timeout
With all this talk about timeout detection, where does the timeout time come from? Take a look at the following definition in Timeout:
/**
* True if {@code deadlineNanoTime} is defined. There is no equivalent to null
* or 0 for {@link System#nanoTime}.
*/
private boolean hasDeadline;
private long deadlineNanoTime;
private long timeoutNanos;
Copy the code
Timeout defines deadlineNanoTime, the deadline time; TimeoutNanos Specifies the timeout period. Specifically, the subclass AsyncTimeout uses timeoutNanos to compute timeouts.
The AsyncTimeout property is defined
Let’s look at some of the property definitions of AsyncTimeout,
private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;
private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
private static final long IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS);
static @Nullable AsyncTimeout head;
private boolean inQueue;
private @Nullable AsyncTimeout next;
private long timeoutAt;
Copy the code
- TimeoutAt: Records the specific time of the timeout. This is calculated by the current time at which the timer started + the above timeoutNanos.
- One of the above codes appears
head
andnext
The definition of theta, in front ofAsyncTimeout
As mentioned in the comment, it uses a uniform child thread for timeout detection. And thishead
andnext
The definition of is oneThe structure of a linked listWhich is used to put eachAsyncTimeout
Objects form a queue to facilitate traversal each time timeout detection is triggered. We’ll talk about that later. - InQueue:
AsyncTimeout
Once the objectAdd to the listIs set to true.
Use of AsyncTimeout in the network request process
Let’s look at the application of AsyncTimeout in the network request process.
-
There is a built-in AsyncTimeout attribute in the Transmitter, which has a timeoutNanos timeout set in the Transmitter constructor. This is the custom callTimeout set when the OkHttpClient is initialized. The timeout here checks for the total time of the entire request.
private final AsyncTimeout timeout = new AsyncTimeout() { @Override protected void timedOut(a) { cancel(); }}; .public Transmitter(OkHttpClient client, Call call) { this.client = client; this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool()); this.call = call; this.eventListener = client.eventListenerFactory().create(call); this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); } Copy the code
CallTimeout: Specifies the timeout period of the entire request process. The default value is 0
-
Called when a connection is established to RealConnection. ConnectSocket (), after establishing a connection will create two Okio BufferedSource and BufferedSink object.
// RealConnection.connectSocket() / / RealConnection. Java 275 rows source = Okio.buffer(Okio.source(rawSocket)); sink = Okio.buffer(Okio.sink(rawSocket)); / / Okio. Java 221 rows public static Source source(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); if (socket.getInputStream() == null) throw new IOException("socket's input stream == null"); AsyncTimeout timeout = timeout(socket); Source source = source(socket.getInputStream(), timeout); return timeout.source(source); } / / Okio. Java 115 rows public static Sink sink(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); if (socket.getOutputStream() == null) throw new IOException("socket's output stream == null"); AsyncTimeout timeout = timeout(socket); Sink sink = sink(socket.getOutputStream(), timeout); return timeout.sink(sink); } / / RealConnection. Java 542 rows ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException { if(http2Connection ! =null) { return new Http2ExchangeCodec(client, this, chain, http2Connection); } else { socket.setSoTimeout(chain.readTimeoutMillis()); source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS); sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS); return new Http1ExchangeCodec(client, this, source, sink); }}Copy the code
When you create a BufferedSource or a BufferedSink object, you need to create an AsyncTimeout, and then you can create a BufferedSource or a BufferedSink object, so this code uses the decorator idea, Then source and sink have timeout ability. Later, when you create an ExchangeCodec, you will set readTimeout and writeTimeout that are customized during OkHttpClient initialization, corresponding to read and write timeouts.
ReadTimeout: indicates the readTimeout period. The default value is 10 seconds.
WriteTimeout: writeTimeout. The default value is 10 seconds.
Ps: Because the socket itself has the detection of connection timeout, so connectTimeout does not need to use the AsyncTimeout scheme.
AsyncTimeout Indicates the timeout detection
Join the queue and start the detection
/ / AsyncTimeout. Java 72 rows
public final void enter(a) {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
Copy the code
The asyncTimeout.Enter () method is shown above, and the timeout detection is officially entered after the call. Focus on the final scheduleTimeout(this, timeoutNanos, hasDeadline); So a static method, remember that AsyncTimeout has a static member variable called head? Let’s look at this method.
/ / AsyncTimeout. Java 83 rows
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
long now = System.nanoTime();
if(timeoutNanos ! =0 && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
// Math.min() is undefined for absolute values, but meaningful for relative ones.
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if(timeoutNanos ! =0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}
// Insert the node in sorted order.
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break; }}}Copy the code
Ps: the node remainingNanos (now); The interval between the current time and the timeout time is calculated.
The method mainly does three things:
- A static variable
head
If the value is left blank, global detection is not enabled. You need to enable the detection threadWatchdog
. Ps:head
It is actually just a sign of the start of a queue, and does not itself belong to a timeout detection. - Calculates the timeout period of the current node that is added to the detection queue
timeoutAt
- The global detection queue is carried outreorder.In accordance with the
timeoutAt
Order from smallest to largest. Ensure the follow-upWatchdog
Detection mechanism. Because it is a linked list structure, you only need to change the point of the next node. The specific order can be seen in the following figure (use the journey map instead of the timeline to understand) :
Journey Title AsyncTimeout Queue order (unit: NS) Now: 0 timeoutAt: 0 timeoutAt2: 0
Watchdog
The Watchdog is the detection thread of the entire AsyncTimeout timeout detection mechanism.
private static final class Watchdog extends Thread {
Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run(a) {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return; }}// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
Copy the code
- through
awaitTimeout()
Find out which ones are out of timeAsyncTimeout
Object. - if
timedOut
If the object is empty, the detection continues. - if
timedOut
forhead
In the linked listNo detection object exists. Can be linked list. - if
timedOut
Is a valid timeout object, thenCall ittimedOut()
Method to the registered listener. - It is worth mentioning that in the
WatchDog
Is set in the constructor ofsetDaemon(true);
Indicates that it is a daemon. See about daemonsSetDaemon,. And the good thing about this is,It can be closed by relying on the thread that started it to close. - because
WatchDog
isDetection of natureOf the thread, sotimedOut()
In the wayTime-consuming operations should not be performed, so as not to affect the follow-up testing.
awaitTimeout()
Find the expired AsyncTimeout in the Watchdog thread by calling awaitTimeout().
static @Nullable AsyncTimeout awaitTimeout(a) throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// The head of the queue has timed out. Remove it.
head.next = node.next;
node.next = null;
return node;
}
Copy the code
From the code, awaitTimeout() will only detect head.next.
- if
head.next
fornull
, will enter first60sTimeout waiting status.- If there is still none, the timeout detection queue is considered empty,
Watchdog
The thread will end. It won’t be on until the next time there’s a new test. - If it exists, it returns
null
, the external to the next loop.
- If there is still none, the timeout detection queue is considered empty,
- if
head.next
If the timeout period is earlier than the current time, theThe difference between the current time and the timeout timeTimeout waiting state. Wake up will also returnnull
, the external to the next loop. awaitTimeout()
Java multithreaded wait(), notify/notifyAll() mechanism is used. The abovescheduleTimeout(this, timeoutNanos, hasDeadline);
Methods in theThe new node is inserted into the linked listWill call afterAsyncTimeout.class.notify();
. The purpose of this is toAllocates resources without a timeout.
Wait, notify, and notifyAll
Exit inspection
When the process finishes, you need to call exit() to move the bound AsyncTimeout object off the list. If it is not found in the list, it is time out…
/** Returns true if the timeout occurred. */
public final boolean exit(a) {
if(! inQueue)return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
/** Returns true if the timeout occurred. */
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// Remove the node from the linked list.
for(AsyncTimeout prev = head; prev ! =null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false; }}// The node wasn't found in the linked list: it must have timed out!
return true;
}
Copy the code