关于node中的流,只想说让上天知道我不认输!

2,113 阅读17分钟

大ga好,刚写完EventEmitter的我此时十分的兴奋,甚至想来上一篇新的文章!说写就写,让上天知道我不认输!

我们先来介绍介绍今天的主角,流。

流(Stream)

流(stream)是一种在 Node.js 中处理流式数据的抽象接口。 stream 模块提供了一些基础的 API,用于构建实现了流接口的对象。Node.js 提供了多种流对象。 例如,发送到 HTTP 服务器的请求和 process.stdout 都是流的实例。流可以是可读的、可写的、或是可读写的。 所有的流都是 EventEmitter 的实例。

上面是nodejs官网中给出的关于流的一些定义,我们可以从上面得出几个概念

  • 流是一个抽象接口,其实就是一个可继承的类
  • 流是可读的、可写的、或是可读写的(严格上来说分为,读流,写流,双工流,转换流)
  • 并且流是EventEmitter的实例,也就是说流具有发布订阅模式的特点

那么流到底是什么呢,我们先来通过fs中的creatReadStream和createWriteStream来了解一下。

fs.createReadStream

我们通过名字就能知道这个方法是一个继承了可读流的方法,这个方法和我们的fs.read其实作用一样都是用来读取文件的,它的底层也就是利用了fs.read实现的。我们先来看看如何使用

let fs = require('fs');
// 这个方法接受两个参数 一个是读取文件的路径,一个是我们的一些可选参数
let rs = fs.createReadStream('1.txt', {
    flags: 'r', // 读取时候的标注 r表示读取 默认是r
    encoding: 'utf8',// 以什么格式读取 默认是null 那么输出的就是buffer
    mode: 0o666,// 操作权限 一般无需改动 默认0o666
    autoClose: true,// 读取完成之后是否自动关闭文件 默认true
    start: 0,// 从文件的哪个位置开始读取 默认是0
    end: null,// 读取到文件的哪个位置结束 默认是null 全部读完
    highWaterMark: 3 // 一次性读取的水位线 也就是一次最多读多少 默认是64*1024个字节
})
// 可读流继承EventEmitter 因此这个方法就通过on来进行一些操作的监听

// 监听文件打开 open事件会在fs.createReadStream调用的时候就被触发
rs.on('open', function () {
    console.log('open')
});
// 监听data事件,一旦监听了data事件,就会不停进行文件读取 一次读取highWaterMark大小 每次读完都把数据通过data事件进行发射
rs.on('data', function (data) {
    console.log(data);
});
// 监听end事件, 读取完毕之后会触发end事件
rs.on('end', function () {
    console.log('end')
});
// 监听close事件 也就是当文件关闭时候会触发
rs.on('close', function () {
    console.log('close')
});
// 监听error事件,操作过程中只要出现错误就是触发这个事件并且把错误发射出来
rs.on('error', function () {
    console.log('error')
});

我的1.txt文件中是987654321,那么我们执行上面的代码就会输出

假如我们把encoding改成null,会发现输出结果变成了buffer
再假个如,我们把rs.on('data')这个监听事件注释掉,我们可以发现代码输入就变成了只有一个open

ok,我们做了这么多的测试,当然又到了总结的时候了

  • 文件流中有open事件,这个事件只要是fs.createReadStream执行了就会触发
  • 文件流中所有的错误都会把错误发射到error事件中
  • 文件流中只要有监听了data事件,那么文件水流的阀门就相当于被打开了,数据就会不停的被读取,直到读取完毕,读取的时候会根据encoding返回读取的内容,如果encoding为null那么就会返回buffer,
  • 文件流读取完毕后会先触发end事件接着 如果设置了autoClose为true那么就会自动关闭文件就会触发close事件
  • hightWaterMark属性决定了阀门的大小也就是一次性读取多少字节的数据
  • start和end决定了从文件的哪个部分读取

另外 我们的可读流还又pause()和resume()两个方法,pause()是暂停读取,resume()是恢复读取~ 惯例,我们根据特点依然自己来实现一个我们的文件可读流

let fs = require('fs');
let EventEmitter = require('events');
// 文件可读流继承了EventEmitter
class MyReadStream extends EventEmitter{
    /**
     * 构造函数
     * @param {String} path 读取的文件路径
     * @param {object} options 可选参数
     */
    constructor(path,options={}){
        super();
        this.path = path;

        this.flags = options.flags || 'r'; // 文件的读取标志 默认r
        this.encoding = options.encoding || null;// 按照什么编码读取 默认是null也就是buffer
        this.fd = options.fd || ''; // 文件描述符 一般不指定 读取文件的时候会拿到这个值再把结果赋给this.fd 如果文件被打开了 fd一定是一个数字类型
        this.mode = options.mode || 0o666;// 文件的权限 一般都是0o666
        this.autoClose = options.autoClose || true;// 文件是否自动关闭 默认true
        this.start = options.start || 0;// 从文件的哪个位置开始读取 默认是0
        this.end = options.end || null;// 读到文件的哪个位置结束 默认是null 需要注意这里是包前也包后的 也就是说 如果start=0 end=3 就是读取0 1 2 3 共四个位置
        this.highWaterMark = options.highWaterMark || 64*1024;// 一次性读取多少字节数据 默认64*1024

        this.flowing = false;//是否是流动状态
        this.pos = this.start;//记录读取时候的位置

        // 初始化的时候就会触发一次open open的回调我们会拿到文件的描述符也就是fd 不过open 是一个异步方法
        this.open();
        // 当用户监听了data事件后 我们就自动的源源不断的进行文件数据读取 也就是打开了阀门 
        // 因此在这里我们需要去利用eventEmitter中的newListener事件,判断如果当前监听了data 那么我们就执行read方法
        this.on('newListener',(type)=>{
            if(type === 'data'){
                this.flowing = true;
                this.read();
            }
        })
    }
    /**
     * 关闭文件方法
     * @description 关闭文件的时候可能文件被打开了 那么就要调用close方法 如果文件还没有打开就报错了 那么就直接发射close事件
     */
    destroy(){
        if(typeof this.fd === 'number'){
            fs.close(this.fd,()=>{
                this.emit('close')
            })
        }else{
            this.emit('close')
        }
    }
    /**
     * 打开文件
     */
    open(){
        fs.open(this.path,this.flags,(err,fd)=>{
            // 打开文件如果出错了 就把错误发射出去 并且如果设置了自动关闭就调用destroy方法
            if(err){
                this.emit('error',err)
                if(this.autoClose){
                    this.destroy()
                }
                return
            }
            // 如果没有错误 那我们就拿到了fd并且把open事件发射 注意 这个过程是异步的
            this.fd = fd;
            this.emit('open');
        })
    }
    /**
     * 读取文件
     * @description 当用户监听了data就触发
     */
    read(){
        // 因为open是异步的方法 所以当read的时候可能还没有拿到fd 因此这个需要做一次处理
        if(typeof this.fd !== 'number'){
            // 当发现fd还没有读到的时候 我们监听open事件,因为我们当文件读取完毕的时候会触发一次open事件 所以我们在这里监听了open,那么触发的时候也会把我们这个监听器函数执行 为了重复执行 我们用once 保证只执行一次 那么就确保下次进来的时候一定是拿到fd的
            this.once('open',()=>{this.read()})
        }else{
            // 声明一个highWaterMark长度的buffer 用来存储读取到的数据
            let buffer = Buffer.alloc(this.highWaterMark)
            // 我们知道我们传入的参数中可能指定start和end 当指定end的时候 我们就需要判断我们读到那里结束 因此我们需要处理一次每次读取的长度 如果没有指定end 那么每次都读取highWaterMark长度就行了 指定了  我们就需要判断现在读取的位置距离end还有多少 如果小于highWaterMark那么就不能读highWaterMark长度了 如果大于 那么就继续读取highWaterMark长度
            let howMuchToRead = this.end? Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;
            // this.pos 指定了从文件的那个位置读取
            fs.read(this.fd,buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
                // 发生了错误就发射错误 并且关闭文件
                if(err){
                    this.emit('error',err)
                    if(this.autoClose){
                        this.destroy()
                    }
                    return
                }
                // bytesRead是每次实际读取的个数 大于0 说明读到了 等于0 说明读完了
                if(bytesRead>0){
                    // 读取了之后 移动pos
                    this.pos += bytesRead;
                    // 取出buffer有效长度
                    let r =buffer.slice(0,bytesRead);
                    // 如果有编码就返回编码后的值
                    r= this.encoding?r.toString(this.encoding):r;
                    this.emit('data',r);
                    if(this.flowing){
                        // 如果当前是流动状态就继续执行 不停读取
                        this.read();
                    }
                }else{
                    // 读取完毕 发射end事件 然后如果指定autoClose true就关闭文件
                    this.emit('end')
                    if(this.autoClose){
                        this.destroy();
                      }
                }
            })

        }
    }
    /**
     * 暂停
     */
    pause(){
        // 修改流动状态为flase 阻止不停读取
        this.flowing = false;
    }
    /**
     * 恢复
     */
    resume(){
        //修改流动状态为true 并且手动调用一次read 恢复读取
        this.flowing = true;
        this.read()
    }
}

module.exports = MyReadStream

这样我们就手动实现了一个文件可读流~

fs.createWriteStream

同样,顾名思义,这个方法就是文件可写流,继承了写流,底层呢是利用fs.write来实现的,那么我们也来看一下这个用法

let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
    flags:'w',// 写入标识  默认是w
    encoding:'utf8',// 写入的编码 默认是utf8
    mode:0o666,// 写入的权限 默认是0o666
    autoClose:true,// 是否自动关闭 默认是true
    start:0,// 从哪个位置开始写入
    highWaterMark:3 // 水位线 这里和读流中的highWatermark不同 这里是指期望读多少
})
// 写流第一次 会直接写入文件 接下来都会存入到内存空间 内存空间的大小就是hightwatermark大 当内存空间清空之前满了 并且全部清空了之后 会emit('drain') 接着 重复这个步骤
let flag = ws.write('1','utf8',()=>{})
console.log(flag)
flag = ws.write('1','utf8',()=>{})
console.log(flag)
flag = ws.write('1','utf8',()=>{})
console.log(flag)
setTimeout(()=>{
    flag = ws.write('1','utf8',()=>{})
    console.log(flag)
    flag = ws.write('1','utf8',()=>{})
    console.log(flag)
    flag = ws.write('1','utf8',()=>{})
    console.log(flag)
},2000)
ws.on('drain',()=>{
    console.log('drain')
})

可能大家听起来有点晕,我们先看一下上面代码的输入结果

首先我们会发现,我们调用了write方法后会立即给我们一个布尔类型的返回值,这个返回值的含义就是当前正在写入以及待写入数据长度的和 如果 小于 我们设置的highWaterMark那么就会返回true,如果大于或者等于就会返回fasle,并且当正在写入的以及待写入的数据大于或者等于highWaterMark后,如果这些数据全部写完了,也就是说待写入为空了,这时候会触发一次drain事件,表示 内存区空了。

举一个例子,我们有一个房间,容纳最好小于3个人,进来一个人,我们告知外界true,表示还能再进来,再进来一个人,现在两个人告知外界 还是true,再来一个人,现在已经是3个人了,已经不是最好的状态了 就告诉外界false,接下来无论再来多少人 ,这个房间的人数只要大于或者等于3就返回外界false,如果房间里只有一个通道可以出去,每次只允许一个人出去,那么如果大家排着队离开房间,离开到房间没人了,再进来一个人的话,就告诉外界true,也就是说只要是留在房间里的 以及 还没有离开房间的人数小于3那么就返回true,并且如果所有人都离开了房间,这时候会触发drain事件,告诉外界 房子没人了。

同样,我们来实现一个可写流

let fs = require('fs');
let EventEmitter = require('events');
// 可写流也是继承EventEmitter
class MyWriteStream extends EventEmitter {
    /**
     * 构造函数
     * @param {String} path 读取的文件路径
     * @param {object} options 可选参数
     */
    constructor(path,options={}){
        super();
        this.path = path;

        this.flags = options.flags || 'w';// 文件的读取标志 默认w
        this.encoding = options.encoding || 'utf8';// 按照什么编码写入 默认是utf8
        this.fd = options.fd || '';// 文件描述符 一般不指定 打开文件的时候会拿到这个值再把结果赋给this.fd 如果文件被打开了 fd一定是一个数字类型
        this.mode = options.mode || 0o666;// 文件的权限 一般都是0o666
        this.autoClose = options.autoClose || true;// 文件是否自动关闭 默认true
        this.start = options.start || 0;// 从文件的哪个位置开始读取 默认是0
        this.highWaterMark = options.highWaterMark || 16*1024;// 期望写入的数据 默认16*1024字节

        this.cache = [];//缓存数组
        this.pos = this.start;//写入的位置
        this.needDrain = false;//默认下不触发drain事件 只有待写入的大于highWaterMark并且缓存清空了才会触发
        this.writing = false;//是否正在写入 如果是的话 其他的就放到缓存中待写入
        this.len = 0;// 当前正在写入和待写入的长度 用来和highWaterMark比较
        this.open();// 先打开文件
    }
    /**
     * 关闭文件方法
     * @description 关闭文件的时候可能文件被打开了 那么就要调用close方法 如果文件还没有打开就报错了 那么就直接发射close事件
     */
    destory(){
        if(typeof this.fd === 'number'){
            fs.close(this.fd,()=>{
                this.emit('close');
            })
        }else{
            this.emit('close')
        }
    }
    /**
     * 打开文件
     */
    open(){
        fs.open(this.path,this.flags,(err,fd)=>{
            // 打开文件如果出错了 就把错误发射出去 并且如果设置了自动关闭就调用destroy方法
            if(err){
                this.emit('error',err);
                if(this.autoClose){
                    this.destory();
                }
            }else{
                // 如果没有错误 那我们就拿到了fd并且把open事件发射 注意 这个过程是异步的
                this.fd = fd;
                this.emit('open')
            }
        })
    }
    /**
     * 外部调用的写入函数
     * @param {any} chunk 写入的数据
     * @param {string} encoding 编码
     * @param {function} callback 回调函数
     */
    write(chunk,encoding="utf8", callback=()=>{}){
        // 首先判断写入的数据是不是buffer 不是buffer就转换成buffer 
        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk);
        // 将写入的数据加到待写入的长度中
        this.len += chunk.length;
        // 判断现在待写入的长度是不是大于等于我们设置的highWaterMark 如果是 说明我们如果全部写完了就该发射drain
        if(this.len >= this.highWaterMark){
            // 达到水位线
            this.needDrain = true
        }
        // 如果当前没有正在写入的 那么就可以直接把该数据写入到文件中 如果有的话就先缓存下来
        if(!this.writing){
            // 将写入状态改成true  表示有正在写入的数据
            this.writing = true;
            //调用真正写入的方法 并且包装一下回调函数 如果写完了就该继续去缓存中查找 把缓存里的数据写入
            this._write(chunk,encoding,()=>{
                // 当一个写入完了之后 调用回调 然后取缓存中的继续写
                callback();
                // 清空缓存
                this.clearCache();
            })
        }else{
            // 放入缓存
            this.cache.push({
                chunk,encoding,callback
            })
        }
        // 返回当前待写入的长度 是不是小于我们设置的水位线
        return this.len < this.highWaterMark
    }
    /**
     * 清空缓存中的数据
     */
    clearCache(){
        // 取出缓存中的第一项 如果第一项有值说明缓存中有数据 没有值 说明 现在缓存空了 判读现在如果满足触发drain事件的条件就触发
        let data = this.cache.shift();
        if(data){
            // 同样 调用写入方法
            this._write(data.chunk,data.encoding,()=>{
                data.callback();
                this.clearCache();
            })
        }else{
            // 如果满足 就emit drain事件 并且将状态重置
            if(this.needDrain){
                this.writing = false;
                this.needDrain = false;
                this.emit('drain')
            }
        }
    }
    /**
     * 内部调用的写入函数 真正的写入
     * @param {any} chunk 写入的数据
     * @param {string} encoding 编码
     * @param {function} callback 回调函数
     */
    _write(chunk,encoding,callback){
        // 因为open是异步的方法 所以当_write的时候可能还没有拿到fd 因此这个需要做一次处理
        if(typeof this.fd !== 'number'){
            // 当发现fd还没有读到的时候 我们监听open事件,因为我们当文件读取完毕的时候会触发一次open事件 所以我们在这里监听了open,那么触发的时候也会把我们这个监听器函数执行 为了重复执行 我们用once 保证只执行一次 那么就确保下次进来的时候一定是拿到fd的
            return this.once('open',()=>this._write(chunk,encoding,callback))
        }
        // this.pos 写入的位置
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,writeBytes)=>{
            // 发生了错误就发射错误 并且关闭文件
            if(err){
                this.emit('error',err);
                if(this.autoClose){
                    this.destory()
                }
                return
            }
            // 将位置移动当前写入数据长度
            this.pos += writeBytes;
            // 在待写入的长度中把 真正写入数据的长度 减去
            this.len -= writeBytes;
            // 执行回调
            callback();
        })
    }
}
module.exports = MyWriteStream;

好了,现在我们既实现了fs中的文件可读流也实现了可写流,那么我们如何利用可读流和可写流来实现一个文件的拷贝功能呢?思考思考~~~~

pipe

我们先看一下fs中pipe的用法

let fs = require('fs');
let from = fs.createReadStream('./1.txt');
let to = fs.createWriteStream('./2.txt');
from.pipe(to);

竟竟竟竟竟竟竟竟竟竟然如此简单????

难以置信的我决定也实现一下这个方法!

首先我们可以看出来我们是在一个可读流上有一个pipe的方法,这个方法接受一个可写流作为参数,那么我们就在我们刚刚写的可写流类中添加这个方法

class MyReadStream extends EventEmitter{
    ..... 
    /**
     * 管道方法
     * @param {*} ws 可写流实例
     */
    pipe(ws){
        // this 是一个可读流实例 那么他监听了data之后就会打开阀门 不停的读取数据 我们将读取的数据写入到文件中
        this.on('data',function (data) {
            // 利用可写流返回的flag去判断当前是不是待写入的已经超过了我们设置的水位线 如果是的那我们就调用pause暂停读取
            let flag = ws.write(data);
            if(!flag){
              this.pause();
            }
          });
        //   当待写入数据全部清空触发drain事件 这时候通知可读流继续读取 
        ws.on('drain', () => {
            this.resume();
        });
    }
}

搜嘎,原来我们实现了之后这么简单,都是利用我们已经写好了api就可以实现了,舒服~~

Stream

我们上面所实现的文件可读流和可写流其实都是读流写流的子集,都是基于node中的stream模块实现的。

Readable

Readable接口就是用来实现可读流的类,我们可以通过这个类来实现一个自己的可读流,并且这个可读流内部有_read方法用来读取数据,并且可以通过内部的this.push方法触发on('data')来把数据输入发射出去,默认fs.createReadStream自己实现了一个 _read 调用的fs.read

// 所有的流都基于这个模块
let {Readable} = require('stream');
class MyStream extends Readable{
  constructor(){
    super();
  }
  _read() { // 如果想自己实现可读流 需要继承Readable,并且重写_read方法,方法中调用push 就是把结果传递给on('data')事件
    this.push('发送数据')
    this.push(null);// push null就是告诉流现在没数据了全部读取完毕 可以发射end事件了,如果没有这个方法 就会陷入无限循环当中
  }
}
let myStream = new MyStream();
myStream.on('data',(data)=>{
  console.log(data.toString());
});
myStream.on('end',function () {
  console.log('end');
})

Writable

同样,所有的可写流都是继承了Writable这个类,通过这个类来实现的,如果使用这个类,就要重写_write方法,去实现数据的写入,需要注意的是在_write中执行完操作后必须要执行一次callback,不然之后的数据就不会再写入了,原理就是我们自己手动实现的文件可写流中的callback是被我们包裹了一层()=>{callback();this.clearCache()},可以看到我们其实在实现这个callback的时候还加入了一个清空缓存的方法,因此在这里我们调用callback就是相当于要执行清空缓存,继续把缓存中的输入写入。

let {Writable} = require('stream');
class MyStream extends Writable{
  constructor(){
    super();
  }
  _write(chunk,encoding,callback){ 
    console.log(chunk,encoding);
    // 可以借助可写流实现自己写入的逻辑
    callback(); // 不调用callback 后面的write就不会执行
  }
}
let myStream = new MyStream();
myStream.write('1','utf8');
myStream.write('1', 'utf8');

Duplex

Duplex双工流,我们上面实现了可读流是通过重写_read方法,实现可写流是通过重写_write方法,那么我们实现双工流其实就是即重写_read方法同时也重写_write方法。

let {Duplex} = require('stream');
let fs = require('fs');
class MyStream extends Duplex{
  constructor(){
    super();
  }
  _read(){
    this.push('1');
    this.push(null)
  }
  _write(chunk,encoding,callback){
    console.log(chunk)
    callback();
  }
}
let myStream = new MyStream
myStream.write('ok');
myStream.on('data',(data)=>{
  console.log(data.toString())
})

Transform

转化流就是在可读流和可写流之间 做转化操作,常见就是拿到数据后在我们的转换流中做一定的操作后再输出出去。 实现一个转化流就是重写_transform方法,这个方法类似于_write,但是我们可以在这个方法里进行数据操作然后再进行可写流的this.push操作,最后还要执行callback()

let {Transform} = require('stream');
class MyStream extends Transform{
  _transform(chunk,encoding,callback){
  // 拿到可写流数据后 将数据转换成大写然后push出去
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}
let myStream = new MyStream();
// 监听用户的输入 然后把这个可读流通过我们的转化流把用户的输入转化为大写字母再输出出去
process.stdin.pipe(myStream).pipe(process.stdout);

这就是我们node中关于流的一些用法以及实现,这些都是在node中觉得比较有意思的地方,所以呢就分享出来和大家一起学习一下~~(此处留下了被流折磨的眼泪......)