我阅读了您对您的问题的更新,并意识到我对您的问题留下的评论完全不合时宜。由于您使用的是流,因此您不想等待所有数据以避免内存耗尽。我应该在一开始就注意到这一点。
让我举一些例子来表示我的道歉。我希望这有助于理解如何使用流。
为了使示例更真实,让我们模拟从远程服务器获取 json,就像 node-fetch 所做的那样。 node-fetch 返回 ReadableStream 的实例,也就是 asyncIterable。我们可以通过将异步生成器函数传递给stream.Readable.from() 来轻松创建它,如下所示。
fetch()的定义
async function* asyncGenerator (chunks) {
let counter = 1;
for (const chunk of chunks) {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`==== chunk ${counter++} transmitted =====================`);
yield chunk;
}
}
const stream = require('stream');
// simulates node-fetch
async function fetch (json) {
const asyncIterable = asyncGenerator(json);
// let the client wait for 0.5 sec.
await new Promise(resolve => setTimeout(resolve, 500));
return new Promise(resolve => {
// returns the response object
resolve({ body: stream.Readable.from(asyncIterable) });
});
}
fetch() 需要 0.5 秒来获取响应对象。它返回Promise,它解析为body 提供ReadableStream 的对象。这个可读流每秒不断地向下游发送大块 json 数据,如asyncGenerator() 中所定义。
我们的fetch() 函数将一个分块的 json 数组作为参数而不是 URL。让我们使用您提供的那个,但我们在稍有不同的位置拆分它,所以在收到第二个块后,我们得到了两个完整的对象。
const chunkedJson = [
// chunk 1
`[
{
"name": "John Doe",
"occupation": "gardener",
"born": "1992-03-02"
}
,
{
"name": "Brian Flem`,
// chunk 2
`ming",
"occupation": "teacher",
"born": "1967-11-22"
}
,
{
"name": "Lucy Black",
"occupation": "accountant",
"born": "1995-04-07"
}`,
// chunk 3
`,
{
"name": "William Bean",
"occupation": "pilot",
"born": "1977`,
// chunk 4
`-10-31"
}
]`
];
现在,有了这些数据,您可以确认fetch() 的工作原理如下。
示例 1:测试 fetch()
async function example1 () {
const response = await fetch(chunkedJson);
for await (const chunk of response.body) {
console.log(chunk);
}
}
example1();
console.log("==== Example 1 Started ==============");
示例 1 的输出。
==== Example 1 Started ==============
==== chunk 1 transmitted =====================
[
{
"name": "John Doe",
"occupation": "gardener",
"born": "1992-03-02"
}
,
{
"name": "Brian Flem
==== chunk 2 transmitted =====================
ming",
"occupation": "teacher",
"born": "1967-11-22"
}
,
{
"name": "Lucy Black",
"occupation": "accountant",
"born": "1995-04-07"
}
==== chunk 3 transmitted =====================
,
{
"name": "William Bean",
"occupation": "pilot",
"born": "1977
==== chunk 4 transmitted =====================
-10-31"
}
]
现在,让我们处理这个 json 数据的每个元素,而不是等待整个数据到达。
StraemArray 是stream.Transform 的子类。所以它有ReadableStream和WritableStream的接口。如果流实例与pipe() 连接,您不必担心backpressure,因此我们通过管道传输两个流,即。从fetch() 获得的ReadableStream 和StreamArray 的实例一起作为下面示例2 中的response.body.pipe(StreamArray.withParser())。
pipe(StreamArray.withParser()) 返回StreamArray 本身的实例以用于方法链接,因此pipeline 变量现在保存对转换流的引用,该转换流也是一个可读流。我们可以将事件监听器附加到它以使用转换后的数据。
StreamArray 在从可读源解析单个对象时发出 data 事件。所以pipiline.on('data', callback) 逐块处理,无需等待整个 json 数据。
当事件侦听器使用pipiline.on('data', callback) 注册到data 事件时,流开始流动。
由于我们模拟的是异步取数据,所以在数据传输过程中可以在控制台看到!!!! MAIN THREAD !!!!。您可以确认主线程在等待解析数据时没有被阻塞。
示例 2:测试 stream-json 在到达时将每个数组元素加一处理
const StreamArray = require('stream-json/streamers/StreamArray');
async function example2 () {
const response = await fetch(chunkedJson);
const pipeline = response.body.pipe(StreamArray.withParser());
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
pipeline.on('data', ({ key, value }) => {
console.log("====== stream-json StreamArray() RESULT ========");
console.log(value); // do your data processing here
}).on('close', () => {
clearInterval(timer); // stop the main thread console.log
});
}
example2();
console.log("==== Example 2 Started ==============");
示例 2 的输出。
==== Example 2 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== stream-json StreamArray() RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }
由于所有流都是 EventEmitter 的实例,您可以简单地将回调附加到 data 事件以使用示例 2 中的最终数据。但是,最好使用 pipe(),即使是最终数据消耗,因为pipe() 处理背压。
当下游的数据消耗慢于上游的数据馈送时,就会出现背压问题。例如,当您的数据处理需要时间时,您可能希望异步处理每个块。如果处理下一个块在前一个块之前完成,则下一个块在第一个块之前被推送到下游。如果下游在处理下一个块之前依赖于第一个块,则会导致麻烦。
当你使用事件监听器时,你必须手动控制暂停和恢复以避免背压(见this as an example)。但是,如果您使用pipe() 连接流,则会在内部处理背压问题。这意味着当下游比上游慢时,pipe() 会自动暂停对下游的馈送。
所以让我们创建自己的WritableStream,以便通过pipe() 连接到StreamArray。在我们的例子中,我们从上游(即StreamArray)而不是字符串接收二进制数据,我们必须将objectMode 设置为true。我们覆盖了_write() 函数,该函数将在内部从write() 调用。你把所有的数据处理逻辑都放在这里,完成后调用callback()。当流与pipe() 连接时,上游不会提供下一个数据,直到调用回调。
为了模拟背压,我们将块 1 和 3 处理 1.5 秒,将块 0 和 4 处理为零秒。
示例 3:管道我们自己的流实例
class MyObjectConsumerStream extends stream.Writable {
constructor(options) {
super({ ...options, objectMode: true });
}
_write(chunk, encoding, callback) {
const { key, value } = chunk; // receive from StreamArray of stream-json
console.log("===== started to processing the chunk ........... ");
setTimeout(() => {
console.log("====== Example 3 RESULT ========");
console.log(value); // do your data processing here
callback(); // pipe() will pause the upstream until callback is called
}, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
}
}
//--- Example 3: We write our own WritableStream to consume chunked data ------
async function example3 () {
const response = await fetch(chunkedJson);
response.body.pipe(StreamArray.withParser()).pipe(new MyObjectConsumerStream()).on('finish', () => {
clearInterval(timer); // stop the main thread console.log
});
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
}
example3();
console.log("==== Example 3 Started ==============");
示例 3 的输出。
==== Example 3 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ...........
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
==== chunk 4 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }
您可以确认接收到的数据是有序的。您还可以看到第二个块的传输在处理第一个对象时开始,因为我们将其设置为 1.5 秒。现在,让我们使用事件监听器做同样的事情,如下所示。
示例 4:简单回调的背压问题
async function example4 () {
const response = await fetch(chunkedJson);
const pipeline = response.body.pipe(StreamArray.withParser());
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
pipeline.on('data', ({ key, value }) => {
console.log("===== started to processing the chunk ........... ");
setTimeout(() => {
console.log(`====== Example 4 RESULT ========`);
console.log(value); // do your data processing here
}, key % 2 === 0 ? 1500 : 0); // for second and thrid chunk it processes 0 sec!
}).on('close', () => {
clearInterval(timer); // stop the main thread console.log
});
}
example4();
console.log("==== Example 4 Started ==============");
示例 4 的输出。
==== Example 4 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
===== started to processing the chunk ...........
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== Example 4 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
===== started to processing the chunk ...........
====== Example 4 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }
现在,我们看到第二个元素“Brian”在“John”之前到达。如果块 1 和 3 的处理时间增加到 3 秒,则最后一个元素“William”也会在第三个元素“Lucy”之前到达。
因此,当数据到达的顺序很重要时,使用pipe() 而不是事件侦听器来消费数据是一个好习惯。
您可能想知道为什么the API doc 中的示例代码使用他们自己的chain() 函数来制作管道。它是 Node.js 流式编程中错误处理的推荐设计模式。如果错误在管道的下游抛出,它不会将错误传播到上游。所以你必须在管道中的每个流上附加回调,如下所示(这里我们假设有三个流a、b、c)。
a.on('error', callbackForA)
.pipe(b).on('error', callbackForB)
.pipe(c).on('error', callbackForC)
与 Promise 链相比,它看起来很麻烦,可以简单地在链尾添加.catch()。即使我们如上所述设置了所有错误处理程序,但仍然不够。
当下游抛出错误时,错误导致的流会使用unpipe() 从管道中分离出来,但是上游不会自动销毁。这是因为有可能将多个流连接到上游以分支流线。所以当你使用pipe()时,你必须自己关闭每个错误处理程序的所有上游。
为了解决这些问题,社区提供了管道构建库。我认为来自stream-chain 的chain() 就是其中之一。从 Node ver.10 开始,为此功能添加了 stream.pipeline。我们可以使用这个官方的管道构造函数,因为stream-json 中的所有流都是常规流实例的子类。
在展示stream.pipiline 的用法之前,让我们修改MyObjectConsumerStream 类以在处理第二个对象时抛出错误。
引发错误的自定义流
class MyErrorStream extends MyObjectConsumerStream {
_write(chunk, encoding, callback) {
const { key, value } = chunk; // receive from StreamArray of stream-json
console.log("===== started to processing the chunk ........... ");
if (key === 2)
throw new Error("Error in key 2");
setTimeout(() => {
console.log("====== Example 5 RESULT ========");
console.log(value); // do your data processing here
callback(); // pipe() will pause the upstream until callback is called
}, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
};
}
stream.pipeline 按顺序接收多个流以及最后的错误处理程序。错误处理程序在抛出错误时接收Error 的实例,并在成功完成时接收null。
示例 5:stream.pipeline 的使用
async function example5 () {
const response = await fetch(chunkedJson);
const myErrorHandler = (timerRef) => (error) => {
if (error)
console.log("Error in the pipiline", error.message);
else
console.log("Finished Example 5 successfully");
clearInterval(timerRef); // stop the main thread console.log
}
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
stream.pipeline(
response.body,
StreamArray.withParser(),
new MyErrorStream(),
myErrorHandler(timer)
);
console.log("==== Example 5 Started ==============");
}
example5();
示例 5 的输出
==== Example 5 Started ==============
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 5 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ...........
====== Example 5 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ...........
/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211
throw new Error("Error in key 2");
^
Error: Error in key 2
at MyErrorStream._write (/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211:13)
at doWrite (internal/streams/writable.js:377:12)
at clearBuffer (internal/streams/writable.js:529:7)
at onwrite (internal/streams/writable.js:430:7)
at Timeout._onTimeout (/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:215:7)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7)
当抛出错误时,stream.pipeline() 对所有未关闭或未正确完成的流调用 stream.destroy(error)。所以我们不必担心内存泄漏。