【问题标题】:NodeJS Streams behaviour pipeline vs promiseNodeJS Streams 行为管道与承诺
【发布时间】:2021-08-08 05:42:31
【问题描述】:

我正在实现一些代码来获取图像,使用 sharp 库将其转换为 png 和 jpg 两种格式,然后返回两个流以稍后上传到 S3 存储桶。

我提供了两种不同的解决方案,一种使用 Promise,另一种使用 stream.pipeline。 但是,由于某种原因,管道版本的运行速度比承诺要慢得多。

这是重现行为的代码(使用节点 14 运行)

const sharp = require('sharp')
const fs = require('fs')
const util = require('util')
const stream = require('stream')
const pipeline = util.promisify(stream.pipeline);

console.time('resize')

const resizeJobPipeline = async (readableStream) => {
  const sharpStream = sharp({
    failOnError: false
  }).resize({width: 800, height: 800, fit: 'inside'})

  // using PassThrough here because in the final code will have to pass this stream to s3 upload
  const memoryPng = new stream.PassThrough()
  const memoryJpg = new stream.PassThrough()

  // Have to await each pipeline sepparately,
  // if wrap them in a Promise.all, then the images don't get fully processed/become corrupted
  await pipeline(readableStream, sharpStream.clone().png(), memoryPng)
  await pipeline(readableStream, sharpStream.clone().jpeg(), memoryJpg)

  return [memoryPng, memoryJpg]
}


const resizeJobPromise = async (readableStream) => {
  const sharpStream = sharp({
    failOnError: false
  }).resize({width: 800, height: 800, fit: 'inside'})

  const promises = []
  promises.push(sharpStream.clone().png().pipe(new stream.PassThrough()))
  promises.push(sharpStream.clone().jpeg().pipe(new stream.PassThrough()))
  readableStream.pipe(sharpStream)

  return await Promise.all(promises)
}

const readStream = fs.createReadStream('big_img.jpg')

// resizeJobPromise(readStream).then(res => {
//   res[0].pipe(fs.createWriteStream('resized.png'))
//   res[1].pipe(fs.createWriteStream('resized.jpg'))
//   console.timeEnd('resize')

// }).catch(err => {
//   console.log(err)
// })

resizeJobPipeline(readStream).then(res => {
  res[0].pipe(fs.createWriteStream('resized.png'))
  res[1].pipe(fs.createWriteStream('resized.jpg'))
  console.timeEnd('resize')

}).catch(err => {
  console.log(err)
})

如果我运行 resizeJobPipeline 版本,使用大约 20mb 的图像,我的平均执行时间约为 500 毫秒

但是,如果评论这个版本并运行 resizeJobPromise 版本,使用相同的图像,我得到的平均时间只有 ~7 毫秒!

通过依次等待两条管道,我预计可能会获得双倍的时间,但不是 100 倍。

我读到管道版本使用起来更安全,因为它会自动处理可读错误并关闭可写流以防止内存泄漏,而在 promise 版本上我必须手动处理这些错误。

我在承诺版本中做错了什么吗?代码背后会发生什么才能使其具有如此高的性能?

【问题讨论】:

    标签: node.js promise node-streams


    【解决方案1】:

    我在 Promise 版本中做错了什么?

    是的,您没有测量流的执行时间。请注意

    promises.push(sharpStream.clone().png().pipe(new stream.PassThrough()))
    promises.push(sharpStream.clone().jpeg().pipe(new stream.PassThrough()))
    

    只是将流对象推入一个数组,将它们传递给Promise.all 不会等待流完成,而是立即使用流对象完成。你也可以从这个函数中省略承诺的东西。

    你应该做的是将pipeline流到file/s3写入流:

    const sharp = require('sharp')
    const fs = require('fs')
    const util = require('util')
    const stream = require('stream')
    const pipeline = util.promisify(stream.pipeline)
    
    function resizeJob() {
      const sharpStream = sharp({
        failOnError: false
      }).resize({width: 800, height: 800, fit: 'inside'})
    
      const source = fs.createReadStream('big_img.jpg')
      // using writeStream here, the final code will do s3 upload instead
      const pngTarget = fs.createWriteStream('resized.png')
      const jpgTarget = fs.createWriteStream('resized.jpg')
    
      const promises = [
        pipeline(readableStream, sharpStream), // don't do this piping twice!
        pipeline(sharpStream.clone().png(), memoryPng),
        pipeline(sharpStream.clone().jpeg(), memoryJpg),
      ]
      return Promise.all(promises)
    }
    
    console.time('resize')
    resizeJob().catch(err => {
      console.log(err)
    }).then(() => {
      console.timeEnd('resize')
    })
    

    【讨论】:

    • 谢谢!现在我明白为什么最初的承诺版本返回得如此之快,管道方法没有返回承诺:) 你的解决方案运行良好,我只是适应返回 memoryStreams,因为我必须将它们传递给另一个模块并且不要使用fs 因为它是一个快速应用程序
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-07-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-29
    • 2019-03-19
    • 1970-01-01
    • 2020-06-16
    相关资源
    最近更新 更多