【问题标题】:Simple buffering duplex stream in Node - how to?Node中的简单缓冲双工流 - 如何?
【发布时间】:2021-03-12 04:17:40
【问题描述】:

我正在尝试实现具有缓冲功能的双工流。

它应该积累大量数据,直到它们足够多,然后才将它们进一步发送。

例如,在播放流式音频/视频数据时可以使用它:不能简单地及时获取帧,对吧?

下面是我创建这种缓冲双工流的愚蠢尝试。有一个源流,它将x\n 字符发送到缓冲流,而缓冲流又应该将数据进一步发送到process.stdout

唉,这行不通。具体来说,read() 函数似乎没有任何方法可以暂停停止,例如:

“嘿,我现在没有任何数据给你,稍后再回来”

不,一旦我返回 undefinednull,故事就结束了,标准输出中什么也没有出现。

var {Readable, Duplex} = require('stream');

// Source stream, seeds: x\n, x\n, x\n, ...
let c = 10;
var rs = new Readable({
  read () {
    if (c > 0) {
      c--;
      console.log('rs reading:', 'x');
      this.push('x\n');
    } 
    else {
      this.push(null)
    }
  },
});

// Buffering duplex stream
// I want it to cache 3 items and only then to proceed
const queue = [];
const limit = 3;
var ds = new Duplex({
  writableHighWaterMark: 0,
  write (chunk, encoding, callback) {
    console.log('ds writing:', chunk, 'paused: ', ds.isPaused());
    queue.push(chunk);
    callback();
  },
  readableHighWaterMark: 0,
  read () {
    // We don't want to output anything
    // until there's enough elements in the `queue`.
    if (queue.length >= limit) {
      const chunk = queue.shift();
      console.log('ds reading:', chunk);
      this.push(chunk);
    }
    else {
      // So how to wait here?
      this.push(undefined)
    }
  },
});

// PROBLEM: nothing is coming out of the "ds" and printed on the stdout
rs.pipe(ds).pipe(process.stdout);

这是我的回复:https://repl.it/@OnkelTem/BufferingStream1-1#index.js

我检查了双工的状态,它甚至没有处于暂停状态。所以它没有暂停,它在流动,然而——什么也不返回。

我还花了几个小时重新阅读有关 Node 流的文档,但实际上并不觉得它是为了理解而创建的。

【问题讨论】:

    标签: node.js audio-streaming node-streams


    【解决方案1】:

    缓冲流只是一种转换流。如果我理解您要正确执行的操作,那么实现应该不会比这更复杂:

    const { Transform } = require('stream');
    
    const DEFAULT_CAPACITY = 10;
    
    class BufferingTransform extends Transform {
      constructor(options = {}) {
        super(options);
    
        this.capacity = options.capacity || DEFAULT_CAPACITY ;
        this.pending = [] ;
    
        return;
      }
    
      get atCapacity() {
        return this.pending.length >= this.capacity;
      }
    
      _transform(chunk, encoding, cb) {
    
        if ( this.atCapacity ) {
          this.push( ...this.pending.shift() );
        }
    
        this.pending.push( [chunk, encoding] );
    
        cb();
      }
    
      _flush(cb) {
    
        while (this.pending.length > 0) {
          this.push( ...this.pending.shift() );
        }
    
        cb();
      }
    
    }
    
    

    一旦你有了它,它应该只是通过BufferingStream and reading from the BufferingStream`管道你的源代码的问题:

    async function readFromSource() {
      const source = openSourceForReading();
      const buffer = new BufferingStream();
    
      source.pipe(buffer);
    
      for await (const chunk of buffer) {
        console.log(chunk);
      }
    
    }
    

    【讨论】:

    • 感谢 Nicholas,这正是我所需要的,但您使用的是 Transform 流而不是 Duplex。好的!带有您的版本的沙箱:repl.it/@OnkelTem/BufferingStream1-2。还有一个,没有类:repl.it/@OnkelTem/BufferingStream1-3 仍然很有趣如何只使用双工流。
    • 转换流只是一种特殊的双工流,它为您处理一些逻辑,因此他们的解决方案确实使用双工流;)
    • @tjjfvi - 是的,我知道。我最初的困难是实现 [更通用] 双工流的read() 方法。基本上我不知道如何在那里“等待”,因为没有任何callback。所以我很好奇它会是什么样子,一个 Duplex 流和 Nicholas 的 Transform one 完全一样。
    • 我有一个想法。实现write() 中的所有逻辑,并在read() 中进行简单的阅读。但我不确定。
    • @Onkeltem — Transform 子类型 Duplex
    【解决方案2】:

    这是一个使用异步迭代的实现:

    function bufferStream(stream, bufferCount){
        stream = normalizeAsyncIterable(stream);
        const iterator = stream[Symbol.asyncIterator]();
        const queue = []
    
        while(queue.length < bufferCount)
            queue.push(iterator.next());
    
        return normalizeAsyncIterable({
            [Symbol.asyncIterator]: () => ({
                next: async () => {
                    const promise = queue.shift() ?? iterator.next();
                    while(queue.length < bufferCount)
                        queue.push(iterator.next());
                    return promise;
                }
            })
        });
    }
    
    // Ensures that calls to .next() while the generator is paused are handled correctly
    async function* normalizeAsyncIterable(iterable){
        for await(const value of iterable)
            yield value;
    }
    

    TS Playground Link

    【讨论】:

      猜你喜欢
      • 2012-09-01
      • 2013-05-12
      • 2013-08-14
      • 2015-03-17
      • 1970-01-01
      • 2019-06-19
      • 1970-01-01
      • 2010-10-23
      • 1970-01-01
      相关资源
      最近更新 更多