【问题标题】:Using streams in NodeJS在 NodeJS 中使用流
【发布时间】:2014-07-23 22:58:40
【问题描述】:

我正在尝试在 Nodejs 中构建一个 API,它将流式传输针对 vertica DB 执行的查询的输出。

Nodejs 中的 vertica db 驱动程序公开了我正在使用的无缓冲查询接口。有关这方面的更多详细信息,请参阅: https://github.com/wvanbergen/node-vertica

以下是我的代码:

var vertica = require('vertica');
var Readable = require('stream').Readable;
var rs = new Readable;

var conn = vertica.connect( {
    host: 'hostname',
    user: 'user',
    password: 'password',
    database: 'verticadb'    
});


var q = conn.query('select * from table');

q.on('row', function(row) {
    rs.push(row.join(',') + "\n");
});

q.on('end', function(status) {
    rs.push(null);
    rs.pipe(process.stdout);
    conn.disconnect();
});

q.on('error', function(err) {
    conn.disconnect();
});

它确实返回了适当的输出,但我的理解是它实际上缓冲了row.join(',') + "\n" 的输出,并且只有在读取所有行后才将其输出到标准输出。我的目标是在读取每一行后立即将其输出。我应该如何修改我的代码以使其工作?您可以将 vertica "row" 事件替换为任何可比较的事件。

附录

我已经设法使用所谓的“经典可读流”使其工作,基于以下文档:https://github.com/substack/stream-handbook

代码:

var vertica = require('vertica');
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var conn = vertica.connect( {
    host: 'hostname',
    user: 'user',
    password: 'password',
    database: 'verticadb'
});


var q = conn.query('select * from affiliate_manager_2');

q.on('row', function(row) {
    stream.emit('data', row.join(',') + "\n");
});

q.on('end', function(status) {
    stream.emit('end'); 
    conn.disconnect();
});

q.on('error', function(err) {
    conn.disconnect();
});

stream.pipe(process.stdout);

但是这是“旧”方式,我想知道如何使用“新方式”。

【问题讨论】:

  • end 只发出一次,即在查询结束时。你应该在每一行发射。
  • 我明白那部分,但我不知道该怎么做。

标签: node.js stream


【解决方案1】:

Readable 是“抽象的”。它正在寻找一个名为 _read 的函数,该函数未在默认实现中定义。没有它,它只会缓冲每个push(chunk),直到它看到push(null)。这就是您在示例中看到的行为。

要获得你想要的行为,只需添加一个_read 函数!

这是一个您可以适应您的数据库的示例:

var Readable = require('stream').Readable;

var stream = new Readable;

stream._read = function () {
  var query = …;
  query.on('row', function (row) {
    stream.push(JSON.stringify(row) + '\n');
  });
  query.on('end', function () {
    stream.push(null);
  });
  stream._read = function () {};
};

stream.pipe(process.stdout);

进一步阅读:

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-03-18
    • 1970-01-01
    • 1970-01-01
    • 2014-06-02
    • 1970-01-01
    • 1970-01-01
    • 2014-12-05
    • 1970-01-01
    相关资源
    最近更新 更多