【问题标题】:How can I avoid multiple nested subscriptions using RxJS operators?如何避免使用 RxJS 运算符的多个嵌套订阅?
【发布时间】:2019-08-20 07:27:44
【问题描述】:

我正在使用 Angular 进行文件加密和上传类。其中许多操作都是异步的,因此我编写的方法返回的是 RxJS Observables。

// 1.
private prepareUpload(file): Observable<T>;

// 2.
private encryptData(data, filekey): Observable<T>

// 3.
private uploadEncryptedData(formData, token, range): Observable<T>

// 4.
private completeUpload(updatedFilekey, token): Observable<T>

我想将这个逻辑封装在一个公共的upload(file) 方法中,我最终使用了嵌套订阅并且它有效,但我知道它是错误的,并且出于多种原因在 RxJS 中是一种反模式。这是代码的简化版本:

public upload(file) {
    const gen = this.indexGenerator(); // generator function

    this.prepareUpload(file).subscribe(values => {
    const [response, filekey, data] = values;

    this.encryptData(data, filekey).subscribe(encryptedDataContainer => {
      const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
      const range = this.getRange(file.size, gen.next().value);

      this.uploadEncryptedData(formData, response.token, range).subscribe(() => {
        if (range.isFinalPart) {
            this.completeUpload(encryptedDataContainer.updatedFilekey, response.token).subscribe(console.log);
        }
      });

    });

  });

}

我未能使用多个 RxJS 运算符的组合来清理此代码。我的目标是避免嵌套订阅,而是在工作流完成时从公共 upload() 方法返回单个 Observable。

谢谢!

【问题讨论】:

标签: angular rxjs6


【解决方案1】:

您可以使用 RxJs 中的 mergeMapfilter 运算符并链接您的调用。您需要创建一些函数级变量以在链接期间使用。

import { mergeMap, filter, catchError } from 'rxjs/operators`
public upload(file) {
    const gen = this.indexGenerator(); // generator function
    let range, token;
    this.prepareUpload(file)
      .pipe(
        mergeMap((values) => {
          const [response, filekey, data] = values;
          token = response.token;
          return this.encryptData(data, filekey);
        }),
        mergeMap(encryptedDataContainer => {
          const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
          range = this.getRange(file.size, gen.next().value);

          return this.uploadEncryptedData(formData, token, range);
        }),
        filter(() => !!range.isFinalPart),
        mergeMap(() => {
          return this.completeUpload(encryptedDataContainer.updatedFilekey, token);
        })
        catchError((error) => {
          console.log(error);
          // handle the error accordingly.
        })
      )
      .subscribe(() => {
        console.log('success');
      });

  }

【讨论】:

  • 这里没有错误处理 - 否则看起来相当健康
  • 也添加了错误处理。感谢您的建议。
  • 感谢@MuhammadAhsanAyaz,这正是我想要的!我只需要在rangetoken 变量范围旁边声明encryptedDataContainer 变量即可使其工作。我还需要用concatMap 替换mergeMap。但这是由于加密上传是如何工作的,与通用解决方案无关。
  • 真棒@Benny1158。很高兴我能帮上忙。
【解决方案2】:

您想在订阅前使用管道。管道允许您在流发出之前对流中的值进行更改。另外,使用mergeMap 来扁平化订阅链。这是一个概述。这并没有提供一个完整的解决方案——没有给我足够的报酬;)——但足以为您指明正确的方向:

this.prepareUpload(file).pipe(
  tap(values => console.log('hello1', values)),
  map(values => [response, filekey, data]),
  tap(values => console.log('hello2', values)),
  mergeMap(() =>
      // essential to catchError else an HTTP error response will disable this effect - if it uses HTTP - catchError essentially prevents the stream from erroring in which case it will never emit again
      this.encryptData(data, filekey).pipe(
        map(res => res), // do something with res here if you want
        catchError(() => {
          return of(null)
        })
      )
    ),
    filter(res => !!res)
    // more mergemap stuff here
  ).subscribe(values => {
    console.log(values)
  })

提示:使用 tap 操作符来 console.log 值,因为它们正在向下传递

PS:语法未检查,可能缺少逗号或括号或 2

PPS:管道中的函数都是RxJS操作符

【讨论】:

    【解决方案3】:

    您可以使用 mergeMap rxjs 运算符合并这些可观察对象,并摆脱嵌套订阅。

    虽然有一个问题, 请注意,由于 mergeMap 一次维护多个活动的内部订阅,因此可能会通过长期存在的内部订阅造成内存泄漏。

    供参考和示例: https://www.learnrxjs.io/operators/transformation/mergemap.html

    【讨论】:

      【解决方案4】:

      我认为链接你的 observables 就可以了,你可以用 flatMap(mergeMap 的别名)可能 - https://stackoverflow.com/a/37777382/9176461RxJS Promise Composition (passing data)

      正如我在评论中提到的,类似以下的内容应该可以工作(伪代码):

      public upload(file) {
          const gen = this.indexGenerator(); // generator function
      
          return Rx.Observable.just(file).pipe(
               mergeMap(this.prepareUpload),
               mergeMap(this.encryptData),
               mergeMap(this.prepareEncDataUpload),
               mergeMap(this.prepareEncDataUpload),
               .... )
      }
      

      【讨论】:

      • 使用管道是新的 RxJS 格式 - 这看起来像旧的学校格式 - 而且 flatMap 不是已弃用吗?
      猜你喜欢
      • 2020-08-18
      • 2019-10-03
      • 2020-05-31
      • 2022-06-22
      • 2023-01-11
      • 2023-01-30
      • 2018-11-27
      • 2021-03-05
      • 2022-08-25
      相关资源
      最近更新 更多