【问题标题】:Preventing multiple executions of observable防止多次执行 observable
【发布时间】:2018-02-13 16:23:32
【问题描述】:

我有一个屏幕需要显示一些来自 api 的不断变化的数据。在我的 api 调用的 observable 上,我使用 repeatWhen() 来实现轮询。此外,当我检测到传入数据的某些值变化时,我需要进行额外的 api 调用。我目前的实现是这样的:

    let queueStatusObservable = this.enumService.getOptions('WorkQueueStatus');
    let queueItemObservable = this.enumService.getOptions('WorkItemStatus');

    // retrieve status every 3000ms
    let summaryObservable = this.service
        .getSummary(this.id)
        .repeatWhen(completed => completed.delay(3000));

    this.summaryModel$ = Observable.combineLatest(queueStatusObservable, queueItemObservable, summaryObservable)
        .map(results => {
            let workQueueStatusList = results[0];
            let workItemStatusList = results[1];
            let summary = results[2].status > -1 ? results[2] : null // convert empty {} object to null;

            let model = {
                workQueueStatusList: workQueueStatusList,
                workItemStatusList: workItemStatusList,
                summary: summary
            };

            return model;
        });

    // fetch validationmessages every time the count changes
    this.validationMessages$ = Observable.merge(
        summaryObservable.first((value, index) => value.statusCount['Invalid'] > 0),
        summaryObservable.pairwise().filter((value, index) => value[0].statusCount['Invalid'] != value[1].statusCount['Invalid'])
    ).flatMap(i => me.service.getMessages());

    // fetch errors every time the count changes
    this.errorMessages$ = Observable.merge(
        summaryObservable.first((value, index) => value.statusCount['Error'] > 0),
        summaryObservable.pairwise().filter((value, index) => value[0].statusCount['Error'] != value[1].statusCount['Error'])
    ).flatMap(i => me.service.getErrors());

observables 上没有 subscribe() 调用,因为这些调用发生在我使用异步管道的角度模板中,如下所示:

    <div *ngIf="summaryModel$|async; let summaryModel">

我希望我每 3 秒会收到一个 api 调用,并且所有在 summaryObservable 上工作的语句都将简单地由这个 api 调用响应触发。

它似乎不是那样工作的。当我在 chrome 中打开网络选项卡时,我看到我每 3 秒收到 4 个 api 调用。这是 rxjs 应该如何工作,还是我以错误的方式使用 rxjs?

我最终得到的解决方案是:

let summaryObservable = this.service
    .getSummary(this.id)
    .repeatWhen(completed => completed.delay(3000))
    .shareReplay();

使用共享方法,我不需要自己管理连接/断开连接,shareReplay 确保每个值都可以被多次请求。

【问题讨论】:

  • 所以你的模板中有多个summaryModel$|async
  • 首先,如果它在同一个模板中,您可以使用 as 语法。就像*ngIf="obs$ | async as obs" 一样,然后只需订阅一次就可以在 obs 上工作。另外,您可能想搜索如何使用 rxjs 中的 share 运算符。

标签: angular rxjs


【解决方案1】:

您使用相同的summaryObservable 5 次。其中 2 个是一次性的 first 呼叫。一般来说,这会给您留下 3 种不同的用途。

当你想多次重复使用同一个 observable 时,你需要使用类似 publish 的东西来“共享”一个 observable 的订阅。

试试这样的:

let summaryObservable = this.service
  .getSummary(this.id)
  .repeatWhen(completed => completed.delay(3000))
  .publish(); // creates a ConnectableObservable

// ...
// all the rest of your code goes here
// ...

// Finally "connect" the observable to start the polling
// that will be shared by all the other usages
this.summaryConnection = summaryObservable.connect()

并确保您定义 ngOnDestroy 以在您的组件被销毁时停止轮询:

ngOnDestroy() {
    this.summaryConnection.unsubscribe(); // .dispose() if using older version of RXJS
}

【讨论】:

  • 我有这样的条件:map(counter => counter + 1), startWith(0), map(counter => ({counter})), takeUntil(this.stop$), repeatWhen(() => this.start$), publish() 但这不起作用 repeatWhen 仍然启动多个 observables
猜你喜欢
  • 2013-03-11
  • 2010-09-30
  • 2012-12-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-04-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多