【问题标题】:Stream data from Cassandra to file considering backpressure考虑背压,将数据从 Cassandra 流式传输到文件
【发布时间】:2017-07-23 00:21:06
【问题描述】:

我有一个节点应用程序,它收集投票提交并将它们存储在 Cassandra 中。投票存储为 base64 编码的加密字符串。 API 有一个名为 /export 的端点,它应该获取所有这些投票字符串(可能 > 100 万),将它们转换为二进制文件,并将它们一个接一个地附加到 votes.egd 文件中。然后应该压缩该文件并将其发送给客户端。我的想法是从 Cassandra 流式传输行,将每个投票字符串转换为二进制并写入 WriteStream。 我想将此功能包装在 Promise 中以便于使用。我有以下内容:

streamVotesToFile(query, validVotesFileBasename) {
  return new Promise((resolve, reject) => {
    const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`);

    writeStream.on('error', (err) => {
      logger.error(`Writestream ${validVotesFileBasename}.egd error`);
      reject(err);
    });

    writeStream.on('drain', () => {
      logger.info(`Writestream ${validVotesFileBasename}.egd error`);
    })

    db.client.stream(query)
    .on('readable', function() {
      let row = this.read();
      while (row) {
        const envelope = new Buffer(row.vote, 'base64');
        if(!writeStream.write(envelope + '\n')) {
          logger.error(`Couldn't write vote`);
        }
        row = this.read()
      }
    })
    .on('end', () => { // No more rows from Cassandra
      writeStream.end();
      writeStream.on('finish', () => {
        logger.info(`Stream done writing`);
        resolve();
      });
    })
    .on('error', (err) => { // err is a response error from Cassandra
      reject(err);
    });
  });
}

当我运行它时,它会将所有投票附加到一个文件并下载正常。但是我有很多问题/疑问:

    1234563我猜是因为事件循环被 Cassandra 流中的所有这些事件占用(每秒数千个)?
  1. 所有投票似乎都可以正常写入文件,但我几乎每次调用 writeStream.write() 都会收到 false 并查看相应的记录消息(参见代码)?

  2. 我知道我需要考虑 WritableStream 的背压和“排水”事件,所以理想情况下我会使用 pipe() 并将投票传送到一个文件,因为它内置了背压支持(对吗?)但是由于我需要处理每一行(转换为二进制并可能在将来从其他行字段添加其他数据),我将如何使用管道来做到这一点?

【问题讨论】:

    标签: node.js cassandra stream node-streams


    【解决方案1】:

    这是TransformStream 的完美用例:

    const myTransform = new Transform({
      readableObjectMode: true,
      transform(row, encoding, callback) {
        // Transform the row into something else
        const item = new Buffer(row['vote'], 'base64');
        callback(null, item);
      }
    });
    
    client.stream(query, params, { prepare: true })
      .pipe(myTransform)
      .pipe(fileStream);
    

    Node.js API Docs 中查看有关如何实现TransformStream 的更多信息。

    【讨论】:

    • 有道理。但是,如果我使用您的代码 ^^ 我会收到一个错误:TypeError: Invalid non-string/buffer chunk at validChunk (_stream_writable.js:216:10) at Transform.Writable.write (_stream_writable.js:245:12) at ResultStream .ondata (_stream_readable.js:555:20) 在 emitOne (events.js:96:13) 在 ResultStream.emit (events.js:188:7) 在 ResultStream.Readable.read (_stream_readable.js:381:10)
    • 啊,myTransform 需要是 objectMode 中的 Transform 流,因为客户端流在 objectMode 中返回 Readable Streams2 对象。 (将 objectMode 设置为 true)。谢谢!
    猜你喜欢
    • 1970-01-01
    • 2016-08-09
    • 1970-01-01
    • 2014-08-07
    • 2019-08-10
    • 2021-02-03
    • 1970-01-01
    • 2021-08-01
    • 1970-01-01
    相关资源
    最近更新 更多