【问题标题】:RxJS: (Time) Buffer that starts after next emittionRxJS:(时间)在下一次发射后开始的缓冲区
【发布时间】:2016-10-02 16:36:08
【问题描述】:

我想知道如何使用 RxJs (4/5) 正确实现这一点?

-a-- -b----c----d-----------------------------------------------------------e------f---------------------
-5-sec after-"a"--> [abcd]---new 5 sec timer will start when "e" emited-----5 sec-after-"e"->[ef]-

我认为是这样的:

.buffer(source$.throttleTime(5000).debounceTime(5000))

在 rxjs 5 中完成工作

【问题讨论】:

    标签: rxjs rxjs5


    【解决方案1】:

    最好的办法是使用缓冲区。缓冲区有一个关闭条件,您希望在引入新项目 5 秒后有一个关闭条件。所以,假设你有一个源流,你想要的流是:

    source.buffer(source.throttle(5100).debounce(5000));
    

    这是 rxjs 4。我认为 rxjs 的缓冲区运算符略有不同,但想法是相同的。

    说明: 油门确保在 5100 毫秒内您只会获得第一个“滴答声”。去抖将在 5000 毫秒后传播这个“滴答声”,因为此后没有其他“滴答声”。请注意,我选择了 5100 毫秒,因为时间并不总是完美的,如果你对两者都使用 5000 毫秒,去抖动可能会反复延迟,你会饿死。无论如何,您的缓冲区不会丢失数据,只会将其分组为大于 5000 毫秒的块。

    Rxjs 5 有一个 bufferToggle 操作符,它看起来可能是一个更好的选择,但是,你打开和关闭缓冲区的事实可能会变得有风险,并且由于时间问题让你丢失数据。

    【讨论】:

    • 谢谢,好像.buffer(source$.throttleTime(5000)) 完成了这项工作
    • 我不确定,如果你的流是连续的,它会,但如果你有暂停,最后的元素将等待下一个油门。上升信号(第一个元素)上的油门滴答声,结束时去抖动。
    【解决方案2】:

    我正在使用 RxJS 6,但找不到 5 的文档。但是,这是一个很棒的问题。这是我的结果,在 real example 中也展示了重现 Angular Material 中的错误。

    source$ = source$.pipe(buffer(source$.pipe(debounceTime(5000))));
    

    【讨论】:

      【解决方案3】:

      尝试了所有 Rxjs 5 缓冲区变体,特别是每 n 秒发出空或不发射的 bufferTime,我最终滚动了自己的 bufferTimeLazy:

      function bufferTimeLazy(timeout) {
        return Rx.Observable.create(subscriber => {
          let buffer = [], hdl;
          return this.subscribe(res => {
            buffer.push(res);
            if (hdl) return;
      
            hdl = setTimeout(() => {
              subscriber.next(buffer);
              buffer = [];
              hdl = null;
            }, timeout);
      
          }, err => subscriber.error(err), () => subscriber.complete());
        });
      };
      
      // add operator
      Rx.Observable.prototype.bufferTimeLazy = bufferTimeLazy;
      
      // example
      const click$ = Rx.Observable.fromEvent(document, 'click');
      
      click$.bufferTimeLazy(5000).subscribe(events => {
        console.log(`received ${events.length} events`);
      });
      

      示例: https://jsbin.com/nizidat/6/edit?js,console,output

      这个想法是在缓冲区中收集事件并在第一个事件后 n 秒发出缓冲区。一旦发出,清空缓冲区并保持休眠状态,直到下一个事件到达。

      如果您不想在 Observable.prototype 中添加操作符,只需调用函数即可:

      bufferTimeLazy.bind(source$)(5000)
      

      编辑: 好的,所以 Rxjs 5 还不错:

      var clicks = Rx.Observable.fromEvent(document, 'click').share();
      var buffered = clicks.bufferWhen(() => clicks.delay(5000));
      buffered.subscribe(x => console.log(`got ${x.length} events`));
      

      达到同样的效果。注意 share() 以避免重复点击订阅 - YMMV。

      【讨论】:

        【解决方案4】:

        正如 Trevor 所提到的,在 RXJS 6 中没有官方方法,但显然您需要使用 debounce + buffer 才能实现该结果。

        为了使事情正确,在 Typescript 和 Type Inference 中,我创建了一个名为 bufferDebounce 的自定义 OperatorFunction,这使得该运算符更易于使用和理解。

        带有类型推断的sn-p

        type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
        
        const bufferDebounce: BufferDebounce = debounce => source =>
          new Observable(observer => 
            source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
              next(x) {
                observer.next(x);
              },
              error(err) {
                observer.error(err);
              },
              complete() {
                observer.complete();
              },
          })
        // [as many sources until no emit during 500ms]
        source.pipe(bufferDebounce(500)).subscribe(console.log) 
        

        您可以在这个工作示例中尝试:https://stackblitz.com/edit/rxjs6-buffer-debounce

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2022-01-11
          • 1970-01-01
          • 1970-01-01
          • 2015-12-04
          • 1970-01-01
          相关资源
          最近更新 更多