Retrofit, OkHttp, Okio Square Android platform network layer three plate axe source code learning
Based on OKio version 1.13.0Okio making address

Introduction to the

Okio is designed to replace java.io and Java.nio’s complex API call framework to make data access, storage, and processing easier.

Okio is the cornerstone of OkHttp, and OkHttp is the cornerstone of Retrofit. The three frameworks are collectively called Square Android Network Layer Three axes.

use

Reference OkioTest

... @Test public void readWriteFile() throws Exception { File file = temporaryFolder.newFile(); BufferedSink sink = Okio.buffer(Okio.sink(file)); sink.writeUtf8("Hello, java.io file!" ); sink.close(); assertTrue(file.exists()); assertEquals(20, file.length()); BufferedSource source = Okio.buffer(Okio.source(file)); assertEquals("Hello, java.io file!" , source.readUtf8()); source.close(); }...Copy the code

According to OkioTest, Okio mainly includes two types of operations: “read” and “write”. The objects that can be operated on are:

File 2. File Path description class 3. Socket 4. OutputStream 5Copy the code

Okio writes an object with sink(XXX) and reads an object with source(XXX).

Overview of Okio read and write framework

Okio.buffer() gets the BufferedSource and BufferedSink for reading and writing

BufferedSink sink = Okio.buffer(Okio.sink(file));
BufferedSource source = Okio.buffer(Okio.source(file));
Copy the code

Take a closer look at the buffer() method

public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
}

public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
}
Copy the code
RealBufferedSink

Look at the RealBufferedSink class

final class RealBufferedSink implements BufferedSink {
    public final Buffer buffer = new Buffer();
    public final Sink sink;
    boolean closed;

    RealBufferedSink(Sink sink) {
    if (sink == null) throw new NullPointerException("sink == null");
    this.sink = sink;
    }
    ……
}
Copy the code

RealBufferedSink implements the BufferedSink interface, BufferedSink implements the Sink interface.

Sink implements Closeable, Flushable interface.

1. The Flushable interface defines only one method, Flush (), that writes data from the cache to the underlying input stream. 2. The Closeable interface defines close(), which closes the stream. 3. The Sink interface defines write(Buffer source, long byteCount) and timeout() for writing data and setting timeout. 4. The BufferedSink interface defines a bunch of wirteXXX(...) Used to manipulate different types of data writes.Copy the code

Look at the member variable of RealBufferedSink

public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
Copy the code

There is a Buffer object, a Sink object passed in from the constructor, and a Boolean flag to record whether the stream is closed or not.

RealBufferedSink’s various wirteXXX (…) Most of the following

@ Override public BufferedSink writeXXX (...) throws IOException { if (closed) throw new IllegalStateException("closed"); Buffer. WriteXXX (...) ; return emitCompleteSegments(); }Copy the code

As you can see, the method of writing data is implemented by buffer objects. The emitXXX() and Flush () methods call sink.write(buffer, byteCount) and sink.flush().

RealBufferedSource

RealBufferedSource is similar to RealBufferedSink

final class RealBufferedSource implements BufferedSource { public final Buffer buffer = new Buffer(); public final Source source; boolean closed; RealBufferedSource(Source source) { if (source == null) throw new NullPointerException("source == null"); this.source = source; }}Copy the code

RealBufferedSource implements the BufferedSource interface, and BufferedSource implements the Source interface.

The Source interface also implements Closeable.

1. Source integrates the Closeable interface, indicating that Source provides a close method to close the stream reading data. 2. Source defines a read(Buffer sink, long byteCount) to read data and a timeout() method to set the read timeout. 3. BufferedSource defines a lot of readXXX(...) Used to read data.Copy the code

The readXXX RealBufferedSource (…) Method and writeXXX(…) in RealBufferedSink Similarly, the member variable buffer is used to read the data together with the Source object passed in when the object is constructed.

The structure of the whole read-write framework is summarized as follows:

Uniform processing of all read and write objects

Whether it’s File, Path, InputStream, OutputStream, Socket, in Okio framework just a simple okio.sink (…) Method to obtain the corresponding input stream (RealBufferedSink) and output stream (RealBufferedSource)

Okio also provides an additional parameter for both input and output streams: Timeout, which sets the Timeout setting for reads and writes.

All sink methods are called

private static Sink sink(final OutputStream out, final Timeout timeout) { if (out == null) throw new IllegalArgumentException("out == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0) { timeout.throwIfReached(); Segment head = source.head; int toCopy = (int) Math.min(byteCount, head.limit - head.pos); out.write(head.data, head.pos, toCopy); head.pos += toCopy; byteCount -= toCopy; source.size -= toCopy; if (head.pos == head.limit) { source.head = head.pop(); SegmentPool.recycle(head); } } } @Override public void flush() throws IOException { out.flush(); } @Override public void close() throws IOException { out.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "sink(" + out + ")"; }}; }Copy the code

Okio.sink() creates an instance of an anonymous inner class that implements the write method to write data to an OutputStream(files, paths, and sockets are converted to an OutputStream(), and a timeout is detected each time data is written. (Timeout mechanism to come later)

Okio.source () similarly creates an anonymous inner class instance that implements the Source interface. Implement the read method, which reads data from an InputStream.

Okio uses a Segment object whenever it reads or writes data. A Segment is a *** list structure *** defined by Okio. Each Segment can hold up to 8K bytes.

The universal Buffer

Okio writes data to buffer first

BufferedSink sink = Okio.buffer(Okio.sink(file)); sink.writeUtf8("Hello, java.io file!" ); sink.close();Copy the code

Okio.buffer() returns RealBufferedSink

@Override public BufferedSink writeUtf8(String string) throws IOException {
  if (closed) throw new IllegalStateException("closed");
  buffer.writeUtf8(string);
  return emitCompleteSegments();
}
Copy the code

Check the writeUtf8

@Override public Buffer writeUtf8(String string) {
  return writeUtf8(string, 0, string.length());
}
Copy the code

Then turn the String into a Segment list

@override public Buffer writeUtf8(String String, int beginIndex, int endIndex) {...... // Transcode a UTF-16 Java String to UTF-8 bytes. for (int i = beginIndex; i < endIndex;) { int c = string.charAt(i); if (c < 0x80) { Segment tail = writableSegment(1); byte[] data = tail.data; ... while (i < runLimit) { c = string.charAt(i); if (c >= 0x80) break; data[segmentOffset + i++] = (byte) c; / / 0 XXXXXXX}... } else if (c < 0x800) { // Emit a 11-bit character with 2 bytes. writeByte(c >> 6 | 0xc0); // 110xxxxx writeByte(c & 0x3f | 0x80); // 10xxxxxx i++; }... } return this; }Copy the code

WritableSegment is not to open a new Segment to the end of the queue

Segment writableSegment(int minimumCapacity) { if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException(); if (head == null) { head = SegmentPool.take(); // Acquire a first segment. return head.next = head.prev = head; } Segment tail = head.prev; if (tail.limit + minimumCapacity > Segment.SIZE || ! tail.owner) { tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up. } return tail; }Copy the code

Watching emitCompleteSegments ()

@Override
public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
}
Copy the code

Buffer.com pleteSegmentByteCount () is used to calculate the Segment of the number of bytes cache

public long completeSegmentByteCount() {
    long result = size;
    if (result == 0) return 0;

    // Omit the tail if it's still writable.
    Segment tail = head.prev;
    if (tail.limit < Segment.SIZE && tail.owner) {
        result -= tail.limit - tail.pos;
    }

    return result;
}
Copy the code

Sink. Write (Buffer, byteCount) is the integrated Sink anonymous class passed in earlier.

Summarize the process

Buffer plays a similar role when reading data

Okio timeout mechanism

Okio can add a supermarket setting to its OutputStream and InputStream. A default TimeOut is set when reading or writing files. This method is an empty implementation.

Okio shows us how to set up an asynchronous timeout mechanism when reading or writing a Socket, which closes the stream if the Socket reads or writes time out.

public static Sink sink(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Sink sink = sink(socket.getOutputStream(), timeout);
    return timeout.sink(sink);
}
Copy the code

Look at the timeout (socket)

private static AsyncTimeout timeout(final Socket socket) { return new AsyncTimeout() { @Override protected IOException NewTimeoutException (@nullable IOException cause) {...... } @Override protected void timedOut() { try { socket.close(); }... }}; }Copy the code

As you can see here, an AsyncTimeout anonymous object is returned, mainly in timeOut() to close the Socket.

The sink(socket.getOutputStream(), timeout) method has been seen above

Private static Sink Sink (final OutputStream out, final Timeout Timeout) {...... Return new Sink() {@override public void write(Buffer source, long byteCount) throws IOException {...... while (byteCount > 0) { timeout.throwIfReached(); ... }}... }; }Copy the code

Take a look at the throwIfReached method

public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); }}Copy the code

Determine if a timeout has been reached if hasDeadline is true and deadlineNanoTime is greater than System.nanotime ().

Looking at the timeout. Sink (sink)

public final Sink sink(final Sink sink) { return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0L) { // Count how many bytes to write. This loop guarantees we split on a segment boundary. long toWrite = 0L; for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) { int segmentSize = s.limit - s.pos; toWrite += segmentSize; if (toWrite >= byteCount) { toWrite = byteCount; break; } } // Emit one write. Only this section is subject to the timeout. boolean throwOnTimeout = false; enter(); try { sink.write(source, toWrite); byteCount -= toWrite; throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } } @Override public void flush() throws IOException { boolean throwOnTimeout = false; enter(); try { sink.flush(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public void close() throws IOException { boolean throwOnTimeout = false; enter(); try { sink.close(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public Timeout timeout() { return AsyncTimeout.this; } @Override public String toString() { return "AsyncTimeout.sink(" + sink + ")"; }}; }Copy the code

You can see that timeout.sink(sink) rewraps sink to add an Enter () method to each method of sink

public final void enter() { if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); boolean hasDeadline = hasDeadline(); if (timeoutNanos == 0 && ! hasDeadline) { return; // No timeout and no deadline? Don't bother with the queue. } inQueue = true; scheduleTimeout(this, timeoutNanos, hasDeadline); }Copy the code

Here you see that if the condition is met, the scheduleTimeout method is executed. But by default, the condition is not fulfilled.

Take a look at SocketTimeoutTest

@Test
public void readWithoutTimeout() throws Exception {
    Socket socket = socket(ONE_MB, 0);
    BufferedSource source = Okio.buffer(Okio.source(socket));
    source.timeout().timeout(5000, TimeUnit.MILLISECONDS);
    source.require(ONE_MB);
    socket.close();
}
Copy the code

Source.timeout ().timeout(5000, timeunit.milliseconds)

public Timeout timeout(long timeout, TimeUnit unit) {
    if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);
    if (unit == null) throw new IllegalArgumentException("unit == null");
    this.timeoutNanos = unit.toNanos(timeout);
    return this;
}
Copy the code

You can see that the timeoutNanos is assigned here. ScheduleTimeout (this, timeoutNanos, hasDeadline)

private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { // Start the watchdog thread and create the head node when the first timeout is scheduled. if (head == null) { head = new AsyncTimeout(); new Watchdog().start(); }... // Insert the node in sorted order. long remainingNanos = node.remainingNanos(now); for (AsyncTimeout prev = head; true; prev = prev.next) { if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { node.next = prev.next; prev.next = node; if (prev == head) { AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. } break; }}}Copy the code

A synchronization lock is used and a Watchdog thread is started. And add AsyncTimeout to a task queue based on the timeout timeout.

private static final class Watchdog extends Thread {
    Watchdog() {
        super("Okio Watchdog");
        setDaemon(true);
    }

    public void run() {
        while (true) {
            try {
                AsyncTimeout timedOut;
                synchronized (AsyncTimeout.class) {
                    timedOut = awaitTimeout();

                    // Didn't find a node to interrupt. Try again.
                    if (timedOut == null) continue;

                    // The queue is completely empty. Let this thread exit and let another watchdog thread
                    // get created on the next call to scheduleTimeout().
                    if (timedOut == head) {
                        head = null;
                        return;
                    }
                }

                // Close the timed out node.
                timedOut.timedOut();
            } catch (InterruptedException ignored) {
            }
        }
    }
}
Copy the code

The Watchdog thread always synchronously traverses the task queue and executes awaitTimeout().

static @Nullable
AsyncTimeout awaitTimeout() throws InterruptedException {
    // Get the next eligible node.
    AsyncTimeout node = head.next;

    // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
    if (node == null) {
        long startNanos = System.nanoTime();
        AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
        return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
                ? head  // The idle timeout elapsed.
                : null; // The situation has changed.
    }

    long waitNanos = node.remainingNanos(System.nanoTime());

    // The head of the queue hasn't timed out yet. Await that.
    if (waitNanos > 0) {
        // Waiting is made complicated by the fact that we work in nanoseconds,
        // but the API wants (millis, nanos) in two arguments.
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
        return null;
    }

    // The head of the queue has timed out. Remove it.
    head.next = node.next;
    node.next = null;
    return node;
}
Copy the code

} Wait (waitMillis, (int) waitNanos) is blocked with the asyncTimeout.class.wait (waitMillis, (int) waitNanos) method.

Note that the wait (timeout) will be AsyncTimeout. Class. Notify ().Copy the code

If the task queue is empty, asyncTimeout.class. wait(IDLE_TIMEOUT_MILLIS) for one minute. Then determine if there are any new tasks.

The resources

Dismantling wheel series: Dismantling Okio

Okio source code analysis

Okio making address