参考:

https://blog.csdn.net/eeewwwddd/article/details/81042225

http://nodejs.cn/api/stream.html#stream_writable_write_chunk_encoding_callback

 

流(stream)是 Node.js 中处理流式数据的抽象接口。 stream 模块提供了一些 API,用于构建实现了流接口的对象。

Node.js 提供了多种流对象。 例如,HTTP 服务器的请求 process.stdout 都是流的实例。

流可以是可读的、可写的、或者可读可写的。 所有的流都是 EventEmitter 的实例,即可以通过事件的监听得以触发事件并执行一定的操作,如:

  req.on('data', (chunk) => {
    body += chunk;
  });

stream 模块可以通过以下方式使用:

const stream = require('stream');

尽管理解流的工作方式很重要,但是 stream 模块本身主要用于开发者创建新类型的流实例。

对于以消费流对象为主的开发者,极少需要直接使用 stream 模块

 

Node.js 中有四种基本的流类型:

    Writable - 可写入数据的流(例如 fs.createWriteStream())。
    Readable - 可读取数据的流(例如 fs.createReadStream())。
    Duplex - 可读又可写的流(例如 net.Socket)。
    Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())

 

两种模式

 二进制模式

每个分块都是buffer、string对象

对象模式

Node.js 创建的流都是运作在字符串和 Buffer(或 Uint8Array)上。 当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null)。 这些流会以“对象模式”进行操作。

创建流时,可以使用 objectMode 选项把流实例切换到对象模式。 将已存在的流切换到对象模式是不安全的。

 

 比如如果想创建一个的可以压入任意形式数据的可读流,只要在创建流的时候设置参数objectModetrue即可,例如:Readable({ objectMode: true })

如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。

如果readable stream写入的数据是对象,那么需要这样创建writable stream,Writable({ objectMode: true })

⚠️就是如果输入的数据并不是Buffer(或 Uint8Array格式的时候,那么在创建这个流的时候就要将其设置为对象模式,即设置其的objectMode: true,举例:

const DuplexStream = require('readable-stream').Duplex
const inherits = require('util').inherits

module.exports = PostMessageStream

inherits(PostMessageStream, DuplexStream)

function PostMessageStream (opts) {
  DuplexStream.call(this, {
    objectMode: true,
  })
...
}

 

缓冲

可写流可读流都会在内部的缓冲器中存储数据,可以分别使用的 writable.writableBuffer 或 readable.readableBuffer 来获取。

可缓冲的数据大小取决于传入流构造函数的 highWaterMark 选项。 对于普通的流,highWaterMark 指定了字节的总数。 对于对象模式的流,highWaterMark 指定了对象的总数。

当调用 stream.push(chunk) 时,数据会被缓冲在可读流中。 如果流的消费者没有调用 stream.read(),则数据会保留在内部队列中直到被消费。

一旦内部的可读缓冲的总大小达到 highWaterMark 指定的阈值时,流会暂时停止从底层资源读取数据,直到当前缓冲的数据被消费 (也就是说,流会停止调用内部的用于填充可读缓冲的 readable._read())。

当调用 writable.write(chunk) 时,数据会被缓冲在可写流中。 当内部的可写缓冲的总大小小于 highWaterMark 设置的阈值时,调用 writable.write() 会返回 true。 一旦内部缓冲的大小达到或超过 highWaterMark 时,则会返回 false

stream API 的主要目标,特别是 stream.pipe(),是为了限制数据的缓冲到可接受的程度,也就是读写速度不一致的源头与目的地不会压垮内存。

因为 Duplex 和 Transform 都是可读又可写的,所以它们各自维护着两个相互独立的内部缓冲器用于读取和写入, 这使得它们在维护数据流时,读取和写入两边可以各自独立地运作。 例如,net.Socket 实例是 Duplex 流,它的可读端可以消费从 socket 接收的数据,而可写端则可以将数据写入到 socket。 因为数据写入到 socket 的速度可能比接收数据的速度快或者慢,所以在读写两端独立地进行操作(或缓冲)就显得很重要了。

 

【1】用于消费流的 API(即读取流中数据)

test.js

const http = require('http');

const server = http.createServer((req, res) => {
  // req 是一个 http.IncomingMessage 实例,它是可读流// res 是一个 http.ServerResponse 实例,它是可写流

  let body = '';
  // 接收数据为 utf8 字符串,
  // 如果没有设置字符编码,则会接收到 Buffer 对象。
  req.setEncoding('utf8');

  // 如果添加了监听器,则可读流会触发 'data' 事件。
  req.on('data', (chunk) => {
    body += chunk;
  });

  // 'end' 事件表明整个请求体已被接收。 
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // 响应信息给用户。
      res.write(typeof data);
      res.end();//end()表示写结束
    } catch (er) {
      // json 解析失败。
      res.statusCode = 400;
      return res.end(`错误: ${er.message}`);
    }
  });
});

server.listen(1337);

然后在终端使用node test.js运行该服务器

然后在另一个终端使用curl localhost:1337 -d "{}" 连接服务器localhost:1337 ,-d即post数据data为{} ,返回object

curl localhost:1337 -d "{}"       返回object
curl localhost:1337 -d "\"foo\""  返回string
curl localhost:1337 -d "not json" 返回 错误: Unexpected token o in JSON at position 1

 

可写流(比如例子中的 res)会暴露了一些方法,比如 write() 和 end() 用于写入数据到流。

当数据可以从流读取时,可读流会使用 EventEmitter API 来通知应用程序。 从流读取数据的方式有很多种。

可写流可读流都通过多种方式使用 EventEmitter API 来通讯流的当前状态。

Duplex 流和 Transform 流都是可写又可读的。

对于只需写入数据到流或从流消费数据的应用程序,并不需要直接实现流的接口,通常也不需要调用 require('stream')

 

《1》可写流

可写流是对数据要被写入的目的地的一种抽象。

可写流的例子包括:

上面的一些例子事实上是实现了可写流接口的 Duplex 流。

所有可写流都实现了 stream.Writable 类定义的接口。

尽管可写流的具体实例可能略有差别,但所有的可写流都遵循同一基本的使用模式,如以下例子所示:

const myStream = getWritableStreamSomehow();
myStream.write('一些数据');
myStream.write('更多数据');
myStream.end('完成写入数据');//说明完成写入

 

stream.Writable 类

下面介绍几类事件:

'close' 事件

当流或其底层资源(比如文件描述符)被关闭时触发。 表明不会再触发其他事件,也不会再发生操作。

不是所有可写流都会触发 'close' 事件。

'drain' 事件

如果调用 stream.write(chunk) 返回 false,可能缓冲区已满,需要等待,则当有空间可以继续写入数据到流时会触发 'drain' 事件。

// 向可写流中写入数据一百万次。
// 留意背压(back-pressure)。
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // 最后一次写入。
        writer.write(data, encoding, callback);
      } else {
        // 检查是否可以继续写入。 
        // 不要传入回调,因为写入还没有结束。
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // 被提前中止。
      // 当触发 'drain' 事件时继续写入,继续运行write()函数。
      writer.once('drain', write);
    }
  }
}
'error' 事件

当写入数据发生错误时触发。

当触发 'error' 事件时,流还未被关闭

'finish' 事件

调用 stream.end() 且缓冲数据都已传给底层系统之后触发。

const http = require('http');

const server = http.createServer((req, res) => {
  // req 是一个 http.IncomingMessage 实例,它是可读流。
  // res 是一个 http.ServerResponse 实例,它是可写流。

  let body = '';
  // 接收数据为 utf8 字符串,
  // 如果没有设置字符编码,则会接收到 Buffer 对象。
  req.setEncoding('utf8');

  // 如果添加了监听器,则可读流会触发 'data' 事件。
  req.on('data', (chunk) => {
    body += chunk;
  });

  // 'end' 事件表明整个请求体已被接收。 
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // 响应信息给用户。
      res.write(typeof data);
      res.end();//会触发finish事件
      res.on('finish', () => {
          console.error('写入已完成');
      });
    } catch (er) {
      // json 解析失败。
      res.statusCode = 400;
      return res.end(`错误: ${er.message}`);
    }
  });
});

server.listen(1337);

运行结果:

nodejs-stream部分

'pipe' 事件

当在可读流上调用 stream.pipe() 时触发。

var assert = require('assert');
const writer = process.stdout;
const reader = process.stdin;
writer.on('pipe', (src) => {
  console.error('有数据正通过管道流入写入器');
  assert.equal(src,reader);//两者相等
  console.log(src);
});
reader.pipe(writer);

返回:

有数据正通过管道流入写入器
ReadStream {
  connecting: false,
  _hadError: false,
  _handle:
   TTY { owner: [Circular], onread: [Function: onread], reading: false },
  _parent: null,
  _host: null,
  _readableState:
   ReadableState {
     objectMode: false,//非对象模式
     highWaterMark: 0,
     buffer: BufferList { length: 0 },
     length: 0,
     pipes:
      WriteStream {
        connecting: false,
        _hadError: false,
        _handle: [TTY],
        _parent: null,
        _host: null,
        _readableState: [ReadableState],
        readable: false,
        _events: [Object],
        _eventsCount: 7,
        _maxListeners: undefined,
        _writableState: [WritableState],
        writable: true,
        allowHalfOpen: false,
        _sockname: null,
        _writev: null,
        _pendingData: null,
        _pendingEncoding: '',
        server: null,
        _server: null,
        columns: 80,
        rows: 24,
        _type: 'tty',
        fd: 1,
        _isStdio: true,
        destroySoon: [Function: destroy],
        _destroy: [Function],
        [Symbol(asyncId)]: 2,
        [Symbol(lastWriteQueueSize)]: 0,
        [Symbol(timeout)]: null,
        [Symbol(kBytesRead)]: 0,
        [Symbol(kBytesWritten)]: 0 },
     pipesCount: 1,
     flowing: true,
     ended: false,
     endEmitted: false,
     reading: false,
     sync: false,
     needReadable: true,
     emittedReadable: false,
     readableListening: false,
     resumeScheduled: true,
     emitClose: false,
     destroyed: false,
     defaultEncoding: 'utf8',
     awaitDrain: 0,
     readingMore: false,
     decoder: null,
     encoding: null },
  readable: true,
  _events:
   { end: [ [Function: onReadableStreamEnd], [Function] ],
     pause: [Function],
     data: [Function: ondata] },
  _eventsCount: 3,
  _maxListeners: undefined,
  _writableState:
   WritableState {
     objectMode: false,
     highWaterMark: 0,
     finalCalled: false,
     needDrain: false,
     ending: false,
     ended: false,
     finished: false,
     destroyed: false,
     decodeStrings: false,
     defaultEncoding: 'utf8',
     length: 0,
     writing: false,
     corked: 0,
     sync: true,
     bufferProcessing: false,
     onwrite: [Function: bound onwrite],
     writecb: null,
     writelen: 0,
     bufferedRequest: null,
     lastBufferedRequest: null,
     pendingcb: 0,
     prefinished: false,
     errorEmitted: false,
     emitClose: false,
     bufferedRequestCount: 0,
     corkedRequestsFree:
      { next: null,
        entry: null,
        finish: [Function: bound onCorkedFinish] } },
  writable: false,
  allowHalfOpen: false,
  _sockname: null,
  _writev: null,
  _pendingData: null,
  _pendingEncoding: '',
  server: null,
  _server: null,
  isRaw: false,
  isTTY: true,
  fd: 0,
  [Symbol(asyncId)]: 5,
  [Symbol(lastWriteQueueSize)]: 0,
  [Symbol(timeout)]: null,
  [Symbol(kBytesRead)]: 0,
  [Symbol(kBytesWritten)]: 0 }
View Code

相关文章: