【问题标题】:NodeJS stream parse and write json line to line upon Promise resultNodeJS流解析并根据Promise结果逐行写入json
【发布时间】:2016-04-23 19:51:15
【问题描述】:

我有一个大的 json 文件,看起来像这样:

[
 {"name": "item1"},
 {"name": "item2"},
 {"name": "item3"}
]

我想流式传输这个文件(到目前为止非常简单),在解析/拒绝调用时为每一行运行异步函数(返回一个承诺)编辑这一行。

输入文件的结果可能是:

[
 {"name": "item1", "response": 200},
 {"name": "item2", "response": 404},
 {"name": "item3"} // not processed yet
]

我不想创建另一个文件,我想动态编辑相同的文件(如果可能的话!)。

谢谢:)

【问题讨论】:

  • 输入不是您在问题中提到的 JSON。对有效 JSON 的一项要求是在 name 周围引用,如下所示:{"name": 'item1'}
  • 你是对的,哈哈,这只是一个例子,这确实不是有效的 json
  • 我改了,现在好点了吗?

标签: javascript json node.js stream


【解决方案1】:

根据this answer 在读取时写入同一个文件是不可靠的。正如那里的评论者所说,最好写入一个临时文件,然后删除原始文件并重命名临时文件。

要创建行流,您可以使用byline。然后对于每一行,应用一些操作并将其输出到输出文件。

类似这样的:

var fs = require('fs');
var stream = require('stream');
var util = require('util');
var LineStream = require('byline').LineStream;

function Modify(options) {
    stream.Transform.call(this, options);
}
util.inherits(Modify, stream.Transform);

Modify.prototype._transform = function(chunk, encoding, done) {
    var self = this;
    setTimeout(function() {
        // your modifications here, note that the exact regex depends on 
        // your json format and is probably the most brittle part of this
        var modifiedChunk = chunk.toString();
        if (modifiedChunk.search('response:[^,}]+') === -1) {
            modifiedChunk = modifiedChunk
                .replace('}', ', response: ' + new Date().getTime() + '}') + '\n';
        }      
        self.push(modifiedChunk);
        done();
    }, Math.random() * 2000 + 1000); // to simulate an async modification
};

var inPath = './data.json';
var outPath = './out.txt';
fs.createReadStream(inPath)
    .pipe(new LineStream())
    .pipe(new Modify())
    .pipe(fs.createWriteStream(outPath))
    .on('close', function() {
        // replace input with output
        fs.unlink(inPath, function() {
           fs.rename(outPath, inPath);
        });
    });

请注意,上述结果一次只发生一个异步操作。您还可以将修改保存到数组中,并在所有修改完成后将数组中的行写入文件,如下所示:

var fs = require('fs');
var stream = require('stream');
var LineStream = require('byline').LineStream;

var modifiedLines = [];
var modifiedCount = 0;
var inPath = './data.json';
var allModified = new Promise(function(resolve, reject) {

    fs.createReadStream(inPath).pipe(new LineStream()).on('data', function(chunk) {
       modifiedLines.length++;
       var index = modifiedLines.length - 1;
       setTimeout(function() {
           // your modifications here
           var modifiedChunk = chunk.toString();
           if (modifiedChunk.search('response:[^,}]+') === -1) {
               modifiedChunk = modifiedChunk
                   .replace('}', ', response: ' + new Date().getTime() + '}');
           }                      
           modifiedLines[index] = modifiedChunk;
           modifiedCount++;
           if (modifiedCount === modifiedLines.length) {
              resolve();
           }
       }, Math.random() * 2000 + 1000);
    });

}).then(function() {
    fs.writeFile(inPath, modifiedLines.join('\n'));
}).catch(function(reason) {
    console.error(reason);
});

如果您希望流式传输有效 json 块而不是行,这将是一种更强大的方法,请查看 JSONStream

【讨论】:

    【解决方案2】:

    如评论中所述,您拥有的文件不是正确的 JSON,尽管在 Javascript 中有效。为了生成正确的 JSON,可以使用 JSON.stringify()。我认为其他人也很难解析非标准 JSON,因此我建议提供一个新的输出文件而不是保留原始文件。

    但是,仍然可以将原始文件解析为 JSON。这可以通过eval('(' + procline + ')'); 实现,但是像这样将外部数据带入 node.js 是不安全的。

    const fs = require('fs');
    const readline = require('readline');
    const fr = fs.createReadStream('file1');
    const rl = readline.createInterface({
        input: fr
    });
    
    
    rl.on('line', function (line) {
        if (line.match(new RegExp("\{name"))) {
            var procline = "";
            if (line.trim().split('').pop() === ','){
                procline = line.trim().substring(0,line.trim().length-1);
            }
            else{
                procline = line.trim();
            }
            var lineObj = eval('(' + procline + ')');
            lineObj.response = 200;
            console.log(JSON.stringify(lineObj));
        }
    });
    

    输出会是这样的:

    {"name":"item1","response":200}
    {"name":"item2","response":200}
    {"name":"item3","response":200}
    

    它是line-delimited JSON (LDJSON),可用于流式传输内容,无需前导和尾随 [],。还有一个ldjson-stream 包。

    【讨论】:

      【解决方案3】:

      我并没有真正回答这个问题,但我认为无论如何都不能以令人满意的方式回答,所以这是我的 2 美分。

      我假设您知道如何逐行流式传输并运行该函数,并且您遇到的唯一问题是编辑正在读取的文件。

      插入的后果

      不可能将数据以本地方式插入到任何文件中(这是您想要通过实时更改 JSON 来实现的)。文件只能在其末尾增长。

      所以在 1GB 文件的开头插入 10 个字节的数据意味着您需要将 1GB 写入磁盘(将所有数据进一步移动 10 个字节)。

      您的文件系统不理解 JSON,只是看到您在一个大文件的中间插入字节,所以这会非常慢。

      所以,是的,可以这样做。 使用 insert() 方法在 NodeJS 中的文件 API 上编写一个包装器。

      然后再编写一些代码,以便能够知道将字节插入 JSON 文件的位置,而无需加载整个文件,也不会在最后生成无效的 JSON。

      现在我不会推荐它:)

      => 阅读这个问题:Is it possible to prepend data to an file without rewriting?

      那为什么要这样做呢?

      我想要么

      • 能够随时终止您的进程,并通过再次读取文件轻松恢复工作。
      • 重试部分处理的文件以仅填充缺失的位。

      第一个解决方案:使用数据库

      抽象在随机位置实时编辑文件所需完成的工作是数据库存在的唯一目的。

      它们的存在只是为了抽象UPDATE mytable SET name = 'a_longer_name_that_the_name_that_was_there_before' where name = 'short_name'背后的魔力。

      看看LevelUP/Down、sqlite等...

      他们将抽象出所有需要在您的 JSON 文件中完成的魔法!

      第二种解决方案:使用多个文件

      当您流式传输文件时,写入两个新文件!

      • 包含输入文件中的当前位置和需要重试的行
      • 另一个是预期的结果。

      您还可以随时终止您的进程并重新启动

      【讨论】:

      • 这正是我正在寻找的答案!非常感谢:)
      • 我会先尝试第二种解决方案。
      • 祝你好运:)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-09-24
      • 1970-01-01
      • 2013-06-05
      • 2015-07-18
      • 1970-01-01
      相关资源
      最近更新 更多