Okio深入分析—源码分析部分

1,099 阅读35分钟
原文链接: www.jianshu.com

一、前言

时间过得可真是快,距离上一次更新已经将近 2 个月了,这之间竟然没有任何的文章更新。原因呢,无非就是工作与生活变得紧张了,渐渐的就把这个给落下了。趁着 Flag 还没倒下,咱得把它扶起来不是。
Okio深入分析——基础使用部分主要了解了Okio这个库的特性,以及最基本的使用。但正如最后所说,我们还不知道它为什么好,为什么快。但是在分析之前,我们先过一遍其主体类的框架图,以便我们对其整体有一个粗略的认识。

OkioFramework.jpg

这个图看上去是不是有点简单啊,都不好意思说是一个框架。确实如此,所以说它是短小精悍呀。言归正传,我们还是来展开一下。
(1) Sink 和 Source 分别定义了写入和读取。
(2) BufferedSink 和 BufferedSource 分别定义了各种写入方法和读取方法,同时还定义了 buffer() 方法应该返回一个实际的 Buffer 用于操作写入与读取。
(3) Buffer 兼具实现了读和写的功能,Buffer 中又进一步将读写单元划分成了 Segment。各个 Segment 之间通过 prev 节点以及 next 节点构成一个双向链表。SegmentPool 是避免为了过多的分配 Segment 对象,用以循环利用 Segment。
(4) RealBufferSink 和 RealBufferSource 是唯一返回给用户的写入与读取实例。其分别由 Sink + Buffer 和 Source + Buffer 的组合模式而成,用以完成真正的写入与读取。

二、写入

案例分析

在分析源码之前,我们还是得先来个 demo 的案例分析 。这次我们的 demo 得先做一个对比,使其看起来更有意思些。
通过 Okio 的 BufferSink 进行的写入代码

private void testSink(File file) {
        if(!file.exists()) {
            try {
                file.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            BufferedSink bufferedSink = Okio.buffer(Okio.sink(file));
            StringBuilder stringBuilder = new StringBuilder();
            for(int k = 0;k < 1000;k++) {
                stringBuilder.append("Buffer Sink Test Sample");
            }
            int i = 0;
            System.out.println("Okio sink test start");
            long ts = System.currentTimeMillis();
            long nts = System.nanoTime();
            while(i++ < 100000) {
                bufferedSink.writeUtf8(stringBuilder.toString());
            }
            bufferedSink.close();
            System.out.println("Okio sink ms time " + (System.currentTimeMillis() - ts));
            System.out.println("Okio sink nano time " + (System.nanoTime() - nts));
            System.out.println("Okio sink test end");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

通过 Java 的 BufferedWriter 进行的写入代码

private void testFileOut(File file) {
        if(!file.exists()) {
            try {
                file.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            FileWriter fileWriter = new FileWriter(file);
            BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);
            StringBuilder stringBuilder = new StringBuilder();
            for(int k = 0;k < 1000;k++) {
                stringBuilder.append("Buffer Sink Test Sample");
            }
            int i = 0;
            System.out.println("Java buffered write test start");
            long ts = System.currentTimeMillis();
            long nts = System.nanoTime();
            while(i++ < 100000) {
                bufferedWriter.write(stringBuilder.toString());
            }

            bufferedWriter.close();
            System.out.println("Java buffered write ms time " + (System.currentTimeMillis() - ts));
            System.out.println("Java buffered write time " + (System.nanoTime() - nts));
            System.out.println("Java buffered write test end");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

这段程序运行之后有 2 个数据是需要我们关注的。
数据大小:通过下面的图,我们可以看到,这个写入数据大小为2.3GB

image.png
写入所花的时间: 如下 BufferSink 与 BufferedWriter 的时间对比,分别进行 10 次读写,应该能很明显的看到 BufferSink 的整体时间分布是要优于 BufferedWriter 的。
次序 BufferSink BufferedWriter
1 4637 4944
2 4228 4299
3 4088 4334
4 4383 4440
5 4467 4375
6 4201 4117
7 4385 4453
8 4149 4439
9 4266 4222
10 4363 4433

上面的结果是PC上跑出来的结果,再来看看手机上的结果。也是基本得到一样的结果。就是 OKIO 的结果基本趋于稳定下降。当然在手机上写10000次太慢了,所以改成了 5000 次。

次序 BufferSink BufferedWriter
1 3530 3139
2 3085 2943
3 2572 2137
4 2501 3192
5 2462 3108
6 3107 4081
7 2047 2377
8 2044 2131
9 2042 3557
10 2055 2099

总结下来就是,Okio 的写入并不总是比 Java 原生 I/O 快。而是在多次反复写入的情况下,Okio 则速度较快,且比较稳定。

源码分析

前面的案例分析,更加增强了我们对 Okio 的认知,接下来就是我们的重点,源码分析与理解了。按照 demo ,画出时序图,并逐层进行分解并深入。所以首先我们还是先来看看时序图。


OkioSink.jpg

构造Sink —— Okio#sink(file)

  /** Returns a sink that writes to {@code file}. */
  // 当传入的是一个文件时所调用的 Sink 构造方法
  public static Sink sink(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return sink(new FileOutputStream(file));
  }
 
  /** Returns a sink that writes to {@code out}. */
  // 当传入的是一个 OutputStream 时所调用的 Sink 构建方法
  public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }
  // 这个构建方法是私有的,最终的调用都会来到这里
  private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      @Override public void flush() throws IOException {
        out.flush();
      }

      @Override public void close() throws IOException {
        out.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };
  }

上面这段代码中,是构建 Sink 的 3 个重载方法,是Okio 的静态方法,而且他们也是依次调用。第一个告诉了我们其底层的写入也是基于 Java 原生的 I/O ,如 FileOutputStream。第二个加入了 Timeout ,这就使其具备了超时机制。第三个方法是最为关键的,这里才真正生成了 Sink 实例。而在这个实例中,也直接定义出了数据如何写入的,如何关闭流,以及返回哪一个 Timeout 实例。
这一波调用总的来说确定了实际的 Sink 对象,以及 Sink 是如何写入的和其应该返回的超时机制。
以上是 5 个Sink 构造方法中的其中 3 个,还有另外 2 个也一起来看看吧。

  /** Returns a sink that writes to {@code path}. */
  // 通过文件路径构建 Sink 实例
  @IgnoreJRERequirement // Should only be invoked on Java 7+.
  public static Sink sink(Path path, OpenOption... options) throws IOException {
    if (path == null) throw new IllegalArgumentException("path == null");
    return sink(Files.newOutputStream(path, options));
  }
  /**
   * Returns a sink that writes to {@code socket}. Prefer this over {@link
   * #sink(OutputStream)} because this method honors timeouts. When the socket
   * write times out, the socket is asynchronously closed by a watchdog thread.
   */
  // 通过  Socket 进行构建
  public static Sink sink(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Sink sink = sink(socket.getOutputStream(), timeout);
    return timeout.sink(sink);
  }

上面这 2 个 Sink 构造方法,第一个是以文件路径来进行构建,而第二个则是以 Socket 进行构建,并且对于 Socket 的构建中,其 Timeout 是 AsyncTimeout。这里说明了 2 个其本特性。第一、除了对文件进行支持,同时还支持 Socket,第二、超时机制有同步的 Timeout 以及异步的 AsyncTimeout。关于超时机制,后面还会详细来讲,这里先有个概念即可。

构造BufferedSink——Okio#buffer(sink)

/**
   * Returns a new sink that buffers writes to {@code sink}. The returned sink
   * will batch writes to {@code sink}. Use this wherever you write to a sink to
   * get an ergonomic and efficient access to data.
   */
  public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
  }

该方法就是构建了一个 RealBufferedSink 实例。那就来看看这个类。

final class RealBufferedSink implements BufferedSink {
  public final Buffer buffer = new Buffer();
  public final Sink sink;
  boolean closed;

  RealBufferedSink(Sink sink) {
    if (sink == null) throw new NullPointerException("sink == null");
    this.sink = sink;
  }
......
}

这个类中两个重要的成员变量一个 sink ,由外部传入,一个是 Buffer 由内部直构建。这里的 sink 是前面通过 Okio.sink()方法最终构建的那个匿名类。那么再来看看 Buffer 这个类。

/**
 * A collection of bytes in memory.
 *
 * <p><strong>Moving data from one buffer to another is fast.</strong> Instead
 * of copying bytes from one place in memory to another, this class just changes
 * ownership of the underlying byte arrays.
 *
 * <p><strong>This buffer grows with your data.</strong> Just like ArrayList,
 * each buffer starts small. It consumes only the memory it needs to.
 *
 * <p><strong>This buffer pools its byte arrays.</strong> When you allocate a
 * byte array in Java, the runtime must zero-fill the requested array before
 * returning it to you. Even if you're going to write over that space anyway.
 * This class avoids zero-fill and GC churn by pooling byte arrays.
 */
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
  private static final byte[] DIGITS =
      { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  static final int REPLACEMENT_CHARACTER = '\ufffd';

  @Nullable Segment head;
  long size;

  public Buffer() {
  }

Buffer 类中最重要的成员变量 head 是一个 Segment 类型,Buffer 中的数据结构也正是由 Segment 所构成的一个双向链表。除此之外,关于 Buffer 类,这里的解释也极其重要。大概的意思是说:
(1) 这是一个在内存中的字节集合。
(2)将一个 Buffer 中的数据移到另一个 Buffer 中去是非常快的。原因是它并不是通过复制来实现,而是从底层字节数组去改变与 Buffer 这个类的所属关系。
(3)它就是 ArrayList 一样的,容量自动增长的。一开始是很小的,且仅分配它所需要的内存大小。
(4)Buffer池化了它的字节数组。如果直接从Java中分配一个字节数组,使用其必须为其填充 0。而 Buffer 通过池化字节数组从而避免了 0 填充和GC 抖动的情况。
上面详细的说明了 Buffer 的特性,当然也是 Okio 的特性。在后面的章节中我们还会继续分析其特性的实现细节。而关于 Buffer 就先了解到这里,再进一步看看Segment。

package okio;

import javax.annotation.Nullable;

/**
 * A segment of a buffer.
 *
 * <p>Each segment in a buffer is a circularly-linked list node referencing the following and
 * preceding segments in the buffer.
 *
 * <p>Each segment in the pool is a singly-linked list node referencing the rest of segments in the
 * pool.
 *
 * <p>The underlying byte arrays of segments may be shared between buffers and byte strings. When a
 * segment's byte array is shared the segment may not be recycled, nor may its byte data be changed.
 * The lone exception is that the owner segment is allowed to append to the segment, writing data at
 * {@code limit} and beyond. There is a single owning segment for each byte array. Positions,
 * limits, prev, and next references are not shared.
 */
final class Segment {
  /** The size of all segments in bytes. */
// 每个 segment 共 8 K字节
  static final int SIZE = 8192;
  /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
// 是否共享底层数据的阈值
  static final int SHARE_MINIMUM = 1024;
// 存放字节的数组
  final byte[] data;
  /** The next byte of application data byte to read in this segment. */
 // 下一个要读的字节的位置
  int pos;
  /** The first byte of available data ready to be written to. */
 // 写入的起始位置
  int limit;
  /** True if other segments or byte strings use the same byte array. */
 // 表明此 segment 是共享的。那什么时候情况下会被共享呢?
  boolean shared;
  /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
// 表明这个段是可以追加数据的,而追回的位置是基于 limit。一般情况下都是为 true 的
  boolean owner;
  /** Next segment in a linked or circularly-linked list. */
// 双向环形链表的下一个节点
  Segment next;
  /** Previous segment in a circularly-linked list. */
// 双向环形链表的前一个节点
  Segment prev;

  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;
    this.shared = false;
  }

  Segment(Segment shareFrom) {
    this(shareFrom.data, shareFrom.pos, shareFrom.limit);
    shareFrom.shared = true;
  }

  Segment(byte[] data, int pos, int limit) {
    this.data = data;
    this.pos = pos;
    this.limit = limit;
    this.owner = false;
    this.shared = true;
  }

  /**
   * Removes this segment of a circularly-linked list and returns its successor.
   * Returns null if the list is now empty.
   */
// 将自己弹出链表
  public @Nullable Segment pop() {
    ......
  }

  /**
   * Appends {@code segment} after this segment in the circularly-linked list.
   * Returns the pushed segment.
   */
// 压入到当前链表
  public Segment push(Segment segment) {
   ......
  }

  /**
   * Splits this head of a circularly-linked list into two segments. The first
   * segment contains the data in {@code [pos..pos+byteCount)}. The second
   * segment contains the data in {@code [pos+byteCount..limit)}. This can be
   * useful when moving partial segments from one buffer to another.
   *
   * <p>Returns the new head of the circularly-linked list.
   */
// 分割,把数据以 [pos,pos + byteCount] 和 [pos + byteCount,limit] 这两段来进行分割。分割出去的数据依据共享阈值决定数据是否被共享
  public Segment split(int byteCount) {
    ......
  }

  /**
   * Call this when the tail and its predecessor may both be less than half
   * full. This will copy data so that segments can be recycled.
   */
// 压缩存储,当处于尾节点和它的前一节点的数据都少于一半时,把数据进行压缩存储到一个,以便可以回收调一个 Segment。
  public void compact() {
    ......
  }
// 移动 byteCount 个字节数据到另一个段中
  /** Moves {@code byteCount} bytes from this segment to {@code sink}. */
  public void writeTo(Segment sink, int byteCount) {
    ......
  }
}

来解读一下 Segment 的特性:
(1) segment 是 Buffer 中的一个组织单元
(2) segment 在 Buffer 中以环形双向链表的数据结构来存储
(3) segment 在 SegmentPool 中以单链接表数据结构来存储
(4) segment 可以在 Buffer 和 ByteStrings 之间进行共享。对于共享的 segment 不可以回收,也不可以修改其中的数据。但例外的是,段的拥有者可以在 limit 之后追回数据。对于这个段来说,底层的字节数组是共享的,但position,limits,prev以及next这些是不共享的。
此外,注释里有对每一个属性的详细解释以及个人的理解。为了方便更容易理解,简单画了一个 Segment 的数据模型图来帮助我们。如下所示。


image.png

关于 Segment 就先了解到这里,其中的方法 split(),compact()以及 writeTo() 后面还会详细说明。接下来继续看看 SegmentPool。

package okio;

import javax.annotation.Nullable;

/**
 * A collection of unused segments, necessary to avoid GC churn and zero-fill.
 * This pool is a thread-safe static singleton.
 */
// 主要是用于收集未使用的 Segments,避免了内存抖动以及0填充。同时,这个 Pool 也是一个线程安全的静态单例类。
final class SegmentPool {
  /** The maximum number of bytes to pool. */
  // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
// 一个池子最大存储 64 KB 字节
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.

  /** Singly-linked list of segments. */
// 表明其是一个单向链表
  static @Nullable Segment next;

  /** Total bytes in this pool. */
// 当前的总数据大小
  static long byteCount;

  private SegmentPool() {
  }

  static Segment take() {
    synchronized (SegmentPool.class) {
      if (next != null) {
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      }
    }
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
  }

  static void recycle(Segment segment) {
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    if (segment.shared) return; // This segment cannot be recycled.
    synchronized (SegmentPool.class) {
      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
      byteCount += Segment.SIZE;
      segment.next = next;
      segment.pos = segment.limit = 0;
      next = segment;
    }
  }
}

这个类其实挺简单的,take() 方法中就是获取一个 Segment ,如果池子中有就取一个出来,如果没有就给 new 一个。而 recycle() 方法,如果池子还没涨到 64 KB 的限制,且 Segment 不是共享的,那就把它加到链接里面去。

小结
通过对 Sink 以及 BufferedSink 的构造,可以说是对 Okio 整个使用环境的一个初始化。同时,在这一段也详细介绍了其相关的核心概念,Buffer,Segment,SegmentPool 等。有了这些基本的核心概念,我们就可以进一步分析字节的写入了。

写入数据——RealBufferedSink#writeUtf8()

  @Override public BufferedSink writeUtf8(String string) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.writeUtf8(string);
    return emitCompleteSegments();
  }

进一步调用了 Buffer 的 writeUtf8() 和 emitCompleteSegments(),那么就往下看Buffer#writeUft8() 方法。代码很长,但关键的是 2 个调用,writableSegment(1) 和 writeByte(),其他的都是 utf-8 编码相关。

  @Override public Buffer writeUtf8(String string) {
    return writeUtf8(string, 0, string.length());
  }

  @Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
    ......

    // Transcode a UTF-16 Java String to UTF-8 bytes.
    for (int i = beginIndex; i < endIndex;) {
      int c = string.charAt(i);
      if (c < 0x80) {
        Segment tail = writableSegment(1);
        byte[] data = tail.data;
        int segmentOffset = tail.limit - i;
        int runLimit = Math.min(endIndex, Segment.SIZE - segmentOffset);

        // Emit a 7-bit character with 1 byte.
        data[segmentOffset + i++] = (byte) c; // 0xxxxxxx

        // Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance
        // improvement over independent calls to writeByte().
        while (i < runLimit) {
          c = string.charAt(i);
          if (c >= 0x80) break;
          data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
        }

        int runSize = i + segmentOffset - tail.limit; // Equivalent to i - (previous i).
        tail.limit += runSize;
        size += runSize;

      } else if (c < 0x800) {
        // Emit a 11-bit character with 2 bytes.
        writeByte(c >>  6        | 0xc0); // 110xxxxx
        writeByte(c       & 0x3f | 0x80); // 10xxxxxx
        i++;

      } else if (c < 0xd800 || c > 0xdfff) {
        // Emit a 16-bit character with 3 bytes.
        writeByte(c >> 12        | 0xe0); // 1110xxxx
        writeByte(c >>  6 & 0x3f | 0x80); // 10xxxxxx
        writeByte(c       & 0x3f | 0x80); // 10xxxxxx
        i++;

      } else {
        // c is a surrogate. Make sure it is a high surrogate & that its successor is a low
        // surrogate. If not, the UTF-16 is invalid, in which case we emit a replacement character.
        int low = i + 1 < endIndex ? string.charAt(i + 1) : 0;
        if (c > 0xdbff || low < 0xdc00 || low > 0xdfff) {
          writeByte('?');
          i++;
          continue;
        }

        // UTF-16 high surrogate: 110110xxxxxxxxxx (10 bits)
        // UTF-16 low surrogate:  110111yyyyyyyyyy (10 bits)
        // Unicode code point:    00010000000000000000 + xxxxxxxxxxyyyyyyyyyy (21 bits)
        int codePoint = 0x010000 + ((c & ~0xd800) << 10 | low & ~0xdc00);

        // Emit a 21-bit character with 4 bytes.
        writeByte(codePoint >> 18        | 0xf0); // 11110xxx
        writeByte(codePoint >> 12 & 0x3f | 0x80); // 10xxxxxx
        writeByte(codePoint >>  6 & 0x3f | 0x80); // 10xxyyyy
        writeByte(codePoint       & 0x3f | 0x80); // 10yyyyyy
        i += 2;
      }
    }

    return this;
  }

Buffer有两个重载版本,第一个调用了第二个,主要就是确定了起始位置为0,而写入的数据为全部数据。而第二个是具体的写入,这段代码比较长,其实不应该贴出来的,但我看到它对 Utf8 的编码十分的优秀,所以还是贴出来,有时间的可以仔细读一读。当然,在这里我们只关注其中最关键的几句即可。 如前面所说,这里面有两个进一步调用的方法分别是 writableSegment(1) 和 writeByte(),而 writeByte() 里面也有对 writableSegment() 的调用。那我们先来看看 writableSegment() 方法的实现。

/**
   * Returns a tail segment that we can write at least {@code minimumCapacity}
   * bytes to, creating it if necessary.
   */
  Segment writableSegment(int minimumCapacity) {
    ......
    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }
    Segment tail = head.prev;
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
    }
    return tail;
  }

这个方法的主要含义就是如果head为空,则说明当前的链接还是空的,那么就新建一个节点,此时头尾相等。如果不为空就取出尾节点,即头节点的前向节点。而尾节点的空间不能再装下当前要写入的字节数时,就会创建一个新的节点,并把新建的节点作为尾节点,然后继续写入。再来看看 writeByte()。

  @Override public Buffer writeByte(int b) {
    Segment tail = writableSegment(1);
    tail.data[tail.limit++] = (byte) b;
    size += 1;
    return this;
  }

这里就是把数据写入到 Segment 的 data[] 中。对于一次 writeUtf8() 的调用来看,目前为止是把数据全部都写入到了 Segment 中,同时也是内存中,还没有发生实际的 I/O 的。接下来继续看 RealBufferedSink#emitCompleteSegments()方法。

  @Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }

这里先是调用了 Buffer#completeSegmentByteCount()确定需要从 buffer 中实际取多少字节写入到实际的输出流中。

  /**
   * Returns the number of bytes in segments that are not writable. This is the
   * number of bytes that can be flushed immediately to an underlying sink
   * without harming throughput.
   */
  public long completeSegmentByteCount() {
    long result = size;
    if (result == 0) return 0;

    // Omit the tail if it's still writable.
    Segment tail = head.prev;
    if (tail.limit < Segment.SIZE && tail.owner) {
      result -= tail.limit - tail.pos;
    }

    return result;
  }

上面代码的意思是用当前总的 size 大小 减去 尾节点已写入的字节数,也就是除尾节点,其他的都会被立即写入到底层的输出流中。这说明了不足 8K 的数据写入,在 close() 之前是不会发生实际 I/O 的。 接下来,进一步调用了 sink#write()。这个 sink 就是我们在前面通过 Okio#sink() 方法所创建出来的匿名类。 为了连贯性,我们还是把写入相关的代码再贴一下以便于进一步的阅读与分析。

@Override public void write(Buffer source, long byteCount) throws IOException {
        ......
        while (byteCount > 0) { // 循环,直到数据全部写入
          timeout.throwIfReached(); // 超时判断
          Segment head = source.head; // 总是取头结点
         // 确定当次需要写入的字节数,当前 segment 中可写入的数据与剩余字节数,取最小的那个
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);//写入数据到输出流
         
         // 更新各位置
          head.pos += toCopy; // 读指针往前移 toCopy 个
          byteCount -= toCopy; // 写入字节数减 toCopy 个
          source.size -= toCopy; // Buffer 中大小减 toCopy 个
          // 如果当前 segment 的数据已经全部写入到输出流,那么将其弹出双向循环链接,并将其回收到 SegmentPool 中
          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }

贴出来的代码中,已经作了非常详细的说明,因为这里的每一句都很重要,都需要我们去理解。当然,总的来说其实也只有 2 个关键点。 其一,超时机制。

/**
   * Throws an {@link InterruptedIOException} if the deadline has been reached or if the current
   * thread has been interrupted. This method doesn't detect timeouts; that should be implemented to
   * asynchronously abort an in-progress operation.
   */
  public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException("thread interrupted");
    }

    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached");
    }
  }

根据这里的注释,以及分析 hasDeadLine 和 deadlineNanoTime 的赋值情况来看,超时的检测只应该发生在异步的情况下。纳尼!!!也就是说同步超时机制是不检查的。 其二,就是数据的写入以及各位位置标记的置位和Segment的回收。

关闭——RealBufferedSink#close()

@Override public void close() throws IOException {
    if (closed) return;

    // Emit buffered data to the underlying sink. If this fails, we still need
    // to close the sink; otherwise we risk leaking resources.
    Throwable thrown = null;
    try {
      if (buffer.size > 0) {
        sink.write(buffer, buffer.size);
      }
    } catch (Throwable e) {
      thrown = e;
    }

    try {
      sink.close();
    } catch (Throwable e) {
      if (thrown == null) thrown = e;
    }
    closed = true;

    if (thrown != null) Util.sneakyRethrow(thrown);
  }

close() 也就是将 buffer 中剩余的数据都写入到底层输出流中,最后再调用 sink#close()关闭底层的输出流。 至此,写入数据可以说大致分析完了。因为与 writeUtf8() 同级别的其他 write方法,其基本原理都是类似的。通过分析我们知道,其是先将数据都写入到 Buffer 中,然后才将数据写入到实际的 I/O 中,以此减少实际的 I/O。而分析到这里,我们应该也注意到,前面所说的 Segment 的 splite,compact以及share 这些优化内存的方法都没有被提及过。这就接下来要分析的读取与另一不同级别的写入。

三、读取

案例分析

关于读取,我们同样也从案例分析开始。先来看看读取的代码。
通过 Okio 的 BufferSource 进行的读取代码

private void testSource(File file) {
        try {
            BufferedSource bufferedSource = Okio.buffer(Okio.source(file));

            System.out.println("Okio source test start");
            long ts = System.currentTimeMillis();
            byte[] buffer = new byte[8 * 1024];

            while(bufferedSource.read(buffer) > 0) {

            }

            bufferedSource.close();
            System.out.println("Okio source test end");
            System.out.println(" source test " + (System.currentTimeMillis() - ts));

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

通过 Java 的 BufferedReader 进行的读取代码

private void testJava(File file) {

        try {
            BufferedReader bufferedReader = new BufferedReader(new FileReader(file));

            System.out.println("java test start");
            long ts = System.currentTimeMillis();

            char buffer[] = new char[8 * 1024];

            while(bufferedReader.read(buffer) > 0) {
            }
            System.out.println("java test end");
            System.out.println("java test " + (System.currentTimeMillis() - ts));
            bufferedReader.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

同样来看看测试结果,不过这里偷了个懒,只测试了PC端的,没有测试手机端。因为平台不影响结果。文件就是上面写入的文件,大小 2.3 G。

次序 BufferSource BufferedReader
1 1117 2391
2 986 2458
3 979 2252
4 980 2232
5 976 2225
6 977 2202
7 983 2252
8 968 2218
9 979 2223
10 977 2221

对比结果很明显,Okio#BufferSource 的读取速度明显大于 Java 原生的2 倍多。但是,为什么呢?另外有一个细节,上面的 buffer 大小只设置了 8 * 1024 ,也就是 8KiB,这是因为 Okio 的 segment 为 8 KiB,其一次读取最大也只读取 8 KiB

源码分析

有了前面案例分析的基本认知,接下来就是我们的源码分析与理解了。同样,先画出时序图,再逐层进行分解并深入。


OkioSource.jpg

从时序图上可以看到,前面几个步骤都是相同的,如 Buffer,Segment,Timeout 这些概念和构造都是一样的。就不重复介绍了。主要看看 Source 和 RealBufferSource 的构造吧。

构造Source——Okio#source(file)

  /** Returns a source that reads from {@code file}. */
  public static Source source(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return source(new FileInputStream(file));
  }
  /** Returns a source that reads from {@code in}. */
  public static Source source(InputStream in) {
    return source(in, new Timeout());
  }
private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

上面的代码看着多,但和 Sink 的构造几乎一样。这里的重点也构造了一个匿名的 Source 接口的实例,以完成实际从输入中的读取。同样的,Source 也是支持 Socket 的,顺便也一起来看看吧。

/**
   * Returns a source that reads from {@code socket}. Prefer this over {@link
   * #source(InputStream)} because this method honors timeouts. When the socket
   * read times out, the socket is asynchronously closed by a watchdog thread.
   */
  public static Source source(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    return timeout.source(source);
  }

支持 Socket 也是比较简单的,即从 socket 中获取输入流以便读取数据,同时这里也设定了 Timeout 为 AsyncTimeout。关于 AsyncTimeout 就放到后面专门的分节里来讲解。

构造 BufferdSource —— Okio#buffer(Source)

  /**
   * Returns a new source that buffers reads from {@code source}. The returned
   * source will perform bulk reads into its in-memory buffer. Use this wherever
   * you read a source to get an ergonomic and efficient access to data.
   */
  public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
  }

构造 RealBufferedSource 实例,继续看。

final class RealBufferedSource implements BufferedSource {
  public final Buffer buffer = new Buffer();
  public final Source source;
  boolean closed;

  RealBufferedSource(Source source) {
    if (source == null) throw new NullPointerException("source == null");
    this.source = source;
  }
......

同样是组合了 Source 和 Buffer 这两个实例。Source 就是前面通过 Okio.source()方法最终构建的那个匿名类。而 Buffer 和其内部聚合的 Segment 都已经介绍过了。所以直接来看 read() 方法吧。

读取数据——RealBufferedSource#read()

  @Override public int read(byte[] sink) throws IOException {
    return read(sink, 0, sink.length);
  }

 @Override public int read(byte[] sink, int offset, int byteCount) throws IOException {
    checkOffsetAndCount(sink.length, offset, byteCount);

    if (buffer.size == 0) {
      long read = source.read(buffer, Segment.SIZE);
      if (read == -1) return -1;
    }

    int toRead = (int) Math.min(byteCount, buffer.size);
    return buffer.read(sink, offset, toRead);
  }

read() 是个重载方法,我们调用的是第一个,然后它会进一步调用指定参数 offset 和 byteCount 的 read() 方法。
在这个方法里,buffer.size 为 0 说明当前 buffer 中为没有数据,即数据已经被从buffer 中读出来了。如果不为0,说明 buffer 中还有数据那么就进一步读取剩下的或者剩余的数据。这里最初肯定是要走为 0 的情况。这里调用的是 source.read(),也即前面匿名类 Source 的。注意,这里传入的参数中除了 buffer ,还有 Segment.SIZE,即 8KiB。
这里为了方便分析,也重复贴一下 read() 方法的相关代码。

@Override public long read(Buffer sink, long byteCount) throws IOException {
       .....
       ......
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

同步 Timeout.throwIfReached() 前面已经分析过了,是不起作用的,其实现在来看看也能明白过来,不管是在读还是写的时候,它都没有设置起始时间,那它又如何能计算出所消耗的时间呢?而 Buffer#writeableSegment() 在前面也分析过了,主要是返回一个 Segment。然后就是实际从文件读出数据并只保存在 Segment 里面。

在 RealBufferedSource#read() 中,走了不为 0 的情况后,就是将Buffer中的数据按需要读到内存中返回给调用者。

有了前面写入的分析,再来看读取显然要简单的多了。至此,读取和写入的最基本流程都分析完了。Okio 为打包了很多的读取和写入的方法,这里只分析了其中最基本的,当然有了这些最基本的认知后,再来理解基本的就容易许多了。

三、读取到写入,写入自读取

除了基本的读取与写入,还可以将读取与写入直接串起来。也即读取出来数据可以直接送进写入,而写入的内容也可以是来自 Source 。先来看看写入自读取吧。

@Override public long writeAll(Source source) throws IOException {
    if (source == null) throw new IllegalArgumentException("source == null");
    long totalBytesRead = 0;
    for (long readCount; (readCount = source.read(buffer, Segment.SIZE)) != -1; ) {
      totalBytesRead += readCount;
      emitCompleteSegments();
    }
    return totalBytesRead;
  }

  @Override public BufferedSink write(Source source, long byteCount) throws IOException {
    while (byteCount > 0) {
      long read = source.read(buffer, byteCount);
      if (read == -1) throw new EOFException();
      byteCount -= read;
      emitCompleteSegments();
    }
    return this;
  }

上面两段代码就是将数据从 Source 读出,再写入到Buffer 中去。这里需要关注的有两个事情:
source实例 :这里的 Source 是 RealBufferdSource 实例。
关键调用 : 这里的关键调用是 RealBufferdSource#read(Buffer,byteCount)。来看看其实现。

@Override public long read(Buffer sink, long byteCount) throws IOException {
    ......
    return buffer.read(sink, toRead);
  }

调用了 Buffer#read(Buffer,toRead)

  @Override public long read(Buffer sink, long byteCount) {
   ......
    sink.write(this, byteCount);
    return byteCount;
  }

看到这里可能会有点晕,所以强调一下,这里的 sink 是 RealBufferdSink 中的 Buffer,而 this 是 RealBufferedSource 中的 buffer。意思就是将 RealBufferdSource 中的 buffer 的数据写入到 RealBufferdSink 中的 buffer 中去。 下面来看看具体的实现,其实代码并不长,注释就占了一大半,为了完整性,我并没有将其删除去。

@Override public void write(Buffer source, long byteCount) {
    // Move bytes from the head of the source buffer to the tail of this buffer
    // while balancing two conflicting goals: don't waste CPU and don't waste
    // memory.
    //
    //
    // Don't waste CPU (ie. don't copy data around).
    //
    // Copying large amounts of data is expensive. Instead, we prefer to
    // reassign entire segments from one buffer to the other.
    //
    //
    // Don't waste memory.
    //
    // As an invariant, adjacent pairs of segments in a buffer should be at
    // least 50% full, except for the head segment and the tail segment.
    //
    // The head segment cannot maintain the invariant because the application is
    // consuming bytes from this segment, decreasing its level.
    //
    // The tail segment cannot maintain the invariant because the application is
    // producing bytes, which may require new nearly-empty tail segments to be
    // appended.
    //
    //
    // Moving segments between buffers
    //
    // When writing one buffer to another, we prefer to reassign entire segments
    // over copying bytes into their most compact form. Suppose we have a buffer
    // with these segment levels [91%, 61%]. If we append a buffer with a
    // single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied.
    //
    // Or suppose we have a buffer with these segment levels: [100%, 2%], and we
    // want to append it to a buffer with these segment levels [99%, 3%]. This
    // operation will yield the following segments: [100%, 2%, 99%, 3%]. That
    // is, we do not spend time copying bytes around to achieve more efficient
    // memory use like [100%, 100%, 4%].
    //
    // When combining buffers, we will compact adjacent buffers when their
    // combined level doesn't exceed 100%. For example, when we start with
    // [100%, 40%] and append [30%, 80%], the result is [100%, 70%, 80%].
    //
    //
    // Splitting segments
    //
    // Occasionally we write only part of a source buffer to a sink buffer. For
    // example, given a sink [51%, 91%], we may want to write the first 30% of
    // a source [92%, 82%] to it. To simplify, we first transform the source to
    // an equivalent buffer [30%, 62%, 82%] and then move the head segment,
    // yielding sink [51%, 91%, 30%] and source [62%, 82%].

    if (source == null) throw new IllegalArgumentException("source == null");
    if (source == this) throw new IllegalArgumentException("source == this");
    checkOffsetAndCount(source.size, 0, byteCount);

    while (byteCount > 0) {
      // Is a prefix of the source's head segment all that we need to move?
      if (byteCount < (source.head.limit - source.head.pos)) {
        Segment tail = head != null ? head.prev : null;
        if (tail != null && tail.owner
            && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
          // Our existing segments are sufficient. Move bytes from source's head to our tail.
          source.head.writeTo(tail, (int) byteCount);
          source.size -= byteCount;
          size += byteCount;
          return;
        } else {
          // We're going to need another segment. Split the source's head
          // segment in two, then move the first of those two to this buffer.
          source.head = source.head.split((int) byteCount);
        }
      }

      // Remove the source's head segment and append it to our tail.
      Segment segmentToMove = source.head;
      long movedByteCount = segmentToMove.limit - segmentToMove.pos;
      source.head = segmentToMove.pop();
      if (head == null) {
        head = segmentToMove;
        head.next = head.prev = head;
      } else {
        Segment tail = head.prev;
        tail = tail.push(segmentToMove);
        tail.compact();
      }
      source.size -= movedByteCount;
      size += movedByteCount;
      byteCount -= movedByteCount;
    }
  }

通过注释,可以明白,这个方法的核心是在讲:
(1) 这个方法主要完成的是从 Source 的首部将数据移入到 Sink 的尾部
(2) 移动过程中需要平衡两个重要的指标,优化 CPU 资源,优化内存资源。
(3) 移动数据是通过改变引用指向,而不是数据的复制
(4) Segment 合并,是说前一个 Segment 与当前的 Segment 数据大小之和如果没有超过 100%,那么就会将当前的 Segment 合并到前一个中去。看看下面的代码,你可能会更加的清楚。

/**
   * Call this when the tail and its predecessor may both be less than half
   * full. This will copy data so that segments can be recycled.
   */
  public void compact() {
    if (prev == this) throw new IllegalStateException();
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);
    pop();
    SegmentPool.recycle(this);
  }

(5) 分割 Segment,对于有一些 Segment ,如果只是部分被读取出来,那么可以通过将其分割成 2 个 Segment ,然后取需要的那个 Segment 加入到当前的 Buffer 中。

/**
   * Splits this head of a circularly-linked list into two segments. The first
   * segment contains the data in {@code [pos..pos+byteCount)}. The second
   * segment contains the data in {@code [pos+byteCount..limit)}. This can be
   * useful when moving partial segments from one buffer to another.
   *
   * <p>Returns the new head of the circularly-linked list.
   */
  public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.
    if (byteCount >= SHARE_MINIMUM) {
      prefix = new Segment(this);
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }

明白了这个方法的主要意图,再来分析具体的代码。分两类情况:
其一,当要读取的数据大小小于 source.head 中的字节数时,如果当前 sink.head 不为 null,则将 source.head 的数据都写入到 sink.tail 中去,当然也是其最后的要读取的数据了,即 soruce.head.writeTo();而当 sink.head 为空时,则先将 source.head 再 byteCount 大小进行数据的分割,再进入到第二种情况。而在分割时,如果需要的数据大于分割的阈值,那么被分割出来的 Segment 就会被共享成同一个 Segment。
其二,当要读取的数据大小大于等于 source.head 字节时,如果当前 sink.head 为 null ,那么就让 source.head 从 source.buffer 中取出,并让 sink.head 直接指向 source.head 即可。而如果当前 sink.head 不为空,则将取出来的 segment 并到当前 sink.buffer 的尾部。根据前面的分析,如果尾部的数据与其前一个数据大小之和没有达到 100% ,那就可以进行合并。

以上便是 Segment 的合并与分割了。Okio 在这里同时考虑到了 CPU 和内存的优化,并且实现的非常微妙。

同时上面有部分也提到了 Segment 的共享,从整个源码以及分析来看,共享主要的场景就是发生在复制时以及可能的分割时。当然,最重要的是分割时了。而关于复制时的共享,其实也是非常符合逻辑了。即使被标记成分享的 Segment 不能再写入,但是对于内存的利用来上讲也是非常优化的。

四、异步的超时机制

异步超时机制主要只是用于 Socket 通信,这里也重复贴一下 Source 的代码,Sink 差不多。

  /**
   * Returns a source that reads from {@code socket}. Prefer this over {@link
   * #source(InputStream)} because this method honors timeouts. When the socket
   * read times out, the socket is asynchronously closed by a watchdog thread.
   */
  public static Source source(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    return timeout.source(source);
  }

如前面所说,Socket 通信时,主要是拿出了其 InputStream,以便于读取数据。而这里的 Timeout 是其子类 AsyncTimeout。当通过重载方法 source() 构造出了 Source 之后,进一步送进了 AsyncTimeout#source() 方法中

/**
   * Returns a new source that delegates to {@code source}, using this to implement timeouts. This
   * works best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation.
   */
  public final Source source(final Source source) {
    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        boolean throwOnTimeout = false;
        enter();
        try {
          long result = source.read(sink, byteCount);
          throwOnTimeout = true;
          return result;
        } catch (IOException e) {
          throw exit(e);
        } finally {
          exit(throwOnTimeout);
        }
      }

      @Override public void close() throws IOException {
        boolean throwOnTimeout = false;
        try {
          source.close();
          throwOnTimeout = true;
        } catch (IOException e) {
          throw exit(e);
        } finally {
          exit(throwOnTimeout);
        }
      }

      @Override public Timeout timeout() {
        return AsyncTimeout.this;
      }

      @Override public String toString() {
        return "AsyncTimeout.source(" + source + ")";
      }
    };
  }

Okio#source()主要是定义了如何读取数据,而AsyncTimeout#source()定义了读取数据的超时机制。在 read() 方法中,主要体现在 enter() 以及 exit() 这两个方法上。

思考一下当前的场景,当前线程正在读取数据,那管理超时必然需要另一个线程。在 AsyncTimeout 中就定义了 WatchDog 线程,来监测超时的读写。

WatchDog 线程,俗名 “看门狗”,其在嵌入式方面应用非常多。而Android系统中的SystemServer是否存在耗时操作的监测也是由一个 WatchDog 线程来实现的。此处的 WatchDog 线程是由第一次 enter() 进入所触发的,起用后便被设置成守护线程运行,并且线程以链表的形式来管理。与此同时, enter() 也会添加一个新的 AsyncTimeout 节点到 WatchDog 线程中。而 exit() 则会检查节点是否还在链表中,否则时间到了不在链表中就会被认为是超时状态了,从而抛出超时的异常。具体我们来看看代码的实现。

public final void enter() {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline();
    if (timeoutNanos == 0 && !hasDeadline) {
      return; // No timeout and no deadline? Don't bother with the queue.
    }
    inQueue = true;
    scheduleTimeout(this, timeoutNanos, hasDeadline);
  }

同 enter() 调用 scheduleTimeout() ,调度起一个 Timeout。

private static synchronized void scheduleTimeout(
      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // Start the watchdog thread and create the head node when the first timeout is scheduled.
// 链表为空说明 Watchdog 还没有启动起来,这里添加了头结点,并且启动了 watchdog 线程。
    if (head == null) {
      head = new AsyncTimeout();
      new Watchdog().start();
    }

// 分不同的条件讲计算终止时间
    long now = System.nanoTime();
    if (timeoutNanos != 0 && hasDeadline) {
      // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
      // Math.min() is undefined for absolute values, but meaningful for relative ones.
      node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if (timeoutNanos != 0) {
      node.timeoutAt = now + timeoutNanos;
    } else if (hasDeadline) {
      node.timeoutAt = node.deadlineNanoTime();
    } else {
      throw new AssertionError();
    }

// 按剩余时间升序排序
    // Insert the node in sorted order.
    long remainingNanos = node.remainingNanos(now);
    for (AsyncTimeout prev = head; true; prev = prev.next) {
      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
        node.next = prev.next;
        prev.next = node;
        if (prev == head) {
          AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
        }
        break;
      }
    }
  }

以上便是插入了一个 AsyncTimeout 的节点。再来看看 Watchdog 线程是如何监测超时的。

private static final class Watchdog extends Thread {
    Watchdog() {
      super("Okio Watchdog");
      setDaemon(true);
    }

    public void run() {
      while (true) {
        try {
          AsyncTimeout timedOut;
          synchronized (AsyncTimeout.class) {
            timedOut = awaitTimeout();

            // Didn't find a node to interrupt. Try again.
            if (timedOut == null) continue;

            // The queue is completely empty. Let this thread exit and let another watchdog thread
            // get created on the next call to scheduleTimeout().
            if (timedOut == head) {
              head = null;
              return;
            }
          }

          // Close the timed out node.
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
  }

重点在 awaitTimeout (),即等待一下,看看是否有 AsyncTimeout 超时了。

/**
   * Removes and returns the node at the head of the list, waiting for it to time out if necessary.
   * This returns {@link #head} if there was no node at the head of the list when starting, and
   * there continues to be no node after waiting {@code IDLE_TIMEOUT_NANOS}. It returns null if a
   * new node was inserted while waiting. Otherwise this returns the node being waited on that has
   * been removed.
   */
  static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
    // Get the next eligible node.
    AsyncTimeout node = head.next;

    // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
    if (node == null) {
      long startNanos = System.nanoTime();
      AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
      return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
          ? head  // The idle timeout elapsed.
          : null; // The situation has changed.
    }

    long waitNanos = node.remainingNanos(System.nanoTime());

    // The head of the queue hasn't timed out yet. Await that.
    if (waitNanos > 0) {
      // Waiting is made complicated by the fact that we work in nanoseconds,
      // but the API wants (millis, nanos) in two arguments.
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
      return null;
    }

    // The head of the queue has timed out. Remove it.
    head.next = node.next;
    node.next = null;
    return node;
  }

首先找到剩余时间最小的那个节点,按照前面升序排序,也就是 head.next 节点。然后再检查剩余时间是否大于 0 ,说明还没有超时,这时便将等待到其剩余时间,并且返回空,表示并未超时。这里提一下的是 class.wait(time) 方法,如果时间没有到,收到对应 class 的 notify()/notifyAll() 也是会提前被唤醒的。假设这里并没有超时,最后再来看看 exit()。

/**
   * Throws an IOException if {@code throwOnTimeout} is {@code true} and a timeout occurred. See
   * {@link #newTimeoutException(java.io.IOException)} for the type of exception thrown.
   */
  final void exit(boolean throwOnTimeout) throws IOException {
    boolean timedOut = exit();
    if (timedOut && throwOnTimeout) throw newTimeoutException(null);
  }

  /**
   * Returns either {@code cause} or an IOException that's caused by {@code cause} if a timeout
   * occurred. See {@link #newTimeoutException(java.io.IOException)} for the type of exception
   * returned.
   */
  final IOException exit(IOException cause) throws IOException {
    if (!exit()) return cause;
    return newTimeoutException(cause);
  }

  /** Returns true if the timeout occurred. */
  public final boolean exit() {
    if (!inQueue) return false;
    inQueue = false;
    return cancelScheduledTimeout(this);
  }

exit() 方法又重载了 3 个,很显然,前面 2 个主要是调用第 3 个来判断是否超时以便进一步的抛出超异常。这里主要来看看第 3 个。其又进一步调用了 cancelScheduledTimeout() 方法,这是与 scheduledTimeout()相对应的方法。

/** Returns true if the timeout occurred. */
  private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
    // Remove the node from the linked list.
    for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
      if (prev.next == node) {
        prev.next = node.next;
        node.next = null;
        return false;
      }
    }

    // The node wasn't found in the linked list: it must have timed out!
    return true;
  }

从链表中寻找当前结点,如果找到则从链接中移除并表示没有超时。如果没有找到,则表示已经超时了。

至此,异步超时机制分析完毕。如果看前面的分析还有点晕,不防来看看总结:
(1) 管理超时机制的数据结构是一个升序链表,而管理这个链表的是守护线程 Watchdog ,它会每次选择一个节点,并通过剩余时间的等待来等待当前 AsyncTimeout 的超时与否。
(2) 通过 enter() 方法,再经过 scheduleTimeout() 方法添加一个 AsyncTimeout节点
(3) 通过 exit() 方法,再经过 cancleScheduleTimeut() 方法将未超时的 AsyncTimeout 从链表中移除。移除后,当 Watchdog 进入下一次遍历时就不会再遍历到被它等待超时的那个节点了。

五、总结

感谢你能读到并读完这篇文章,对于 Okio 的全部分析到此为止了。文章确实贴了不少的源码,但我仍然觉得是有必要的,它能使我们读起来更加的通畅以及完整。当然,也许还有更好的表述方式。本文并没有分析到了 Okio 所有的方方面面,但我相信能够认真读完并且理解其中大部分的意思,再去分析其他部分应该也非常简单了。最后,由于本人水平有限,在分析过程中难免会有错误,表述上也会存在不尽人意的地方。对于一切问题,欢迎下方留言讨论,不甚感激。