Node.js Streams(流)

524 阅读7分钟

流的概念

  • 流是一组有序的、有起点和终点的字节数据传输手段
  • 流不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如 HTTP 服务器 request 和 response 对象都是流
  • 流 是 Node.js 的核心模块,基本上都是 stream的实例,比如 process.stdout、http.clientRequest

流的好处

  • 流是基于事件的 API,用于管理和处理数据,而且有不错的效率
  • 借助事件和非阻塞 I/O 库,流模块允许在其可用的时候动态处理,在其不需要的时候释放掉

流中的数据有两种模式,二进制模式和对象模式

  • 二进制模式, 每个分块都是 buffer 或者 string 对象
  • 对象模式, 流内部处理的是一系列普通对象

所有使用 Node.js API 创建的流对象都只能操作 strings 和 Buffer对象。但是,通过一些第三方流的实现,你依然能够处理其它类型的 JavaScript 值 (除了 null,它在流处理中有特殊意义)。 这些流被认为是工作在 “对象模式”(object mode)。 在创建流的实例时,可以通过 objectMode 选项使流的实例切换到对象模式。试图将已经存在的流切换到对象模式是不安全的。

Node.js 中有四种基本的流类型

  1. Readable-可读流 (例如 fs.createReadStream() )
  2. Writable-可写的流(例如 fs.createWriteStreame() )
  3. Duplex-可读写的流(例如 net.Socket )
  4. Transform-在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate() )

第一种类型:可读流 createReadStream

创建一个可读流
// 引入 fs(读取文件) 模块
let fs = require('fs');
// 创建一个可读流
let rs = fs.createReadStream('./1.txt',{
    flags:'r',
    encoding:'utf8',
    start:0,
    autoClose:true,
    end: 3,
    highWaterMark:3 
});

API:createReadStream(path, [options]);

  1. path 是读取文件的路径
  2. options 里面有
    • flags:打开文件要做的操作,默认为 'r'
    • encoding:默认是null,null 代表的是 buffer
    • start:开始读取的索引位置
    • autoClose:读取完毕后自动关闭
    • end:结束读取的索引位置(包括结束位置)
    • highWaterMark:读取缓存区默认的默认的大小 64kb (64*1024b)

    如果指定 encoding 为 utf8 编码, highWaterMark 要大于 3 个字节

可读流的一些监听事件
  1. data 事件
  2. end 事件
  3. error 事件
  4. open 事件
  5. close 事件

各个写法如下:

// 流切换到流动模式,数据会被尽可能快的读出
rs.on('data',function(data){ // 暂停模式 -> 流动模式
    console.log(data);
});

// 该事件会在读完数据后被触发
rs.on('end', function () {
    console.log('读取完成');
});

// 读文件失败后被触发
rs.on('error', function (err) {
    console.log(err);
});

// 文件打开后被触发
rs.on('open', function () {
    console.log('文件打开了');
});

// 文件关闭后被触发
rs.on('close', function () {
    console.log('关闭');
});
设置编码

与指定 {encoding:'utf8'} 效果相同,设置编码

rs.setEncoding('utf8');
暂停和恢复触发 data

通过 pause() 方法和 resume() 方法

rs.on('data', function (data) {
    console.log(data);
    rs.pause(); // 暂停方法 表示暂停读取,暂停data事件触发
});
setTimeout(function () {
    rs.resume(); // 恢复方法
},2000);

第二种类型:可写流 createWriteStream

创建一个可写流
// 引入 fs(读取文件) 模块
let fs = require('fs');
// 创建一个可写流
let ws = fs.createWriteStream('./1.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3 
});

API:createWriteStream(path, [options]);

  1. path 是读取文件的路径
  2. options 里面有
    • flags:打开文件要做的操作,默认为 'w'
    • encoding:默认是 utf8
    • highWaterMark:写入缓存区的,默认大小 16kb
可写流的一些方法
1. write 方法
ws.write(chunk, [encoding], [callback]);
  • chunk 写入的数据 buffer/string
  • encoding 编码格式,chunk 为字符串时有用,是个可选参数
  • callback 写入成功后的回调

返回值为布尔值,系统缓存区满时为 false,未满时为 true

2. end 方法
ws.end(chunk, [encoding], [callback]);

表明接下来没有数据要被写入 Writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据 如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数

3. drain 方法
ws.on('drain',function(){
    console.log('drain')
});
  • 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false
  • 当前所有缓存的数据块满了,满了之后情况才会出发 drain
  • 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块
4. finish 方法
ws.end('结束');
ws.on('finish',function(){
    console.log('drain')
});
  • 在调用 end 方法,且缓冲区数据都已经传给底层系统之后, 'finish' 事件将被触发

第三种类型:可读写的流,也叫双工流(Duplex)

双工流,可以在同一个对象上同时实现可读、可写,就好像同时继承这两个接口。而且读取可以没关系(互不干扰)

// 引入双工流模块
let {Duplex} =  require('stream');
let d = Duplex({
    read(){
        this.push('hello');
        this.push(null)
    },
    write(chunk,encoding,callback){
        console.log(chunk);
        callback();
    }
});
d.on('data',function(data){
    console.log(data);
});
d.write('hello');

第四种类型:转换流(Transform)

  • 转换流输出是从输入中计算出来的
  • 转换流中,不需要实现 read 和 write 方法,只需要实现一个 transform 方法,就可以结合两者。
// 引入转换流
let {Transform} =  require('stream');
// 转换流的参数和可写流一样
let tranform1 = Transform({
    transform(chunk,encoding,callback){
        this.push(chunk.toString().toUpperCase()); 
        callback();
    }
});
let tranform2 = Transform({
    transform(chunk,encoding,callback){
        console.log(chunk.toString());
        callback();
    }
});
process.stdin.pipe(tranform1).pipe(tranform2);

pipe 方法

大家都知道,想把 Readable 的数据 写到 Writable,需要手动将数据读入内存中,然后在写入 Writable。也就是每次传递数据的时候,都需要写一下的代码:

readable.on('readable', (err) => {
 if(err) throw err
 writable.write(readable.read())
})

为了方便使用,Node.js 提供了 pipe() 方法

readable.pipe(writable)
pipe 方法的原理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});
unpipe 用法
  • readable.unpipe() 方法将之前通过 stream.pipe() 方法绑定的流分离
  • 如果 destination 没有传入, 则所有绑定的流都会被分离
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('关闭向2.txt的写入');
from.unpipe(writable);
console.log('手工关闭文件流');
to.end();
}, 1000);
cork & uncork
  • 调用 writable.cork() 方法将强制所有写入数据都存到内存中的缓冲区里。 直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出
  • writable.uncork() 将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据
stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());

readable

'readable' 事件将在流中有数据可供读取时才触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 某些数据可读
});
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
  start:3,
  end:8,
  encoding:'utf8',
  highWaterMark:3
});
rs.on('readable',function () {
  console.log('readable');
  console.log('rs._readableState.buffer.length',rs._readableState.length);
  let d = rs.read(1);
  console.log('rs._readableState.buffer.length',rs._readableState.length);
  console.log(d);
  setTimeout(()=>{
      console.log('rs._readableState.buffer.length',rs._readableState.length);
  },500)
});
  • 当流数据到达尾部时, 'readable' 事件会触发。触发顺序在 'end' 事件之前
  • 事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。

可读流的两种模式

  1. 可读流的两种工作模式:flowing 和 paused
  2. flowing 模式下,可读流自动从系统底层读取数据,通过 EventEmitter 接口的事件尽快将数据提供给应用
  3. paused 模式下,调用 stream.read() 方法来从流中读取数据片段
  4. 所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式
    • 监听 'data' 事件
    • 调用 stream.resume() 方法
    • 调用 stream.pipe() 方法将数据发送到 Writable
  5. 可读流可以通过下面途径切换到 paused 模式:
    • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
    • 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。

可读流的三种状态

在任意时刻,任意可读流应确切处于下面三种状态之一:

  1. readable._readableState.flowing = null
  2. readable._readableState.flowing = false
  3. readable._readableState.flowing = true
  • 若 readable._readableState.flowing 为 null,由于不存在数据消费者,可读流将不会产生数据。 在这个状态下,监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法, readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件。

  • 调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure), 将导致 readable._readableState.flowing 值变为 false。 这将暂停事件流,但 不会 暂停数据生成。 在这种情况下,为 'data' 事件设置监听函数不会导致 readable._readableState.flowing 变为 true。

  • 当 readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中。