【发布时间】:2021-05-18 18:06:34
【问题描述】:
我创建了一个Writable 流,它与我们系统中的一个大型管道挂钩,该管道在收到BUFFER_SIZE 块(对象)后写入数据库。
getStream() {
const buffer = [];
const stream = new Writable({
objectMode: true,
async write(chunk,enc,next) {
buffer.push(chunk);
if( buffer.length > BUFFER_SIZE ) {
await insertToDB(buffer);
}
next();
}
});
stream.on('finish', async () => {
// insert last batch?
if( buffer.length ) {
await insertToDB(buffer);
}
});
return stream;
}
async consumer() {
await pipeline(...largePipeline, getStream());
closeAll();
}
这工作正常,但我遇到的问题是on('finish', ...) 事件处理程序被调用太晚,在closeAll() 已在consumer() 函数中被调用。
有没有办法让write() 方法知道它刚刚收到了最后一个 chunk?这样我就可以在调用最后一个 next() 之前刷新缓冲区,一切都会同步。
请注意,在这个代码库中,管道、消费者和编写者之间有非常严格的分离,我不能也不会在这些组件之间交换承诺、标志或状态检查。可写流是一个独立的单元!我正在寻找 Node Streams 通过缓冲写入解决此问题的方法,必须有一种方法来检查 Writable 流是否被最后调用并等待它真正完成,但我无法理解它。
【问题讨论】:
-
您如何使用
await pipe?pipe是否返回承诺? -
对不起,它应该从流承诺库中读取
pipeline,它返回一个承诺。
标签: node.js node-streams