阅读 1146

简单理解 backpressure(背压)机制

前言

本文仅是作者自己的见解,如有错误还请即使指正

为什么存在背压机制?

我们首先来看一段代码, 这段代码存在什么问题?

乍一看,感觉没啥大毛病,但是如果writable.write()写入数据比较慢,但是可读流又在不断的传输数据,就会造成内存溢出,形成阻塞。


const fs = require('fs')

const readable = fs.createReadStream('./小妇人.mp4')
const writable = fs.createWriteStream('./小妇人(1).mp4')

readable.on('data', (chunk) => {
    // 这里存在问题↓↓↓↓↓↓↓
    writable.write(chunk);
})

readable.on('end', () => {
    writable.end()
})
复制代码

流的错误处理

如果可写流,无法正确的处理大量由可读流传输的数据,可读流并不会被销毁,这会导致我们写入的文件被损坏。我们必须添加适当的错误处理程序,在当流发生故障的时候,销毁管道中的所有流。

const gzip = require('zlib').createGzip();
const fs = require('fs');

const readable = fs.createReadStream('好莱坞往事.1080p.mkv');
const writable = fs.createWriteStream('好莱坞往事.1080p.mkv.gz');

// 如果可写流发生故障,压缩文件会压缩失败
readable.pipe(gzip).pipe(writable);
复制代码

在 Node 8.x 版本之前我们使用 pump。对于更高版本的 Node, 可以使用pipeline


const gzip = require('zlib').createGzip();
const { pipeline } = require('stream')
const fs = require('fs');

const readable = fs.createReadStream('好莱坞往事.1080p.mkv');
const writable = fs.createWriteStream('好莱坞往事.1080p.mkv.gz');

pipeline(
    readable,
    gzip,
    writable,
    error => {
        if (error) {
            console.log('电影压缩失败')
        } else {
            console.log('电影压缩成功')
        }
    }
)
复制代码

我们也可以使用 promisify 将其改造成 async/await 的形式。


const gzip = require('zlib').createGzip()
const { pipeline } = require('stream')
const { promisify } = require('util')
const fs = require('fs')
const readable = fs.createReadStream('好莱坞往事.1080p.mkv')
const writable = fs.createWriteStream('好莱坞往事.1080p.mkv.gz')
const asyncPipeline = promisify(pipeline)

async function start () {
    try {
        await asyncPipeline(
            readable,
            gzip,
            writable
        )
        console.log('电影压缩成功')
    } catch (error) {
        console.log('电影压缩失败')
    }
}

start()
复制代码

可读流太快了

硬盘的写入速度,远远小于硬盘的读取速度。如果可读流太快,而可写流的无法迅速的消费可读流传输的数据,写入流将会把 chunk,push 到写队列中方便之后使用,这样就会造成数据在内存中的累积。这个时候将会触发 backpressur(背压) 机制。如果没有 backpressur(背压) 机制,系统将会出现如下的问题:

  1. 内存溢出
  2. 其他进程变得缓慢
  3. 垃圾收集器将超负荷运作

背压机制是如何解决这些问题的?

在代码中调用pipe时,它会向可写流发出信号,表示有数据准备传输。当我们的可写流使用 write() 写入数据时,如果写队列繁忙,或者内部缓存区已经溢出了,write() 将会返回false。

这个时候,背压机制就会启动,它会暂停任何数据传入到可写流中,并等待可写流准备好,清空内部缓存区后。将会发出 drain 事件,并恢复可读流的传输。

这就意味着 pipe 只会使用固定大小的内存,不会存在内存泄漏的问题。

为什么我们平时很少关注背压的问题呢?那是因为在你调用 pipe 时,Node.js已经自动处理了这些问题。但是如果我们需要实现自定义流,则需要考虑到这些问题。

参考