【发布时间】:2016-06-25 13:23:03
【问题描述】:
环境: NodeJS、Express、DynamoDB(但实际上可以是任何数据库)
场景: 需要读取大量记录并作为可下载文件返回给用户。这意味着我不能一次缓冲所有内容,然后在 Express 的响应中发送它。另外,我可能需要多次执行查询,因为一次查询可能不会返回所有数据。
建议的解决方案: 使用可通过管道传输到 Express 中的响应流的可读流。
我首先创建了一个继承自 stream.Readable 的对象,并实现了一个推送查询结果的 _read() 方法。问题是在 _read() 中调用的数据库查询是异步的,但 stream.read() 是一个同步方法。
当流通过管道传输到服务器的响应时,在 db 查询甚至有机会执行之前调用了多次读取。 因此,查询被多次调用,即使当查询的第一个实例完成并执行 push(null) 时,其他查询也完成并且我收到“EOF 后的 push()”错误。
- 有没有办法使用 _read() 正确执行此操作?
- 我应该忘记 _read() 并在构造函数中执行查询和 push() 结果吗?
- 我应该执行查询并发出数据事件而不是 push() 吗?
谢谢
function DynamoDbResultStream(query, options){
if(!(this instanceof DynamoDbResultStream)){
return new DynamoDbResultStream(query, options);
}
Readable.call(this, options);
this.dbQuery = query;
this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);
DynamoDbResultStream.prototype._read = function(){
var self = this;
if(!this.done){
dynamoDB.query(this.dbQuery, function(err, data) {
if (!err) {
try{
for(i=0;i<data.Items.length;i++){
self.push(data.Items[i]);
}
}catch(err){
console.log(err);
}
if (data.LastEvaluatedKey) {
//Next read() should invoke the query with a new start key
self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
}else{
self.done=true;
self.push(null);
}
}else{
console.log(err);
self.emit('error',err);
}
});
}else{
self.push(null);
}
};
编辑: 发布这个问题后,我发现这篇文章的答案显示了如何在不使用继承的情况下做到这一点:How to call an asynchronous function inside a node.js readable stream
有一条评论指出,在 _read() 内部应该只有一个 push()。而且每次 push() 通常都会产生另一个 read() 调用。
【问题讨论】:
-
你能提供一个你正在编写的代码的例子吗?
-
我已经添加了到目前为止的代码
-
我会把你指向我的
scramjet模块,但我还没有这么简单的可读界面。如果您仍然感兴趣,我可以向您展示如何进行非常适合上述场景的异步流映射。
标签: javascript node.js database express asynchronous