【问题标题】:RxJS v5 Pausable Observable IntervalRxJS v5 Pausable Observable Interval
【发布时间】:2017-09-20 00:57:37
【问题描述】:

在 RxJS v5 中没有实现 pausable 操作符,有没有更好的方法来创建一个可暂停的间隔?下面的代码有效,但通过跟踪最后发出的值作为偏移量来实现。看来应该有更好的办法了……

const source = Rx.Observable.interval(100).share()
const offset = new Rx.BehaviorSubject(0)
let subscription;
let currentValue;

function start() {    
    subscription = source
        .subscribe(i => {
            currentValue = i + offset.value
      })
}



function pause() {
    source.take(1).subscribe(i => offset.next(i + offset.value))
    subscription.unsubscribe()
}

【问题讨论】:

标签: rxjs rxjs5


【解决方案1】:

share() 运算符是an alias for .publish().refCount()refCount() 意味着当没有其他订阅者存在时,observable 将自行清理。因为您正在从源取消订阅,所以它会自行清理,然后在 subscribed 再次重新启动时。将publish()connect() 一起使用。代码如下:

const source = Observable.interval(100).publish();
source.connect();

// Start with false, change to true after 200ms, then false again
// after another 200ms
const pauser = Observable.timer(200)
  .mapTo(true)
  .concat(Observable.timer(200).mapTo(false))
  .startWith(false);

const pausable = pauser
  .switchMap(paused => (paused ? Observable.never() : source))
  .take(10);

pausable.subscribe(x => console.log(x));

查看此 jsbin 以获取运行示例:http://jsbin.com/jomusiy/3/edit?js,console

【讨论】:

    【解决方案2】:

    没有通用的方法可以做到这一点。这取决于您暂停的确切含义以及您要暂停的内容。 (您是想停止发射然后重新开始,是缓冲和排放,还是实际上需要向上游添加延迟,但要及时保持上游值的分布?)

    当上游专门是一个计时器时,我有一种方法可以有效地执行此操作,就像在您的示例中一样。这是我自己的问题的答案。

    RxJS (5.0rc4): Pause and resume an interval timer

    这个有很大的优势,可以及时保留source的值分布,但只是增加了延迟。

    对于更一般的情况:

    1. 对于冷的 observables:switchnever 和上游之间。暂停时,取消订阅上游。 skip 你见过的,然后switchskiped 流。在未暂停时,您必须保持对已发出的值的计数,以便下次有人取消暂停时您可以skip 那么多值。你只需要记住你以前见过多少。然而,每次暂停都会导致冷的 observable 从头开始​​重播。在一般情况下,这可能非常效率低下。代码看起来像这样。 pauser 这里将是一个主题,您可以将其设置为 true 或 false 以暂停上游。

      function pausableCold(pauser, upstream) {
          var seen = 0;
          return pauser.switch(paused => {
              if (paused) {
                  return Observable.never();
              }
              else {
                  return upstream.skip(seen).do(() => seen++);
              }
          });
      }
      
    2. 对于热或冷的 observable,您可以使用缓冲。 buffer 暂停时,然后在未暂停时排空并连接到热上游。 (这会保留所有值,但不会及时保留它们的分布。此外,如果可能是冷的,您应该使用 publish 在上游热。)

    3. 最有效的方法实际上并不是 Rx 的一部分。你真正想要的是告诉源停止发射,然后重新开始。您执行此操作的方式非常特定于源是什么以及源如何产生价值。

    【讨论】:

      猜你喜欢
      • 2021-08-28
      • 2017-09-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多