【问题标题】:Rxjs observable with Inner subscription taking long time and outer observable executed firstly in subsequent requestsRxjs 可观察,内部订阅需要很长时间,外部可观察在后续请求中首先执行
【发布时间】:2020-02-26 00:08:30
【问题描述】:

我有一个启动 Observable 的按钮,然后我订阅结果并嵌套我有一个依赖于之前的 Observable 结果的订阅。

this.getData(params).subscribe(result => {
    // Make some first level process
    console.log('[Outter Execution]: ',result);

    this.getInnerData(result).subscribe(res => {
        // Make some inner level process
        console.log('[Inner Execution]: ',res);
    });
});

取决于用户的点击速度,顺序不一样:

//User click slowly

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]

//User start clicking quickly

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Outer Execution]
[Outer Execution]

[Inner Execution]
[Inner Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]

如您所见,如果嵌套订阅耗时较长,并且用户再次点击内订阅未解决,则在解决内执行之前,第一个[Outer Execution]消息被注销。一段时间后,解决了之前长时间的内部订阅,并返回了记录的消息。

我尝试使用 switchMapmergeMap 运算符但没有成功。

[已编辑]:我需要的功能,是作为一个块执行,并且需要在第一次执行完成后执行下一个外部执行(外部和内部订阅)。

【问题讨论】:

  • 问题和预期行为是什么?
  • 我需要同时执行(外部和内部),并且需要在第一次完整执行后执行下一个外部执行(外部和内部订阅)
  • 您在使用 switchMap 时走在了正确的轨道上。在订阅中拥有订阅通常不是一个好主意,因为很难管理订阅并确保它们被正确取消订阅。

标签: rxjs nested observable subscription


【解决方案1】:

使用 RxJS 的方法有很多,如果没有具体的示例,很难为您提供具体的答案。我能做的最好的就是从我的代码中为您提供一个示例,并希望它接近您想要完成的目标。

我与他们的供应商有产品。当我得到一个产品时,它有一系列供应商 ID。所以我需要获取产品,然后在该操作中获取供应商。 (听起来和你的场景相似?)

这是我的代码:

  selectedProductSuppliers$ = this.selectedProduct$
    .pipe(
      filter(selectedProduct => Boolean(selectedProduct)),
      switchMap(selectedProduct =>
        forkJoin(selectedProduct.supplierIds.map(supplierId => this.http.get<Supplier>(`${this.suppliersUrl}/${supplierId}`)))
      ),
      tap(suppliers => console.log('product suppliers', JSON.stringify(suppliers)))
    );

此代码使用switchMap 来确保如果用户单击多次,它将切换到最新的选择。

然后它使用forkJoin 处理每个供应商 ID,发出 http get 请求以检索每个供应商数据并将它们加入一个发出的数组中。

返回值为Observable&lt;Supplier[]&gt;

你可以在这里找到一个完整的例子:https://github.com/DeborahK/Angular-RxJS

更新:

如果您将上述代码更改为使用concatMap 而不是switchMap,它将处理每个请求,等待一个完成后再执行下一个。

【讨论】:

  • 我刚刚应用了这个过程,但我需要完成所有前面的块(主要和依赖的可观察对象,在你的示例中,主要是 selectedProduct$,依赖是每个供应商Ids 的 HTTP 获取)。顺序应该是,等待第一个序列(主要和依赖)完成,然后再继续第二个 clic 进程(我不能取消前一个或延迟依赖进程)。但是当用户快速点击时,main observable 会被执行,稍后,当依赖完成时,它会显示依赖结果。
  • 因此,如果他们快速点击,您仍想处理每个点击,而不仅仅是最后一次(最近的)点击?如果是这样,那么您可以尝试concatMap 而不是switchMap
  • 我刚刚创建了一个所需用法的示例,其中包含一些数据来说明此过程stackblitz.com/edit/angular-b6hia7(app.component.ts)如果您单击一个并等待,则该过程是正确的,但是如果您尽管我使用了 mergeMap 或 concatMap (如您更新它),但快速单击几次该过程不是连续的。
  • 你的依赖进程是否只发出一项?如果是这样,您不需要forkJoin。在我上面的示例中,每个产品选择都发出 多个 http 请求以获取产品的供应商列表。此外,您的流程是同步的还是异步的? of 同步发射。
  • 依赖进程以异步方式发出一个项目或数组。我更新了 stackblitz 以避免使用 of 。 main 和dependent 是异步的,但完整的过程需要作为一个块。
【解决方案2】:

为确保处理等待,您需要在主 Observable 上使用concatMap,而不是在依赖的 Observable 上。否则不等待集合。

我在您的最新更改之前进行了分叉,但这是我想出的似乎可行的方法:

  private clickSubject = new Subject<number>();
  clickAction$ = this.clickSubject.asObservable();

  ngOnInit() {
    this.clickAction$
      .pipe(
        concatMap(value => this.mainProcess(value)
          .pipe(
            mergeMap(x => this.dependantProcess(x))
          )
        )
      )
      .subscribe();
  }

  onClick(value) {
    // Emits a value into the action stream
    this.clickSubject.next(value);
  }

  mainProcess(value) {
    console.log("[Emitted] Main", value);
    return of(value).pipe(delay(10));
  }

  dependantProcess(value) {
    console.log("[Emitted] Dependent", value);
    return of(value).pipe(delay(2000));
  }

注意concatMap是用来等待mainProcess的。

  • 此代码通过使用 RxJS Subject 定义操作流来响应用户点击。

  • 每次用户单击按钮时,都会发出操作流。

  • 动作流管道使用concatMap缓存请求并等待处理,直到处理完之前的请求。

  • 当主进程发出时,从属进程执行。

  • 当管道完成(主进程及其依赖进程)后,下一个缓存请求将由concatMap 处理。

有意义吗?

你可以在这里找到更新的代码:

https://stackblitz.com/edit/angular-dependent-order-deborahk

【讨论】:

  • 它就像一个魅力。我将审查您的流程和课程,以便更好地了解一些复杂的用例。
猜你喜欢
  • 2018-06-23
  • 1970-01-01
  • 1970-01-01
  • 2017-04-13
  • 2019-06-21
  • 1970-01-01
  • 1970-01-01
  • 2022-12-18
  • 1970-01-01
相关资源
最近更新 更多