【问题标题】:Angular handle multiple dependent subscriptionsAngular 处理多个依赖订阅
【发布时间】:2021-11-19 17:21:02
【问题描述】:

有什么帮助吗?

let notificationsMessages = []
countries.forEach((country: any) => {
    this.isActiveCountry(country.isActive).subscribe((data) => { // // CALL #1 TO API
        country.serverId = data.serverId;
        this.uploadPhotoToApi(country.fileSource).subscribe((response) => { // CALL #2 TO API
          // server return the uploaded file ID
          country.serverFileID = response.serverFileId;
          this.sendCountryToApi(country).subscribe((response) => { // CALL #3 TO API
            this.countriesTable.delete(country.id).then(() => {
              // Delete the uploaded country from local database
              // if is the last country EMIT EVENT
            }, (error) => {
              // if is the last country EMIT EVENT
              notificationsMessages.push(error); // Error to delete country from indexedDB 
            });
          }, (error) => {
              // if is the last country EMIT EVENT
              notificationsMessages.push(error); // // Error to upload country to API
          });
        }, (errorCode) => {
              // if is the last country EMIT EVENT
          notificationsMessages.push(error); // Error on sending file to API
        });
      }, (error) => {
              // if is the last country EMIT EVENT
            notificationsMessages.push(error); // // Error on country identification
      });
  });
  

处理完所有country 列表后,如何发出事件? 我需要知道有多少国家上传成功,有多少没有。

例如,如果我有一个包含 50 个国家/地区的列表,则在处理最后一个国家/地区时,我想发出一个包含 2 个数组的事件……如下所示: 成功:[countryId1, countryId2...] 错误:['Country Id 2 failed on upload', 'Country Id 10 failed on file upload']

所有这 3 个调用都是依赖的,必须按该顺序执行……我无法更改此流程。 我应该在 CALL #3 成功以及所有错误函数上发出事件吗? 谢谢!

【问题讨论】:

  • 尝试 switchMap 运算符
  • 以及我如何知道 forEach 已完成并且所有项目都已处理?
  • 您可以使用 deps 构建一个流,一旦流完成,您的所有项目都会被处理

标签: javascript angular typescript rxjs functional-programming


【解决方案1】:

这是执行此操作的一种方法。这可能有点过头了,因为它可以让您对错误处理进行大量精细控制,但基本上总是以相同的方式处理错误。

即便如此,这可能比最直接的解决方案更容易扩展。

这里:

interface TaggedCountry{
  success: boolean,
  country: any,
  error?: any
}

class ArbiratryClassName {

  processCountry(country: any): Observable<TaggedCountry>{

    return this.isActiveCountry(country.isActive).pipe(
      // country now has serverId set
      map(({serverId}) => ({...country, serverId})),
      catchError(error => throwError(() => ({
        success: false,
        country,
        error
      }) as TaggedCountry)),

      mergeMap((resultCountry: any) => this.uploadPhotoToApi(resultCountry.fileSource).pipe(
        // country now has serverFileId set
        map(({serverFileId}) => ({...resultCountry, serverFileId})),
        catchError(error => throwError(() => ({
          success: false,
          country: resultCountry,
          error
        }) as TaggedCountry))
      )),

      mergeMap((resultCountry: any) => this.sendCountryToApi(resultCountry).pipe(
        // Ignore response from sendCountryToApi
        mapTo(resultCountry),
        catchError(error => throwError(() => ({
          success: false,
          country: resultCountry,
          error
        }) as TaggedCountry))
      )),

      mergeMap((resultCountry: any) => from(this.countriesTable.delete(resultCountry.id)).pipe(
        // Ignore response from countriesTable.delete
        mapTo(resultCountry),
        catchError(error => throwError(() => ({
          success: false,
          country: resultCountry,
          error
        }) as TaggedCountry))
      )),

      map((resultCountry: any) => ({
        success: true,
        country: resultCountry
      }) as TaggedCountry),

      // Convert errors into regular emissions
      catchError((tagged:TaggedCountry) => of(tagged))
    );
  }

  processCountries(countries: any[]): Observable<{success: TaggedCountry[], errors: TaggedCountry[]}>{
    return forkJoin(countries.map(c => this.processCountry(c))).pipe(
      map((tagged: TaggedCountry[]) => ({
        success: tagged.filter(tag => tag.success),
        errors: tagged.filter(tag => !tag.success)
      }))
    )
  }

  doSomethingWith(countries: any[]): void {
    this.processCountries(countries).subscribe({
      next: countries => console.log("All countries processed. Result: ", countries),
      complete: () => console.log("There's only one emission, so this should get called immediately after .next() was called"),
      error: err => console.log("This is a surprise, is there an error we didn't catch earlier? Error: ", err)
    })
  }
}

如果看到以不同的方式完成相同的事情会有所帮助,这里是 processCountry 的较短实现

processCountry(country: any): Observable<TaggedCountry>{

  return this.isActiveCountry(country.isActive).pipe(
    tap((res:any) => country.serverId = res.serverId),

    switchMap(_ => this.uploadPhotoToApi(country.fileSource)),
    tap((res:any) => country.serverFileId = res.serverFileId),

    switchMap(_ => this.sendCountryToApi(country)),
    switchMap(_ => this.countriesTable.delete(country.id)),

    // Tag our result as a success
    map(_ => ({
      success: true,
      country
    }) as TaggedCountry),

    // Tag our errors and convert errors into regular emissions
    catchError(error => of(({
      success: false,
      country,
      error
    }) as TaggedCountry))
  );
}

【讨论】:

  • 谢谢大家!我已经使用了您的第一个示例,并且我在服务中拥有该代码,因为我正在对其进行一些后台同步。当所有同步化完成“下一步:国家=> this.CountrieSyncProcessDoneEvent.emit(countries)”时,我试图做这样的事情来从我的服务向我的组件发出一个事件。但是我已经读过不能从服务中发出事件......我怎样才能让我的组件知道同步何时完成(更新国家表)?谢谢
  • 如果我有 10 个国家/地区处理了事件,我从组件订阅的事件将被调用 10 次:" this.myService.CountrieSyncProcessDoneEvent.subscribe((param: any) => { console.log ('CountrieSyncProcessDoneEvent'); });"
  • 我认为在服务中使用 RxJS 没有问题。不过,你想怎么做这取决于
  • 问题是 this.processCountries(countries).subscribe({ next .... 中的“event.emit”的实现将被调用为不仅处理一次的每个国家/地区.. .. 我想让事件只发出一次
  • 我应该只为所有国家发射一次。这就是forkJoin 的工作原理……不知道发生了什么。它对我有用。
【解决方案2】:

尽量避免将多个.subscribe(s 相互嵌套的诱惑。正如@praveen-soni 提到的,switchMap 可以提供帮助。

然后要在所有国家/地区都已处理后获得状态,我认为forkJoin 非常适合:它接收一个可观察的列表,并在它们全部完成后发出。

如何构建 observables 列表?您最初有一个国家/地区列表,因此您可以将每个国家/地区映射到处理该国家/地区的 observable。我们也可以使用一个catchError,这样错误就不会关闭整个流,而只会关闭该特定国家/地区的一个。

我认为它看起来像:

const result$ = forkJoin(
  countries.map((country) =>
    this.isActiveCountry(country.isActive).pipe(
      switchMap((data) => {
        country.serverId = data.serverId;
        return this.uploadPhotoToApi(country.fileSource);
      }),
      switchMap((response) => {
        country.serverFileId = response.serverFileId;
        return this.sendCountryToApi(country);
      }),
      switchMap(() => {
        return this.countriesTable.delete(country.id);
      }),
      map(() => {
        // Everything went OK, map it to an OK status
        return {
          type: "success",
        };
      }),
      catchError((error) => {
        // If any step fails, map it to an error type
        return of({
          type: "error",
          error,
        });
      }),
      take(1) // Make sure the observable completes
    )
  )
);

// result$ can now be used as any other observable
result$.subscribe(result => {
  console.log(result);
})

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-05-05
    • 2022-01-26
    • 1970-01-01
    • 2020-10-17
    • 1970-01-01
    • 1970-01-01
    • 2016-04-27
    • 1970-01-01
    相关资源
    最近更新 更多