【问题标题】:Download a file from S3 and split by number of lines从 S3 下载文件并按行数拆分
【发布时间】:2020-03-03 11:32:44
【问题描述】:

我正在尝试从 S3 存储桶下载文件并将其拆分为 500000 行的块,每行另存为单独的文件。

我在下面写的代码总是因为这个错误而崩溃:

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory
data-import-engine_1      | 
data-import-engine_1      | <--- Last few GCs --->
data-import-engine_1      | 
data-import-engine_1      | [298:0x5637f5f1d000]   629519 ms: Mark-sweep 1452.6 (1633.7) -> 1452.5 (1596.2) MB, 1100.7 / 0.0 ms  last resort GC in old space requested
data-import-engine_1      | [298:0x5637f5f1d000]   630627 ms: Mark-sweep 1452.5 (1596.2) -> 1452.5 (1591.7) MB, 1107.6 / 0.0 ms  last resort GC in old space requested
data-import-engine_1      | 
data-import-engine_1      | 
data-import-engine_1      | <--- JS stacktrace --->
data-import-engine_1      | 
data-import-engine_1      | ==== JS stack trace =========================================
data-import-engine_1      | 
data-import-engine_1      | Security context: 0xee0238255e9 <JSObject>
data-import-engine_1      |     1: _copyArray [/home/nodeusr/app/node_modules/denque/index.js:~395] [pc=0x33c25bba7a4b](this=0x3336cceeb8a1 <Denque map = 0x23eafa0916c1>,fullCopy=0x20ed2d882371 <true>)
data-import-engine_1      |     2: _growArray [/home/nodeusr/app/node_modules/denque/index.js:416] [bytecode=0x48d2d8f8429 offset=19](this=0x3336cceeb8a1 <Denque map = 0x23eafa0916c1>)
data-import-engine_1      |     3: /* anonymous */ [/home/nodeusr/app/node_modules/ioredis/built...

我在 docker 容器上运行此代码,使用 alpine-node:9 .n 我正在使用 Bull Queue 在沙盒进程中处理这些作业 (https://github.com/OptimalBits/bull#documentation)

我已经尝试增加 docker 引擎的内存并增加了节点进程的内存限制,但我无法解决这个问题。

const AWS = require('aws-sdk')
const fs = require('fs')
// var LineByLineReader = require('line-by-line')
const es = require('event-stream')

var s3 = new AWS.S3({
  region: process.env.DEFAULT_REGION || 'eu-west-2',
  accessKeyId: process.env.STORAGE_API_KEY || 'somekey',
  secretAccessKey: process.env.STORAGE_API_SECRET || 'somesecret',
  endpoint: (process.env.STORAGE_HOST && process.env.STORAGE_PORT) ? process.env.STORAGE_HOST + ':' + process.env.STORAGE_PORT : 'http://localstack:4572'
})

module.exports = (job) => {
  var dsConfig = job.data.dsConfig
  var totalBytes = 0

  var params = {
    Bucket: process.env.STORAGE_BUCKET_NAME || 'fcd_bucket',
    Key: dsConfig.resourceId
  }

  return new Promise((resolve, reject) => {
    s3.headObject(params, function (err, data) {
      if (err) reject(err)

      totalBytes = data.ContentLength
      var d = job.data
      if (typeof (job.data.progress) === 'undefined') {
        d.progress = 0
      }
      if (typeof (job.data.params) === 'undefined') {
        params.Range = 'bytes=0-' + totalBytes.toString()
        d.params = params
      }
      let progress = d.progress

      var chunkCounter = 0
      var totalCounter = 0
      var rowCounter = 0
      var chunkSize = 500000

      const filewriter = []
      filewriter[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '.tmp', {
        flags: 'a', // 'a' means appending (old data will be preserved)
        encoding: 'utf8'
      })
      var outputFiles = ['./tmp/' + d.procId + '.tmp']

      function writeLine (line) {
        if (totalCounter > 0) filewriter[chunkCounter].write(line)
        if (rowCounter > chunkSize) {
          rowCounter = 0
          chunkCounter++
          filewriter[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '-' + chunkCounter + '.tmp', {
            flags: 'a', // 'a' means appending (old data will be preserved)
            encoding: 'utf8'
          })
          outputFiles.push('./tmp/' + d.procId + '-' + chunkCounter + '.tmp')
        }
        rowCounter++
        totalCounter++
        progress += Buffer.byteLength(line, 'utf-8')
        d.params.Range = 'bytes=' + progress.toString() + '-' + totalBytes.toString()
        d.progress = progress
        job.progress(parseFloat(progress / totalBytes).toFixed(3) * 100)
        // pipeline.resume()
      }

      s3.getObject(params).createReadStream({
        encoding: 'utf8'
      }).pipe(es.split(/(\r?\n)/)).pipe(es.map((line, callback) => {
        callback(null, writeLine(line))
      }))
        .on('error', err => {
          job.update(d).then(() => {
            console.log('Error occurred during Job ' + job.id + ' execution, progress data stored so to restart from the same point')
          }).catch(err => {
            console.log(err)
          })
          reject(err)
        })
        .on('end', () => {
          filewriter.forEach(writer => {
            writer.end()
          })
          d.tempFiles = outputFiles
          job.update(d).then(() => {
            resolve()
          }).catch(err => {
            reject(err)
          })
        })
    })
  })
}

你对如何解决这个问题有什么建议吗?

谢谢, 齐射

【问题讨论】:

  • 你的writeLine() 函数对我来说看起来很可疑......你为什么一直在创建一个新的写入流?这可以通过实现一个 DuplexStream 来清除,一旦满足行数,它就会透明地输出到不同的文件。然后,您将受益于内置的背压。

标签: node.js amazon-s3 aws-sdk event-stream


【解决方案1】:

将 CloudFront 用于对象的部分请求:

更多详情请关注:https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/RangeGETs.html

【讨论】:

  • S3 自己支持远程请求...并不是说这是解决这个问题的方法。
猜你喜欢
  • 1970-01-01
  • 2018-03-14
  • 2013-07-29
  • 1970-01-01
  • 2011-09-25
  • 2021-07-15
  • 2015-04-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多