图片来源

Node.js中的 streams 给大家的印象是难以使用甚至难以理解的。我有好消息告诉你——以后就不是这种情况了。

多年来,开发人员创建了许多包,唯一的目的是使streams更易于使用。但是在这篇文章中,我将专注于讲原生的Node.js stream API.

“Streams 是Node’s 中最好的也是最被误解的主意。”

— Dominic Tarr

streams的本质?

Streams 是一个数据集——和数组、字符串一样。不同的是streams的数据可能不能马上全部获取到,他们不需要在内存中。这样使得streams在处理大数据集或者来自外部的数据源的数据块上能发挥很大的作用。

然而,streams不仅是能用在大数据上,也给我们在代码中的可组合的能力。就像通过发送其他较小的Linux命令组成强大的Linux命令一样,我们可以在Node中用streams做同样的事情。

Linux命令的组合

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

Node中很多内建模块实现了流式接口:

截图来自我的Pluralsight课程——高级Node.js

上面的列表中的原生Node.js对象就是可读流和可写流的对象。有些对象是可读流也是可写流,如TCP sockets,zlib 和 crypto streams。

注意,这些对象是密切相关。当一个HTTP响应在客户端上是一个可读流,相应的在服务端是一个可写流。这是因为在HTTP的情况下,我们基于从一对象(http.IncomingMessage)读而从另外一个对象(http.ServerResponse)写

同样需要注意的是,stdio流(stdinstdoutstderr)在子进程中会有反向流类型。这样的话就能用非常简单的方式管道传送给其他流或者主进程的stdio流。

streams实战的例子

理论很有用,但是通常不能100%的使人信服。让我们来看一个例子展示当说到内存消耗时streams给写代码带来的不同。

首先创建一个大文件:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

看一下我用了什么来创建这个大文件——一个可写流!

fs模块可以用流式接口来读或者写文件。在上面的例子里,我们通过一个可写流向big.file文件循环写了100万行。

运行以上代码生成了一个大概400多MB的文件。

这有一个简单的Node服务器,被设计的来单一的传输big.file这个文件:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;

    res.end(data);
  });
});

server.listen(8000);

当服务器收到一个请求,它将用异步方法——fs.readFile传输这个大文件。但是,嘿,我们好像没有阻止事件循环或其他任何事情。每件事都很好,真的吧?真的吗?

好吧,我们来看一下会发生什么,当服务器跑起来,发起请求并且监视内存的时候。

当跑起这个服务器,开始在一个正常的内存消耗8.7 MB:

然后,我请求了这个服务器,注意内存消耗上发生了什么:

哇哦——内存消耗跳到了434.8 MB。

我们把big.file的文件内容整个的读进内存,然后再写进返回对象。这样效率非常低。

HTTP响应对象(上面代码中的res)是一个可写流,这意味着如果我们有一个可读流代表big.file的文件数据,我们只要用管道连接这两个流,达到了同样的效果而且不用消耗400多MB的内存。

Node的 fs 模块用createReadStream方法可以得到任何一个文件的可读流,我们把它管道给响应对象:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  **const src = fs.createReadStream('./big.file');**
  **src.pipe(res);**
});

server.listen(8000);

现在再去请求这个服务器,神奇的事情发生了(看内存消耗):

发生了什么?

当客户端请求一个大文件,一次流式的返回一块文件,这意味着不用将整个文件缓存在内存中。这就是为什么内存占用只涨到了大约25MB。

你可以把这个例子推到极限。用500万行重新生成big.file代替之前的100万行,这样会导致文件大小会超过2GB,比Node中默认缓存大小的限制还大。

如果你还尝试用fs.readFile,默认情况(你可以改变默认限制)下显然是不能的。但是用fs.createReadStream的话,给请求传输2GB的数据也是完全没问题的,而且,更好的是,进程的内存使用不会变大。

准备好了一起来学习streams了吗?

这篇文章是我关于Node.js的Pluralsight教程中的一部分. 我在视频中讲了类似的内容。

Streams 101

Node.js中有4个基本的流类型:可读流(Readable)、可写流(Writable)、双工流(Duplex)和转换流(Transform streams)。

  • 可读流是可以被消耗的数据源的抽象,典型例子就是fs.createReadStream方法。

  • 可写流是可以写入数据的目的地的抽象,典型例子就是fs.createWriteStream方法。

  • 双工流既是可读的也是可写的,典型例子是TCP套接字。

  • 转换流是基于双工流的,它可以用来修改或转换数据,因为它是写入和读取的。zlib.createGzip就是一个用gzip来压缩数据的转换流例子。你可以认为转换流就是一个函数,这个函数的输入是一个可写流,输出是一个可读流,你可能也听说过把转换流叫做“通过流”。

所有的流都是EventEmitter的实例。他们在数据可读或者可写的时候发出事件。然而,我们也可以简单的通过pipe方法来使用流数据。

pipe方法

你需要记住这行神奇的代码:

**readableSrc**.pipe(**writableDest**)

这简单的一行,连接了可读流的输出——源数据和可写流的输入——目标。源必须是可读流,目标必须是可写流。当然也可以是双工流或者转换流,事实上,如果连接的是一个双工流,我们可以像Linux中一样链式调用pipe:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

pipe方法返回目标流,这使我们能够执行上面的链式调用。对于流a(可读)、bc(双工)和d(可写),我们可以:The pipe method returns the destination stream, which enabled us to do the chaining above. For streams a (readable), b and c (duplex), and d (writable), we can:

a.pipe(b).pipe(c).pipe(d)
# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Which, in Linux, is equivalent to:
$ a | b | c | d

pipe方法是最简单的方式去使用流,一般建议使用pipe方法或使用事件来处理流,但是要避免两个混合使用。通常,当你使用pipe方法时,你不需要使用事件,但是如果你需要用更多定制的方式来处理流,那你可以只用事件。

流事件

除了从可读流里读取数据和向可写流目标写数据外,pipe方法将自动管理沿途的一些事情。例如,它处理错误、文件结束以及当一个流比另一个流慢或更快时的情况。

然而,我们也可以直接使用事件来操作流。下面是pipe方法主要用于读取和写入数据的事件的简化等效代码:

# readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});

以下是可读流可写流的重要事件以及可用方法:

截图捕捉自我的Pluralsight课程——Advanced Node.js

这些事件和函数在某种程度上是相关的,因为它们通常一起使用。

可读流中最重要的事件是:

  • data事件,每当流将数据块传递给消费者时,它就会触发。

  • end事件,当没有更多的数据从流中被消耗时触发。

可写流中最重要的事件是:

  • drain事件,这是可写流可以接收更多数据的信号。

  • finish事件,当所有数据都给到底层系统时触发。

可以结合事件和函数来定制和优化流的使用。使用一个可读的流,我们可以用pipe / unpipe方法,或read / Unshift / resume方法。使用一个可写流,我们可以把它pipe / unpipe目的地,或是写它的write方法调用end方法当我们完成。

可读流的暂停和流(flowing)模式

可读流有两种主要模式,这影响我们可以使用它们的方式:

  • 它们可以是暂停(paused)模式

  • 或是流(flowing)模式

这些模式有时被称为拉和推模式。

所有可读的流默认情况下都是在暂停模式下启动的,但在需要时可以轻松切换到流模式或者返回到暂停状态。有时,转换是自动发生。

当一个可读流处于暂停模式,我们可以使用read()方法按需的从流中读取数据,然而,在流模式下的可读流,数据是不断流动的,我们要监听事件来使用这些数据。

在流模式下,如果没有用户处理数据,那么实际上数据会丢失。这就是为什么当我们在流模式中有可读的流时,我们需要一个data事件。事实上,只要添加一个data事件,就可以将暂停模式转换为流模式,删除data事件,流将切换回暂停模式。其中一些这样做事为了与旧的节点流接口向后兼容。

这两个流模式之间手动开关,可以使用resume()pause()方法。

截图捕捉自我的Pluralsight课程—Advanced Node.js

当使用pipe方法读取可读流时,我们不必担心这些模式,因为pipe自动管理它们。

实现流

当我们谈论Node.js中的流,主要有两种不同的任务:

  • 实现流。

  • 使用流。

到目前为止,我们一直只谈论使用流。让我们实现一些!

流的实现通常会引入(require)``stream模块。

实现可写流

为了实现可写流,我们需要使用流模块中的Writable构造函数。

const { Writable } = require('stream');

我们有很多方式来实现一个可写流。例如,如果我们想要的话,我们可以继承Writable构造函数。

class myWritableStream extends Writable {
}

然而,我更喜欢简单的构造函数的方法。我们只需给Writable构造函数传递一些选项并创建一个对象。唯一需要的选项是write函数,该函数揭露数据块要往哪里写。

const { Writable } = require('stream');
const outStream = new Writable({
  **write**(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

这个write函数有3个参数:

  • chunk通常是一个buffer,除非我们配置不同的流。

  • encoding是在特定情况下需要的参数,通常我们可以忽略它。

  • callback是在完成处理数据块后需要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数。

outstream,我们只是用console.log把数据块作为一个字符串打印到控制台,然后不用错误对象调用callback表示成功。这是一个非常简单的可能也不那么有用的echo流,它把收到的所有数据打印到控制台。

为了使用这个流,我们可以直接用process.stdin这个可读流,就可以把process.stdinpipe给outStream.

执行上面的代码,任何我们输入给process.stdin的内容都会被outStreamconsole.log输出到控制台。

实现这个流不怎么有用,因为它实际上被实现了而且node内置了,它等同于process.stdout。以下一行代码,就是把stdinpipe给stdout,就能实现之前的效果:

process.stdin.pipe(process.stdout);

实现可读流

为了实现可读流,引用Readable接口并用它构造新对象:

const { Readable } = require('stream');
const inStream = new Readable({});

有一个简单的方法来实现可读流。我们可以直接把供使用的数据push出去。

const { Readable } = require('stream');
const inStream = new Readable();
inStream.**push**('ABCDEFGHIJKLM');
inStream.**push**('NOPQRSTUVWXYZ');
inStream.**push**(null); // No more data
inStream.pipe(process.stdout);

push一个null对象就意味着我们想发出信号——这个流没有更多数据了。

使用这个可写流,可以直接把它pipe给process.stdout这个可写流。

执行以上代码,会读取inStream中所有的数据,并输出在标准输出流。很简单,也不是很有用。

我们基本上在pipe给process.stdout之前把所有的数据都推到流里了。更好的方法是按需推送。我们可以通过在一个可读流的配置实现read()方法来做这件事情:

const inStream = new Readable({
  **read**(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});

当在可读的流上调用读方法时,实现可以将部分数据推到队列中。例如,我们可以一次推送一个字母,从字符代码65(代表A),并且每推一次增加1:

const inStream = new Readable({
  read(size) {
    **this.push**(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      **this.push**(null);
    }
  }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

当从可读流里读数据,read方法将被持续调用,我们就会推送更多的字母。我们需要停止这个循环的条件,这就是为什么会一个if语句当currentcharcode大于90(代表Z)是推送null。

这段代码相当于我们开始使用的更简单的代码,但是当用户要求时,我们正在按需推送数据。你应该经常这样做。

实现双工/转换流

有了双工流,我们可以在同一个对象上同时实现可读和可写,就好像同时继承这两个接口。

这里是一个双工流例子,结合了上面可读可写流的实现:

const { Duplex } = require('stream');

const inoutStream = new **Duplex**({
  **write**(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  **read**(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
**process.stdin.pipe(inoutStream).pipe(process.stdout);**

通过组合这些方法,我们可以使用这个双工流来读取从A到Z的字母,我们还可以像echo一样使用它来。把可读流stdinpipe给这个双工流来echo,再把双工流pipe给可写流stdout来打印从A到Z的字母。

重要的是双工流的可读性和可写性操作完全独立于彼此。这仅仅是将两个特性组合成一个对象。

转换流是更有趣的双工流,因为它的输出是从输入中计算出来的。

对于转换流,我们不必实现readwrite的方法,我们只需要实现一个transform方法,将两者结合起来。它有write方法的意思,我们也可以用它来push数据。

下面是一个简单的转换流,在收到任何你输入的东西,转换成大写后再返回:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  **transform**(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

在该例子中,可以像之前使用双工流一样的去使用转换流。我们只实现了一个transform()方法,在该方法中,我们将chunk转换为大写,然后将它们push作为可读流部分。

流对象模式

默认情况下,流处理的数据是Buffer/String类型的值。有一个objectMode标志,我们可以设置它让流可以接受任何JavaScript对象。

这是一个简单的例子来实现。下面的转换流组合使一个特性将一个逗号分隔的字符串映射到一个JavaScript对象中。所以"a, b, c, d"成为{a: b, c: d}

const { Transform } = require('stream');
const commaSplitter = new Transform({
  **readableObjectMode**: true,
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});
const arrayToObject = new Transform({
  **readableObjectMode**: true,
  **writableObjectMode**: true,
  transform(chunk, encoding, callback) {
    const obj = {};
    for(let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});
const objectToString = new Transform({
  **writableObjectMode**: true,
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});
process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

我们输入字符串(例如:“a,b,c,d”)通过commaSplitter后可读流数据变成[“a”, “b”, “c”, “d”]。添加readableObjectMode配置是很有必要的,因为这里传输的是对象而不是字符串。

拿到数组后把数组pipe给arrayToObject流,需要writableObjectMode设置来让流能接收一个对象。因为我们的输出也是一个对象(输入的数组会被转成对象),所以同时也需要readableObjectMode配置。最后一个流objectToString接收一个对象但是返回的是字符串,可读流部分是一个普通的字符串(字符串化的对象),所以我们只需要writableObjectMode配置。

上面例子的使用结果

Node内置的转换流

Node有一些非常有用的内置转换流,例如zlib和crypto。

以下是一个结合fs的可读和可写流,使用zlib.createGzip()流来创建压缩文件的例子:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

您可以使用该脚本你作为参数传入的文件gzip压缩。我们通过文件创建的可读流pipe给内置的zlib变换流然后pipe给一个新的压缩文件的可写流。是不是超级简单。

使用pipe最酷的地方是,如果需要的话,我们可以把它们与事件结合起来。例如,我希望用户在脚本运行时看到进度和脚本完成时的“已完成”消息。由于pipe方法返回目标流,所以我们也可以注册监听事件处理程序:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
 **.on('data', () => process.stdout.write('.'))**
  .pipe(fs.createWriteStream(file + '.zz'))
  **.on('finish', () => console.log('Done'));**

因此,通过pipe方法,我们可以轻松地使用流,我们仍然可以在需要的地方使用事件进一步定制与这些流的交互。

使用pipe最方便的是一块一块的组合我们的程序,这样可读性会大大提高。例如,我们可以不像上面一样去监听data事件,只是简单是创建一个转换流来报告进度,用另外的一个.pipe()来代替.on()

  标签:
        javascript
      
0
你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!