OKio - 重新定义了“短小精悍”的IO框架

13,778 阅读14分钟

前言

其实接触Square的这款IO框架还是因为okHttp这个网络框架,因为他的网络IO是通过OKio来完成的。不过,对于Java原生IO体系我却是早已心怀不满。基本上我很排斥写Java的IO部分,因为写起来很麻烦和笨重,有多排斥呢?

我记得大学那会儿,准备写一个编译器,在读取代码的那个IO部分用的python来完成的,然后在Java层来接收字符。

我就是这么不喜欢Java原生IO体系。

我一直都想自己对Java IO的API做一个彻底的封装,和原生IO接口来个了断,结果一直因为各种原因没去做。在了解了OKio之后,就更加没有动力去封装原生接口了。

今天借着这个机会,向大家介绍这个短小精悍的IO框架,顺便也和大家探讨一下封装的相关问题,希望通过这篇文章,大家能够乐于放弃原生的IO接口,转而使用这款IO框架来作为自己日常开发的工具。


原生IO:没那么简单

在聊OKio之前,我们还是先复习一下Java原生IO体系。

下面是Java IO输入部分的架构

需要说明的是,以上并不是Java IO框架的全部,只是例举一些大家可能有印象的类,并且省去了很多继承自这些类的的子类。看一看上面的结构图,你就知道什么叫复杂了。观察上图,我们至少可以吐槽以下几点:

  • IO接口的实现类太多
  • 每个类基本对应一种IO需求,导致它的体系十分庞大

当然,Java中出现这种庞大的IO体系是有它的历史原因的,这是使用装饰者模式来构建和拓展的Java IO体系的必然结果。因此我们也不必过分苛责。


OKio:就是这么简单

说完了Java原生IO接口的种种问题之后,我们可以开始来聊一聊OKio这个框架了。那么,它到底是一种什么样的框架呢?

俗话说得好, 文字定义终觉浅,绝知此事要上图

从上面可以看到,其实OKio是对于Java原生IO接口的一次封装。一次成功的封装。

那么,在OKio 的帮助下,完成一次读写操作又是怎样的呢?

// 写入数据
 String fileName="test.txt";
        String path= Environment.getExternalStorageDirectory().getPath();
        File file=null;
        BufferedSink bufferSink=null;
        try{
            file=new File(path,fileName);
            if (!file.exists()){
                file.createNewFile();
            }
            bufferSink=Okio.buffer(Okio.sink(file));
            bufferSink.writeString("this is some thing import \n", Charset.forName("utf-8"));
            bufferSink.writeString("this is also some thing import \n", Charset.forName("utf-8"));
            bufferSink.close();

        }catch(Exception e){

        }


//读取数据
 try {
            BufferedSource bufferedSource=Okio.buffer(Okio.source(file));
            String str=bufferedSource.readByteString().string(Charset.forName("utf-8"));
            Log.e("TAG","--->"+str);
        } catch (Exception e) {
            e.printStackTrace();
        }

以上是我随手写的一个文件的写入和读取操作,可以看到,整个过程其实是非常简单的,不过这并不是重点,重点是写入和读取的方式和数据类型都十分的灵活,

是的,十分灵活

比如,读取数据可以很轻松的一行一行的读取:


//一行一行的读出数据
        try {
            BufferedSource bufferedSource=Okio.buffer(Okio.source(file));
            Log.e("TAG-string","--->"+bufferedSource.readUtf8Line());
            Log.e("TAG-string","--->"+bufferedSource.readUtf8Line());
            Log.e("TAG-string","--->"+bufferedSource.readUtf8Line());
            bufferedSource.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

再比如,你可以直接读写Java数据类型等等,可以说,OKio非常优雅的满足了Java IO的绝大部分需求。却有没有Java原生IO的繁琐。


OKio详解

上文写的一些实例代码解释不多,当你仔细的了解了OKio这个框架之后,你就会理解上面每一行示例代码所代表的意思。

好了,我们还是从这张图来切入

上面可以看到,实际上Sink和Source是OKio中的最基本的接口,大概相当于OutputStream和InputStream在原生接口中的地位。

我们以输出相关的Sink接口为例

public interface Sink extends Closeable, Flushable {
  //通过缓冲区写入数据
  void write(Buffer source, long byteCount) throws IOException;
//刷新 (缓冲区)
  @Override void flush() throws IOException;
//超时机制
  Timeout timeout();
//关闭写操作
  @Override void close() throws IOException;
}

上面的写入操作最基础的接口,当然,你看到了Buffer和flush()这个方法,这也就意味着写入操作很可能是围绕缓冲区来进行的,事实上确实是这样,我们往后看。

Sink下面一层接口是BufferedSink:

public interface BufferedSink extends Sink {
  Buffer buffer();
  BufferedSink write(ByteString byteString) throws IOException;
  BufferedSink write(byte[] source) throws IOException;
  BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
  long writeAll(Source source) throws IOException;
  BufferedSink write(Source source, long byteCount) throws IOException;
  BufferedSink writeUtf8(String string) throws IOException;
  BufferedSink writeUtf8(String string, int beginIndex, int endIndex) throws IOException;
  BufferedSink writeUtf8CodePoint(int codePoint) throws IOException;
  BufferedSink writeString(String string, Charset charset) throws IOException;
  BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset)
      throws IOException;
  BufferedSink writeByte(int b) throws IOException;
  BufferedSink writeShort(int s) throws IOException;
  BufferedSink writeShortLe(int s) throws IOException;
  BufferedSink writeInt(int i) throws IOException;
  BufferedSink writeIntLe(int i) throws IOException;
  BufferedSink writeLong(long v) throws IOException;
  BufferedSink writeLongLe(long v) throws IOException;
  BufferedSink writeDecimalLong(long v) throws IOException;
  BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException;
  BufferedSink emitCompleteSegments() throws IOException;
  BufferedSink emit() throws IOException;
  OutputStream outputStream();
}

其实上面的接口也很明了,就是在基本接口的基础上,定义各式各样的写入方式。

真正实现上面这些接口的类则是RealBufferedSink,我摘取部分代码作为说明


final class RealBufferedSink implements BufferedSink {
//实例化一个缓冲区,用于保存需要写入的数据。
  public final Buffer buffer = new Buffer();
  public final Sink sink;
  boolean closed;
  RealBufferedSink(Sink sink) {
    if (sink == null) throw new NullPointerException("sink == null");
    this.sink = sink;
  }

  @Override public Buffer buffer() {
    return buffer;
  }
    //通过缓冲区把ByteString类型的数据写入
  @Override public BufferedSink write(ByteString byteString) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(byteString);
    //完成写入
    return emitCompleteSegments();
  }

//通过缓冲区把String类型的数据写入
  @Override public BufferedSink writeString(String string, Charset charset) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.writeString(string, charset);
    return emitCompleteSegments();
  }
...
...

//通过缓冲区把byte数组中的数据写入
  @Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source, offset, byteCount);
    //完成写入
    return emitCompleteSegments();
  }
//完成写入
  @Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }
...
...
...

}

ByteString内部可以保存byte类型的数据,作为一个工具类,它可以把byte转为String,这个String可以是utf8的值,也可以是base64后的值,也可以是md5的值等等

上面只是一部分吗的代码,但是大家也能看到buffer这个变量反复出现,而且深度参与了写入数据的过程,我们可以一起去看看,着重看上面涉及到的几个方法

public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
  private static final byte[] DIGITS =
      { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  static final int REPLACEMENT_CHARACTER = '\ufffd';

  Segment head;
  long size;

  public Buffer() {
  }

  /** Returns the number of bytes currently in this buffer. */
  public long size() {
    return size;
  }


 //写入String类型的数据
@Override 
public Buffer writeString(String string, Charset charset) {
    //调用下面的方法
    return writeString(string, 0, string.length(), charset);
  }
//准备写入String数据
  @Override
  public Buffer writeString(String string, int beginIndex, int endIndex, Charset charset) {
    if (string == null) throw new IllegalArgumentException("string == null");
    if (beginIndex < 0) throw new IllegalAccessError("beginIndex < 0: " + beginIndex);
    if (endIndex < beginIndex) {
      throw new IllegalArgumentException("endIndex < beginIndex: " + endIndex + " < " + beginIndex);
    }
    if (endIndex > string.length()) {
      throw new IllegalArgumentException(
          "endIndex > string.length: " + endIndex + " > " + string.length());
    }
    if (charset == null) throw new IllegalArgumentException("charset == null");
    //假如是utf-8编码的数据,则调用writeUtf8()
    if (charset.equals(Util.UTF_8)) return writeUtf8(string, beginIndex, endIndex);
    //否则,将String转化为byte类型的数据
    byte[] data = string.substring(beginIndex, endIndex).getBytes(charset);
    //然后执行write(),写入byte数组
    return write(data, 0, data.length);
  }


  //offset:写入数据的数组下标起点,
  //byteCount :写入数据的长度
    @Override 
public Buffer write(byte[] source, int offset, int byteCount) {
    if (source == null) throw new IllegalArgumentException("source == null");
    //做一些检查工作
    checkOffsetAndCount(source.length, offset, byteCount);

    int limit = offset + byteCount;
    //开始循环写入数据
    while (offset < limit) {
    //Segment??黑人问号脸??
    //我们不妨把Segment先看成一种类似数组结构的容器
    //这个方法就是获取一个数据容器
      Segment tail = writableSegment(1);
    // limit - offset是代写入的数据的长度
    // Segment.SIZE - tail.limit是这个容器剩余空间的长度
      int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
      //调用Java方法把数据复制到容器中。
      System.arraycopy(source, offset, tail.data, tail.limit, toCopy);
      //记录相关偏移量
      offset += toCopy;
      tail.limit += toCopy;
    }
    //增加buffer的size
    size += byteCount;
    return this;
  }

  //获取一个Segment
Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
    if (head == null) {
    假如当前Segment为空,则从Segment池中拿到一个
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }
    //获取当前Segment的前一个Segment
    //看来这是一个链表结构没跑了
    Segment tail = head.prev;
    //检查这个Segment容器是否有剩余空间可供写入 
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
      //假如没有,则拿一个新的的Segment来代替这个(即链表的下一个)
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
    }
    return tail;
  }

  }

好了,现在我们基本上揭开了OKio框架中隐藏的最重要的一个东西,数据缓存机制,主要包括Buffer,Segment,SegmentPool,

后两者主要集中在Buffer类中运用,数据是通过Buffer写入一个叫Segment容器中的。

关于SegmentPool,其实它的存在很简单,保存暂时不用的数据容器,防止频繁GC,基本上所有的XX池的作用的是这样,防止已申请的资源被回收,增加资源的重复利用,提高效率,减少GC,避免内存抖动....

关于Segment,我们已经知道它是一个数据容器,而且是一个链表结构,根据它有prev和next两个引用变量可以推测,其实它是一个双向链表,为了照顾某些数据结构比较弱的同学,特意画了一下

大概就是这个样子。下面我们在深入去了解这个Segment的代码细节

final class Segment {
  /** The size of all segments in bytes. */
  static final int SIZE = 8192;

  /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
  static final int SHARE_MINIMUM = 1024;

//segment中保存数据的数组
  final byte[] data;

  /** The next byte of application data byte to read in this segment. */
  int pos;

  /** The first byte of available data ready to be written to. */
  int limit;

  /** True if other segments or byte strings use the same byte array. */
  boolean shared;

  /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
  boolean owner;

  /** Next segment in a linked or circularly-linked list. */
  Segment next;

  /** Previous segment in a circularly-linked list. */
  Segment prev;

  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;
    this.shared = false;
  }

  Segment(Segment shareFrom) {
    this(shareFrom.data, shareFrom.pos, shareFrom.limit);
    shareFrom.shared = true;
  }

  //创建一个Segment
  Segment(byte[] data, int pos, int limit) {
    this.data = data;
    this.pos = pos;
    this.limit = limit;
    this.owner = false;
    this.shared = true;
  }

    //从链表中移除一个segment
  /**
   * Removes this segment of a circularly-linked list and returns its successor.
   * Returns null if the list is now empty.
   */
  public Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

//从链表中添加一个segment
  /**
   * Appends {@code segment} after this segment in the circularly-linked list.
   * Returns the pushed segment.
   */
  public Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }


//下面这些方法主要是在Segment内部做一些存储的优化用的
  /**
   * Splits this head of a circularly-linked list into two segments. The first
   * segment contains the data in {@code [pos..pos+byteCount)}. The second
   * segment contains the data in {@code [pos+byteCount..limit)}. This can be
   * useful when moving partial segments from one buffer to another.
   *
   * <p>Returns the new head of the circularly-linked list.
   */
  public Segment split(int byteCount) {
    ...
    ...
    ...
  }

  /**
   * Call this when the tail and its predecessor may both be less than half
   * full. This will copy data so that segments can be recycled.
   */
  public void compact() {
    ...
    ...
    ...
  }

  /** Moves {@code byteCount} bytes from this segment to {@code sink}. */
  public void writeTo(Segment sink, int byteCount) {
   ...
   ...
   ...
  }
}

其实整体来看,Segment的结构还是非常简单的。SegmentPool我们也可以顺手看了,因为也很简单

final class SegmentPool {
  /** The maximum number of bytes to pool. */
  // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.

  /** Singly-linked list of segments. */
  static Segment next;

  /** Total bytes in this pool. */
  static long byteCount;

  private SegmentPool() {
  }
//获取一个闲置的Segment
  static Segment take() {
    synchronized (SegmentPool.class) {
      if (next != null) {
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      }
    }
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
  }
    //回收一个闲置的Segment
  static void recycle(Segment segment) {
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    if (segment.shared) return; // This segment cannot be recycled.
    synchronized (SegmentPool.class) {
      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
      byteCount += Segment.SIZE;
      segment.next = next;
      segment.pos = segment.limit = 0;
      next = segment;
    }
  }
}

SegmentPool是通过一个单向的链表结构构成的池,你问我为啥他不用双向链表?因为没必要,Segment池中所有闲置的对象都是一样的,只要保证每次能从其中获取到一个对象即可,因此不必用双向链表结构来实现。

那么Segment中使用双向链表的结构来构造节点是为什么呢?那是因为使用双向链表结构的话,数据的复制和转移,以及Segment内部做相关的优化都十分方便和高效。

好了,我们现在可以理一理了,在RealBufferedSink这个实现类中,数据从以各种形式写入到其Buffer里,而Buffer通过Segment和SegmentPool来管理这些缓存的数据,目前为止,数据还没有真正写入到文件中,只是保存在缓存里,

那么数据真正写入文件是在什么时候呢?

答案是在Close()方法中,我们可以看看RealBufferedSink这个类的close()方法

  //每次写入完,我们会调用close()方法,最终都会调用到这里
  @Override public void close() throws IOException {
    //如果已经关闭,则直接返回
    if (closed) return;

    // Emit buffered data to the underlying sink. If this fails, we still need
    // to close the sink; otherwise we risk leaking resources.
    Throwable thrown = null;
    try {
    //只要buffer中有数据,就一次性写入
      if (buffer.size > 0) {
        sink.write(buffer, buffer.size);
      }
    } catch (Throwable e) {
      thrown = e;
    }

    try {
      sink.close();
    } catch (Throwable e) {
      if (thrown == null) thrown = e;
    }
    closed = true;

    if (thrown != null) Util.sneakyRethrow(thrown);
  }

sink.write(buffer, buffer.size);这个方法才是真正的写入数据到文件,这个sink只是一个接口,那么它的实现类在哪里呢?

我们在回看最开头关于写入数据的示例代码:

        String fileName="test.txt";
        String path= Environment.getExternalStorageDirectory().getPath();
        File file=null;
        BufferedSink bufferSink=null;
        try{
            file=new File(path,fileName);
            if (!file.exists()){
                file.createNewFile();
            }
            //这是非常关键的一步,Okio.sink(file)就是创建Sink的实现类
            bufferSink=Okio.buffer(Okio.sink(file));

            bufferSink.writeString("this is some thing import \n", Charset.forName("utf-8"));
            bufferSink.writeString("this is also some thing import \n", Charset.forName("utf-8"));
            bufferSink.close();

        }catch(Exception e){

        }

我们在进入OKio类中看看这个sink(file)方法:

    //会往下调用
  /** Returns a sink that writes to {@code file}. */
  public static Sink sink(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    //构建一个输出流
    return sink(new FileOutputStream(file));
  }

  //会往下调用
    /** Returns a sink that writes to {@code out}. */
  public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }

  //在这里创建一个sink的实现类
   private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          //最后使用的依然是Java 原生的api来实现数据的真正写入
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      @Override public void flush() throws IOException {
        out.flush();
      }

      @Override public void close() throws IOException {
        out.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };
  }

好了,关于数据写入,整个来龙去脉我们基本上都讲完了。

读取的过程以此类推,先读入缓存区,在从缓存区中读,没有太大的区别。

总结

我们可以再回顾一下:

  • 通过外部传入File,Socket,或者OutputStream类型来构建一个输入流
  • OKio内部创建一个缓存区,并返回一个BufferSink
  • 通过这个BufferSink来实现写入各种数据,实际上都存入了缓存区
  • 最终调用close()方法,一次定把缓存区的数据写入到文件中

虽然它内部对于数据类型的转换,数据缓存的优化我并没有提到,但是也无伤大雅,因为只要你;理解了它的缓冲区的设计,那么这个IO框架的优点和高效的地方就一目了然了;当然,OKio也号称能高效的使用NIO来进行读写,不过客户端基本上用不上这样的功能,所以也不做考究。

我们再回过头来看OKio的框架,就可以明白一些事情,为什么他可以这么简单的实现多种数据类型的读写?原因就在于它实现了一个缓存区,整个IO是基于缓存的。我们的操作都是针对缓存区的,所以可以非常灵活的实现多种数据类型的读写。而我们也看到,最终数据还是通过字节流写入到了文件。

我不知道你从中是否看到了什么关于封装的一些东西,不过我倒是有几分感触想分享给大家。

封装并不应该仅仅局限于把几步重复的代码放在一个方法里然后统一调用,更多的时候,我们应该思考原来框架的缺陷,以解决这些缺陷为目的进行封装,如果在原来的架构上难以解决,则应该在适当的时候往前跨一步,跳出原来框架的局限。

有的时候,一次成功的封装,相当于一次完美的重构。


后记

去年年末没有凑热闹发《年度总结》,所以没机会祝大家新年快乐,在这里祝大家新的一年里工作顺利
!!


勘误

暂无