【问题标题】:Nodejs `fs.createReadStream` as promiseNodejs `fs.createReadStream` 作为承诺
【发布时间】:2021-09-15 14:30:16
【问题描述】:

我试图让fs.createReadStream 作为一个承诺工作,所以在读取整个文件后,它将被解析。

在下面的情况下,我暂停流,执行等待方法并恢复。

  1. 如何使.on('end'...最终被执行。
  2. 如果 1. 不可能,为什么 `.on('不会被解雇',也许我可以用它来解决承诺。
function parseFile<T>(filePath: string, row: (x: T) => void, err: (x) => void, end: (x) => void) {
        return new Promise((resolve, reject) => {
            const stream = fs.createReadStream(filePath);
            stream.on('data', async data => {
                    try {
                        stream.pause();
                        await row(data);
                    } finally {
                        stream.resume();
                    }
                })
                .on('end', (rowCount: number) => {
                    resolve();// NOT REALLY THE END row(data) is still being called after this
                })
                .on('close', () => {
                    resolve();// NEVER BEING CALLED
                })
                .on('error', (rowCount: number) => {
                    reject();// NEVER GETS HERE, AS EXPECTED
                })
        })
}

更新
在这里你可以实际测试一下:https://stackblitz.com/edit/node-czktjh?file=index.js

运行node index.js

输出应该是 1000 而不是 1

谢谢

【问题讨论】:

  • 这里可能出现的问题之一是您正在暂停输入流,而不是 csv 解析流。 .on('data', ...) 来自 csv 流,因此您不会暂停它。
  • 请记住,来自 readStream 的单个数据块可能会导致多行 csv,从而导致来自 csv 流的多个 data 事件。您还必须测试 csv 流是否注意流暂停。有些流有,有些则没有。
  • @jfriend00 谢谢,我正在创建一个公共容器以便您可以运行它,移除管道没有帮助
  • @jfriend00 - 请看我的更新
  • 在您将问题编辑为的简化代码中,我无法重现您描述的问题。

标签: node.js fs


【解决方案1】:

需要注意的事项。您已从问题的当前版本中删除了行处理,因此正在以大块的形式读取流。它似乎只读取了两个块中的整个文件,因此只有两个 data 事件,所以这里的预期计数是 2,而不是 1000。

我认为出现此代码的问题是因为 stream.pause() 不会暂停 end 事件的生成 - 它只会暂停未来的 data 事件。如果最后一个data 事件已被触发,然后您在await 处理该data 事件(这会导致您的data 事件处理程序立即返回一个promise,则流将认为它已完成并且@987654329在您完成等待处理最后一个data 事件中的函数之前,@ 事件仍然会触发。请记住,数据事件处理程序不是承诺感知的。而且,stream.pause() 似乎只影响data 事件,而不是 end 事件。

我可以想象一种解决方法,它使用一个标记来跟踪您是否仍在处理数据事件并推迟处理 end 事件,直到您完成最后一个 data 事件。稍后我将为此添加代码,说明如何使用该标志。

仅供参考,缺少的close 事件是另一个流怪异事件。您的 nodejs 程序实际上在 close 事件触发之前终止。如果你把它放在程序的开头:

setTimeout(() => { console.log('done with timer');}, 5000);

然后,您将看到关闭事件,因为计时器将阻止您的 nodejs 程序在关闭事件触发之前退出。我并不是建议将此作为任何问题的解决方案,只是为了说明close 事件仍然存在,并且如果您的程序在获得机会之前没有退出,它想要触发。


这里的代码演示了如何使用标志来解决暂停问题。当您运行此代码时,您只会看到 2 个 data 事件,而不是 1000 个,因为此代码没有读取行,它正在读取更大的块。所以,这个的预期结果不是 1000。

// run `node index.js` in the terminal
const fs = require('fs');

const parseFile = row => {
  let  paused = true;
  let ended = false;
  let dataCntr = 0;
  return new Promise((resolve, reject) => {
    const stream = fs.createReadStream('./generated.data.csv');
    stream
      .on('data', async data => {
        ++dataCntr;
        try {
          stream.pause();
          paused = true;
          await row(data);
        } finally {
          paused = false;
          stream.resume();
          if (ended) {
            console.log(`received ${dataCntr} data events`);
            resolve();
          }
        }
      })
      .on('end', rowCount => {
        ended = true;
        if (!paused) {
          console.log(`received ${dataCntr} data events`);
          resolve();
        }
      })
      .on('close', () => {
        //resolve();
      })
      .on('error', rowCount => {
        reject();
      });
  });
};
(async () => {
  let count = 0;
  await parseFile(async row => {
    await new Promise(resolve => setTimeout(resolve, 50)); //sleep
    count++;
  });
  console.log(`lines executed: ${count}, the expected is more than 1`);
})();

仅供参考,我仍然认为您的原始版本存在我在第一条评论中提到的问题 - 您没有暂停正确的流。这里记录的还有另一个问题(在最后一个data 事件完成之前,您可以在await 之前获得end)。

【讨论】:

    猜你喜欢
    • 2016-07-06
    • 2019-09-29
    • 2019-03-19
    • 1970-01-01
    • 2020-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多