【问题标题】:Node - Closing Streams Properly after Pipeline节点 - 在管道之后正确关闭流
【发布时间】:2020-07-19 04:47:46
【问题描述】:

假设我有以下代码:

try {
    let size = 0;

    await pipeline(
        fs.createReadStream('lowercase.txt'),
        async function* (source) {
            for await (const chunk of source) {
                size += chunk.length;
           
                if (size >= 1000000) {
                    throw new Error('File is too big');
                }

                yield String(chunk).toUpperCase();
            }
        },
        fs.createWriteStream('uppercase.txt')
    );

    console.log('Pipeline succeeded.');
} catch (error) {
    console.log('got error:', error);
}

如何确保在每种情况下都正确关闭流? node docs 并没有多大帮助——他们只是告诉我,我将有悬空的事件监听器:

stream.pipeline() 将对所有流调用 stream.destroy(err),除了:

发出 'end' 或 'close' 的可读流。

发出“finish”或“close”的可写流。

stream.pipeline() 在调用回调后将悬空事件侦听器留在流上。在失败后重用流的情况下,这可能会导致事件侦听器泄漏和吞噬错误。

【问题讨论】:

  • 我还注意到该文档在这个主题上奇怪地不完整,这就是我自己从未使用过pipeline() 的原因。我认为我只需将自己的承诺包装在我可以完全控制错误处理的事情上。我是那种宁愿自己编写代码也不愿与预制但记录不充分的代码搏斗的人。鬼鬼祟祟的控制与你不明白如何正确使用的预制代码。
  • @jfriend00 这对我来说很有意义。如果问得不算多,您能否使用 Promise 和事件侦听器编写上述代码的一个版本?将不胜感激并接受它作为答案:)

标签: javascript node.js stream pipe pipeline


【解决方案1】:

因此,我发现许多 node.js 流复合操作(例如 pipeline().pipe())在错误处理方面非常糟糕/不完整。例如,如果你这样做:

fs.createReadStream("input.txt")
  .pipe(fs.createWriteStream("output.txt"))
  .on('error', err => {
      console.log(err);
  }).on('finish', () => {
      console.log("all done");
  });

您会期望,如果打开 readStream 时出现错误,您会在此处的错误处理程序中收到该错误,但“否”并非如此。打开该输入文件的错误将未处理。这有一些逻辑,因为.pipe() 返回输出流并且输入错误不是输出流上的错误,但是当它没有通过时,很容易错过输入流上的错误。 .pipe() 操作可以侦听输入流上的错误并传递错误(即使它是pipeErr 或其他东西),然后它也可以在读取错误时正确清理 writeStream。但是,.pipe() 并没有彻底实现。它似乎想假设输入流永远不会出错。

相反,您必须单独保存 readStream 对象并直接为其附加错误处理程序才能看到该错误。所以,我只是不再相信这种复合的东西,而且文档从来没有真正解释过如何进行正确的错误处理。我试图查看pipeline() 的代码,看看我是否能理解错误处理,但这并没有证明是一项富有成效的努力。

所以,您的特定问题似乎可以通过转换流来解决:

const fs = require('fs');
const { Transform } = require('stream');

const myTransform = new Transform({
    transform: function(chunk, encoding, callback) {
        let str = chunk.toString('utf8');
        this.push(str.toUpperCase());
        callback();
    }
});

function upperFile(input, output) {
    return new Promise((resolve, reject) => {
        // common function for cleaning up a partial output file
        function errCleanup(err) {
            fs.unlink(output, function(e) {
                if (e) console.log(e);
                reject(err);
            });
        }

        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
        let outputStream = fs.createWriteStream(output, {emitClose: true});

        // have to separately listen for read/open errors
        inputStream.on("error", err => {
            // have to manually close writeStream when there was an error reading
            if (outputStream) outputStream.destroy();
            errCleanup(err);
        });
        inputStream.pipe(myTransform)
            .pipe(outputStream)
            .on("error", errCleanup)
            .on("close", resolve);        
    });
}

// sample usage
upperFile("input.txt", "output.txt").then(() => {
    console.log("all done");
}).catch(err => {
    console.log("got error", err);
});

如您所见,大约 2/3 的代码以稳健的方式处理错误(内置操作无法正确执行的部分)。

【讨论】:

    猜你喜欢
    • 2013-07-11
    • 1970-01-01
    • 1970-01-01
    • 2019-02-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多