【问题标题】:Use RxJS pipe() to turn array into stream of asynchronous values in Angular使用 RxJS pipe() 将数组转换为 Angular 中的异步值流
【发布时间】:2022-01-16 07:55:40
【问题描述】:
type Movie = {id: string};
type FullMovie = {id: string, picture: string};

我有一个返回 Movie 类型数组的 url:

http.get(url).subscribe(res: Movie[])

我将http.get(movie.id) 用于数组中的每个 电影,返回FullMovie

http.get(movie.id).subscribe(res: FullMovie)

所以本质上我想创建一个返回 FullMovie 对象流的方法,因为请求解析:getAll = (url): Observable<FullMovie>

getAll = (url): Observable<FullMovie> => {
  return http.get(url)
    //must pipe the array into a stream of FullMovies but not a stream of FullMovie Observables. I don't want to subscribe to each of the returned FullMovies
    //something like
   .pipe(//map(array => array.forEach(movie => return http.get(movie.id))))
}

目前我有以下可行的解决方案,但我想要一个更简洁的解决方案:

 private getFull = (queryGroup: string): Observable<TMDBMovie> =>
    new Observable<TMDBMovie>((observer) => {
      //get movie array
      this.httpGet(queryGroup).subscribe((movies) => {
        var j = 0;

        if (movies.length === 0) return observer.complete();

        //loop through elements
        movies.forEach(movie => {
          this.getById(movie.id).subscribe(
            (res) => complete(observer.next(res)),
            (error) => complete()
          );
        });
          
        }

        const complete = (arg: any = 0) => {
          if (++j === len) observer.complete();
        };
      });
    });

编辑:

这行得通

newGetFull = (queryGroup: string) =>
    this.httpGet(queryGroup)
      .pipe(concatMap((arr) => from(arr)))
      .pipe(
        mergeMap((movie) => this.getById(movie.id).pipe(catchError(() => of())))
      );

【问题讨论】:

    标签: javascript rxjs observable rxjs-pipeable-operators


    【解决方案1】:

    你可能想尝试这些方面的东西

    getAll = (url): Observable<FullMovie> => {
      return http.get(url)
       .pipe(
          // turn the array Movie[] into a stream of Movie, i.e. an Obsevable<Movie>
          concatMap(arrayOfMovies => from(arrayOfMovies)),
          // then use mergeMap to "flatten" the various Obaservable<FullMovie> that you get calling http.get(movie.id)
          // in other words, with mergeMap, you turn a stream of Observables into a stream of the results returned when each Observable is resolved
          mergeMap(movie => http.get(movie.id))
       )
    }
    

    考虑到使用上面的mergeMap,您不能保证最终流的顺序与您从第一次调用中获得的Movies 数组的顺序相同。这是因为每个http.get(movie.id)可能需要不同的时间才能返回,因此无法保证订单。

    如果需要保证顺序,请使用concatMap而不是mergeMap(实际上concatMapmergeMap并发设置为1)。

    如果您希望在返回结果之前完成所有http.get(movie.id),请使用forkJoin,而不是像这样使用mergeMap

    getAll = (url): Observable<FullMovie> => {
      return http.get(url)
       .pipe(
          // turn the array Movie[] into an array of Observable<Movie>
          map(arrayOfMovies => arrayOfMovies.map(movie => http.get(movie.id))),
          // then use forkJoin to resolve all the Observables in parallel
          concatMap(arrayOfObservables => forkJoin(arrayOfObservables))
       ).subscribe(
          arrayOfFullMovies => {
            // the result notified by forkJoin is an array of FullMovie objects
          }
       )
    }
    

    【讨论】:

    • 这很好用,谢谢!还有一件事,如果其中一个 http 请求返回 404 错误,整个 Observable 将停止,我应该在哪里使用 catchError() 以便它将继续,因为我编辑中的代码不能以正确的方式工作?
    • 没关系,我把它添加到了正确的位置,谢谢
    猜你喜欢
    • 1970-01-01
    • 2021-10-03
    • 1970-01-01
    • 1970-01-01
    • 2019-01-27
    • 2020-03-26
    • 1970-01-01
    • 1970-01-01
    • 2022-12-20
    相关资源
    最近更新 更多