【问题标题】:rxjs - buffer stream until function returns truerxjs - 缓冲流直到函数返回 true
【发布时间】:2019-12-13 01:30:39
【问题描述】:

我有一个以恒定数量增加的数字流,我想对其进行子采样。给定一个恒定样本interval,我想缓冲流,直到第一个和最后一个缓冲值之间的差异大于或等于interval。然后它会发出这个数组,与buffer 运算符非常相似。

我已经搜索了不同的 rxjs 运算符,但不知道如何使其工作。 bufferUntil 运营商将是完美的,但似乎不存在。我该如何实现?

例如:

const interval = 15;
//example stream would be: 5, 10 , 15, 20, 25, 30..

Observable.pipe(
   bufferUntil(bufferedArray => {
       let last = bufferedArray.length - 1;
       return (bufferedArray[last] - bufferedArray[0] >= interval);
   })
).subscribe(x => console.log(x));

//With an expected output of [5, 10, 15, 20], [ 25, 30, 35, 40],..

【问题讨论】:

    标签: javascript rxjs


    【解决方案1】:

    您可以实现一个维护自定义缓冲区的运算符。将所有传入值推入缓冲区,当满足给定条件时发出缓冲区并重置它。使用defer 为每个订阅者提供自己的缓冲区。

    function bufferUntil<T>(emitWhen: (currentBuffer: T[]) => boolean): OperatorFunction<T, T[]> {
      return (source: Observable<T>) => defer(() => {
        let buffer: T[] = []; // custom buffer
        return source.pipe(
          tap(v => buffer.push(v)), // add values to buffer
          switchMap(() => emitWhen(buffer) ? of(buffer) : EMPTY), // emit the buffer when the condition is met
          tap(() => buffer = []) // clear the buffer
        )
      });
    }
    

    https://stackblitz.com/edit/rxjs-7awqmv

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-07
      • 2020-09-10
      • 1970-01-01
      • 2010-12-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多