【问题标题】:Event listener for when a file has finished streaming to AWS S3 upload api?文件何时完成流式传输到 AWS S3 上传 api 的事件侦听器?
【发布时间】:2020-12-11 22:56:39
【问题描述】:

我正在 Google Drive 和 AWS S3 之间创建文件备份。我通过使用 Google Get API 下载文件并将数据传输到 AWS S3 来创建可读流承诺。 由于我有很多文件,每个 promise 都会添加到队列中,并且只有新的 promise 才会在它解析时进入。

我正在努力仅在文件完成上传到 AWS S3 时才解决承诺,而不是在文件下载时解决?

我认为使用 .on('finish', () => {resolve()}) 应该可以做到这一点,但它似乎不起作用。

这是我的代码示例:

// download stream of NON gdocs files and pipe to destination
const getGFileContent = async (fileObj) => {  
  let fileExt = fileObj.path.join('/').concat('/',fileObj.name)

  return drive.files.get({fileId: fileObj.id, mimeType: fileObj.mimeType, alt: 'media'}, {responseType: 'stream'})
    .then(res => {
      return new Promise((resolve, reject) => {
        res.data
          .pipe(uploadS3(fileExt))
          .on('end', () => {console.log(`Done downloading file: ${fileExt}`)})
          .on('finish', () => {resolve(console.log(`File Backup Complete: ${fileExt}`))})
          .on('error', err => {reject(console.error(`Error downloading file: ${err}`))})
      })

// upload a file to AWS S3 by passing the file stream from getGFileContent into the 'body' parameter of the upload
const uploadS3 = (filePath) => {
  let pass = new stream.PassThrough()
  let params = {
    Bucket: awsBucketName, // bucket-name
    Key: filePath, // file will be saved as bucket-name/[uniquekey.csv]
    Body: pass  // file data passed through stream
  } 
  new aws.S3().upload(params).promise()
    .then(() => console.log(`Successfully uploaded to S3: ${filePath}`))
    .catch( err => console.log(`Error, unable to upload to S3: ${err}`))
  return pass
}

【问题讨论】:

    标签: javascript amazon-web-services amazon-s3 pipe filestream


    【解决方案1】:

    首先想到的是让uploadS3 函数asyncawait 完成上传,然后返回passThrough 流。 但这行不通。然后它会返回一个 Promise 并且 .pipe() 只接受一个流对象。

    除此之外,您可以重构您的代码,以便getGFileContent 将返回一个可读的流承诺。

    然后,让uploadS3 接受一个可读流作为参数并返回一个 s3 上传承诺。

    要结束它,添加一个异步 backupFile 函数,它将 await 用于 GDrive 蒸汽和上传承诺在继续之前解决。这也将保持函数的整洁和干净,每个函数都有自己的单一职责。

    示例代码:

    const AWS = require('aws-sdk');
    const fs = require('fs');
    
    const s3 = new AWS.S3();
    
    AWS.config.update({
        accessKeyId: '----',
        secretAccessKey: '----',
    });
    
    const backupFile = async (file) => {
        const fileStream = await getGFileStream(file);
        try {
            await uploadStreamToS3(fileStream);
            console.log(`S3 Backup of ${fileStream.path} completed`)
        } catch (err) {
            console.log(`error during file upload ${err}`);
        }
    }
    
    const getGFileStream = async (fileObj) => {
        // TODO: logic to find and get the file. Returns a readableStream promise 
        const fileStream = fs.createReadStream('./largeFile.zip');
        console.log('File ${...} read from Google Drive');
        return fileStream;
    }
    
    const uploadStreamToS3 = (fileStream) => {
        const params = {Bucket: 'test-bucket', Key: 'key', Body: fileStream}
        console.log(`Starting to upload ${fileStream.path} to S3`);
        return s3.upload(params).promise();
    }
    
    backupFile({id: 'mockTestFile'});
    

    【讨论】:

      猜你喜欢
      • 2023-01-09
      • 1970-01-01
      • 2012-10-31
      • 2017-12-29
      • 1970-01-01
      • 1970-01-01
      • 2016-03-14
      • 1970-01-01
      • 2020-02-12
      相关资源
      最近更新 更多