【问题标题】:How to customize a ReadablerStreamReader to handle the Array JSON如何自定义 ReadablerStreamReader 来处理数组 JSON
【发布时间】:2021-10-12 21:17:44
【问题描述】:

我有 user.json(假设这将是一个大文件,我想流式读取此文件,但限制块大小)。

[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  },
  {
    "name": "Brian Flemming",
    "occupation": "teacher",
    "born": "1967-11-22"
  },
  {
    "name": "Lucy Black",
    "occupation": "accountant",
    "born": "1995-04-07"
  },
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977-10-31"
  }
]

我的示例代码。

const fs = require('fs');
const stream = require('stream');

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log('---------start')
    console.log(chunk.toString());
    console.log('---------end')
  }
}

const readStream = fs.createReadStream('users.json', {highWaterMark: 120 })
logChunks(readStream)

输出看起来像

---------start
[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  }
  ,
  {
    "name": "Brian Flem
---------end
---------start
ming",
    "occupation": "teacher",
    "born": "1967-11-22"
  }
  ,
  {
    "name": "Lucy Black",
    "occupation": "ac
---------end
---------start
countant",
    "born": "1995-04-07"
  }
  ,
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977
---------end
---------start
-10-31"
  }
]

---------end

我的目标是从多个chunck中提取json对象,使其可以是JSON.parse。

我没有找到 node.js 的任何 JSONStreamParse,所以我希望我能在这里得到一些专业的想法。谢谢


更新:

我有一个选项解决方案是使用第 3 方解决方案。 stream-json

await util.promisify(stream.pipeline)(
    readStream,
    StreamArray.withParser(),
    async function( parsedArrayEntriesIterable ){
      for await (const {key: arrIndex, value: arrElem} of parsedArrayEntriesIterable) {
        console.log("Parsed array element:", arrElem);
      }
    }
  )

【问题讨论】:

  • 重新发布此评论,因为我评论了错误的问题。是否有任何特殊原因不将所有块保存在缓冲区中并在最后解析整个 json 字符串?我可以很容易地告诉你这个答案,否则,我们必须编写一个自定义解析器来将不完整的 json 字符串分成两个,例如有效部分和不完整部分。等待整个 json 字符串并不是一个坏主意,因为用户在整个阅读过程中不会被阻塞。由于每次迭代都是异步的,因此 JavaScript 事件循环的主线程可以控制循环的每次迭代。
  • 我也对该解决方案感兴趣,我目前没有用例,但我很好奇该 praser 将如何工作。 (以及如何扩展它以使用数组/嵌套对象)
  • @Summer 感谢您的更新,我意识到您发布的库有更好的解决方案。您也可以将此库用于您的其他问题stackoverflow.com/questions/68705813/…。有时间我也会更新这个答案。

标签: node.js parsing jsonstream


【解决方案1】:

我阅读了您对您的问题的更新,并意识到我对您的问题留下的评论完全不合时宜。由于您使用的是流,因此您不想等待所有数据以避免内存耗尽。我应该在一开始就注意到这一点。

让我举一些例子来表示我的道歉。我希望这有助于理解如何使用流。

为了使示例更真实,让我们模拟从远程服务器获取 json,就像 node-fetch 所做的那样。 node-fetch 返回 ReadableStream 的实例,也就是 asyncIterable。我们可以通过将异步生成器函数传递给stream.Readable.from() 来轻松创建它,如下所示。

fetch()的定义

async function* asyncGenerator (chunks) {
  let counter = 1;
  for (const chunk of chunks) {
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log(`==== chunk ${counter++} transmitted =====================`);
    yield chunk;
  }
}

const stream = require('stream');

// simulates node-fetch
async function fetch (json) {
  const asyncIterable = asyncGenerator(json);
  // let the client wait for 0.5 sec.
  await new Promise(resolve => setTimeout(resolve, 500));
  return new Promise(resolve => {
    // returns the response object
    resolve({ body: stream.Readable.from(asyncIterable) });
  });
}

fetch() 需要 0.5 秒来获取响应对象。它返回Promise,它解析为body 提供ReadableStream 的对象。这个可读流每秒不断地向下游发送大块 json 数据,如asyncGenerator() 中所定义。

我们的fetch() 函数将一个分块的 json 数组作为参数而不是 URL。让我们使用您提供的那个,但我们在稍有不同的位置拆分它,所以在收到第二个块后,我们得到了两个完整的对象。

const chunkedJson = [
  // chunk 1
  `[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  }
  ,
  {
    "name": "Brian Flem`,
  // chunk 2
  `ming",
    "occupation": "teacher",
    "born": "1967-11-22"
  }
  ,
  {
    "name": "Lucy Black",
    "occupation": "accountant",
    "born": "1995-04-07"
  }`,
  // chunk 3
  `,
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977`,
  // chunk 4
  `-10-31"
  }
]`
];

现在,有了这些数据,您可以确认fetch() 的工作原理如下。

示例 1:测试 fetch()

async function example1 () {
  const response = await fetch(chunkedJson);
  for await (const chunk of response.body) {
    console.log(chunk);
  }
}

example1();
console.log("==== Example 1 Started ==============");

示例 1 的输出。

==== Example 1 Started ==============
==== chunk 1 transmitted =====================
[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  }
  ,
  {
    "name": "Brian Flem
==== chunk 2 transmitted =====================
ming",
    "occupation": "teacher",
    "born": "1967-11-22"
  }
  ,
  {
    "name": "Lucy Black",
    "occupation": "accountant",
    "born": "1995-04-07"
  }
==== chunk 3 transmitted =====================
,
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977
==== chunk 4 transmitted =====================
-10-31"
  }
]

现在,让我们处理这个 json 数据的每个元素,而不是等待整个数据到达。

StraemArraystream.Transform 的子类。所以它有ReadableStreamWritableStream的接口。如果流实例与pipe() 连接,您不必担心backpressure,因此我们通过管道传输两个流,即。从fetch() 获得的ReadableStreamStreamArray 的实例一起作为下面示例2 中的response.body.pipe(StreamArray.withParser())

pipe(StreamArray.withParser()) 返回StreamArray 本身的实例以用于方法链接,因此pipeline 变量现在保存对转换流的引用,该转换流也是一个可读流。我们可以将事件监听器附加到它以使用转换后的数据。

StreamArray 在从可读源解析单个对象时发出 data 事件。所以pipiline.on('data', callback) 逐块处理,无需等待整个 json 数据。

当事件侦听器使用pipiline.on('data', callback) 注册到data 事件时,流开始流动。

由于我们模拟的是异步取数据,所以在数据传输过程中可以在控制台看到!!!! MAIN THREAD !!!!。您可以确认主线程在等待解析数据时没有被阻塞。

示例 2:测试 stream-json 在到达时将每个数组元素加一处理

const StreamArray = require('stream-json/streamers/StreamArray');

async function example2 () {
  const response = await fetch(chunkedJson);
  const pipeline = response.body.pipe(StreamArray.withParser());
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
  pipeline.on('data', ({ key, value }) => {
    console.log("====== stream-json StreamArray() RESULT ========");
    console.log(value); // do your data processing here
  }).on('close', () => {
    clearInterval(timer); // stop the main thread console.log
  });
}

example2();
console.log("==== Example 2 Started ==============");

示例 2 的输出。

==== Example 2 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== stream-json StreamArray() RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }

由于所有流都是 EventEmitter 的实例,您可以简单地将回调附加到 data 事件以使用示例 2 中的最终数据。但是,最好使用 pipe(),即使是最终数据消耗,因为pipe() 处理背压。

当下游的数据消耗慢于上游的数据馈送时,就会出现背压问题。例如,当您的数据处理需要时间时,您可能希望异步处理每个块。如果处理下一个块在前一个块之前完成,则下一个块在第一个块之前被推送到下游。如果下游在处理下一个块之前依赖于第一个块,则会导致麻烦。

当你使用事件监听器时,你必须手动控制暂停和恢复以避免背压(见this as an example)。但是,如果您使用pipe() 连接流,则会在内部处理背压问题。这意味着当下游比上游慢时,pipe() 会自动暂停对下游的馈送。

所以让我们创建自己的WritableStream,以便通过pipe() 连接到StreamArray。在我们的例子中,我们从上游(即StreamArray)而不是字符串接收二进制数据,我们必须将objectMode 设置为true。我们覆盖了_write() 函数,该函数将在内部从write() 调用。你把所有的数据处理逻辑都放在这里,完成后调用callback()。当流与pipe() 连接时,上游不会提供下一个数据,直到调用回调。

为了模拟背压,我们将块 1 和 3 处理 1.5 秒,将块 0 和 4 处理为零秒。

示例 3:管道我们自己的流实例

class MyObjectConsumerStream extends stream.Writable {
  constructor(options) {
    super({ ...options, objectMode: true });
  }

  _write(chunk, encoding, callback) {
    const { key, value } = chunk; // receive from StreamArray of stream-json
    console.log("===== started to processing the chunk ........... ");
    setTimeout(() => {
      console.log("====== Example 3 RESULT ========");
      console.log(value); // do your data processing here
      callback(); // pipe() will pause the upstream until callback is called
    }, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
  }
}

//--- Example 3: We write our own WritableStream to consume chunked data ------
async function example3 () {
  const response = await fetch(chunkedJson);
  response.body.pipe(StreamArray.withParser()).pipe(new MyObjectConsumerStream()).on('finish', () => {
    clearInterval(timer); // stop the main thread console.log
  });
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
}

example3();
console.log("==== Example 3 Started ==============");

示例 3 的输出。

==== Example 3 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ........... 
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
==== chunk 4 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }

您可以确认接收到的数据是有序的。您还可以看到第二个块的传输在处理第一个对象时开始,因为我们将其设置为 1.5 秒。现在,让我们使用事件监听器做同样的事情,如下所示。

示例 4:简单回调的背压问题

async function example4 () {
  const response = await fetch(chunkedJson);
  const pipeline = response.body.pipe(StreamArray.withParser());
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
  pipeline.on('data', ({ key, value }) => {
    console.log("===== started to processing the chunk ........... ");
    setTimeout(() => {
      console.log(`====== Example 4 RESULT ========`);
      console.log(value); // do your data processing here
    }, key % 2 === 0 ? 1500 : 0); // for second and thrid chunk it processes 0 sec!
  }).on('close', () => {
    clearInterval(timer); // stop the main thread console.log
  });
}

example4();
console.log("==== Example 4 Started ==============");

示例 4 的输出。

==== Example 4 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
===== started to processing the chunk ........... 
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== Example 4 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
===== started to processing the chunk ........... 
====== Example 4 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }

现在,我们看到第二个元素“Brian”在“John”之前到达。如果块 1 和 3 的处理时间增加到 3 秒,则最后一个元素“William”也会在第三个元素“Lucy”之前到达。

因此,当数据到达的顺序很重要时,使用pipe() 而不是事件侦听器来消费数据是一个好习惯。

您可能想知道为什么the API doc 中的示例代码使用他们自己的chain() 函数来制作管道。它是 Node.js 流式编程中错误处理的推荐设计模式。如果错误在管道的下游抛出,它不会将错误传播到上游。所以你必须在管道中的每个流上附加回调,如下所示(这里我们假设有三个流abc)。

a.on('error', callbackForA)
 .pipe(b).on('error', callbackForB)
 .pipe(c).on('error', callbackForC)

与 Promise 链相比,它看起来很麻烦,可以简单地在链尾添加.catch()。即使我们如上所述设置了所有错误处理程序,但仍然不够。

当下游抛出错误时,错误导致的流会使用unpipe() 从管道中分离出来,但是上游不会自动销毁。这是因为有可能将多个流连接到上游以分支流线。所以当你使用pipe()时,你必须自己关闭每个错误处理程序的所有上游。

为了解决这些问题,社区提供了管道构建库。我认为来自stream-chainchain() 就是其中之一。从 Node ver.10 开始,为此功能添加了 stream.pipeline。我们可以使用这个官方的管道构造函数,因为stream-json 中的所有流都是常规流实例的子类。

在展示stream.pipiline 的用法之前,让我们修改MyObjectConsumerStream 类以在处理第二个对象时抛出错误。

引发错误的自定义流

class MyErrorStream extends MyObjectConsumerStream {
  _write(chunk, encoding, callback) {
    const { key, value } = chunk; // receive from StreamArray of stream-json
    console.log("===== started to processing the chunk ........... ");
    if (key === 2)
      throw new Error("Error in key 2");
    setTimeout(() => {
      console.log("====== Example 5 RESULT ========");
      console.log(value); // do your data processing here
      callback(); // pipe() will pause the upstream until callback is called
    }, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
  };
}

stream.pipeline 按顺序接收多个流以及最后的错误处理程序。错误处理程序在抛出错误时接收Error 的实例,并在成功完成时接收null

示例 5:stream.pipeline 的使用

async function example5 () {
  const response = await fetch(chunkedJson);
  const myErrorHandler = (timerRef) => (error) => {
    if (error)
      console.log("Error in the pipiline", error.message);
    else
      console.log("Finished Example 5 successfully");
    clearInterval(timerRef); // stop the main thread console.log
  }
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
  stream.pipeline(
    response.body,
    StreamArray.withParser(),
    new MyErrorStream(),
    myErrorHandler(timer)
  );
  console.log("==== Example 5 Started ==============");
}

example5();

示例 5 的输出

==== Example 5 Started ==============
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 5 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ........... 
====== Example 5 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ........... 
/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211
      throw new Error("Error in key 2");
      ^

Error: Error in key 2
    at MyErrorStream._write (/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211:13)
    at doWrite (internal/streams/writable.js:377:12)
    at clearBuffer (internal/streams/writable.js:529:7)
    at onwrite (internal/streams/writable.js:430:7)
    at Timeout._onTimeout (/Users/shito/Documents/git-repositories/javascript/stackoverflow/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:215:7)
    at listOnTimeout (internal/timers.js:554:17)
    at processTimers (internal/timers.js:497:7)

当抛出错误时,stream.pipeline() 对所有未关闭或未正确完成的流调用 stream.destroy(error)。所以我们不必担心内存泄漏。

【讨论】:

  • 哇,感谢您在这里尝试不同的选项。但我在这里的目的是摆脱使用第 3 方库“StreamArray.withParser()”。我不想在这里使用依赖项,我尝试编写自己的实现以节省更多时间,因为我不需要解析 JSON 对象中的名称/值。
猜你喜欢
  • 1970-01-01
  • 2021-07-23
  • 1970-01-01
  • 2021-05-26
  • 2019-11-17
  • 2013-02-09
  • 2020-06-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多