参考:
https://blog.csdn.net/eeewwwddd/article/details/81042225
http://nodejs.cn/api/stream.html#stream_writable_write_chunk_encoding_callback
流(stream)是 Node.js 中处理流式数据的抽象接口。 stream 模块提供了一些 API,用于构建实现了流接口的对象。
Node.js 提供了多种流对象。 例如,HTTP 服务器的请求和 process.stdout 都是流的实例。
流可以是可读的、可写的、或者可读可写的。 所有的流都是 EventEmitter 的实例,即可以通过事件的监听得以触发事件并执行一定的操作,如:
req.on('data', (chunk) => { body += chunk; });
stream 模块可以通过以下方式使用:
const stream = require('stream');
尽管理解流的工作方式很重要,但是 stream 模块本身主要用于开发者创建新类型的流实例。
对于以消费流对象为主的开发者,极少需要直接使用 stream 模块。
Node.js 中有四种基本的流类型:
Writable - 可写入数据的流(例如 fs.createWriteStream())。 Readable - 可读取数据的流(例如 fs.createReadStream())。 Duplex - 可读又可写的流(例如 net.Socket)。 Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())
两种模式
二进制模式
每个分块都是buffer、string对象
对象模式
Node.js 创建的流都是运作在字符串和 Buffer(或 Uint8Array)上。 当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null)。 这些流会以“对象模式”进行操作。
当创建流时,可以使用 objectMode 选项把流实例切换到对象模式。 将已存在的流切换到对象模式是不安全的。
比如如果想创建一个的可以压入任意形式数据的可读流,只要在创建流的时候设置参数objectMode为true即可,例如:Readable({ objectMode: true })
如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。
如果readable stream写入的数据是对象,那么需要这样创建writable stream,Writable({ objectMode: true })
⚠️就是如果输入的数据并不是Buffer(或 Uint8Array)格式的时候,那么在创建这个流的时候就要将其设置为对象模式,即设置其的objectMode: true,举例:
const DuplexStream = require('readable-stream').Duplex const inherits = require('util').inherits module.exports = PostMessageStream inherits(PostMessageStream, DuplexStream) function PostMessageStream (opts) { DuplexStream.call(this, { objectMode: true, }) ... }
缓冲
可写流和可读流都会在内部的缓冲器中存储数据,可以分别使用的 writable.writableBuffer 或 readable.readableBuffer 来获取。
可缓冲的数据大小取决于传入流构造函数的 highWaterMark 选项。 对于普通的流,highWaterMark 指定了字节的总数。 对于对象模式的流,highWaterMark 指定了对象的总数。
当调用 stream.push(chunk) 时,数据会被缓冲在可读流中。 如果流的消费者没有调用 stream.read(),则数据会保留在内部队列中直到被消费。
一旦内部的可读缓冲的总大小达到 highWaterMark 指定的阈值时,流会暂时停止从底层资源读取数据,直到当前缓冲的数据被消费 (也就是说,流会停止调用内部的用于填充可读缓冲的 readable._read())。
当调用 writable.write(chunk) 时,数据会被缓冲在可写流中。 当内部的可写缓冲的总大小小于 highWaterMark 设置的阈值时,调用 writable.write() 会返回 true。 一旦内部缓冲的大小达到或超过 highWaterMark 时,则会返回 false。
stream API 的主要目标,特别是 stream.pipe(),是为了限制数据的缓冲到可接受的程度,也就是读写速度不一致的源头与目的地不会压垮内存。
因为 Duplex 和 Transform 都是可读又可写的,所以它们各自维护着两个相互独立的内部缓冲器用于读取和写入, 这使得它们在维护数据流时,读取和写入两边可以各自独立地运作。 例如,net.Socket 实例是 Duplex 流,它的可读端可以消费从 socket 接收的数据,而可写端则可以将数据写入到 socket。 因为数据写入到 socket 的速度可能比接收数据的速度快或者慢,所以在读写两端独立地进行操作(或缓冲)就显得很重要了。
【1】用于消费流的 API(即读取流中数据)
test.js
const http = require('http'); const server = http.createServer((req, res) => { // req 是一个 http.IncomingMessage 实例,它是可读流。 // res 是一个 http.ServerResponse 实例,它是可写流。 let body = ''; // 接收数据为 utf8 字符串, // 如果没有设置字符编码,则会接收到 Buffer 对象。 req.setEncoding('utf8'); // 如果添加了监听器,则可读流会触发 'data' 事件。 req.on('data', (chunk) => { body += chunk; }); // 'end' 事件表明整个请求体已被接收。 req.on('end', () => { try { const data = JSON.parse(body); // 响应信息给用户。 res.write(typeof data); res.end();//end()表示写结束 } catch (er) { // json 解析失败。 res.statusCode = 400; return res.end(`错误: ${er.message}`); } }); }); server.listen(1337);
然后在终端使用node test.js运行该服务器
然后在另一个终端使用curl localhost:1337 -d "{}" 连接服务器localhost:1337 ,-d即post数据data为{} ,返回object
curl localhost:1337 -d "{}" 返回object curl localhost:1337 -d "\"foo\"" 返回string curl localhost:1337 -d "not json" 返回 错误: Unexpected token o in JSON at position 1
可写流(比如例子中的 res)会暴露了一些方法,比如 write() 和 end() 用于写入数据到流。
当数据可以从流读取时,可读流会使用 EventEmitter API 来通知应用程序。 从流读取数据的方式有很多种。
可写流和可读流都通过多种方式使用 EventEmitter API 来通讯流的当前状态。
Duplex 流和 Transform 流都是可写又可读的。
对于只需写入数据到流或从流消费数据的应用程序,并不需要直接实现流的接口,通常也不需要调用 require('stream')
《1》可写流
可写流是对数据要被写入的目的地的一种抽象。
可写流的例子包括:
- 客户端的 HTTP 请求
- 服务器的 HTTP 响应
- fs 的写入流
- zlib 流
- crypto 流
- TCP socket
- 子进程 stdin
-
process.stdout、process.stderr
上面的一些例子事实上是实现了可写流接口的 Duplex 流。
所有可写流都实现了 stream.Writable 类定义的接口。
尽管可写流的具体实例可能略有差别,但所有的可写流都遵循同一基本的使用模式,如以下例子所示:
const myStream = getWritableStreamSomehow(); myStream.write('一些数据'); myStream.write('更多数据'); myStream.end('完成写入数据');//说明完成写入
stream.Writable 类
下面介绍几类事件:
'close' 事件
当流或其底层资源(比如文件描述符)被关闭时触发。 表明不会再触发其他事件,也不会再发生操作。
不是所有可写流都会触发 'close' 事件。
'drain' 事件
如果调用 stream.write(chunk) 返回 false,可能缓冲区已满,需要等待,则当有空间可以继续写入数据到流时会触发 'drain' 事件。
// 向可写流中写入数据一百万次。 // 留意背压(back-pressure)。 function writeOneMillionTimes(writer, data, encoding, callback) { let i = 1000000; write(); function write() { let ok = true; do { i--; if (i === 0) { // 最后一次写入。 writer.write(data, encoding, callback); } else { // 检查是否可以继续写入。 // 不要传入回调,因为写入还没有结束。 ok = writer.write(data, encoding); } } while (i > 0 && ok); if (i > 0) { // 被提前中止。 // 当触发 'drain' 事件时继续写入,继续运行write()函数。 writer.once('drain', write); } } }
'error' 事件
当写入数据发生错误时触发。
当触发 'error' 事件时,流还未被关闭
'finish' 事件
调用 stream.end() 且缓冲数据都已传给底层系统之后触发。
const http = require('http'); const server = http.createServer((req, res) => { // req 是一个 http.IncomingMessage 实例,它是可读流。 // res 是一个 http.ServerResponse 实例,它是可写流。 let body = ''; // 接收数据为 utf8 字符串, // 如果没有设置字符编码,则会接收到 Buffer 对象。 req.setEncoding('utf8'); // 如果添加了监听器,则可读流会触发 'data' 事件。 req.on('data', (chunk) => { body += chunk; }); // 'end' 事件表明整个请求体已被接收。 req.on('end', () => { try { const data = JSON.parse(body); // 响应信息给用户。 res.write(typeof data); res.end();//会触发finish事件 res.on('finish', () => { console.error('写入已完成'); }); } catch (er) { // json 解析失败。 res.statusCode = 400; return res.end(`错误: ${er.message}`); } }); }); server.listen(1337);
运行结果:
'pipe' 事件
-
src<stream.Readable> 通过管道流入到可写流的来源流。
当在可读流上调用 stream.pipe() 时触发。
var assert = require('assert'); const writer = process.stdout; const reader = process.stdin; writer.on('pipe', (src) => { console.error('有数据正通过管道流入写入器'); assert.equal(src,reader);//两者相等 console.log(src); }); reader.pipe(writer);
返回:
有数据正通过管道流入写入器 ReadStream { connecting: false, _hadError: false, _handle: TTY { owner: [Circular], onread: [Function: onread], reading: false }, _parent: null, _host: null, _readableState: ReadableState { objectMode: false,//非对象模式 highWaterMark: 0, buffer: BufferList { length: 0 }, length: 0, pipes: WriteStream { connecting: false, _hadError: false, _handle: [TTY], _parent: null, _host: null, _readableState: [ReadableState], readable: false, _events: [Object], _eventsCount: 7, _maxListeners: undefined, _writableState: [WritableState], writable: true, allowHalfOpen: false, _sockname: null, _writev: null, _pendingData: null, _pendingEncoding: '', server: null, _server: null, columns: 80, rows: 24, _type: 'tty', fd: 1, _isStdio: true, destroySoon: [Function: destroy], _destroy: [Function], [Symbol(asyncId)]: 2, [Symbol(lastWriteQueueSize)]: 0, [Symbol(timeout)]: null, [Symbol(kBytesRead)]: 0, [Symbol(kBytesWritten)]: 0 }, pipesCount: 1, flowing: true, ended: false, endEmitted: false, reading: false, sync: false, needReadable: true, emittedReadable: false, readableListening: false, resumeScheduled: true, emitClose: false, destroyed: false, defaultEncoding: 'utf8', awaitDrain: 0, readingMore: false, decoder: null, encoding: null }, readable: true, _events: { end: [ [Function: onReadableStreamEnd], [Function] ], pause: [Function], data: [Function: ondata] }, _eventsCount: 3, _maxListeners: undefined, _writableState: WritableState { objectMode: false, highWaterMark: 0, finalCalled: false, needDrain: false, ending: false, ended: false, finished: false, destroyed: false, decodeStrings: false, defaultEncoding: 'utf8', length: 0, writing: false, corked: 0, sync: true, bufferProcessing: false, onwrite: [Function: bound onwrite], writecb: null, writelen: 0, bufferedRequest: null, lastBufferedRequest: null, pendingcb: 0, prefinished: false, errorEmitted: false, emitClose: false, bufferedRequestCount: 0, corkedRequestsFree: { next: null, entry: null, finish: [Function: bound onCorkedFinish] } }, writable: false, allowHalfOpen: false, _sockname: null, _writev: null, _pendingData: null, _pendingEncoding: '', server: null, _server: null, isRaw: false, isTTY: true, fd: 0, [Symbol(asyncId)]: 5, [Symbol(lastWriteQueueSize)]: 0, [Symbol(timeout)]: null, [Symbol(kBytesRead)]: 0, [Symbol(kBytesWritten)]: 0 }