【问题标题】:JSON to csv to gzip to s3 while streaming without using too much memoryJSON 到 csv 到 gzip 到 s3,同时流式传输而不使用太多内存
【发布时间】:2017-05-30 17:06:06
【问题描述】:

我们需要在node js中实现一个cron服务,遵循这个流程:

  1. 从 postgres 大量数据查询(大约 500mb)
  2. 将json数据转换成另一个json
  3. 将 json 转换为 csv
  4. 压缩包
  5. 使用“上传”方法上传到 s3

显然,我们需要使用流来实现这个过程,而不会产生内存开销。

我们遇到了很多问题:

  1. 我们正在使用 sequelize,一种 SQL 形式。有了它,我们就无法流式传输查询。因此,我们将查询返回的 JSON 转换为可读的 Stream
  2. 我们找不到一种优雅而聪明的方法来实现转换查询返回的 json 的转换流。 (例如输入-> [{a:1,b:2}..] --> 输出->[{a1:1,b1:2}..]
  3. 在记录并尝试写入 fs 而不是 s3(使用 fs.createWriteStream)时,似乎该文件是在管道启动的同时创建的,但它的大小约为 10 字节,并且仅在流处理时才变得一致完成的。此外,使用了大量的 RAM,并且流式处理在内存使用方面似乎毫无用处。

你会如何在 node js 中编写这个流程? 我在实验过程中使用了以下库:

  • json2csv 流
  • JSON 流
  • 双簧管
  • zlib
  • fs
  • aws-sdk

【问题讨论】:

    标签: json node.js csv amazon-s3 stream


    【解决方案1】:

    由于 Sequelize 结果无论如何都被读入内存,我看不出设置流来转换 JSON 的意义(而不是直接操作内存中的数据),但说你会移植将查询续集到mysql,它确实提供了流,你可以使用这样的东西:

    const es       = require('event-stream');
    const csv      = require('fast-csv');
    const gzip     = require('zlib').createGzip();
    const AWS      = require('aws-sdk');
    const s3Stream = require('s3-upload-stream')(new AWS.S3());
    
    // Assume `connection` is a MySQL connection.
    let sqlStream = connection.query(...).stream();
    
    // Create the mapping/transforming stream.
    let mapStream = es.map(function(data, cb) {
      ...modify `data`...
      cb(null, data);
    });
    
    // Create the CSV outputting stream.
    let csvStream = csv.createWriteStream();
    
    // Create the S3 upload stream.
    let upload = s3Stream.upload(...);
    
    // Let the processing begin.
    sqlStream.pipe(mapStream).pipe(csvStream).pipe(gzip).pipe(upload);
    

    如果“输入流”发出 JSON,您可以将 sqlStream 替换为以下内容:

    const JSONStream = require('JSONStream');
    
    someJSONOutputtingStream.pipe(JSONStream.parse('*'))
    

    (管道的其余部分保持不变)

    【讨论】:

    • 您说如果数据已经在内存中,您看不到设置流的意义。也许我错了,但假设这种情况:我有 json 格式的结果查询,存储在内存中。然后我可以直接转换这个对象。但是,如果我将转换后的 JSON 转换为 CSV,然后我 gzip CSV(从不使用流),我将使用大约 3 倍于第一个 json 使用的内存。也许我错了?不管怎样,我会在两个小时内测试你的代码,我会告诉你的!
    • 流式传输的想法还不错,但是专门通过流来修改数据似乎毫无意义(尽管我猜这取决于数据需要修改的具体方式)。
    • 您的代码运行良好。无论如何,我需要通过流修改数据,因为我不能使用文件系统并且使用内存会花费太多。
    • 您好,我收到错误 csv.createWriteStream() 未定义。
    猜你喜欢
    • 1970-01-01
    • 2018-12-07
    • 2020-02-03
    • 2021-07-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-08
    相关资源
    最近更新 更多