Retrofit, OkHttp, Okio Square Android platform network layer three plate axe source code learning
Based on OKio version 1.13.0Okio making address
Introduction to the
Okio is designed to replace java.io and Java.nio’s complex API call framework to make data access, storage, and processing easier.
Okio is the cornerstone of OkHttp, and OkHttp is the cornerstone of Retrofit. The three frameworks are collectively called Square Android Network Layer Three axes.
use
Reference OkioTest
... @Test public void readWriteFile() throws Exception { File file = temporaryFolder.newFile(); BufferedSink sink = Okio.buffer(Okio.sink(file)); sink.writeUtf8("Hello, java.io file!" ); sink.close(); assertTrue(file.exists()); assertEquals(20, file.length()); BufferedSource source = Okio.buffer(Okio.source(file)); assertEquals("Hello, java.io file!" , source.readUtf8()); source.close(); }...Copy the code
According to OkioTest, Okio mainly includes two types of operations: “read” and “write”. The objects that can be operated on are:
File 2. File Path description class 3. Socket 4. OutputStream 5Copy the code
Okio writes an object with sink(XXX) and reads an object with source(XXX).
Overview of Okio read and write framework
Okio.buffer() gets the BufferedSource and BufferedSink for reading and writing
BufferedSink sink = Okio.buffer(Okio.sink(file));
BufferedSource source = Okio.buffer(Okio.source(file));
Copy the code
Take a closer look at the buffer() method
public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
Copy the code
RealBufferedSink
Look at the RealBufferedSink class
final class RealBufferedSink implements BufferedSink {
public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
RealBufferedSink(Sink sink) {
if (sink == null) throw new NullPointerException("sink == null");
this.sink = sink;
}
……
}
Copy the code
RealBufferedSink implements the BufferedSink interface, BufferedSink implements the Sink interface.
Sink implements Closeable, Flushable interface.
1. The Flushable interface defines only one method, Flush (), that writes data from the cache to the underlying input stream. 2. The Closeable interface defines close(), which closes the stream. 3. The Sink interface defines write(Buffer source, long byteCount) and timeout() for writing data and setting timeout. 4. The BufferedSink interface defines a bunch of wirteXXX(...) Used to manipulate different types of data writes.Copy the code
Look at the member variable of RealBufferedSink
public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
Copy the code
There is a Buffer object, a Sink object passed in from the constructor, and a Boolean flag to record whether the stream is closed or not.
RealBufferedSink’s various wirteXXX (…) Most of the following
@ Override public BufferedSink writeXXX (...) throws IOException { if (closed) throw new IllegalStateException("closed"); Buffer. WriteXXX (...) ; return emitCompleteSegments(); }Copy the code
As you can see, the method of writing data is implemented by buffer objects. The emitXXX() and Flush () methods call sink.write(buffer, byteCount) and sink.flush().
RealBufferedSource
RealBufferedSource is similar to RealBufferedSink
final class RealBufferedSource implements BufferedSource { public final Buffer buffer = new Buffer(); public final Source source; boolean closed; RealBufferedSource(Source source) { if (source == null) throw new NullPointerException("source == null"); this.source = source; }}Copy the code
RealBufferedSource implements the BufferedSource interface, and BufferedSource implements the Source interface.
The Source interface also implements Closeable.
1. Source integrates the Closeable interface, indicating that Source provides a close method to close the stream reading data. 2. Source defines a read(Buffer sink, long byteCount) to read data and a timeout() method to set the read timeout. 3. BufferedSource defines a lot of readXXX(...) Used to read data.Copy the code
The readXXX RealBufferedSource (…) Method and writeXXX(…) in RealBufferedSink Similarly, the member variable buffer is used to read the data together with the Source object passed in when the object is constructed.
The structure of the whole read-write framework is summarized as follows:
Uniform processing of all read and write objects
Whether it’s File, Path, InputStream, OutputStream, Socket, in Okio framework just a simple okio.sink (…) Method to obtain the corresponding input stream (RealBufferedSink) and output stream (RealBufferedSource)
Okio also provides an additional parameter for both input and output streams: Timeout, which sets the Timeout setting for reads and writes.
All sink methods are called
private static Sink sink(final OutputStream out, final Timeout timeout) { if (out == null) throw new IllegalArgumentException("out == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0) { timeout.throwIfReached(); Segment head = source.head; int toCopy = (int) Math.min(byteCount, head.limit - head.pos); out.write(head.data, head.pos, toCopy); head.pos += toCopy; byteCount -= toCopy; source.size -= toCopy; if (head.pos == head.limit) { source.head = head.pop(); SegmentPool.recycle(head); } } } @Override public void flush() throws IOException { out.flush(); } @Override public void close() throws IOException { out.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "sink(" + out + ")"; }}; }Copy the code
Okio.sink() creates an instance of an anonymous inner class that implements the write method to write data to an OutputStream(files, paths, and sockets are converted to an OutputStream(), and a timeout is detected each time data is written. (Timeout mechanism to come later)
Okio.source () similarly creates an anonymous inner class instance that implements the Source interface. Implement the read method, which reads data from an InputStream.
Okio uses a Segment object whenever it reads or writes data. A Segment is a *** list structure *** defined by Okio. Each Segment can hold up to 8K bytes.
The universal Buffer
Okio writes data to buffer first
BufferedSink sink = Okio.buffer(Okio.sink(file)); sink.writeUtf8("Hello, java.io file!" ); sink.close();Copy the code
Okio.buffer() returns RealBufferedSink
@Override public BufferedSink writeUtf8(String string) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeUtf8(string);
return emitCompleteSegments();
}
Copy the code
Check the writeUtf8
@Override public Buffer writeUtf8(String string) {
return writeUtf8(string, 0, string.length());
}
Copy the code
Then turn the String into a Segment list
@override public Buffer writeUtf8(String String, int beginIndex, int endIndex) {...... // Transcode a UTF-16 Java String to UTF-8 bytes. for (int i = beginIndex; i < endIndex;) { int c = string.charAt(i); if (c < 0x80) { Segment tail = writableSegment(1); byte[] data = tail.data; ... while (i < runLimit) { c = string.charAt(i); if (c >= 0x80) break; data[segmentOffset + i++] = (byte) c; / / 0 XXXXXXX}... } else if (c < 0x800) { // Emit a 11-bit character with 2 bytes. writeByte(c >> 6 | 0xc0); // 110xxxxx writeByte(c & 0x3f | 0x80); // 10xxxxxx i++; }... } return this; }Copy the code
WritableSegment is not to open a new Segment to the end of the queue
Segment writableSegment(int minimumCapacity) { if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException(); if (head == null) { head = SegmentPool.take(); // Acquire a first segment. return head.next = head.prev = head; } Segment tail = head.prev; if (tail.limit + minimumCapacity > Segment.SIZE || ! tail.owner) { tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up. } return tail; }Copy the code
Watching emitCompleteSegments ()
@Override
public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
Copy the code
Buffer.com pleteSegmentByteCount () is used to calculate the Segment of the number of bytes cache
public long completeSegmentByteCount() {
long result = size;
if (result == 0) return 0;
// Omit the tail if it's still writable.
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {
result -= tail.limit - tail.pos;
}
return result;
}
Copy the code
Sink. Write (Buffer, byteCount) is the integrated Sink anonymous class passed in earlier.
Summarize the process
Buffer plays a similar role when reading data
Okio timeout mechanism
Okio can add a supermarket setting to its OutputStream and InputStream. A default TimeOut is set when reading or writing files. This method is an empty implementation.
Okio shows us how to set up an asynchronous timeout mechanism when reading or writing a Socket, which closes the stream if the Socket reads or writes time out.
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
Copy the code
Look at the timeout (socket)
private static AsyncTimeout timeout(final Socket socket) { return new AsyncTimeout() { @Override protected IOException NewTimeoutException (@nullable IOException cause) {...... } @Override protected void timedOut() { try { socket.close(); }... }}; }Copy the code
As you can see here, an AsyncTimeout anonymous object is returned, mainly in timeOut() to close the Socket.
The sink(socket.getOutputStream(), timeout) method has been seen above
Private static Sink Sink (final OutputStream out, final Timeout Timeout) {...... Return new Sink() {@override public void write(Buffer source, long byteCount) throws IOException {...... while (byteCount > 0) { timeout.throwIfReached(); ... }}... }; }Copy the code
Take a look at the throwIfReached method
public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); }}Copy the code
Determine if a timeout has been reached if hasDeadline is true and deadlineNanoTime is greater than System.nanotime ().
Looking at the timeout. Sink (sink)
public final Sink sink(final Sink sink) { return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0L) { // Count how many bytes to write. This loop guarantees we split on a segment boundary. long toWrite = 0L; for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) { int segmentSize = s.limit - s.pos; toWrite += segmentSize; if (toWrite >= byteCount) { toWrite = byteCount; break; } } // Emit one write. Only this section is subject to the timeout. boolean throwOnTimeout = false; enter(); try { sink.write(source, toWrite); byteCount -= toWrite; throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } } @Override public void flush() throws IOException { boolean throwOnTimeout = false; enter(); try { sink.flush(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public void close() throws IOException { boolean throwOnTimeout = false; enter(); try { sink.close(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public Timeout timeout() { return AsyncTimeout.this; } @Override public String toString() { return "AsyncTimeout.sink(" + sink + ")"; }}; }Copy the code
You can see that timeout.sink(sink) rewraps sink to add an Enter () method to each method of sink
public final void enter() { if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); boolean hasDeadline = hasDeadline(); if (timeoutNanos == 0 && ! hasDeadline) { return; // No timeout and no deadline? Don't bother with the queue. } inQueue = true; scheduleTimeout(this, timeoutNanos, hasDeadline); }Copy the code
Here you see that if the condition is met, the scheduleTimeout method is executed. But by default, the condition is not fulfilled.
Take a look at SocketTimeoutTest
@Test
public void readWithoutTimeout() throws Exception {
Socket socket = socket(ONE_MB, 0);
BufferedSource source = Okio.buffer(Okio.source(socket));
source.timeout().timeout(5000, TimeUnit.MILLISECONDS);
source.require(ONE_MB);
socket.close();
}
Copy the code
Source.timeout ().timeout(5000, timeunit.milliseconds)
public Timeout timeout(long timeout, TimeUnit unit) {
if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);
if (unit == null) throw new IllegalArgumentException("unit == null");
this.timeoutNanos = unit.toNanos(timeout);
return this;
}
Copy the code
You can see that the timeoutNanos is assigned here. ScheduleTimeout (this, timeoutNanos, hasDeadline)
private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { // Start the watchdog thread and create the head node when the first timeout is scheduled. if (head == null) { head = new AsyncTimeout(); new Watchdog().start(); }... // Insert the node in sorted order. long remainingNanos = node.remainingNanos(now); for (AsyncTimeout prev = head; true; prev = prev.next) { if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { node.next = prev.next; prev.next = node; if (prev == head) { AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. } break; }}}Copy the code
A synchronization lock is used and a Watchdog thread is started. And add AsyncTimeout to a task queue based on the timeout timeout.
private static final class Watchdog extends Thread {
Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
Copy the code
The Watchdog thread always synchronously traverses the task queue and executes awaitTimeout().
static @Nullable
AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// The head of the queue has timed out. Remove it.
head.next = node.next;
node.next = null;
return node;
}
Copy the code
} Wait (waitMillis, (int) waitNanos) is blocked with the asyncTimeout.class.wait (waitMillis, (int) waitNanos) method.
Note that the wait (timeout) will be AsyncTimeout. Class. Notify ().Copy the code
If the task queue is empty, asyncTimeout.class. wait(IDLE_TIMEOUT_MILLIS) for one minute. Then determine if there are any new tasks.
The resources
Dismantling wheel series: Dismantling Okio
Okio source code analysis
Okio making address