【问题标题】:How to use ES8 async/await with streams?如何在流中使用 ES8 异步/等待?
【发布时间】:2016-02-09 13:00:17
【问题描述】:

https://stackoverflow.com/a/18658613/779159 中的示例说明了如何使用内置的加密库和流计算文件的 md5。

var fs = require('fs');
var crypto = require('crypto');

// the file you want to get the hash    
var fd = fs.createReadStream('/some/file/name.txt');
var hash = crypto.createHash('sha1');
hash.setEncoding('hex');

fd.on('end', function() {
    hash.end();
    console.log(hash.read()); // the desired sha1sum
});

// read all file and pipe it (write it) to the hash object
fd.pipe(hash);

但是是否可以将其转换为使用 ES8 async/await 而不是使用上面看到的回调,同时仍然保持使用流的效率?

【问题讨论】:

  • async/await 只不过是对 Promise 的语法级支持。如果你可以把这段代码放在一个 Promise 中,那么你就完成了。
  • Node.js 10.x 支持使用 for-await-of 读取流 (nodejs.org/docs/latest-v10.x/api/…) 但我认为这不是您的问题的正确概念。留给其他可能会遇到可能会有所帮助的情况的人。

标签: javascript node.js async-await ecmascript-2017


【解决方案1】:

async/await 仅适用于承诺,不适用于流。有一些想法可以制作一个额外的类似流的数据类型,它会拥有自己的语法,但如果有的话,这些都是高度实验性的,我不会详细介绍。

无论如何,您的回调只是在等待流的结束,这非常适合 promise。你只需要包装流:

var fd = fs.createReadStream('/some/file/name.txt');
var hash = crypto.createHash('sha1');
hash.setEncoding('hex');
// read all file and pipe it (write it) to the hash object
fd.pipe(hash);

var end = new Promise(function(resolve, reject) {
    hash.on('end', () => resolve(hash.read()));
    fd.on('error', reject); // or something like that. might need to close `hash`
});

现在你可以等待这个承诺了:

(async function() {
    let sha1sum = await end;
    console.log(sha1sum);
}());

【讨论】:

  • 恭喜!现在这不是一个正确的答案,现在您可以对流2ality.com/2018/04/async-iter-nodejs.html 使用异步循环。不幸的是,它现在是一个实验性功能。
  • @MiF 所以你的意思是答案是错误的,因为它不是“高度实验性的”而只是“实验性的”? :-D
  • JS 在所有异步情况下都使用 Promise 和 async/await。太奇怪了,这个功能没有在生产中发布。我认为,现在是一个很好的 sintacsys,不会改变。
  • 你为什么要等到end的ReadStream?在哈希函数有机会对数据执行之前,Promise 不会解决吗?或者,至少,它不会造成竞争条件吗?我假设您希望将fd.pipe(hash) 的值存储在一个变量中并监听finish 事件,因为这表明WriteableStream(散列)已经完成。
  • @adam-beck 听起来你是对的。为什么我这样做?因为OP在他们的问题中也这样做了。欢迎提出修改建议。
【解决方案2】:

如果您使用的是节点版本 >= v10.0.0,那么您可以使用 stream.pipelineutil.promisify

const fs = require('fs');
const crypto = require('crypto');
const util = require('util');
const stream = require('stream');

const pipeline = util.promisify(stream.pipeline);

const hash = crypto.createHash('sha1');
hash.setEncoding('hex');

async function run() {
  await pipeline(
    fs.createReadStream('/some/file/name.txt'),
    hash
  );
  console.log('Pipeline succeeded');
}

run().catch(console.error);

【讨论】:

  • 是否也应该在某处调用 unpipe?
  • 我不得不挖了好几个小时才找到这个答案。为什么这不包含在 Node Streams 文档中作为示例...?
  • 但它在文档中。 nodejs.org/api/…
  • 这是一个更好的答案
  • 请注意,节点 github.com/nodejs/node/issues/34274 中的管道流刷新存在问题
【解决方案3】:

这样的工作:

for (var res of fetchResponses){ //node-fetch package responses
    const dest = fs.createWriteStream(filePath,{flags:'a'});
    totalBytes += Number(res.headers.get('content-length'));
    await new Promise((resolve, reject) => {
        res.body.pipe(dest);
        res.body.on("error", (err) => {
            reject(err);
        });
        dest.on("finish", function() {
            resolve();
        });
    });         
}

【讨论】:

  • 很好的解释,我使用这个承诺来包装我使用内置在 https 模块中的 NodeJS 发出的 https 请求。谢谢!
【解决方案4】:

节点 V15 现在在 stream/promises 中有一个 promisfiy 管道。 这是最干净、最正式的方式。

const { pipeline } = require('stream/promises');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

我们都应该感谢它在这里完成了多少工作:

  • 捕获所有流中的错误。
  • 出现错误时销毁未完成的流。
  • 仅在最后一个可写流完成时返回。

这个管道是 Node.JS 最强大的功能之一。使其完全异步并不容易。现在我们有了它。

【讨论】:

    【解决方案5】:

    我会发表评论,但没有足够的声誉。

    注意事项: 如果您有一个应用程序正在传递流并执行异步/等待,请在等待之前非常小心地连接所有管道。您最终可能会得到不包含您认为他们所做的内容的流。这是最小的例子

    const { PassThrough } = require('stream');
    
    async function main() {
        const initialStream = new PassThrough();
    
        const otherStream = new PassThrough();
        const data = [];
        otherStream.on('data', dat => data.push(dat));
        const resultOtherStreamPromise = new Promise(resolve => otherStream.on('end', () => { resolve(Buffer.concat(data)) }));
    
        const yetAnotherStream = new PassThrough();
        const data2 = [];
        yetAnotherStream.on('data', dat => data2.push(dat));
        const resultYetAnotherStreamPromise = new Promise(resolve => yetAnotherStream.on('end', () => { resolve(Buffer.concat(data2)) }));
    
        initialStream.pipe(otherStream);
        initialStream.write('some ');
    
        await Promise.resolve(); // Completely unrelated await
    
        initialStream.pipe(yetAnotherStream);
        initialStream.end('data');
        const [resultOtherStream, resultYetAnotherStream] = await Promise.all([
            resultOtherStreamPromise,
            resultYetAnotherStreamPromise,
        ]);
    
        console.log('other stream:', resultOtherStream.toString()); // other stream: some data
        console.log('yet another stream:', resultYetAnotherStream.toString()); // yet another stream: data
    }
    main();
    

    【讨论】:

      【解决方案6】:

      2021 年更新:

      Node 文档中的新示例:

      async function print(readable) {
        readable.setEncoding('utf8');
        let data = '';
        for await (const chunk of readable) {
          data += chunk;
        }
        console.log(data);
      }
      

      https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator

      【讨论】:

        猜你喜欢
        • 2020-07-11
        • 1970-01-01
        • 1970-01-01
        • 2021-12-16
        • 1970-01-01
        • 1970-01-01
        • 2023-01-15
        • 2021-10-22
        相关资源
        最近更新 更多