【问题标题】:RxJS operator that waits for quiet period in event stream, but in case of busy event stream does not wait for ever在事件流中等待静默期的 RxJS 运算符,但在事件流繁忙的情况下不会永远等待
【发布时间】:2018-11-11 14:19:12
【问题描述】:

场景:

  • 我有一个事件流,每个事件都应该导致信息的更新显示(事件流来自 websockets,显示在 highcharts 图表中,但这并不重要)
  • 出于性能原因,我不想为每个事件触发 UI 更新。
  • 我宁愿做以下事情:

    • 当我收到一个事件时,我只想进行 UI 更新,距离上次更新超过 X 毫秒
    • 但是每隔 Y 毫秒 (Y > X) 我还是想做一次更新,如果有任何传入事件
    • 所以我正在寻找某种(组合)RxJS 运算符,它会限制事件流的速率,仅在出现安静期(或等待安静期最大时间)时才发出事件已超过)。
    • 即我想等待一段安静的时期,但不是永远。

如何实现我上面描述的内容?

我看过:

【问题讨论】:

  • 我会使用debounce,传递一个通过将延迟源与计时器合并而成的可观察通知。以后我可以看看写一个答案。

标签: javascript rxjs


【解决方案1】:

您可以将timerdebounceTime 结合起来,并使用它来采样原始事件流:

let $tick = Rx.Observable.timer(100, 100);
let $updates = $events
                  .sample($tick.merge($events.debounceTime(30))
                  .distinctUntilChanged();

这将每 100 毫秒发生一次事件,但如果事件发生在 30 毫秒间隙之前。

使用sample,采样流中的值将被忽略。因此,该技术创建了一个采样流,其中包括基于时间的要求和去抖动。每当发生其中任何一种情况时,它将从原始流中获取最新值。

使用distinctUntilChanged 可以防止事件以相同的值连续重复,如果没有发生任何变化。如果您的数据是结构化的或无法与=== 进行比较,您可能需要将比较函数作为参数添加到distinctUntilChanged

【讨论】:

  • 我猜是否有任何点取决于 X 和 Y 的值之间的差异,但我很想给你 +1 只是因为拉弗格瓶。
【解决方案2】:

你可以编写一个运算符来做你想做的事,方法是使用debounce 并在可观察的通知器的组合中使用两个计时器:

  • 在源发出值后发出 X 毫秒的计时器;和
  • 一个计时器,在操作员返回的可观察对象发出一个值后发出 Y 毫秒。

请参阅下面的 sn-p。里面的评论应该解释它是如何工作的。

const {
  ConnectableObservable,
  merge,
  MonoTypeOperatorFunction,
  Observable,
  of,
  Subject,
  Subscription,
  timer
} = rxjs;

const {
  concatMap,
  debounce,
  mapTo,
  publish,
  startWith,
  switchMap
} = rxjs.operators;

// The pipeable operator:

function waitUntilQuietButNotTooLong(
  quietDuration,
  tooLongDuration
) {

  return source => new Observable(observer => {

    let tooLongTimer;
    
    // Debounce the source using a notifier that emits after `quietDuration`
    // milliseconds since the last source emission or `tooLongDuration`
    // milliseconds since the observable returned by the operator last
    // emitted.

    const debounced = source.pipe(
      debounce(() => merge(
        timer(quietDuration),
        tooLongTimer
      ))
    );

    // Each time the source emits, `debounce` will subscribe to the notifier.
    // Use `publish` to create a `ConnectableObservable` so that the too-long
    // timer will continue independently of the subscription from `debounce`
    // implementation.

    tooLongTimer = debounced.pipe(
      startWith(undefined),
      switchMap(() => timer(tooLongDuration)),
      publish()
    );

    // Connect the `tooLongTimer` observable and subscribe the observer to
    // the `debounced` observable. Compose a subscription so that
    // unsubscribing from the observable returned by the operator will
    // disconnect from `tooLongTimer` and unsubscribe from `debounced`.

    const subscription = new Subscription();
    subscription.add(tooLongTimer.connect());
    subscription.add(debounced.subscribe(observer));
    return subscription;
  });
}

// For a harness, create a subject and apply the operator:

const since = Date.now();
const source = new Subject();
source.pipe(
  waitUntilQuietButNotTooLong(100, 500)
).subscribe(value => console.log(`received ${value} @ ${Date.now() - since} ms`));

// And create an observable that emits at a particular time and subscribe
// the subject to it:

const emissions = of(0, 50, 100, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950, 1000, 1050, 1100, 1150);
emissions.pipe(
  concatMap((value, index) => timer(new Date(since + value)).pipe(
    mapTo(index)
  ))
).subscribe(source);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-27
    • 1970-01-01
    相关资源
    最近更新 更多