【问题标题】:How to create a readable stream with an async data source in NodeJs?如何在 NodeJs 中使用异步数据源创建可读流?
【发布时间】:2016-06-25 13:23:03
【问题描述】:

环境: NodeJS、Express、DynamoDB(但实际上可以是任何数据库)

场景: 需要读取大量记录并作为可下载文件返回给用户。这意味着我不能一次缓冲所有内容,然后在 Express 的响应中发送它。另外,我可能需要多次执行查询,因为一次查询可能不会返回所有数据。

建议的解决方案: 使用可通过管道传输到 Express 中的响应流的可读流。

我首先创建了一个继承自 stream.Readable 的对象,并实现了一个推送查询结果的 _read() 方法。问题是在 _read() 中调用的数据库查询是异步的,但 stream.read() 是一个同步方法。

当流通过管道传输到服务器的响应时,在 db 查询甚至有机会执行之前调用了多次读取。 因此,查询被多次调用,即使当查询的第一个实例完成并执行 push(null) 时,其他查询也完成并且我收到“EOF 后的 push()”错误。

  1. 有没有办法使用 _read() 正确执行此操作?
  2. 我应该忘记 _read() 并在构造函数中执行查询和 push() 结果吗?
  3. 我应该执行查询并发出数据事件而不是 push() 吗?

谢谢

function DynamoDbResultStream(query, options){
    if(!(this instanceof DynamoDbResultStream)){
        return new DynamoDbResultStream(query, options);
    }

    Readable.call(this, options);

    this.dbQuery = query;
    this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);

DynamoDbResultStream.prototype._read = function(){
    var self = this;
    if(!this.done){
        dynamoDB.query(this.dbQuery, function(err, data) {
            if (!err) {
                try{
                    for(i=0;i<data.Items.length;i++){
                        self.push(data.Items[i]);
                    }
                }catch(err){
                    console.log(err);
                }
                if (data.LastEvaluatedKey) {
                    //Next read() should invoke the query with a new start key
                    self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
                }else{
                    self.done=true;
                    self.push(null);
                }
            }else{
                 console.log(err);
                 self.emit('error',err);
            }
        });
    }else{
        self.push(null);
    }
};

编辑: 发布这个问题后,我发现这篇文章的答案显示了如何在不使用继承的情况下做到这一点:How to call an asynchronous function inside a node.js readable stream

有一条评论指出,在 _read() 内部应该只有一个 push()。而且每次 push() 通常都会产生另一个 read() 调用。

【问题讨论】:

  • 你能提供一个你正在编写的代码的例子吗?
  • 我已经添加了到目前为止的代码
  • 我会把你指向我的scramjet 模块,但我还没有这么简单的可读界面。如果您仍然感兴趣,我可以向您展示如何进行非常适合上述场景的异步流映射。

标签: javascript node.js database express asynchronous


【解决方案1】:

注意Stream的不同模式:https://nodejs.org/api/stream.html#stream_two_modes

const Readable = require('stream').Readable;

// starts in paused mode
const readable = new Readable();

let i = 0;
fetchMyAsyncData() {
  setTimeout(() => {
    // still remains in paused mode
    readable.push(++i);

    if (i === 5) {
      return readable.emit('end');
    }

    fetchMyAsyncData();
  }, 500);    
}

// "The res object is an enhanced version of Node’s own response object and supports all built-in fields and methods."
app.get('/mystreamingresponse', (req, res) => {

  // remains in paused mode
  readable.on('readable', () => res.write(readable.read()));

  fetchMyAsyncData();

  // closes the response stream once all external data arrived
  readable.on('end', () => res.end());
})

【讨论】:

    【解决方案2】:

    我在 NodeJs 12+ 上找到了答案(甚至更低?)。

    实现这一点的最佳方法是通过生成器/迭代器函数。

    这是我对 CosmosDb 所做的示例,它提供了一个用于迭代查询的令牌。但是,您可以像这样执行任何异步调用。

    这个想法是生成器函数在第一次执行时创建一个迭代器,yield 调用将为每次迭代提供结果。该方法在每次 yield 时暂停,直到它返回一个值(最后一个 return true)。

        async function* reader() {
            let continuationToken = null;
            do {
                const result = await myAsyncCall(filter, continuationToken);
                continuationToken = result.continuationToken;
    
                // return the resources to the writer
                yield result.resources;
            } while (continuationToken);
    
            // finish the iterator
            return true;
        }
    
        await pipeline(
            // note this is indeed the first call to reader(), not the method pointer.
            Readable.from(reader()),
            ws);
    

    通过此设置,Writable 将接收资源数组作为块,并可以根据需要对其进行处理。

    需要注意的是,无法将 Writable highwatermark 连接到 Readable highwatermark(当我使用 Readable 子类时,它曾经可以工作)。

    但是,这应该不是什么大问题,因为 Writable 仍然控制着流程,而您作为开发人员也有控制权。

    【讨论】:

      猜你喜欢
      • 2016-03-08
      • 1970-01-01
      • 2020-06-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-01-26
      相关资源
      最近更新 更多