【问题标题】:RXJS Combining multiple observables inside a pipeRXJS 在管道内组合多个可观察对象
【发布时间】:2019-10-31 04:32:39
【问题描述】:

我有一个返回一定数量 id 的 API 调用。 这些 id 中的每一个都用于进行新的 api 调用。这些 API 调用的结果需要合并到一个对象中。

起初我在第一个 api 调用的 .pipe(map) 运算符中使用了一个循环。在这个循环中,我进行了第二次 api 调用,并且在每个调用中的 .pipe(map) 运算符中,我将在我的角度组件中编辑一个变量。

这不是很漂亮,我实际上想知道这是否是线程安全的。我知道 javascript 是单线程的,但是让多个异步进程处理同一个全局变量似乎不太安全。

之后,我通过 apiCall1 循环返回的 Id 将第二个 api 调用返回的 observable 存储在一个数组中,并使用 forkJoin 相应地订阅和处理每个结果(参见示例)。

但这不是很漂亮,我想知道是否有一个操作符可以在我的管道中使用?

所以代替(伪代码):

  .pipe(
      map(ids=> {

        let observables = []
        for (const id of ids) {
         observables.push(this.service.getSomeStuff(id));
        }

        forkJoin(...observables).subscribe(dataArray) => {
          for (data of dataArray) {
            //Do something
          }
        });

      }),
      takeWhile(() => this.componentActive),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    )
    .subscribe();

有没有这样的操作符:

  .pipe(
      map(ids=> {

        let observables = []
        for (const id of ids) {
         observables.push(this.service.getSomeStuff(id));
        }

      return observables
      }),
      forkJoin(dataArray => {
          for (data of dataArray) {
            //Do something
          }
        });
      takeWhile(() => this.componentActive),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    )
    .subscribe();

【问题讨论】:

    标签: angular rxjs rxjs-pipeable-operators


    【解决方案1】:
    combineLatest(...observables)
    

    只有在所有 observables 都发射后才会发射,并且您将拥有一个结果数组。

    【讨论】:

      【解决方案2】:

      与其使用 for 立即修改数据,不如在收到数据时这样做?像这样。

      source$.pipe(
        mergeMap(ids => from(ids)),
        mergeMap(id => this.service.getSomeStuff(id)),
        tap(data => //do someting with the data);
        takeWhile(() => this.componentActive),
        catchError(error => {
          console.log(error);
          return throwError(error);
        }),
        toArray(), --> this create the array of all the values received.
      )
      .subscribe(data => //returns array of modified data);
      

      【讨论】:

        【解决方案3】:

        这是你可以做的:

        sourceObservable$.pipe(
          // depends on your need here you can use mergeMap as well
          switchMap(ids => {
            const observables = ids.map(i => this.service.getSomeStuff(id));
            return forkJoin(observables);
          }),
          tap(joined => {
            // joined will be an array of values of the observables in the same
            // order as you pushed in forkJoin
            for (data of joined) {
              // do something
            }
          }),
          takeWhile(() => this.componentActive),
          catchError(error => {
            console.log(error);
            return throwError(error);
          })
        )
        .subscribe();
        

        【讨论】:

        • 最好去掉 tap 并将该代码移到对 subscribe 的调用中。
        • @Brandon 是的,这也是一回事。这只是为了演示在可观察管道中可以做什么。就个人而言,在 Angular 应用程序中,我很少订阅。我大量使用async 管道让 Angular 自己处理订阅管理。尽管在 subscribe() 方法中也绝对没问题。顺便说一句,感谢您的建议..:)
        • 我同意 Brandon 的观点,你不应该使用纯函数的理由很少
        • @Davy 使用tap 操作符回调也是一个纯函数。正如我所说的 - 使用 subscribe() 也绝对没问题。
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2016-06-18
        • 1970-01-01
        • 2018-02-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-02-01
        相关资源
        最近更新 更多