Hello, I’m N0tExpectErr0r, an Android developer who loves technology

My personal blog: blog.n0tExpecterr0r.cn

OkHttp uses a library called Okio to generate Input/Output streams for I/O. It uses a library called Okio to generate Input/Output streams for I/O. Let’s analyze its source code.

Okio has two very important interfaces, Sink and Source, which inherit from Closeable. Sink corresponds to the OutputStream we used and Source corresponds to the InputStream we used.

The entry point for Okio is the Okio class, which is a factory class that creates Sink, Source, and other objects through its internal static methods.

Sink

Sink is really just an interface, so let’s look at the methods in Sink:

public interface Sink extends Closeable, Flushable {
  void write(Buffer source, long byteCount) throws IOException;

  @Override void flush() throws IOException;

  Timeout timeout();

  @Override void close() throws IOException;
}
Copy the code

It contains write, flush, timeout, and close methods. We can obtain a sink using the okio. sink method based on OutputStream:

private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");
    return new Sink() {
        @Override public void write(Buffer source, long byteCount) throws IOException {
            checkOffsetAndCount(source.size, 0, byteCount);
            while (byteCount > 0) {
                timeout.throwIfReached();
                Segment head = source.head;
                int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
                out.write(head.data, head.pos, toCopy);
                head.pos += toCopy;
                byteCount -= toCopy;
                source.size -= toCopy;
                if (head.pos == head.limit) {
                    source.head = head.pop();
                    SegmentPool.recycle(head);
                }
            }
        }
        @Override public void flush() throws IOException {
            out.flush();
        }
        @Override public void close() throws IOException {
            out.close();
        }
        @Override public Timeout timeout() {
            return timeout;
        }
        @Override public String toString() {
            return "sink(" + out + ")"; }}; }Copy the code

An anonymous inner class of Sink is built and implemented and returned, mainly implementing its write methods, with the rest simply called to the corresponding methods of OutputStream.

In the write method, some state checking is done first, and it looks like Timeout handling is implemented in the Timeout class, which we’ll examine later. Then it got a Segment from the Buffer, took out data from it, calculated the amount of writing and wrote it to the OutputStream corresponding to Sink.

The segments are connected in a form similar to a linked list. It appears that the Buffer maintains a list of segments that represent one of the data segments. Here the Buffer is segmented and written to the OutputStream.

Finally, recycle the current Segment using the SegmentPool.recycle method.

From the above code we can get the following information:

  1. BufferIt’s an abstraction of a piece of data in memory that passes throughSegmentStored as linked lists for storing data.
  2. SegmentData is stored in segments. Therefore, data is obtained in segmentsSegmentGet data from.
  3. There is aSegmentPoolPools are used to implementSegmentReuse.
  4. SegmentThe use of a linked list is somewhat similar.

Source

Source, like Sink, is just an interface:

public interface Source extends Closeable {
  long read(Buffer sink, long byteCount) throws IOException;

  Timeout timeout();

  @Override void close() throws IOException;
}
Copy the code

In Okio, you can create a source from InputStream using the source method:

private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) {
        throw new IllegalArgumentException("in == null");
    } else if (timeout == null) {
        throw new IllegalArgumentException("timeout == null");
    } else {
        return new Source() {
            public long read(Buffer sink, long byteCount) throws IOException {
                if (byteCount < 0L) {
                    throw new IllegalArgumentException("byteCount < 0: " + byteCount);
                } else if (byteCount == 0L) {
                    return 0L;
                } else {
                    try {
                        timeout.throwIfReached();
                        Segment tail = sink.writableSegment(1);
                        int maxToCopy = (int)Math.min(byteCount, (long)(8192 - tail.limit));
                        int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
                        if (bytesRead == -1) {
                            return -1L;
                        } else {
                            tail.limit += bytesRead;
                            sink.size += (long)bytesRead;
                            return (long)bytesRead;
                        }
                    } catch (AssertionError var7) {
                        if (Okio.isAndroidGetsocknameError(var7)) {
                            throw new IOException(var7);
                        } else {
                            throw var7;
                        }
                    }
                }
            }
            public void close() throws IOException {
                in.close();
            }
            public Timeout timeout() {
                return timeout;
            }
            public String toString() {
                return "source(" + in + ")"; }}; }}Copy the code

Here we build and implement an anonymous inner class of Source and return it, which should be the default implementation of Source.

Except for the read method, it simply calls the corresponding InputStream method. Let’s focus on the read method:

First, it does some state detection, and then gets a writable Segment via sink.writeableSegment. Data is then read from the InputStream and written to the Segment. The read size is limited to 8192 bytes.

Buffer

Buffer plays a very important role in both Sink and Source. It corresponds to the data stored in our memory and abstracts these data. Let’s analyze the Buffer below:

The Buffer is an abstraction of the data in our memory, but the data is not actually stored in the Buffer. It maintains a circular linked list of segments internally, where the data is actually stored. It divides the data into segments and joins them using linked lists. The Buffer encapsulates many I/O operations that process data in segments.

Why use Segment instead of storing the entire data? Since the data is stored in segments, some of these segments may be the same as the data in another Buffer, which shows the flexibility of the Segment. We do not need to copy the data into another Buffer. Just point its Segment to the Segment of the repeating Segment. At the same time, for example, transferring data from Source to Sink, there is no need to copy, just point the linked list to our Segment, which greatly improves efficiency and saves memory space.

Segment

The Segment is a two-way circular list with the following parameters:

Static final int SIZE = 8192; static final int SIZE = 8192; Static final int SHARE_MINIMUM = 1024; Final byte[] data; // The next starting position for the user to read data int pos; // The next starting position that can be written to intlimit; // Whether data has been shared Boolean shared; // Whether this byte array belongs to this Segment Boolean owner; // Segment next; // Segment prev; / /... }Copy the code

(pos = start position for next read and limit = start position for next write)

The data in the read area will not be used in the future. The data in the written area is waiting to be read, while the data in the free area has not been filled in and can be written.

A mechanism of sharing

The Segment also supports data sharing. The shared and owner fields indicate whether the data has been shared and whether it belongs to the current Segment, respectively. It also provides two copy modes: sharedCopy and unsharedCopy.

UnsharedCopy returns a new Segment and copies the data array to the new Segment using the clone method:

/** Returns a new segment that its own private copy of the underlying byte array. */
final Segment unsharedCopy() {
    return new Segment(data.clone(), pos, limit.false.true);
}
Copy the code

SharedCopy also returns a new Segment, but its data array is shared with the new Segment:

/**
 * Returns a new segment that shares the underlying byte array with this. Adjusting pos and limit
 * are safe but writes are forbidden. This also marks the current segment as shared, which
 * prevents it from being pooled.
 */
final Segment sharedCopy() {
    shared = true;
    return new Segment(data, pos, limit.true.false);
}
Copy the code

At the same time, we can see from the comments that when data is shared, write operation is forbidden for security. Also mark copied segments as shared to prevent them from being recycled.

This is also designed to reduce copying and thus increase I/O efficiency.

Merger and segmentation

Segments also support merging with previous segments and splitting themselves, giving users more flexibility.

The merge operation will merge the data of the current Segment and its predecessor when neither Segment exceeds half of its size, and recycle the current Segment, thus increasing the memory utilization efficiency:

/**
 * Call this when the tail and its predecessor may both be less than half
 * full. This will copy data so that segments can be recycled.
 */
public final void compact() {
    if(prev == this) throw new IllegalStateException(); // The data on the previous node is not writable (shared data), cancel the mergeif(! prev.owner)return; // Calculate the amount of space left between the current node and the previous node int byteCount =limit- pos; int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos); // Do not merge a when there is not enough write spaceif (byteCount > availableByteCount) return; WriteTo (prev, byteCount); // writeTo(prev, byteCount); // Remove the current node from the list and recycle pop(); SegmentPool.recycle(this); }Copy the code

[pos, pos+byteCount, limit, pos+byteCount]

public final Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;
    // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonlyand // may lead to long chains of short segments. // To balance these goals we only share segments when the copy will be  large.if(byteCount >= SHARE_MINIMUM) {// If the copy size is greater than 1024 bytes, use the shared prefix = sharedCopy(); }else{// Copy less than 1024 bytes using arrayCopy prefix = SegmentPool.take(); System.arraycopy(data, pos, prefix.data, 0, byteCount); } / / tolimitLimit = prefix.pos + byteCount; pos += byteCount; prev.push(prefix);return prefix;
}
Copy the code

If the data Segment is larger than 1024 bytes, the data is shared to the previous node. Both segments share the same data array; otherwise, a new Segment is constructed by copying.

Why do we need to treat data sizes differently here? First, to avoid the performance overhead of copying data, we added the ability to share segments. However, because the shared data is read-only, if there are many short data segments, the performance will not be very good. Therefore, only when the amount of data to be copied is large, the data Segment will be shared.

After that, pos and limit are set for both. Since the part before pos and the part after limit do not affect our normal reading and writing, we can not care about their current state, there is no need to carry out some operations such as filling zero on them.

SegmentPool

Okio also uses a SegmentPool to implement a pool of objects to avoid the performance overhead of frequently creating and destroying segments.

The implementation of a SegmentPool is very simple. It maintains a single linked list internally for storing segments that are recycled and stored in the pool, and its maximum capacity is limited to 64K.

When a Segment is needed, we can use the take method to get a reclaimed object:

static Segment take() {
    synchronized (SegmentPool.class) {
        if(next ! = null) { Segment result = next; next = result.next; result.next = null; byteCount -= Segment.SIZE;returnresult; }}return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
Copy the code

It finds a free Segment in the singly linked list, initializes it and returns it. If there are no objects in the current list, a new Segment is created.

When a Segment is used, you can remove it from the linked list using the pop operation, and then recycle it by calling the SegmentPool.recycle method:

static void recycle(Segment segment) {
    if(segment.next ! = null || segment.prev ! = null) throw new IllegalArgumentException();if (segment.shared) return; // This segment cannot be recycled.
    synchronized (SegmentPool.class) {
    	if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full. byteCount += Segment.SIZE; segment.next = next; segment.pos = segment.limit = 0; next = segment; }}Copy the code

When a Segment is reclaimed, read-only segments are not reclaimed. If the number of segments exceeds the upper limit, the Segment is not reclaimed.

Data transfer

One big difference between Okio and java.io is the transfer of Buffer data, which can be done through its copyTo method. It is called migration because it is a significant data improvement over replication. For example, we can see how data transfer between two buffers takes place:

public final Buffer copyTo(Buffer out, long offset, long byteCount) {
    if (out == null) throw new IllegalArgumentException("out == null");
    checkOffsetAndCount(size, offset, byteCount);
    if (byteCount == 0) returnthis; out.size += byteCount; // skip Segment s = head;for (; offset >= (s.limit - s.pos); s = s.next) {
    	offset -= (s.limit - s.pos);
    }
    for(; byteCount > 0; Segment copy = s.sharedCopy(); copy.pos += offset; copy.limit = Math.min(copy.pos + (int) byteCount, copy.limit); / / insert the Segmentif (out.head == null) {
          out.head = copy.next = copy.prev = copy;
        } else {
          out.head.prev.push(copy);
        }
        byteCount -= copy.limit - copy.pos;
        offset = 0;
    }
    return this;
}
Copy the code

As you can see from the above code, this process is actually implemented through Segment sharing, so there is no need to copy, greatly improving the efficiency of data transfer.

BufferedSource

We can wrap a plain Source with the okio.buffer method and get a BufferSource with buffering capability. This is an interface that defines a series of reading methods:

public interface BufferedSource extends Source, ReadableByteChannel {
    @Deprecated
    Buffer buffer();

    Buffer getBuffer();

    boolean exhausted() throws IOException;

    void require(long byteCount) throws IOException;

    boolean request(long byteCount) throws IOException;

    byte readByte() throws IOException;

    short readShort() throws IOException;

    short readShortLe() throws IOException; / /... Long indexOf(Byte B, Long fromIndex) throws IOException; long indexOf(byte b, long fromIndex, long toIndex) throws IOException; long indexOf(ByteString bytes) throws IOException; long indexOf(ByteString bytes, long fromIndex) throws IOException; long indexOfElement(ByteString targetBytes) throws IOException; long indexOfElement(ByteString targetBytes, long fromIndex) throws IOException; boolean rangeEquals(long offset, ByteString bytes) throws IOException; boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount) throws IOException; BufferedSource peek(); InputStream inputStream(); }Copy the code

It has two main implementation classes: Buffer and RealBufferedSource. The RealBufferedSource is clearly a class wrapped by the Buffer method. Buffer actually implements the BufferSource as well, using a series of read methods to read the corresponding data from the Segment. Our RealBufferedSource is a wrapper class for Source and maintains a Buffer to improve the efficiency of the Input. Let’s first analyze the thinking and then discuss why this makes Input more efficient.

We can first see the readByteArray method for RealBufferedSource:

@Override public byte[] readByteArray(long byteCount) throws IOException {
    require(byteCount);
    return buffer.readByteArray(byteCount);
}
Copy the code

The require method is called first, and then the data is read from buffer.

We see the require method:

@Override public void require(long byteCount) throws IOException {
    if(! request(byteCount)) throw new EOFException(); }Copy the code

It actually switches to the request method:

@Override public boolean request(long byteCount) throws IOException {
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");
    while (buffer.size < byteCount) {
    	if (source.read(buffer, Segment.SIZE) == -1) return false;
    }
    return true;
}
Copy the code

The Segment.SIZE (8192 bytes) is read from the Segment. So what it reads is byteCount rounded up by 8192 bytes. Why doesn’t it just read byteCount bytes, the full 8192 bytes?

This is a kind of prefetching thought, because I/O operations tend to be very frequent, if conducted a read, read it probably will be the next, so we put it in advance may read part read out together next time, so the next time you read, don’t need to request the system in order to get the data again, You can get it directly from our buffer. This is why adding buffers improves I/O efficiency.

You might also ask, why does this improve I/O efficiency? Isn’t it the same amount of reading? This involves some knowledge of the operating system. In modern operating systems, our programs often run in the user mode, and the user mode does not have the I/O permission, so it is often to the operating system to initiate a request, switch to the kernel mode, and then I/O, and then return to the user mode after completion. This switching between user and kernel modes is actually time-consuming and involves copying. Therefore, using the above buffer can effectively reduce our system I/O calls and speed up our efficiency.

BufferedSink

We can also get a BufferedSink with buffer buffering capability by wrapping an ordinary Sink with okio. buffer method. BufferedSink is also an interface that internally defines a set of methods for writing:

public interface BufferedSink extends Sink, WritableByteChannel {
    Buffer buffer();

    BufferedSink write(ByteString byteString) throws IOException;

    BufferedSink write(byte[] source) throws IOException;

    BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;

    long writeAll(Source source) throws IOException;

    BufferedSink write(Source source, long byteCount) throws IOException; / /... Some encapsulates write @override void Flush () throws IOException; BufferedSink emit() throws IOException; BufferedSink emitCompleteSegments() throws IOException; OutputStream outputStream(); }Copy the code

BufferedSink also has two implementation classes: Buffer and RealBufferedSink. We can see RealBufferedSink first. RealBufferedSink is a wrapper class of Sink and maintains a Buffer internally.

write

Let’s first look at its write method:

@Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source, offset, byteCount);
    return emitCompleteSegments();
}
Copy the code

Here is an example of a simple method to write byte[], which first writes data to buffer and then calls the method emitCompleteSegments. It can be seen that sink is not actually written here, so where is it written? Let’s look at what’s done in the emitCompleteSegments method:

@Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
}
Copy the code

Here are the first to call buffer.com pleteSegmentByteCount method for buffer has been written but not being part of the number of bytes read (include only has been filled in the Segment). Then call sink.write to write it to sink.

In fact, it is very strange here. The function of buffer is to optimize it by caching, but this method writes data to buffer and then immediately writes it to sink. This will cause performance loss compared to writing directly to sink. Why do we do that here?

This paragraph is also strange when I see it here, but considering the overall design of Okio, Buffer should be regarded as a unified data transfer station, and the optimization of reading and writing should be carried out uniformly in Buffer. Therefore, considering the overall consistency, The real Buffer sink is also written by Buffer relay, which should be a compromise. The advantage of using buffers is that a piece of data can be used for both reading and writing.

flush

RealBufferedSink also supports flush, which writes all data in the buffer to sink:

@Override public void flush() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    if (buffer.size > 0) {
      sink.write(buffer, buffer.size);
    }
    sink.flush();
}
Copy the code

emit

RealBufferedSink also has emit function, which are emitCompleteSegments method and emit method respectively. The former is to write all the data that has been written but not read from the filled Segment to sink. Write to sink (similar to flush) all written and unread data in buffer:

@Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
}

@Override public BufferedSink emit() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.size();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
}
Copy the code

Timeout Timeout mechanism

Okio realizes the Timeout mechanism between Sink and Source through Timeout class. It judges the Timeout when Sink writes and Source reads. If the Timeout occurs, write and other operations will be interrupted. A normal Timeout is used for wrapping a normal InputStream/OutputStream, and AsyncTimeout is used for wrapping a Socket.

Timeout

We first study the common Timeout. There are mainly two values in Timeout, Timeout and deadline, which respectively represent the maximum waiting time of wait and the Timeout time to complete a certain work.

deadline

For deadline, we can set the deadline method:

/** Set a deadline of now plus {@code duration} time. */
public final Timeout deadline(long duration, TimeUnit unit) {
    if (duration <= 0) throw new IllegalArgumentException("duration <= 0: " + duration);
    if (unit == null) throw new IllegalArgumentException("unit == null");
    return deadlineNanoTime(System.nanoTime() + unit.toNanos(duration));
}
Copy the code

After that, the Timeout throwIfReached method (such as Sink’s write method) needs to be called every time the Timeout needs to be checked:

public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
      Thread.currentThread().interrupt(); // Retain interrupted status.
      throw new InterruptedIOException("interrupted");
    }
    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached"); }}Copy the code

In this case, the time is simply checked. If the specified time is reached, an exception is thrown and subsequent operations are interrupted.

timeout

Timeout also implements a Timeout mechanism for wait for monitor. You can wait for the monitor to be notified by the waitUntilNotified method. If the waiting process exceeds the Timeout period or the current thread is interrupted, An exception is thrown to avoid waiting forever. Also, this method needs to be called in a synchronized block to be thread-safe.

After we construct a Timeout, we can use the Timeout method to set its wait Timeout:

public Timeout timeout(long timeout, TimeUnit unit) {
    if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);
    if (unit == null) throw new IllegalArgumentException("unit == null");
    this.timeoutNanos = unit.toNanos(timeout);
    return this;
}
Copy the code

The main thing here is to set the timeoutNanos to the corresponding value. Then we see the waitUntilNotified method:

/**
 * Waits on {@code monitor} until it is notified. Throws {@link InterruptedIOException} if either
 * the thread is interrupted or if this timeout elapses before {@code monitor} is notified. The
 * caller must be synchronized on {@code monitor}.
 */
public final void waitUntilNotified(Object monitor) throws InterruptedIOException { try { boolean hasDeadline = hasDeadline(); long timeoutNanos = timeoutNanos(); // If there is no timeout set, call monitor directlywaitmethodsif(! hasDeadline && timeoutNanos == 0L) { monitor.wait(); // There is no timeout:wait forever.
            return; } // calculate what we wantwaitThe long timewaitNanos; long start = System.nanoTime(); // Wait for the smallest timeout and deadlineif(hasDeadline && timeoutNanos ! = 0) { long deadlineNanos = deadlineNanoTime() - start;waitNanos = Math.min(timeoutNanos, deadlineNanos);
        } else if (hasDeadline) {
            waitNanos = deadlineNanoTime() - start;
        } else {
            waitNanos = timeoutNanos; } / /waitCorresponding time long elapsedNanos = 0L;if (waitNanos > 0L) {
            long waitMillis = waitNanos / 1000000L;
            monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L)); elapsedNanos = System.nanoTime() - start; } // If notify is not present, an exception is thrownif (elapsedNanos >= waitNanos) {
            throw new InterruptedIOException("timeout"); }} Catch (InterruptedException e) {thread.currentThread ().interrupt(); throw new InterruptedIOException("interrupted"); }}Copy the code

AsyncTimeout

AsyncTimeout is a subclass of Timeout, so let’s look at how AsyncTimeout handles timeouts in sockets.

The AsyncTimeout queue contains a head and a next reference.

Static @nullable AsyncTimeout head; // Whether the current node is in the queue private BooleaninQueue; Private @nullable AsyncTimeout next;Copy the code

This feels a bit like MessageQueue, guessing that AsyncTimeout is stored in the queue in order according to the time of the timeout.

And as you can see from the AsyncTimeout JavaDoc, it requires the consumer to call the Enter method at the beginning of an asynchronous event and the exit method at the end. It also opens a thread in the back to periodically check for timeouts.

enter & exit

Let’s look at the Enter method first:

public final void enter() {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); Boolean hasDeadline = hasDeadlineif(timeoutNanos == 0 && ! hasDeadline) {return;
    }
    inQueue = true; ScheduleTimeout (this, timeoutNanos, hasDeadline); }Copy the code

Set its inQueue to true and then call scheduleTimeout to check the timeout. Let’s leave the implementation of scheduleTimeout out for now.

Then we see the exit method:

/** Returns true if the timeout occurred. */
public final boolean exit() {
    if (!inQueue) return false;
    inQueue = false;
    return cancelScheduledTimeout(this);
}
Copy the code

This is also very simple, setting inQueue to false and calling cancelScheduledTimeout to stop the previous timed check thread.

scheduleTimeout

Let’s take a look at the implementation of this timing check, starting with the scheduleTimeout method:

private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, Boolean hasDeadline) {// If there are no nodes already in the queue, construct a header node and start Watchdogif(head == null) { head = new AsyncTimeout(); new Watchdog().start(); } long now = System.nanoTime(); // Calculate the specific timeout duration, mainly taking the minimum value of timeout and deadlineif(timeoutNanos ! = 0 && hasDeadline) { // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around, // Math.min() is undefinedfor absolute values, but meaningful for relative ones.
    	node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if(timeoutNanos ! = 0) { node.timeoutAt = now + timeoutNanos; }else if (hasDeadline) {
     	node.timeoutAt = node.deadlineNanoTime();
    } else{ throw new AssertionError(); } long remainingNanos = node.remainingNanos(now); // Inserts into the queue in ascending order of remaining timefor (AsyncTimeout prev = head; true; prev = prev.next) {
        if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
            node.next = prev.next;
            prev.next = node;
            if(prev = = head) {/ / insert the queue head, notify AsyncTimeout. Class. The notify (); }break; }}}Copy the code

The above logic is mainly divided into the following steps:

  1. If there are no nodes in the queue, construct a head node and startWatchdog.WatchdogIs aThreadSubclass, which is our timed scanning thread.
  2. Calculate theTimeoutThe timeout period oftimeoutdeadlineThe minimum
  3. Will thetimeoutInserts into the queue in ascending order of remaining time
  4. If the insertion position is the head of the queue, proceednotify(The intent is not there yet, so we can look at it later.)

cancelScheduledTimeout

Now let’s see what cancelScheduledTimeout does:

private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
    // Remove the node from the linked list.
    for(AsyncTimeout prev = head; prev ! = null; prev = prev.next) {if (prev.next == node) {
            prev.next = node.next;
            node.next = null;
            return false;
        }
    }
    // The node wasn t found in the linked list: it must have timed out!
    return true;
}
Copy the code

This is simple: remove AsyncTimeout from the queue. Return true to indicate that the Timeout has occurred, or false to indicate that the Timeout has not occurred and the Timeout is removed. This return value is also reflected in the return value of our exit method.

Watchdog

Now let’s look at how the Watchdog actually checks for timeouts:

private static final class Watchdog extends Thread {
    public void run() {
    	while (true) { try { AsyncTimeout timedOut; synchronized (AsyncTimeout.class) { timedOut = awaitTimeout(); // Can't find the node to interrupt, keep lookingif (timedOut == null) continue; // Queue empty, stop threadif (timedOut == head) {
    	    	    	head = null;
    	    	    	return; } // call the timeout method to notify timedout.timedOut (); } catch (InterruptedException ignored) { } } } }Copy the code

Wachdog repeatedly calls the awaitTimeout method to try to get a Timeout that can be stopped, and then calls its Timeout method to notify the outside world that it has timed out.

awaitTimeout

We can look at what awaitTimeout does:

static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException { AsyncTimeout node = head.next; // The queue is empty,waitUntil a new node is addedif (node == null) {
    	long startNanos = System.nanoTime();
    	AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
    	return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
    	    ? head  // The idle timeout elapsed.
    	    : null; // The situation has changed.
    }
    long waitNanos = node.remainingNanos(System.nanoTime()); // Calculate the need for this nodewaitThe time ofif (waitNanos > 0) {
    	// waitThe corresponding time longwaitMillis = waitNanos / 1000000L;
    	waitNanos -= (waitMillis * 1000000L);
    	AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
    	returnnull; } / /waitHead. Next = node.next; node.next = null;return node;
}
Copy the code

There are mainly the following steps:

  1. If the queue is empty,waitUntil a new node joins the queue
  2. Compute Node requirementswaitThe time andwaitCorresponding to the time
  3. When the time is up, the node times out and is removed from the queue

In this code, we can see why we need to notify the list header when adding a node. There are two main purposes:

  1. If there are no elements in the queue, passnotifyNotifies that a new element has been queued.
  2. Because it is inserted in the head, it requires less waiting time than the following node, so it needs to stop the previous onewaitTo calculate the newTimeoutThe time required to wait and timeout it.

This is similar to MessageQueue in Android, and we can learn a few more.

sink & source

AsyncTimeout also implements sink and source methods to realize sink and source supporting AsyncTimeout timeout mechanism, mainly by calling Enter and exit respectively before and after each operation. Take Sink as an example:

public final Sink sink(final Sink sink) {
  return new Sink() {
    @Override public void write(Buffer source, long byteCount) throws IOException {
      checkOffsetAndCount(source.size, 0, byteCount);
      while(byteCount > 0L) { // Count how many bytes to write. This loop guarantees we split on a segment boundary. long toWrite =  0L;for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
          int segmentSize = s.limit - s.pos;
          toWrite += segmentSize;
          if (toWrite >= byteCount) {
            toWrite = byteCount;
            break;
          }
        }
        // Emit one write. Only this section is subject to the timeout.
        boolean throwOnTimeout = false;
        enter();
        try {
          sink.write(source, toWrite);
          byteCount -= toWrite;
          throwOnTimeout = true;
        } catch (IOException e) {
          throw exit(e);
        } finally {
          exit(throwOnTimeout);
        }
      }
    }
    
    @Override public void flush() throws IOException {
      boolean throwOnTimeout = false;
      enter();
      try {
        sink.flush();
        throwOnTimeout = true;
      } catch (IOException e) {
        throw exit(e);
      } finally {
        exit(throwOnTimeout);
      }
    }
    
    @Override public void close() throws IOException {
      boolean throwOnTimeout = false;
      enter();
      try {
        sink.close();
        throwOnTimeout = true;
      } catch (IOException e) {
        throw exit(e);
      } finally {
        exit(throwOnTimeout);
      }
    }
    
    @Override public Timeout timeout() {
      return AsyncTimeout.this;
    }
    
    @Override public String toString() {
      return "AsyncTimeout.sink(" + sink + ")"; }}Copy the code

It’s pretty simple, so I won’t explain it too much here.

conclusion

Okio is a set of excellent I/O library optimized based on Java.io. By introducing Segment mechanism, it greatly reduces the cost of data migration, reduces the number of copies, and simplifies the tedious system of Java.io, making the whole library easier to use. Okio also implemented a lot of other functions of the Source and Sink, interested readers can browse the Source code. At the same time, you can go to the previous source code parsing OkHttp, OkHttp uses Okio to write and read Socket data.