【问题标题】:Nodejs Read very large file(~10GB), Process line by line then write to other fileNodejs读取非常大的文件(~10GB),逐行处理然后写入其他文件
【发布时间】:2015-07-17 15:29:18
【问题描述】:

我有一个 10 GB 特定格式的日志文件,我想逐行处理此文件,然后将输出写入其他文件 应用了一些转换。我正在使用节点进行此操作。

虽然这种方法很好,但要花很多时间才能做到这一点。我能够在 JAVA 中在 30-45 分钟内完成此操作,但在节点中完成相同的工作需要 160 多分钟。以下是代码:

以下是从输入中读取每一行的启动代码。

var path = '../10GB_input_file.txt';
var output_file = '../output.txt';

function fileopsmain(){

    fs.exists(output_file, function(exists){
        if(exists) {
            fs.unlink(output_file, function (err) {
                if (err) throw err;
                console.log('successfully deleted ' + output_file);
            });
        }
    });

    new lazy(fs.createReadStream(path, {bufferSize: 128 * 4096}))
        .lines
        .forEach(function(line){
            var line_arr = line.toString().split(';');
            perform_line_ops(line_arr, line_arr[6], line_arr[7], line_arr[10]);
        }
    );

}

这是对该行执行某些操作的方法,并且 将输入传递给 write 方法以将其写入输出文件。

function perform_line_ops(line_arr, range_start, range_end, daynums){

    var _new_lines = '';
    for(var i=0; i<days; i++){
        //perform some operation to modify line pass it to print
    }

    write_line_ops(_new_lines);
}

以下方法用于将数据写入新文件。

function write_line_ops(line) {
    if(line != null && line != ''){
        fs.appendFileSync(output_file, line);
    }
}

我想把这个时间缩短到 15-20 分钟。有没有可能这样做。

另外,我正在使用 8 GB RAM 的英特尔 i7 处理器 上尝试此操作。

【问题讨论】:

  • 一个有效的问题是lazy 模块是否在处理之前将整个文件读入内存而不是逐行流式传输?您可能会对node-byline 模块感兴趣。
  • 如果我正在处理这个问题,第一步是在一个小得多的文件上对每一步进行计时,看看究竟是什么导致了速度变慢。从那里,您可以开始优化这部分代码。
  • @jfriend00 没有延迟模块不会将整个文件加载到内存中,因为我正在同时监控内存使用情况。
  • @Kevin B 我正在做同样的事情,我正在处理一个 400MB 的文件,该文件在大约 2.5 分钟内得到处理。虽然我不确定是什么导致了这里的问题。
  • 我建议你先把问题绑定到这里。创建一个简单的测试应用程序,它只创建一个读取流并读取整个文件,而无需担心行数和写入磁盘。看看这需要多长时间。如果这很快,那么您可以一次将一块拼图添加到拼图中,并随时跟踪您的进度。接下来将管道添加到新文件名并查看性能。如果原始读取速度很慢,那么问题在 nodejs 流中较低,您将不得不降低级别来修复性能。

标签: node.js file-io large-files file-handling file-processing


【解决方案1】:

您可以在没有模块的情况下轻松完成此操作。例如:

var fs = require('fs');
var inspect = require('util').inspect;

var buffer = '';
var rs = fs.createReadStream('foo.log');
rs.on('data', function(chunk) {
  var lines = (buffer + chunk).split(/\r?\n/g);
  buffer = lines.pop();
  for (var i = 0; i < lines.length; ++i) {
    // do something with `lines[i]`
    console.log('found line: ' + inspect(lines[i]));
  }
});
rs.on('end', function() {
  // optionally process `buffer` here if you want to treat leftover data without
  // a newline as a "line"
  console.log('ended on non-empty buffer: ' + inspect(buffer));
});

【讨论】:

  • 是的,可以自己编写线路处理代码。但是,如果您查看 OP 的需求,他们需要能够一次访问多行,所以现在您的代码需要添加行组的缓冲等等。 OP 方法的重点是尝试使用现有工具为您解决这些问题,而不是从头开始编写自己的工具。而且,目前尚不清楚这如何解决 OP 的问题。
  • OP 的代码正在逐行读取文件,这也正是我的代码正在做的事情。我的观点是,在这种特殊情况下,自己做非常简单,同时还确保整个文件不会在处理之前立即被缓冲。
  • 您已经疯狂猜测导致性能问题的原因并提供了一个替代解决方案。在我看来,如果不进行一些测试,我们还不知道性能问题出在哪里。
  • @mscdex 我现在并排测试此代码,因此现在可以将其保留为一个未解决的问题。此解决方案可能会有所帮助。一定会让大家知道上述做法的结果。
  • @mscdex 它仍然需要相同的时间。我想写入文件需要一些延迟。
【解决方案2】:

我猜不出你的代码中可能的瓶颈在哪里。

  • 能否添加lazy函数的库或源代码?
  • 您的perform_line_ops 做了多少操作? (if/else、switch/case、函数调用)

我已经根据您给定的代码创建了一个示例,我知道这并不能回答您的问题,但可能会帮助您了解节点如何处理这种情况。

const fs = require('fs')
const path = require('path')

const inputFile = path.resolve(__dirname, '../input_file.txt')
const outputFile = path.resolve(__dirname, '../output_file.txt')

function bootstrap() {
    // fs.exists is deprecated
    // check if output file exists
    // https://nodejs.org/api/fs.html#fs_fs_exists_path_callback
    fs.exists(outputFile, (exists) => {
        if (exists) {
            // output file exists, delete it
            // https://nodejs.org/api/fs.html#fs_fs_unlink_path_callback
            fs.unlink(outputFile, (err) => {
                if (err) {
                    throw err
                }

                console.info(`successfully deleted: ${outputFile}`)
                checkInputFile()
            })
        } else {
            // output file doesn't exist, move on
            checkInputFile()
        }
    })
}

function checkInputFile() {
    // check if input file can be read
    // https://nodejs.org/api/fs.html#fs_fs_access_path_mode_callback
    fs.access(inputFile, fs.constants.R_OK, (err) => {
        if (err) {
            // file can't be read, throw error
            throw err
        }

        // file can be read, move on
        loadInputFile()
    })
}

function saveToOutput() {
    // create write stream
    // https://nodejs.org/api/fs.html#fs_fs_createwritestream_path_options
    const stream = fs.createWriteStream(outputFile, {
        flags: 'w'
    })

    // return wrapper function which simply writes data into the stream
    return (data) => {
        // check if the stream is writable
        if (stream.writable) {
            if (data === null) {
                stream.end()
            } else if (data instanceof Array) {
                stream.write(data.join('\n'))
            } else {
                stream.write(data)
            }
        }
    }
}

function parseLine(line, respond) {
    respond([line])
}

function loadInputFile() {
    // create write stream
    const saveOutput = saveToOutput()
    // create read stream
    // https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options
    const stream = fs.createReadStream(inputFile, {
        autoClose: true,
        encoding: 'utf8',
        flags: 'r'
    })

    let buffer = null

    stream.on('data', (chunk) => {
        // append the buffer to the current chunk
        const lines = (buffer !== null)
            ? (buffer + chunk).split('\n')
            : chunk.split('\n')

        const lineLength = lines.length
        let lineIndex = -1

        // save last line for later (last line can be incomplete)
        buffer = lines[lineLength - 1]

        // loop trough all lines
        // but don't include the last line
        while (++lineIndex < lineLength - 1) {
            parseLine(lines[lineIndex], saveOutput)
        }
    })

    stream.on('end', () => {
        if (buffer !== null && buffer.length > 0) {
            // parse the last line
            parseLine(buffer, saveOutput)
        }

        // Passing null signals the end of the stream (EOF)
        saveOutput(null)
    })
}

// kick off the parsing process
bootstrap()

【讨论】:

    【解决方案3】:

    我知道这是旧的但是......

    猜测appendFileSync() _write()_s 到文件系统并等待响应。大量的小写入通常很昂贵,假设您在 Java 中使用 BufferedWriter,您可能会通过跳过一些 write() 获得更快的结果。

    使用其中一个异步写入并查看节点缓冲区是否合理,或者将行写入大节点缓冲区直到它已满并始终写入一个已满(或几乎满)的缓冲区。通过调整缓冲区大小,您可以验证写入次数是否会影响性能。我怀疑它会。

    【讨论】:

      【解决方案4】:

      执行速度很慢,因为您没有使用节点的异步操作。本质上,您正在执行这样的代码:

      > read some lines
      > transform
      > write some lines
      > repeat
      

      虽然您可以同时做所有事情,或者至少阅读和写作。此处答案中的一些示例可以做到这一点,但语法至少很复杂。使用scramjet,您只需简单几行即可:

      const {StringStream} = require('scramjet');
      
      fs.createReadStream(path, {bufferSize: 128 * 4096})
          .pipe(new StringStream({maxParallel: 128})    // I assume this is an utf-8 file
          .split("\n")                                  // split per line
          .parse((line) => line.split(';'))             // parse line
          .map([line_arr, range_start, range_end, daynums] => {
              return simplyReturnYourResultForTheOtherFileHere(
                  line_arr, range_start, range_end, daynums
              );                                         // run your code, return promise if you're doing some async work
          })
          .stringify((result) => result.toString())
          .pipe(fs.createWriteStream)
          .on("finish", () => console.log("done"))
          .on("error", (e) => console.log("error"))
      

      这可能会运行得更快。

      【讨论】:

        猜你喜欢
        • 2010-09-28
        • 2013-05-16
        • 1970-01-01
        • 2017-03-19
        • 2018-09-21
        • 2021-07-30
        • 1970-01-01
        • 2016-06-22
        • 1970-01-01
        相关资源
        最近更新 更多