愿你坚持不懈,努力进步,进阶成自己理想的人

—— 2017.09, 写给3年后的自己

Node.js之Stream的应用与原理探索

为什么使用流?

假设我们现在要读取一个文件,通常情况下,会写成:

const fs = require('fs')
fs.readFile(fileName, (err, body) => {
    console.log(body.toString())
})

这在文件比较小的时候没什么问题,但是当文件很大的时候(比如有几百M),那么就会造成各种各样的问题。如:

<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... >
buffer.js:382
    throw new Error('toString failed');
    ^

Error: toString failed
    at Buffer.toString (buffer.js:382:11)

这是因为Buffer对象长度过大,导致toString()失败。除此之外,由于fs.readFile是一次性将文件内容读取到内存中的,那么当内存中的可用空间不足时,还会有爆内存的问题。
所以,对于大文件的处理,不宜采用一次性获取全部内容的做法。好的做法是采用流来读取文件内容。如:

const fs = require('fs')
fs.createReadStream(fileName).pipe(process.stdout)

如上代码,createReadStream创建了一个可读流,它连接了数据源(上游)和消耗方(下游,stdout)。这样子处理后,文件的内容是被分块取走的,下游获得是一个先后到达的数据序列。

这种模型就像用一条水管去水池里取水,即使水池很大,水管一次也只存储与水管等容积的水,每当用了一点水,水池中的水都会再流入水管中。


一、基础知识

Stream(流)是一个抽象的接口,也是一个抽象的概念。它是一种对有序的、有起点和终点的字节数据的传输手段,它不关心文件的整体内容,而是关心是否读取到了数据、读到数据后如何处理。所有的Stream对象都是EventEmitter的实例。

1、流的类型

在Node中,流具有以下四种类型:

  • Readable:可读流
  • Writeable:可写流
  • Duplex:双工流(可读可写)
  • Transform:转换流(操作被写入的数据,输出转换结果)

而由于Stream对象是EventEmitter的实例,所以它常用的事件有:

  • data 当有数据可读时触发
  • end 当没有更多的数据时触发
  • error 写入/读取过程中报错时触发
  • finish 所有数据已被写入底层系统时触发

2、流的基础操作

我们可以使用流来进行读写操作,常用的操作有:从流中读取数据、将数据写入流、管道流。以下介绍这几种操作:

从流中读取数据

const fs = require('fs')
// 创建可读流
const readable = fs.createReadStream('./input.txt')
readable.setEncoding('UTF8')

let data = ''
readable.on('data', chunk => data += chunk)
readable.on('end', () => console.log(data))
readable.on('error', err => console.log(err.stack))

将数据写入流

const fs = require('fs')
// 创建可写流
const writeable = fs.createWriteStream('./output.txt')
writeable.write('Hello, world', 'UTF8')
writeable.end()

writeable.on('finish', () => console.log('写入完成'))
writeable.on('error', err => console.log(err.stack))

管道流

管道是一种输出流输入流的机制,用于从一个流中获取数据并传递到另一个流中。它就像两个水桶之间连接的管子,沿着管子水可以从上游流入下游。这种机制在复制大文件时格外有用(避免了一次性将大文件读入缓存造成爆内存)

const fs = require('fs')
const readable = fs.createReadStream('./input.txt')
const writeable = fs.createWriteStream('./output.txt')
readable.pipe(writeable)


二、自定义流

我们可以通过实现Stream的抽象接口,来自定义流。Stream提供了以下四种类型的流,为:

const Stream = require('stream')
const Readable = Stream.Readable
const Writeable = Stream.Writeable
const Duplex = Stream.Duplex
const Transform = Stream.Transform

1、创建可读流

创建一个可读流,我们需要做的事情为:继承Readable,并实现_read()方法。其中,_read()方法是从底层系统读取具体数据的逻辑,是生产数据的方法。与之紧密配合的方法,则是push()方法,两者配合起来使用总结如下:

  • _read()中,通过push(data)将数据放入可读流中供下游消耗,push(data)的调用可以是同步的,也可以是异步的
  • 当数据已经生产完成后,调用push(null)终止可读流,一旦终止,就不能在push(data)

所以我们实现一个可读流的示例,如下:

const Readable = require('stream').Readable

class ReadableFromIterator extends Readable {
    constructor(iterator) {
        super()
        this.iterator = iterator()
    }
    _read() {
        const res = this.iterator.next()
        if (res.done) {
            return this.push(null)
        }
        this.push(`${res.value}\n`)
    }
}

module.exports = ReadableFromIterator

那么有了可读流后,我们怎么使用它呢?
可以通过监听data事件的方式消耗可读流,如:

const iterator = function* () {
    yield* [1, 2, 3, 4, 5]
}
const readable = new ReadableFromIterator(iterator)
readable.on('data', data => process.stdout.write(data))
readable.on('end', () => process.stdout.write('DONE'))

总结如下:

  • 监听data事件后,readable就会持续不断地调用_read(),然后触发data事件将数据输出
  • data事件是在nextTick中触发的,所以有:
const iterator = function* () {
    yield* [1, 2, 3, 4, 5]
}
const readable = new ReadableFromIterator(iterator)
readable.on('data', data => process.stdout.write(data))
readable.on('end', () => process.stdout.write('DONE'))
console.log('Now, output');
/*
输出为:
Now, output
1
2
3
4
5
DONE
*/

2、创建可写流

创建可写流,可以使用创建类继承Writable并实现_write(data, enc, next)的方式,也可以Writable()并重写_write()方法,如:

const Writable = require('stream').Writable

const writable = Writable()
writable._write = function(data, enc, next) {
    process.stdout.write(data.toString().toUpperCase())
    process.nextTick(next)
}
writable.on('finish', () => process.stdout.write('DONE'))

writable.write('a\n')
writable.write('b\n')
writable.write('c\n')
writable.end()
/*
输出:
A
B
C
*/

实现了可写流后:

  • 上游会调用writable.write(data)将数据写入可写流中,而write()方法会调用_write()将data写入底层
  • 需要注意的是,在_write()中,当数据成功写入底层后,必须调用next(err)来告诉流可以开始处理下一个数据。而next的调用,既可以是同步的,也可以是异步的。
  • 上游需要调用writable.end(data)来结束可写流,data是可选的,此后便不能再调用write新增数据了。在end方法调用后,当所有的底层写操作完成,便会触发finish事件

3、创建双工流

Duplex(双工)流实际上是继承了Readable和Writable的一类流,一个Duplex对象既可当成可读流来使用,也可以当做可写流来使用。创建一个双工流,需要实现_read()方法和_write()方法。实现了_read()方法后,可以监听data事件来消耗Duplex产生的数据,而实现了_write()方法后,便可以作为下游去消耗数据。所以一个Duplex对象是有两端的,即为:可写端和可读端。

const Duplex = require('stream').Duplex
const duplex = Duplex()
duplex._read = function() {
    this._num = this._num || 0
    if (this._num > 1) {
        this.push(null)
    } else {
        this.push('' + (this._num++))
    }
}
duplex._write = function(buf, enc, next) {
    process.stdout.write(`_write ${buf.toString()}\n`)
    next()
}

duplex.on('data', data => console.log('ondata', data.toString()))

duplex.write('a')
duplex.write('b')
duplex.end()
/*
输出:
_write a
_write b
0
1
*/

4、创建转化流

Duplex流中,可读流中的数据和可写流中的数据是分开的。而Transform流是一种特殊的Duplex流,它继承自Duplex流,其可写端的数据经变换后会自动添加到可读端。要创建一个Transform流,需要实现_transform()方法,如:

const Transform = require('stream').Transform
class Plus extends Transform {
    constructor(step) {
        super()
        this.step = step
    }
    _transform(buf, enc, next) {
        const res = Number(buf.toString()) + this.step
        this.push(''+res)
        next()
    }
}

const t = new Plus(10)
t.on('data', data => process.stdout.write(`${data}\n`))
t.write('100')
t.write('200')
t.end()
/*
输出:
110
210
*/


三、对象模式

在以上例子中,经常有data.toString(),那么,为什么一定要调用toString()呢?
这是因为,在流的操作中,存在以下情形:

  • 对于可读流,push(data)时,data只能是StringBuffer类型
  • 对于可读流,readable.on('data', data => ...)时,输出的data都是Buffer类型
  • 对于可写流,write(data)时,data只能是StringBuffer类型
  • 对于可写流,_write(data)时,传进来的data都是Buffer类型

即,

  • 对可读流:上游 -> on('data') -> 下游,传递的是Buffer
  • 对可写流:上游write(data) -> _write(data) -> 下游,传递的也是Buffer

所以数据 一旦放入流中,便成为了Buffer。那么对象模式,则是指在构造函数中传入objectMode这一参数,实现写入什么,读出什么的效果。如:

const Readable = require('stream').Readable
/* 非对象模式 */ 
const r1 = Readable()
r1.push('a')
r1.push('b')
r1.push(null)

r1.on('data', data => console.log(data))
/*
输出:
<Buffer 61>
<Buffer 62>
*/

/* 对象模式 */
const r2 = Readable({ objectMode: true })
r2.push('a')
r2.push('b')
r2.push({})
r2.push(null)
r2.on('data', data => console.log(data))
/*
输出:
a
b
{}
*/


四、流的原理

1、可读流取数据的原理

可读流的基本原理:

  • 当我们使用Readable创建了可读流对象readable后,就能够得到一个可读流。可读流中通过_read()方法去连接底层数据源
  • 调用_read()方法时,从底层获取到的数据,会通过push方法传递到缓存队列中。
  • 下游使用readable.read(n)向流请求数据,然后使用readable.on('data')从流中取数据,这个过程图示为:

其中read()方法取数据的具体过程,则可以总结如下:

详细解释如下:

  • 在可读流对象readable中有readable._readableState(下文简称state)用来维护状态,其中的字段有:
    • state.buffer 缓存区,用以缓存数据
    • state.length 标志缓存中当前的数据量
    • state.highWaterMark 缓存大小的一个上限阈值,单位为Byte
    • state.reading 标志上次从底层取数据的操作是否完成,一旦push()方法被调用,就会设为false表示本次_read()结束
    • state.ended 标志是否取完底层数据(调用了push(null)
    • state.needReadable 标志是否需要从底层中取数据
  • 判断是否需要去底层读取数据,依据是判断doRead;为true时执行_read()从底层取数据,为false时执行m=howMuchToRead(n)从缓存获取数据,其中doRead的判断逻辑为:
let doRead = state.needReadable
// 当缓存中没数据了,或者剩余的数据未达到缓存大小阈值时,需要从底层读数据
if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true
}
// 如果底层数据已经取完了,或者正在从底层读数据,则不需要从底层读数据
if (state.ended || state.reading) {
    doRead = false
}
// 当需要从底层读数据时:
// 1. 设置state.reading为true表示开始读数据
// 2. 如果缓存为空,则设在state.needReadable表示需要从底层读数据
// 3. 调用_read()去底层读取数据,读取阈值为state.highWaterMark
if (doRead) {
    state.reading = true
    state.sync = true
    if (state.length === 0) {
        state.needReadable = true
    }
    this._read(state.highWaterMark)
    state.sync = false
}
  • m=howMuchToRead(n)表示read(n)时实际从缓存中获取m个数据,其计算逻辑为:
    • state.length === 0 && state.ended 表示缓存已空、数据源已枯竭,无数据可取,此时m=0
    • state.objectMode开启时,若n=0,则m=0,否则m=1
    • n是数字时:
      • 若n <= 0,则m=0
      • 若n > state.length,表示缓存中数据不够:
        • 若数据源未枯竭(state.endedfalse),则m=0,且设置state.needReadable=true,从而在下次执行read()时从底层取数据
        • 若数据源已枯竭,则m=state.length
      • 若0 < n <= state.length,表示缓存中数据够用,m=n
    • state.flowingtrue,表示开启流动模式,m为缓存中第一个元素的长度;为falsem=state.length,将缓存读空
  • m>0的情况下,通过ret = fromList(m)从state.buffer中取得m个数据,成功取到的情况下,通过emit('data', ret)将数据传递出去,read(n)的返回值为ret
  • m <= 0或者fromList(m)返回nullread(n)就都返回null

下游通过read()方法去从缓存区中取数据,而read()方法在底层数据源未干涸且缓存区中无数据的时候,又会通过_read()方法去从底层中获取数据。那么_read()的详细处理过程是怎么样的呢?

  • _read()中会同步或者异步地调用push()方法
  • 调用push(data)方法时,会先对数据解码为Buffer,即chunk = decode(data),然后:
    • 判断是否需要立即输出数据,如果需要立即输出,则执行emit('data', chunk),否则加入到缓存区中,即state.buffer.push(chunk)
    • 其中,判断是否需要立即输出数据的逻辑为:state.flowing && state.length === 0 && !state.sync,这表示同时符合以下条件时,流应该立即输出:
      • 流处于流动模式(流动模式下,下游不需要自行反复调用read(n)
      • 缓存区为空(所以当前数据是马上需要用的)
      • _read()调用push()是异步调用的(异步调用的情况下,数据和read()方法不在同一个EventLoop,不会返回给read()

2、数据的流式消耗

流式数据是指按时间先后到达的数据序列。在流中,对可读流中数据的消耗存在两种模式,即为暂停模式流动模式

  • 流动模式: 数据会不断地生产出来,就像流动一样。监听流的data事件,即可进入流动模式
  • 暂停模式: 暂停模式下,数据不会不断地生产,需要显式调用read()方法来触发data事件。而流内部,使用state.flowing字段来记录流的模式,其取值有:
    • true 流动模式
    • false 暂停模式
    • null 初始状态
  • 调用readable.resume()可使流进入流动模式,state.flowing会被设为true;而调用readable.pause()可使流进入暂停模式,state.flowing会被设为false

3、暂停状态

初始状态下(state.flowing === null),若我们监听data事件,则流会进入流动模式。此时,我们可以调用readable.pause()使流进入暂停模式,在暂停模式下,监听data事件是不会进入流动模式的,而且如果要消耗流中数据,我们得显式地调用read()方法,如:

const Readable = require('stream').Readable
const dataSource = [1, 2, 3, 4, 5]
const readable = Readable()
readable._read = function() {
    if (dataSource.length) {
        this.push(dataSource.shift() + '')
    } else {
        this.push(null)
    }
}
// 进入暂停模式
readable.pause()
readable.on('data', data => process.stdout.write(`emit ${data}\n`))

let data = readable.read()
while (data !== null) {
    process.stdout.write(`read() returns ${data}\n`)
    data = readable.read()
}
/*
输出如下:
emit 1
read() returns 1
emit 2
read() returns 2
emit 3
read() returns 3
emit 4
read() returns 4
emit 5
read() returns 5
*/

由于执行read()时,调用_read()去底层取数据是同步的,所以数据会添加到缓存(state.buffer)中,此后,read()会从缓存中读取数据,触发data事件,并返回数据。
但是,如果_read()去底层取数据这个过程是异步的呢?会怎么样?尝试以下代码:

const Readable = require('stream').Readable
const dataSource = [1, 2, 3, 4, 5]
const readable = Readable()
readable._read = function() {
    process.nextTick(() => {
        if (dataSource.length) {
            this.push(dataSource.shift() + '')
        } else {
            this.push(null)
        }
    })
}
// 进入暂停模式
readable.pause()
readable.on('data', data => process.stdout.write(`emit ${data}\n`))

let data = readable.read()
while (data !== null) {
    process.stdout.write(`read() returns ${data}\n`)
    data = readable.read()
}

执行后会发现,并没有任何输出。这是因为异步的情况下,_read()执行完后,数据还来不及放入缓存,从而导致read(n)取到null
这种情况下,就需要使用 redable事件 了,将上面最后一段代码改为:

readable.on('readable', () => {
    while (null !== readable.read());
})

就会发现输出了:

emit 1
emit 2
emit 3
emit 4
emit 5

首先,我们需要了解下readable事件是怎么样的:

  • 调用_read()从底层取数据后,read(n)会尝试从缓存中取数据
  • _read()是异步调用push()的,则缓存中的数据不会马上增多,所以仍然会出现数据不够的问题
  • 所以read(n)返回null的时候,表示此次未能够 从缓存中读取数据,所以消耗方就需要等新数据到达、缓存中被塞入数据时,再尝试调用read方法
  • 所以,这个新数据到达后,是如何通知消耗方的呢,那就是readable事件了。所以对于push()数据立即输出的场景,消耗方直接监听data事件就能够拿到数据;而数据非立即输出的场景,数据是被放入缓存的,此时就需要通过监听readable事件,再调用read()方法来获得数据了。

所以,当read()返回null时,表示缓存数据不够且底层数据还未加入缓存,这种情况下,state.needReadabletrue,由前面的分析我们可知,在暂停模式下不会立即输出push()进来的数据,而是执行state.buffer.push(chunk)推入缓存。而在数据推入缓存之后,就会触发readable事件。
对于上述例子,我们有个疑问:在监听readable事件之前,我们并没有调用read(),那么为什么会触发readable呢?
这是因为:首次监听readable事件时,会触发read(0)调用,从而引起_read()push()调用,在数据放入缓存中,触发readable事件,从而启动循环。
所以其实,上述例子还可以改为:

readable.on('readable', () => readable.read())

4、 流动模式

流动模式相比暂停模式,使用上要容易些:

  • 在创建流之后,通过监听data事件,可以进入流动模式
  • 通过pipe方法,将数据导向下游可写流,进入流动模式消耗数据

为什么监听data事件能够进入流动模式呢?且看源码:

Readable.prototype.on = function (ev, fn) {
    var res = Stream.prototype.on.call(this, ev, fn)
    if (ev === 'data' && false !== this._readablestate.flowing) {
        this.resume()
    }
    // ...
    return res
}

其中,继承链为:Readable -继承-> Stream -继承-> EventEmitter,在Readable这个子类中,重写了on方法,加入了逻辑:当事件是data事件且处于非暂停模式,就会调用this.resume()方法进入流动模式。方法首先会设state.flowingtrue,然后在下一个tick中执行flow(),试图将缓存读空,flow()的源码为:

if (state.flowing) {
    do {
        var chunk = stream.read()
    } while (null !== chunk && state.flowing)
}

而我们知道,read()中是可能触发push()的调用的,而push()又可能会触发data事件,从而可能继续触发flow(),如此反复,使得数据可以源源不断地流动。这里,再了解一下push()的一个细节,如源码:

// 流动模式下,且缓存为空,且非同步模式
if (state.flowing && state.length === 0 && !state.sync) {
    stream.emit('data', chunk)
    stream.read(0)
} else {
    state.length += state.objectMode ? 1 : chunk.length
    state.buffer.push(chunk)
    if (state.needReadable) {
        emitReadable(stream)
    }
}

第一个if分支即为立即输出的情况,立即输出的情况下,在获得数据后,会执行read(0),从而引起_read()push(),数据便可不断输出。
而非立即输出的情况下,数据是被添加到缓存中的,此时有:

  • state.length为0时,在调用_read()之前,state.needReadable就是true了,所以会执行emitReadable(stream),从而使得下一个tick中触发readable事件,从而再调用flow()形成流动
  • state.length不为0时,缓存中有数据,read()不会返回null,所以flow()中的循环会一直继续

5、背压反馈机制

我们通常在创建一个流后,通过监听data事件来获取吞吐出的数据并对其进行处理,如下:

const fs = require('fs')
fs.createReadStream(file).on('data', doSomething)

但是doSomething中的逻辑可能处理较慢,那么来不及处理的数据就会在内存里面,如果来不及处理的数据一多,那么将占用大量的内存。
所以理想情况下是:下游消耗一个数据,上游产生一个数据。如此整体的内存使用便可保持在一个水平。而Readable中便提供了一种称为pipe的机制来实现这种功能。提供pipe方法,我们可以连接上下游,如:

const fs = require('fs')
fs.createReadStream(file).pipe(writable)

其中,writeable是一个可写流对象,上游会调用其write方法将数据写进去。并且,writable的内部拥有一个写队列,当队列长度达到某个阈值如state.highWaterMark时,执行write()返回false,否则返回true
因此根据这一特性,上游就可以通过write()的反馈,在流动模式暂停模式之间切换,而这便是pipe方法的核心实现逻辑,简化如下:

readable.on('data', data => {
    if (false === writable.write(data)) {
        readable.pause()
    }
})
// writable清空缓存时,会触发drain事件
writable.on('drain', () => {
    readable.resume()
})

通过下列例子,我们可以体会这个过程:

const stream = require('stream')

let c = 0
const readable = stream.Readable({
    highWaterMark: 2,
    read() {
        process.nextTick(() => {
            let data = c < 6 ? String.fromCharCode(c + 65) : null
            console.log('produce', data)
            this.push(data)
            c++
        })
    }
})

const writable = stream.Writable({
    highWaterMark: 2,
    write(chunk, enc, next) {
        console.log('consume', chunk)
    }
})

readable.pipe(writable)

以上例子的实际输出为:

produce A
consume <Buffer 41>
produce B
produce C
produce D

为什么上游可生产了6个数据,实际却只生产了4个,且只消耗了1个呢?这是因为:

  • 下游迟迟未消耗完数据(未调用next()),因此数据A流入下游后,下游消耗了。但是由于下游的高水标为2,所以B、C流入下游的时候,被缓存起来了
  • 下游缓冲区满了,故write()返回了false,上游进入暂停模式
  • 由于上游的高水标为2,所以其缓冲区的大小保持在2,所以缓存了CD两个数据
  • 所以实际生产4个数据

这种机制,便称为背压机制,下游的消耗驱动上游的生产,下游缓冲区的增大会增加上游写数据的阻力。这种机制使得数据的生产和消耗形成了一个闭环,形成了一种拉式流(pull stream),从而具有“按需生产”的特点。


参考资料