【问题标题】:rxjs: keep infinite stream alive after errorrxjs:错误后保持无限流活着
【发布时间】:2021-02-13 16:14:30
【问题描述】:

我想知道我是否可以在我的无限主流中合并映射的每个外部流的管道中改为 catchError,在我的主流末尾使用一个 catchError。此 catchError 返回对主流本身的引用以进行救援。可能会导致内存泄漏或任何其他问题?

这是示例代码:

import { Observable, fromEvent } from "rxjs";
import { catchError, mergeMap } from "rxjs/operators";

function foreignStream() {
  return new Observable(observer => {
    observer.next(0);
    observer.next(1);
    observer.next(2);
    observer.error("error");
  });
}

const stream$ = fromEvent(document, "click").pipe(
  mergeMap(foreignStream),
  catchError(x => {
    console.log(x);
    return stream$;
  })
);
stream$.subscribe(
  console.log,
  x => console.log("err" + x),
  () => console.log("complete")
);

【问题讨论】:

    标签: angular rxjs rxjs-pipeable-operators


    【解决方案1】:

    虽然该解决方案看起来确实很有趣,但它实际上会导致在每个错误上都有新的订阅,而旧的订阅一直处于打开状态。您可以改用retryretryWhen 来重新启动可观察源,以防出现错误。

    试试下面的

    RxJS retry

    const stream$ = fromEvent(document, "click").pipe(
      mergeMap(foreignStream),
      mergeMap(foreignStream2),
      mergeMap(foreignStream3),
      retry() // <-- retry immediately infinite times
    );
    
    const stream$ = fromEvent(document, "click").pipe(
      mergeMap(foreignStream),
      mergeMap(foreignStream2),
      mergeMap(foreignStream3),
      retry(5) // <-- retry immediately 5 times on errors and complete
    );
    

    RxJS retryWhen

    const stream$ = fromEvent(document, "click").pipe(
      mergeMap(foreignStream),
      mergeMap(foreignStream2),
      mergeMap(foreignStream3),
      retryWhen(error => error.pipe(delay(5000))) // <-- retry after 5 seconds infinite times
    );
    

    【讨论】:

    • 重试运算符绝对是更好的解决方案。我不怀疑它可以在没有参数的情况下用于无限重试,但这是我在阅读文档时的疏忽。关于返回对主流的引用,正如您所说。来自文档的引用:“一个函数将 err 作为参数,它是错误,并被捕获,它是源 observable,以防你想通过再次返回它来“重试”那个 observable。”谢谢回答
    猜你喜欢
    • 1970-01-01
    • 2020-10-07
    • 2021-09-04
    • 1970-01-01
    • 2011-05-08
    • 2019-02-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多