【问题标题】:rxjs: how to make foreach loop wait for inner observablerxjs:如何让 foreach 循环等待内部 observable
【发布时间】:2020-12-22 23:11:59
【问题描述】:

我有一个 foreach 循环,我可能需要一个 http 请求来获取一些信息。我尝试在 foreach 循环中使用 forkjoin 来“使 foreach 循环等待 observables”(在这个例子中只有一个 observables,但实际上我会有更多......)。但是有了这段代码,foreach 每个都继续运行,而无需等待可观察对象完成,我找不到任何解决方案来防止这种情况发生。

非常感谢您的帮助!!!

    ...
    let wishes: Wish[] = [];
    response.wishes.forEach((wish) => {
          let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
                   
          let personObservables: Observable<any>[] = [];
          if (wish.reservation){
            personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy)); 
          } else {
            personObservables.push(of(null));
          }
          forkJoin(personObservables).subscribe( ([reservedBy]) => {
            if (reservedBy) {
              newWish.reservation = {
                ...wish.reservation,
                reservedBy
              };
            }
            wishes.push(newWish);
          } );
        });
    ...

编辑:没有 foreach 循环的完整工作解决方案。在管道和数组上的映射函数中使用映射运算符要容易得多。我了解到,将这种逻辑拆分为多个运算符更容易,而不是尝试在 1 个地图运算符中修复它...

    public getWishlist ( receiver : Person) : Observable<Wish[]> {
        return this.http$.get<IWishlistResponse[]>(
          environment.apiUrl + 'wishlist/' + receiver.id
        ).pipe(
          
          // Create wish instances from each wish in API response and save reservation for later use
          map( wishlistResponse => 
            wishlistResponse[0].wishes.map(wish => ({
              wish: this.createWishInstanceFromResponse(wish, receiver),
              reservation: wish.reservation
            }))),
          
          // For each wish with reservation: get person info for 'reservedBy' id
          map( wishesAndReservationObjects => wishesAndReservationObjects.map( ({wish, reservation}) => 
            !reservation ? 
            of(wish) : 
            this.peopleService.getPersonById(reservation.reservedBy)
            .pipe(
              map ( reservedBy => {
                if (reservedBy) wish.reservation = { 
                  ...reservation, 
                  reservedBy: new Person(reservedBy.id, reservedBy.firstName, reservedBy.lastName)
                }
                return wish;
              })
            )
           )),
          
          // forkJoin all observables, so the result is an array of all the wishes
          switchMap(reservedByObservables => reservedByObservables.length !== 0 ? forkJoin(reservedByObservables) : of(<Wish[]>[])), //https://stackoverflow.com/questions/41723541/rxjs-switchmap-not-emitting-value-if-the-input-observable-is-empty-array
          
          // Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
          map ( wishes => wishes.map( wish => {
            wish.setUserIsFlags(this.userService.currentUser);
            return wish;
          })),
          
          // For each wish: get state via API call
          map ( wishesWithoutState => wishesWithoutState.map( wishWithoutState => 
            this.http$.get<wishStatus>(environment.apiUrl + 'wish/' + wishWithoutState.id + '/state')
            .pipe(
              catchError(() => of(null)),
              map( state => {
                wishWithoutState.status = state;
                return wishWithoutState;
              })
            )
          )),
          
          // Combine all stateObservables into 1 array
          switchMap(stateObservables => stateObservables.length !== 0 ? forkJoin(stateObservables) : of(<Wish[]>[]))
        )
      }
    
      private createWishInstanceFromResponse ( wishResponse : IWishResponse, receiver: Person ) : Wish {
        let wish : Wish = new Wish (
          wishResponse._id,
          wishResponse.title,
          wishResponse.price,
          ...
        );
        return wish;
      }

【问题讨论】:

  • 也许完全从循环中删除forkjoin?所以先建数组,再运行fork?

标签: angular foreach rxjs angular2-observables


【解决方案1】:

作为一般规则,除非您是最终消费者并且准备好丢弃 observable(不需要对数据进行更多转换),否则永远不要对 observable 调用 subscribe。这通常只在向用户显示或写入数据库时​​发生。

您的代码在做什么(您的各种调用返回什么)对我来说不是很清楚。所以这是我将如何做你似乎正在尝试的近似值。

按照这些思路应该可以工作。

请注意,在我准备好将最终的愿望清单记录到控制台之前,我是如何从不订阅的?这是设计使然,因此信息如预期一样。

return this.http$.get<IWishlistResponse[]>(
  environment.apiUrl + 
  'wishlist/' + 
  receiver.id
).pipe(
  map(wishlist => wishlist.wishes.map(wish => ({
    oldWish: wish,
    newWish: new Wish( wish._id, wish.title, wish.price)
  }))),
  map(wishes => wishes.map(({oldWish, newWish}) => 
    !oldWish.reservation ? 
    of(newWish) :
    this.peopleService.getPersonById(oldWish.reservation.reservedBy).pipe(
      map(([reservedBy]) => reservedBy ?
        {
          ...newWish, 
          reservation: {
            ...oldWish.reservation,
            reservedBy
          }
        } :
        {
          ...newWish,
          reservation: oldWish.reservation
        }
      )
    )
  )),
  switchMap(newWishObservables => forkJoin(newWishObservables))
).subscribe(newWishes => console.log(newWishes));

更新 #1:清理代码

使代码更易于维护、错误检查等的一些措施是将可观察对象的创建外包到单独的函数中,并保持转换逻辑的“干净(更)”管道。

一旦你这样做了,这个模式:

map(things => things.map(thing => observableThing)),
mergeMap(observableThings => forkJoin(observableThings))

可以组合起来而不会造成太多混乱

mergeMap(things => forkJoin(
  things.map(thing => observableThing)
))

这是我可能会如何整理更新代码中的管道(这只是在运行中快速完成,未经测试)。

您会注意到我删除了处理空forkJoins 的代码。我认为这不是一个真正的问题,您可以在 observable 完成后在管道中解决该问题。

您还会注意到,我在创建 observable 的两个函数中都做了一些快速错误处理。这两次我都只是替换了一个默认值(尽管在管道的不同部分,效果略有不同)。在实践中,您可能需要检查错误的名称或类型并做出相应的响应。

我只是忽略了错误,但有些错误不应该被悄悄地忽略。这取决于你。

public getWishlist ( receiver : Person) : Observable<Wish[]> {
  return this.http$.get<IWishlistResponse[]>(
    environment.apiUrl + 'wishlist/' + receiver.id
  ).pipe(

    // Create wish instances from each wish in API response and save reservation for later use
    map( wishlistResponse => 
      wishlistResponse[0].wishes.map(wish => ({
        wish: this.createWishInstanceFromResponse(wish, receiver),
        reservation: wish.reservation
      }))
    ),
    
    // For each wish with reservation: get person info for 'reservedBy' id
    map(wishesAndReservationObjects => 
      wishesAndReservationObjects.map(({wish, reservation}) => 
        this.addReservationToWish(wish, reservation)
      )
    ),
    
    // forkJoin all observables, so the result is an array of all the wishes
    switchMap(reservedByObservables => 
      forkJoin(reservedByObservables)
    ),
    
    // Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
    map (wishes => wishes.map(wish => {
      wish.setUserIsFlags(this.userService.currentUser);
      return wish;
    })),
    
    // For each wish: get state via API call
    map(wishesWithoutState => wishesWithoutState.map(wishWithoutState => 
      this.addStatusToWish(wishWithoutState)
    )),
    
    // Combine all stateObservables into 1 array
    switchMap(stateObservables =>
      forkJoin(stateObservables)
    ),

    // If this.http$.get<IWishlistResponse[]> is empty, this will all
    // complete without emitting anything. So we can just give it a default
    // thing to emit in case that happens.
    defaultIfEmpty(<Wish[]>[])
  );
}

private createWishInstanceFromResponse( 
  wishResponse : IWishResponse, 
  receiver: Person 
): Wish {
  let wish : Wish = new Wish (
    wishResponse._id,
    wishResponse.title,
    wishResponse.price
  );
  return wish;
}

/***
 * Create an observable that tries to add reservation and 
 * reservedBy to the wish
 ***/ 
private addReservationToWish(wish: Wish, reservation): Observable<Wish>{
  return this.peopleService.getPersonById(reservation?.reservedBy).pipe(
    // if peopleService.getPersonById fails (Say, because reservation was 
    // null), convert the error into a null response, the filter will Ignore
    // the failed call and just return wish.
    catchError(_ => of(null)),
    // Since errors become null, this filters errors. Furthermore if peopleService
    // doesn't error and just emits a null/undefined value instead, this will
    // filter that situation as well
    filter(reservedBy => reservedBy != null),
    // Now we know reservedBy isn't null, so map doesn't need to check
    map(reservedBy => { 
      wish.reservation = reservation;
      wish.reservation.reservedBy = new Person(
        reservedBy.id, 
        reservedBy.firstName, 
        reservedBy.lastName,
        ...
      );
      return wish;
    }),
    // If reservedBy was null (due to error or otherwise), then just emit
    // the wish without a reservation
    defaultIfEmpty(wish)
  );
}

/***
 * Create an observable that tries to add status to the wish
 ***/ 
private addStatusToWish(wish: Wish): Observable<Wish>{
  return this.http$.get<WishStatus>(
    environment.apiUrl + 
    'wish/' + 
    wishWithoutState.id + 
    '/state'
  ).pipe(
    map(state => ({
      ...wish,
      status: state
    })),
    // If http$.get<WishStatus> failed, then just return the wish
    // without a status
    catchError(_ => of(wish)),
  );
}

现在 mapforkJoin(mapped) 的转换看起来足够干净,我可能会合并它们,但这取决于你。

如下所示:

public getWishlist ( receiver : Person) : Observable<Wish[]> {
  return this.http$.get<IWishlistResponse[]>(
    environment.apiUrl + 'wishlist/' + receiver.id
  ).pipe(

    // Create wish instances from each wish in API response
    // For each wish with reservation: get person info for 'reservedBy' id
    switchMap(wishlistResponse => forkJoin(
      wishlistResponse[0].wishes.map(wish => 
        this.addReservationToWish(
          this.createWishInstanceFromResponse(wish, receiver), 
          wish.reservation
        )
      )
    )),
    
    // Call method on each wish (with or without reservation) to set user flags in each instance (must be done after reservedBy is added)
    map (wishes => wishes.map(wish => {
      wish.setUserIsFlags(this.userService.currentUser);
      return wish;
    })),
    
    // For each wish: get state via API call
    switchMap(wishesWithoutState => forkJoin(
      wishesWithoutState.map(wishWithoutState => this.addStatusToWish(wishWithoutState))
    )),

    // If this.http$.get<IWishlistResponse[]> is empty, this will all
    // complete without emitting anything. So we can just give it a default
    // thing to emit in case that happens.
    defaultIfEmpty(<Wish[]>[])
  );
}

【讨论】:

  • 我根据您的回答改进了我的代码,现在它运行完美。我还能够按照您的代码风格清理我的代码。所以这是一次很棒的学习经历!我在上面的帖子中添加了完整的解决方案。你能检查一下并提供一些更简洁代码的提示和技巧吗?
  • 您能否告诉我在此管道中添加错误处理的最佳做法是什么?
  • 我已经进行了更新,错误处理是如此具体,以至于您添加的位置很难给出经验法则。一般来说,我喜欢对每个 API 可能返回的常见错误进行一些错误处理,然后再对未知/意外错误进行一些常规处理。
  • 哇,非常感谢您的更新!多亏了你,我获得了很多可观察的知识。非常感谢您付出的努力和时间来如此清楚地解释它(并将我的解决方案重写为更清晰的代码:-D)。请关注我,这样你就可以回答我所有的问题;-)
【解决方案2】:

您使用fromconcatMap 来完成此操作。

let wishes: Wish[] = [];
from(response.wishes).pipe(concatMap(wish) => {
      let newWish : Wish = new Wish( wish._id, wish.title, wish.price );
               
      let personObservables: Observable<any>[] = [];
      if (wish.reservation){
        personObservables.push(this.peopleService.getPersonById(wish.reservation.reservedBy)); 
      } else {
        personObservables.push(of(null));
      }
      return forkJoin(personObservables).pipe(tap([reservedBy]) => {
        if (reservedBy) {
          newWish.reservation = {
            ...wish.reservation,
            reservedBy
          };
        }
        wishes.push(newWish);
      }));
    }).subscribe();

【讨论】:

  • 感谢您的回答,@FanCheung。我试图实现它,但我不知道如何在我的完整代码中工作......这段代码是另一个可观察的管道的一部分:this.http$.get(...).map (response => 上面的代码).switchMap(wishes => ...)。如果我实现上述,switchMap 中的愿望是 void 类型。所以我似乎没有从 map 函数返回任何东西......我的问题中添加了完整代码......
  • 需要知道代码的上下文。好像你是从一个函数返回所有操作。在这种情况下,订阅不应该存在。
  • 是的,我知道,我试过没有订阅,但后来我遇到了新问题。我设法通过整个管道得到愿望,但我无法通过管道得到它们。在我的组件中的订阅中,有几个愿望“到达”但其他愿望没有。我想这与我正在执行的所有异步调用有关。
【解决方案3】:

你可以创建这个函数来处理你的进程:

completeData(items): Observable<any> {
    let $apiCallItems: Observable<any>[] = [];
    const itemsNeedServiceCall: Array<any> = [];
    const itemsCompleted: Array<any> = [];
    items.forEach((item) => {
      if (item.reservation) {
        $apiCallItems.push(this.peopleService.getPersonById(item.reservation.reservedBy));
        itemsNeedServiceCall.push(item);
      } else {
        itemsCompleted.push(item);
      }
    });
    return forkJoin($apiCallItems).pipe(
      map((r) => {
        for (let i = 0; i < r.length; i++) {
          itemsNeedServiceCall[i] = {
            ...itemsNeedServiceCall[i],
            ...r[i],
          };
        }
        return [...itemsCompleted, ...itemsNeedServiceCall];
      })
    );
  }

然后你可以在任何你想使用的地方做这样的事情:

this.completeData(response.wishes).subscribe(r => {
    //some code
})

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-05-09
    • 1970-01-01
    • 2021-04-23
    • 2019-04-15
    • 2019-05-22
    • 1970-01-01
    • 1970-01-01
    • 2021-11-04
    相关资源
    最近更新 更多