【问题标题】:RxJS: deferred inner-observables still run even when the outer-observable has no subscription leftRxJS:即使外部可观察对象没有剩余订阅,延迟的内部可观察对象仍会运行
【发布时间】:2018-08-16 16:06:23
【问题描述】:

我刚刚意识到,即使外部可观察对象没有剩余订阅,内部可观察对象(如在 mergeMap 或 switchMap 运算符中定义的对象)也不会“停止”。

为了更好的例子,让我们展示一些代码:

const {
  Subject,
  of: obsOf,
  concat: obsConcat,
  defer,
} = require("rxjs");
const {
  finalize,
  mergeMap,
  tap,
  takeUntil,
} = require("rxjs/operators");

const subject = new Subject();

obsOf(null).pipe(
  mergeMap(() =>
    obsConcat(
      defer(() => {
        console.log("side-effect 1");
        return obsOf(1);
      }),
      defer(() => {
        console.log("side-effect 2");
        return obsOf(2);
      }),
      defer(() => {
        console.log("side-effect 3");
        return obsOf(3);
      })
    )
  ),
  finalize(() => {
    console.log("finalized");
  })
)
.pipe(
  takeUntil(subject),
  tap((i) => {
    if (i === 2) {
      subject.next();
    }
  })
).subscribe(
  (i) => { console.log("next", i); },
  (e) => { console.log("error", e); },
  () => { console.log("complete"); },
);

// Ouput:
// > side-effect 1
// > next 1
// > side-effect 2
// > complete
// > finalized
// > side-effect 3

side-effect 3 行被记录的事实很奇怪,因为外部 observable 已经调用了finalize

由于所有这些副作用都在 defer 中,因此在取消订阅后可以完全避免它们。在我看来,这些副作用根本没有任何价值。

知道为什么 RxJS 仍然执行这些吗?

【问题讨论】:

    标签: rxjs


    【解决方案1】:

    不幸的是,这是设计使然(从 RxJS 6 开始)-concat 将缓冲 observables,即使在您取消订阅后也会订阅每个缓冲的对象(如果订阅是 closed,它将订阅并立即取消订阅)。

    你必须防止 observables 被缓冲...

    obsOf(null).pipe(
      mergeMap(() => obsOf(
        defer(() => {
          console.log("side-effect 1");
          return obsOf(1);
        }),
        defer(() => {
          console.log("side-effect 2");
          return obsOf(2);
        }),
        defer(() => {
          console.log("side-effect 3");
          return obsOf(3);
        })
      )),
      concatAll(),
      finalize(() => {
        console.log("finalized");
      }),
      takeUntil(subject),
      tap((i) => {
        if (i === 2) {
          subject.next();
        }
      })
    ).subscribe(
      (i) => { console.log("next", i); },
      (e) => { console.log("error", e); },
      () => { console.log("complete"); },
    );
    

    人们可能会认为上面的代码有效,但前提是您延迟了其中一个 observables。将obsOf(1) 替换为timer(100).pipe(mapTo(1));,行为完全相同。

    唯一的解决方法是确保您没有缓冲任何内容(意味着不要使用 concat* 运算符)或以其他方式限制可观察生产(使用单独的主题并手动控制生产)。

    【讨论】:

    • 非常感谢!知道为什么他们改变了 RxJS6 中 concat 的设计吗?懒惰地运行 Observables,甚至是连接的 Observables,看起来更像 Rx。
    猜你喜欢
    • 2019-01-30
    • 2018-06-23
    • 1970-01-01
    • 1970-01-01
    • 2018-07-28
    • 1970-01-01
    • 2012-11-15
    • 2016-11-02
    • 1970-01-01
    相关资源
    最近更新 更多