我一直都想自己对Java IO的API做一个彻底的封装,和原生IO接口来个了断,结果一直因为各种原因没去做。在了解了OKio之后,就更加没有动力去封装原生接口了。
下面是Java IO输入部分的架构
需要说明的是,以上并不是Java IO框架的全部,只是例举一些大家可能有印象的类,并且省去了很多继承自这些类的的子类。看一看上面的结构图,你就知道什么叫复杂了。观察上图,我们至少可以吐槽以下几点:
- IO接口的实现类太多
- 每个类基本对应一种IO需求,导致它的体系十分庞大
当然,Java中出现这种庞大的IO体系是有它的历史原因的,这是使用装饰者模式来构建和拓展的Java IO体系的必然结果。因此我们也不必过分苛责。
俗话说得好, 文字定义终觉浅,绝知此事要上图
那么,在OKio 的帮助下,完成一次读写操作又是怎样的呢?
// 写入数据 String fileName="test.txt"; String path= Environment.getExternalStorageDirectory().getPath(); File file=null; BufferedSink bufferSink=null; try{ file=new File(path,fileName); if (!file.exists()){ file.createNewFile(); } bufferSink=Okio.buffer(Okio.sink(file)); bufferSink.writeString("this is some thing import \n", Charset.forName("utf-8")); bufferSink.writeString("this is also some thing import \n", Charset.forName("utf-8")); bufferSink.close(); }catch(Exception e){ }//读取数据 try { BufferedSource bufferedSource=Okio.buffer(Okio.source(file)); String str=bufferedSource.readByteString().string(Charset.forName("utf-8")); Log.e("TAG","--->"+str); } catch (Exception e) { e.printStackTrace(); }复制代码
//一行一行的读出数据 try { BufferedSource bufferedSource=Okio.buffer(Okio.source(file)); Log.e("TAG-string","--->"+bufferedSource.readUtf8Line()); Log.e("TAG-string","--->"+bufferedSource.readUtf8Line()); Log.e("TAG-string","--->"+bufferedSource.readUtf8Line()); bufferedSource.close(); } catch (Exception e) { e.printStackTrace(); }复制代码
再比如,你可以直接读写Java数据类型等等,可以说,OKio非常优雅的满足了Java IO的绝大部分需求。却有没有Java原生IO的繁琐。
public interface Sink extends Closeable, Flushable { //通过缓冲区写入数据 void write(Buffer source, long byteCount) throws IOException;//刷新 (缓冲区) @Override void flush() throws IOException;//超时机制 Timeout timeout();//关闭写操作 @Override void close() throws IOException;}复制代码
public interface BufferedSink extends Sink { Buffer buffer(); BufferedSink write(ByteString byteString) throws IOException; BufferedSink write(byte[] source) throws IOException; BufferedSink write(byte[] source, int offset, int byteCount) throws IOException; long writeAll(Source source) throws IOException; BufferedSink write(Source source, long byteCount) throws IOException; BufferedSink writeUtf8(String string) throws IOException; BufferedSink writeUtf8(String string, int beginIndex, int endIndex) throws IOException; BufferedSink writeUtf8CodePoint(int codePoint) throws IOException; BufferedSink writeString(String string, Charset charset) throws IOException; BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset) throws IOException; BufferedSink writeByte(int b) throws IOException; BufferedSink writeShort(int s) throws IOException; BufferedSink writeShortLe(int s) throws IOException; BufferedSink writeInt(int i) throws IOException; BufferedSink writeIntLe(int i) throws IOException; BufferedSink writeLong(long v) throws IOException; BufferedSink writeLongLe(long v) throws IOException; BufferedSink writeDecimalLong(long v) throws IOException; BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException; BufferedSink emitCompleteSegments() throws IOException; BufferedSink emit() throws IOException; OutputStream outputStream();}复制代码
final class RealBufferedSink implements BufferedSink {//实例化一个缓冲区,用于保存需要写入的数据。 public final Buffer buffer = new Buffer(); public final Sink sink; boolean closed; RealBufferedSink(Sink sink) { if (sink == null) throw new NullPointerException("sink == null"); this.sink = sink; } @Override public Buffer buffer() { return buffer; } //通过缓冲区把ByteString类型的数据写入 @Override public BufferedSink write(ByteString byteString) throws IOException { if (closed) throw new IllegalStateException("closed"); buffer.write(byteString); //完成写入 return emitCompleteSegments(); }//通过缓冲区把String类型的数据写入 @Override public BufferedSink writeString(String string, Charset charset) throws IOException { if (closed) throw new IllegalStateException("closed"); buffer.writeString(string, charset); return emitCompleteSegments(); }......//通过缓冲区把byte数组中的数据写入 @Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException { if (closed) throw new IllegalStateException("closed"); buffer.write(source, offset, byteCount); //完成写入 return emitCompleteSegments(); }//完成写入 @Override public BufferedSink emitCompleteSegments() throws IOException { if (closed) throw new IllegalStateException("closed"); long byteCount = buffer.completeSegmentByteCount(); if (byteCount > 0) sink.write(buffer, byteCount); return this; }.........}复制代码
public final class Buffer implements BufferedSource, BufferedSink, Cloneable { private static final byte[] DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; static final int REPLACEMENT_CHARACTER = '\ufffd'; Segment head; long size; public Buffer() { } /** Returns the number of bytes currently in this buffer. */ public long size() { return size; } //写入String类型的数据@Override public Buffer writeString(String string, Charset charset) { //调用下面的方法 return writeString(string, 0, string.length(), charset); }//准备写入String数据 @Override public Buffer writeString(String string, int beginIndex, int endIndex, Charset charset) { if (string == null) throw new IllegalArgumentException("string == null"); if (beginIndex < 0) throw new IllegalAccessError("beginIndex < 0: " + beginIndex); if (endIndex < beginIndex) { throw new IllegalArgumentException("endIndex < beginIndex: " + endIndex + " < " + beginIndex); } if (endIndex > string.length()) { throw new IllegalArgumentException( "endIndex > string.length: " + endIndex + " > " + string.length()); } if (charset == null) throw new IllegalArgumentException("charset == null"); //假如是utf-8编码的数据,则调用writeUtf8() if (charset.equals(Util.UTF_8)) return writeUtf8(string, beginIndex, endIndex); //否则,将String转化为byte类型的数据 byte[] data = string.substring(beginIndex, endIndex).getBytes(charset); //然后执行write(),写入byte数组 return write(data, 0, data.length); } //offset:写入数据的数组下标起点, //byteCount :写入数据的长度 @Override public Buffer write(byte[] source, int offset, int byteCount) { if (source == null) throw new IllegalArgumentException("source == null"); //做一些检查工作 checkOffsetAndCount(source.length, offset, byteCount); int limit = offset + byteCount; //开始循环写入数据 while (offset < limit) { //Segment??黑人问号脸?? //我们不妨把Segment先看成一种类似数组结构的容器 //这个方法就是获取一个数据容器 Segment tail = writableSegment(1); // limit - offset是代写入的数据的长度 // Segment.SIZE - tail.limit是这个容器剩余空间的长度 int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit); //调用Java方法把数据复制到容器中。 System.arraycopy(source, offset, tail.data, tail.limit, toCopy); //记录相关偏移量 offset += toCopy; tail.limit += toCopy; } //增加buffer的size size += byteCount; return this; } //获取一个SegmentSegment writableSegment(int minimumCapacity) { if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException(); if (head == null) { 假如当前Segment为空,则从Segment池中拿到一个 head = SegmentPool.take(); // Acquire a first segment. return head.next = head.prev = head; } //获取当前Segment的前一个Segment //看来这是一个链表结构没跑了 Segment tail = head.prev; //检查这个Segment容器是否有剩余空间可供写入 if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) { //假如没有,则拿一个新的的Segment来代替这个(即链表的下一个) tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up. } return tail; } }复制代码
final class Segment { /** The size of all segments in bytes. */ static final int SIZE = 8192; /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */ static final int SHARE_MINIMUM = 1024;//segment中保存数据的数组 final byte[] data; /** The next byte of application data byte to read in this segment. */ int pos; /** The first byte of available data ready to be written to. */ int limit; /** True if other segments or byte strings use the same byte array. */ boolean shared; /** True if this segment owns the byte array and can append to it, extending {@code limit}. */ boolean owner; /** Next segment in a linked or circularly-linked list. */ Segment next; /** Previous segment in a circularly-linked list. */ Segment prev; Segment() { this.data = new byte[SIZE]; this.owner = true; this.shared = false; } Segment(Segment shareFrom) { this(shareFrom.data, shareFrom.pos, shareFrom.limit); shareFrom.shared = true; } //创建一个Segment Segment(byte[] data, int pos, int limit) { this.data = data; this.pos = pos; this.limit = limit; this.owner = false; this.shared = true; } //从链表中移除一个segment /** * Removes this segment of a circularly-linked list and returns its successor. * Returns null if the list is now empty. */ public Segment pop() { Segment result = next != this ? next : null; prev.next = next; next.prev = prev; next = null; prev = null; return result; }//从链表中添加一个segment /** * Appends {@code segment} after this segment in the circularly-linked list. * Returns the pushed segment. */ public Segment push(Segment segment) { segment.prev = this; segment.next = next; next.prev = segment; next = segment; return segment; }//下面这些方法主要是在Segment内部做一些存储的优化用的 /** * Splits this head of a circularly-linked list into two segments. The first * segment contains the data in {@code [pos..pos+byteCount)}. The second * segment contains the data in {@code [pos+byteCount..limit)}. This can be * useful when moving partial segments from one buffer to another. * *Returns the new head of the circularly-linked list. */ public Segment split(int byteCount) { ... ... ... } /** * Call this when the tail and its predecessor may both be less than half * full. This will copy data so that segments can be recycled. */ public void compact() { ... ... ... } /** Moves {@code byteCount} bytes from this segment to {@code sink}. */ public void writeTo(Segment sink, int byteCount) { ... ... ... }}复制代码
final class SegmentPool { /** The maximum number of bytes to pool. */ // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments? static final long MAX_SIZE = 64 * 1024; // 64 KiB. /** Singly-linked list of segments. */ static Segment next; /** Total bytes in this pool. */ static long byteCount; private SegmentPool() { }//获取一个闲置的Segment static Segment take() { synchronized (SegmentPool.class) { if (next != null) { Segment result = next; next = result.next; result.next = null; byteCount -= Segment.SIZE; return result; } } return new Segment(); // Pool is empty. Don't zero-fill while holding a lock. } //回收一个闲置的Segment static void recycle(Segment segment) { if (segment.next != null || segment.prev != null) throw new IllegalArgumentException(); if (segment.shared) return; // This segment cannot be recycled. synchronized (SegmentPool.class) { if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full. byteCount += Segment.SIZE; segment.next = next; segment.pos = segment.limit = 0; next = segment; } }}复制代码
//每次写入完,我们会调用close()方法,最终都会调用到这里 @Override public void close() throws IOException { //如果已经关闭,则直接返回 if (closed) return; // Emit buffered data to the underlying sink. If this fails, we still need // to close the sink; otherwise we risk leaking resources. Throwable thrown = null; try { //只要buffer中有数据,就一次性写入 if (buffer.size > 0) { sink.write(buffer, buffer.size); } } catch (Throwable e) { thrown = e; } try { sink.close(); } catch (Throwable e) { if (thrown == null) thrown = e; } closed = true; if (thrown != null) Util.sneakyRethrow(thrown); }复制代码
sink.write(buffer, buffer.size);这个方法才是真正的写入数据到文件,这个sink只是一个接口,那么它的实现类在哪里呢?
String fileName="test.txt"; String path= Environment.getExternalStorageDirectory().getPath(); File file=null; BufferedSink bufferSink=null; try{ file=new File(path,fileName); if (!file.exists()){ file.createNewFile(); } //这是非常关键的一步,Okio.sink(file)就是创建Sink的实现类 bufferSink=Okio.buffer(Okio.sink(file)); bufferSink.writeString("this is some thing import \n", Charset.forName("utf-8")); bufferSink.writeString("this is also some thing import \n", Charset.forName("utf-8")); bufferSink.close(); }catch(Exception e){ }复制代码
//会往下调用 /** Returns a sink that writes to {@code file}. */ public static Sink sink(File file) throws FileNotFoundException { if (file == null) throw new IllegalArgumentException("file == null"); //构建一个输出流 return sink(new FileOutputStream(file)); } //会往下调用 /** Returns a sink that writes to {@code out}. */ public static Sink sink(OutputStream out) { return sink(out, new Timeout()); } //在这里创建一个sink的实现类 private static Sink sink(final OutputStream out, final Timeout timeout) { if (out == null) throw new IllegalArgumentException("out == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0) { timeout.throwIfReached(); Segment head = source.head; int toCopy = (int) Math.min(byteCount, head.limit - head.pos); //最后使用的依然是Java 原生的api来实现数据的真正写入 out.write(head.data, head.pos, toCopy); head.pos += toCopy; byteCount -= toCopy; source.size -= toCopy; if (head.pos == head.limit) { source.head = head.pop(); SegmentPool.recycle(head); } } } @Override public void flush() throws IOException { out.flush(); } @Override public void close() throws IOException { out.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "sink(" + out + ")"; } }; }复制代码
- 通过外部传入File,Socket,或者OutputStream类型来构建一个输入流
- OKio内部创建一个缓存区,并返回一个BufferSink
- 通过这个BufferSink来实现写入各种数据,实际上都存入了缓存区
- 最终调用close()方法,一次定把缓存区的数据写入到文件中