【问题标题】:Processing large CSV uploads in Node.js在 Node.js 中处理大型 CSV 上传
【发布时间】:2020-09-29 18:30:57
【问题描述】:

根据之前的帖子:

Node async loop - how to make this code run in sequential order?

...我正在寻找有关处理大型数据上传文件的更广泛建议。

场景:

用户上传一个非常大的 CSV 文件,其中包含数十万到数百万行。它正在使用 multer 流入端点:

const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

router.post("/", upload.single("upload"), (req, res) => {
    //...
});

每一行都转换为一个 JSON 对象。然后将该对象映射到几个较小的对象中,这些对象需要插入到几个不同的表中,分布在各种微服务容器中并由它们访问。

async.forEachOfSeries(data, (line, key, callback) => {
    let model = splitData(line);
    //save model.record1, model.record2, etc. sequentially
});

很明显,使用这种方法我会遇到内存限制。这样做最有效的方法是什么?

【问题讨论】:

标签: javascript node.js asynchronous


【解决方案1】:

为避免内存问题,您需要使用streams 处理文件 - 简单来说,增量。您无需将整个文件加载到内存中,而是读取每个,它会得到相应的处理,然后在符合Garbage Collection 的条件后立即进行处理。

在 Node 中,您可以结合使用 CSV stream parser 将二进制内容流式传输为 CSV 行和 through2,这是一个流实用程序,可让您控制 溪流;在这种情况下暂停它以允许将行保存在数据库中。

过程

流程如下:

  • 您获取数据流
  • 您通过 CSV 解析器对其进行管道传输
  • 你通过一个 through2 管道它
  • 您将每一行保存在数据库中
  • 保存完成后,请致电cb() 继续下一项。

我不熟悉multer,但这里有一个使用来自文件的流的示例。

const fs = require('fs')
const csv = require('csv-stream')
const through2 = require('through2')

const stream = fs.createReadStream('foo.csv')
  .pipe(csv.createStream({
      endLine : '\n',
      columns : ['Year', 'Make', 'Model'],
      escapeChar : '"',
      enclosedChar : '"'
  }))
  .pipe(through2({ objectMode: true }, (row, enc, cb) => {
    // - `row` holds the first row of the CSV,
    //   as: `{ Year: '1997', Make: 'Ford', Model: 'E350' }`
    // - The stream won't process the *next* item unless you call the callback
    //  `cb` on it.
    // - This allows us to save the row in our database/microservice and when
    //   we're done, we call `cb()` to move on to the *next* row.
    saveIntoDatabase(row).then(() => {
      cb(null, true)
    })
    .catch(err => {
      cb(err, null)
    })
  }))
  .on('data', data => {
    console.log('saved a row')
  })
  .on('end', () => {
    console.log('end')
  })
  .on('error', err => {
    console.error(err)
  })

// Mock function that emulates saving the row into a database,
// asynchronously in ~500 ms
const saveIntoDatabase = row =>
  new Promise((resolve, reject) =>
    setTimeout(() => resolve(), 500))

foo.csv CSV 的例子是这样的:

1997,Ford,E350
2000,Mercury,Cougar
1998,Ford,Focus
2005,Jaguar,XKR
1991,Yugo,LLS
2006,Mercedes,SLK
2009,Porsche,Boxter
2001,Dodge,Viper

为什么?

这种方法避免了在内存中加载整个 CSV。一旦处理了row,它就会超出范围/变得无法访问,因此它有资格进行垃圾收集。这就是使这种方法如此高效的原因。理论上,这允许您处理无限大小的文件。阅读Streams Handbook 了解有关流的更多信息。

一些提示

  • 您可能希望在每个循环中保存/处理多于 1 行(在相同大小的 中)。在这种情况下,将一些 rows 推入一个数组,处理/保存整个数组(块),然后调用 cb 以继续下一个块 - 重复该过程。
  • 流会发出您可以监听的事件。 end/error 事件对于响应操作是成功还是失败特别有用。
  • Express 默认使用流 - 我几乎可以肯定您根本不需要 multer

【讨论】:

  • 阅读other question Tsar 时似乎想使用http 请求保存到数据库。使这些请求串行可能太慢,但在不等待请求完成的情况下调用回调可能太快。如果太快,Tsar 可以使用带有限制承诺的代码。
  • 您有最好的方法,通过流式传输 csv,可能会采用批量大小并将其处理throttled 到服务器可以处理的范围,当批处理在另一批中读取时(调用回调)。
  • 以上或多或少说明了这个概念。它需要调整 - 我要做的第一件事是在 Array 中累积 5000-10000 行,并尽可能一次性保存/处理该块。当然,为每一行触摸数据库是非常低效的。
  • 是的,如果接触数据库实际上意味着使用数据库驱动程序。最初的问题是沙皇使用 http 请求插入记录,所以可能是第 3 方服务。如果该服务没有批量插入,则可能应该应用限制。
【解决方案2】:

大型 .csv 数据解析和导入

我使用上述模型通过以下代码将 csv 数据的 1.7mm x 200 矩阵导入 mongo。诚然,这很慢,我可以在学习如何更好地分块数据以提高效率方面使用一些帮助,即不是在每次读取后插入,而是将行累积到 5,10,25k 行的数组中,然后 insertMany 或更好熟练使用 through2-map 或 through2-filter 方法。如果有人愿意分享一个例子,请提前致谢。

require('dotenv').config();
const parse = require('csv-parser');
const fs = require("fs");
const through2 = require('through2')
const db = require('../models');

const file = "myFile.csv"
const rows = [];

//========Constructor Function for Mongo Import after each read======//
function Hotspot(variable1, variable2,...) {
this.variable1 = variable1;
this.variable2 = variable2;
...}

//========Counter so I can monitor progress in console============//
let counter = 0;
const rows = [];

//This function is imported & run in server.js from './scripts' after mongoose connection established//

exports.importCsvData = () => {
    fs.createReadStream(myFile)
        .pipe(parse())  
        .pipe(through2({ objectMode: true }, (row, enc, cb) => {
            let hotspot = new Hotspot(
                `${row["ROW_VARIABLE_COLUMN_1"]}`,
                `${row["ROW_VARIABLE_COLUMN_2"]}`,...)

     db.MongoModel.create(hotspot)
                .then(result => console.log('created', counter++))
                .then(() => { cb(null, true) })
                .catch(err => {
                    cb(err, null)
                })
        }))
        .on('data', (row) => {
            rows.push(row);
        })
        .on('end', () => {
            console.log('read complete')
        })
}

我使用了以下帖子和链接:

作为编写此脚本的基础和参考。似乎工作“很好”,除了我昨晚 10 点开始,到今天早上 7 点 45 分还不到一半。这比 "event": "Allocation failed - JavaScript heap out of memory" 在我尝试将所有“热点”对象累积到一个热点数组中以便批量插入 mongoDB 后收到错误。我对 Node 和学习中的 readStream/through2/csv-parser 相当陌生,但想分享一些有效的东西,并且目前正在运作。

【讨论】:

    猜你喜欢
    • 2018-04-23
    • 1970-01-01
    • 1970-01-01
    • 2022-06-12
    • 2015-02-20
    • 2012-11-30
    • 2014-03-03
    • 1970-01-01
    相关资源
    最近更新 更多