这部分的内容并不实用,这是真的,我们不太可能去自己实现流子类,而是去使用流的子类,比如 fs 、net、http等I/O相关的模块就是流的子类,这部分内容又很重要。通过对流理解,在使用具体子类时才能得心应手。

什么是流?

单刀流和双刀流?还是其他什么流派吗?god

fuck小时候在后院,最喜欢在地上挖洞,高的地方和低的地方之间,挖出一个沟渠来,把水从高的地方倒下,顺着沟渠流到低的地方,这就是流。高的点为上游,低点为下游,压力把水从上游导入下游。

在Node.js中,通过.pipe() 方法连接上下游,把上游数据导入下游。

什么时候应该使用流?

下面的例子,是当客户请求服务器资源,服务器会读取data.txt文件到内存,然后把数据再推倒客户端,如果文件很大,就会占用很多内存。更糟的是,如果高并发,性能会更差。

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
   fs.readFile(__dirname + '/data.txt', function (err, data) {
       res.end(data);
   });
});
server.listen(3000);

流可以解决这个问题,把文件流和响应流通过pipe连起来,这样上游的数据就会“缓缓的”流道客户端,不会出现大量内存被消耗的情况。在这里,用fs.createReadStream方法代替fs.readFile方法。

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
   var stream = fs.createReadStream(__dirname + '/data.txt');
   stream.pipe(res);
});
server.listen(3000);

方法,内部通过监听 stream 的dataend时间,把接收到的文件块第一时间write到客户端,节省内存。

流分类别吗?

是的,虽然不是帅气的双刀流。

在Node.js中,流分4种类别: readable可读流, writable可写流, transform转换流, duplex双工流。

pipe 管道方法

无论哪种流,都可通过pipe方法进行连接。pipe只是个函数,它接受一个可读的流作为输入源,和一个输出源的写入流。

src.pipe(target)

.pipe(target) 返回target ,所以可以多次调用pipe方法。

a.pipe(b).pipe(c).pipe(d)

readableStream 可读流

readableStream通过调用.pipe() 方法,可以把数据送入到  writable、transform、duplex 流。god

readableStream.pipe(writable / transform / duplex  Stream)

创建一个可读流

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('jsera');
rs.push('.net');
rs.push('\n leo is me \n')
rs.push(null);

rs.pipe(process.stdout);
rs.pipe(process.stdout);

god终端结果:googog

jsera.net
leo is me

rs.push(null); 表示rs已完成输出数据。

调用.push方法后,并不能马上把数据传递到process.stdout,因为这时只是推到可读流的缓冲区,直到消费者愿意读它。

如果我们能够避免缓冲区,只生成消费者需要的数据,将是更好的方式,可以通过重写可读流的._read方法做到这一点。

下面来做个例子:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 100;
rs._read = function () {
   if (c > 'z'.charCodeAt(0)){
       rs.push("\n");
       rs.push(null);
   }else{
       setTimeout(function(){
           rs.push(String.fromCharCode(c++));
       },100)
   }
};

rs.pipe(process.stdout);

结果是,缓慢的打印出:

defghijklmnopqrstuvwxyz

值得注意的一点是,rs.push(null) 表示数据已完毕,这时后面再调用 rs.push(data),就会抛出下面的异常:

Error: stream.push() after EOF

所以这里用了一个 return;

这里的setTimeout延迟,是模拟真实的情况,因为网络流都是有延迟的。

objectMode & readable事件

如果加入 Readable({ objectMode: true }) 参数创建可读流,那么调用.read(n) 时,将以内部每次调用.push(data)时的data为返回数据,而不会根据n个字节数返回数据。

重写以上程序:

var Readable = require('stream').Readable;
var rs = new Readable();

var c = 100;
rs._read = function () {
   if (c > 'z'.charCodeAt(0)){
       rs.push("\n");
       rs.push(null);
       return;
   }
   setTimeout(function () {
       rs.push(String.fromCharCode(c++));
   }, 100)

};

rs.on("readable",function(){
   console.log(rs.read(3));
})

输出结果是:

null
null
<Buffer 64 65 66>
null
null
<Buffer 67 68 69>
null
null
<Buffer 6a 6b 6c>
null
null
<Buffer 6d 6e 6f>
null
null
<Buffer 70 71 72>
null
null
<Buffer 73 74 75>
null
null
<Buffer 76 77 78>
null
<Buffer 79 7a 0a>
null

会发现,read(3) 每次返回3个字节的数据,如果不够3个字节,那么会打印出null,直到积攒到3个字节返回3个字节的数据,这时候的 objectMode 为 false。

如果我们设置 objectMode为true,这时:

var Readable = require('stream').Readable;
var rs = new Readable({objectMode:true});

var c = 100;
rs._read = function () {
   if (c > 'z'.charCodeAt(0)){
       rs.push("\n");
       rs.push(null);
       return;
   }
   setTimeout(function () {
       rs.push(String.fromCharCode(c++));
   }, 100)

};

rs.on("readable",function(){
   console.log(rs.read(3));
})

打印结果为:

d
e
f
g
h
… ...

创建 writableStream 可写流

仅需要重写 ._write 方法即可。

var Writable = require('stream').Writable;
var ws = new Writable();
ws._write = function (chunk, enc, next) {
   console.dir(chunk.toString());
   next();
};

process.stdin.pipe(ws);

是提供的数据,enc是数据编码,调用next()表示可以继续写入。大概了解一下即可。

写入数据到一个writableStream

_write方法是实现子类必须要实现的方法,而writableStream.write 方法并不是它,这点要弄清,write是用户调用的方法。

下面举例说明:

var fs = require('fs');
var ws = fs.createWriteStream('file.txt');

ws.write('hello ');

setTimeout(function () {
   ws.end('world!\n');
}, 2000);

运行后等待两秒,打开生成的file.txt 文件会出现 hello world!

这里调用了write方法,写入数据,2秒后调用end方法,把剩余语句写入后,关闭可写流。

流的内容就介绍到这里,可参看本书的 实例讲解Node.js API 的流相关内容,进一步深化理解。

在具体应用方面,留意流的使用规律,这是一个漫长的修行过程,流是不好理解的,只有去实践才能真正领悟,有任何问题可在线提问,如果哪里讲解不到位,也可提出来。