【问题标题】:Detect stream's Writable last chunk检测流的可写最后一个块
【发布时间】:2021-05-18 18:06:34
【问题描述】:

我创建了一个Writable 流,它与我们系统中的一个大型管道挂钩,该管道在收到BUFFER_SIZE 块(对象)后写入数据库。

getStream() {
    const buffer = [];

    const stream = new Writable({
        objectMode: true,
        async write(chunk,enc,next) {
            buffer.push(chunk);
            if( buffer.length > BUFFER_SIZE ) {
                await insertToDB(buffer);
            }
            next();
        }
    });

    stream.on('finish', async () => {
        // insert last batch?
        if( buffer.length ) {
            await insertToDB(buffer);
        }
    });

    return stream;
}

async consumer() {
    await pipeline(...largePipeline, getStream());
    closeAll();
}

这工作正常,但我遇到的问题是on('finish', ...) 事件处理程序被调用太晚,在closeAll() 已在consumer() 函数中被调用。

有没有办法让write() 方法知道它刚刚收到了最后一个 chunk?这样我就可以在调用最后一个 next() 之前刷新缓冲区,一切都会同步。

请注意,在这个代码库中,管道、消费者和编写者之间有非常严格的分离,我不能也不会在这些组件之间交换承诺、标志或状态检查。可写流是一个独立的单元!我正在寻找 Node Streams 通过缓冲写入解决此问题的方法,必须有一种方法来检查 Writable 流是否被最后调用并等待它真正完成,但我无法理解它。

【问题讨论】:

  • 您如何使用await pipepipe 是否返回承诺?
  • 对不起,它应该从流承诺库中读取pipeline,它返回一个承诺。

标签: node.js node-streams


【解决方案1】:

好的,我找到了正确的方法,当数据耗尽时,Node Streams 会调用final(next) 方法,这就是你应该在“释放”流之前完成写作的地方:

getStream() {
    let buffer = [];

    return new Writable({
        objectMode: true,
        async write(chunk,enc,next) {
            buffer.push(chunk);
            if( buffer.length > BUFFER_SIZE ) {
                await insertToDB(buffer);
            }
            next();
        },
        async final(next) {
            // insert last batch?
            if( buffer.length ) {
                await insertToDB(buffer);
                buffer = [];
            }
            next();
        }

    });
}

async consumer() {
    await pipeline(...largePipeline, getStream());
    closeAll();
}

我还发现writev() 函数是编写缓冲块的更好方法,而不是在Writable 流中实现自己的缓冲区:

getStream() {
    return new Writable({
        objectMode: true,
        highWaterMark: BUFFER_SIZE,
        async writev(chunks, next) {
            await insertToDB( chunks.map( chunk => chunk.chunk ) );
            next();
        }
    });
}

它利用 highWaterMark 配置设置以及每次发送给您的块对象的数量,这允许 Node 更好地控制整个流管道的背压并简化您的 Writable 设计。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多