【问题标题】:How to manually complete the angular http observable如何手动完成 angular http observable
【发布时间】:2020-01-24 17:42:49
【问题描述】:

我正在开发能够删除/取消正在进行的 http 调用的多个并行文件上传功能。一旦所有呼叫完成/取消,我就会通知消费者。

为此,我使用forkJoin 组合单个http observables。但是如果用户点击取消按钮,我不应该等待实际的 http 响应完成。

takeUntill 不会优雅地处理它,因为它只会在从底层源 http 流接收到下一个值后才会执行。

this.uploadFiles().subscribe(data => {
  console.warn("Upload completed now!!!!!", data);
});

uploadFiles() {
  return forkJoin(
    this.files.map(file => // returns array of observable
        this.uploadFile(file).pipe( catchError(() => of("Error re-emitted as success to prevent early exit"))))
  ).pipe(map(() => {
       // logic for some user friendly statistic
      return data;
    }));
}

【问题讨论】:

    标签: angular rxjs httpclient angular8


    【解决方案1】:

    使用 takeUntil 和 Subject 作为通知者来完成 Observables。您可以将文件 id 传递给 Subject 并在 takeUntil 中使用 filter 以仅取消具有给定 id 的文件的文件上传。

    使用defaultIfEmpty 提供一个指示已取消请求的值。这也可以防止外部 forkJoin 在内部空请求完成时立即完成。

    private cancelUpload$ = new Subject<number>();
    
    uploadFiles() {
      let errorCount = 0, cancelledCount = 0, successCount = 0;
      return forkJoin(this.dummyFiles.map(file =>
        this.uploadFile(file).pipe(
          // react to CANCEL event
          map(response => response == 'CANCEL' ? ++cancelledCount : ++successCount),
          catchError(() => of(++errorCount))
        )
      )).pipe(map(() => ({ errorCount, successCount, cancelledCount })));
    }
    
    uploadFile(file: any) {
      http$.pipe(
        ...
        takeUntil(this.cancelUpload$.pipe(filter(id => id == file.id))), // cancel
        defaultIfEmpty('CANCEL'), // provide value when cancelled
      )
    }
    
    cancelUpload(file: any) {
      file.uploadStatus = "cancelled";
      this.cancelUpload$.next(file.id) // cancel action
    }
    

    https://stackblitz.com/edit/angular-zteeql-e1zacp

    【讨论】:

    • Take until 用于忽略源 observable 并将等待源 observable(在这种情况下为 http.post),因此我们不能立即聚合取消状态。
    • 在我的 stackblitz 中,forkJoin 总是成功完成,因为我将单个可观察到的错误作为成功发出。
    • @Amitesh takeUntil 立即完成源 observable 并且不等待 http 请求发出一些东西。如果takeUntil 完成源defaultIfEmpty 立即发出一个默认值,指示取消事件。您会立即收到有关取消事件的通知,并可以在此处this.uploadFile(file).pipe(tap(response =&gt; response == 'CANCEL' ? /* react to CANCEL event */ )) 采取行动。 forkJoin 当然仍然只会在所有内部文件上传完成后发出。
    • 您对takeUntil 的看法是正确的。我在没有验证的情况下被误解了:P
    【解决方案2】:

    将订阅分配给变量并使用按钮取消它:

    $http: Subscription;
    this.$http = this.http.post(this.uploadUrl, formData, {
          reportProgress: false
          // observe: 'events',
    });
    

    而在取消功能中:

    cancelUpload(file: any) {
      file.uploadStatus = "cancelled"; 
      this.$http.unsubscribe();
    }
    

    【讨论】:

      【解决方案3】:

      除了takeUntil,我们还可以使用mergerace 运算符来处理这种情况。但是它不会改变底层逻辑。

      第 1 步:为个人上传创建 Subject

      file.cancelUpload$ = new Subject();
      

      第 2 步:将此主题与实际的 http 调用合并

      如果任何 observable 发出错误,merge 将完成流。即当我们从cancelUpload$ 主题发出错误时,http 请求将被自动取消(查看网络选项卡)。

      return merge(file.cancelUpload$, $http.pipe(...
      

      第 3 步:实际取消代码

      cancelUpload(file: any) {
        file.uploadStatus = "cancelled";
        file.cancelUpload$.error(file.uploadStatus);// implicitly subject gets completed
      }
      

      第四步:填写cancelUpload$主题,以防上传错误/成功

      这将确保merge 操作将完成,因为两个流现在都已完成。因此forkJoin 将收到响应。

      参考https://stackblitz.com/edit/angular-zteeql?file=src%2Fapp%2Fhttp-example.ts

        uploadFiles() {
          let errorCount = 0,cancelledCount = 0, successCount = 0;
          return forkJoin(
            this.dummyFiles
              .map(file =>
                this.uploadFile(file).pipe(
                  catchError(() => of("Error re-emitted as success")) // value doesn't matter
                )
              )
          ).pipe(
            map(() => { // map would receive array of files in the order it was subscribed
              this.dummyFiles.forEach(file => {
                switch (file.uploadStatus) {
                  case "success": successCount++; break;
                  case "cancelled": cancelledCount++; break;
                  case "error": errorCount++; break;
                }
              });
              return { errorCount, successCount, cancelledCount };
            })
          );
        }
      
        uploadFile(file: any) {
          const formData = new FormData();
          const binaryContent = new Blob([Array(1000).join("some random text")], {
            type: "text/plain"
          }); // dummy data to upload
          formData.append("file", binaryContent);
          const $http = this.http.post(this.uploadUrl, formData, {
            reportProgress: false
            // observe: 'events',
            // withCredentials: true
          });
          file.cancelUpload$ = new Subject();
          file.uploadStatus = "inProgress";
          return merge(
            file.cancelUpload$,
            $http.pipe(
              tap(data => {
                file.uploadStatus = "uploaded";
                file.cancelUpload$.complete();
              }),
              catchError(event => {
                file.uploadStatus = "error";
                file.cancelUpload$.complete();
                return throwError("error");
              })
            )
          );
        }
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-11-05
        • 2018-09-18
        • 2015-06-14
        • 2019-06-03
        • 2018-12-18
        • 1970-01-01
        • 2020-03-26
        • 1970-01-01
        相关资源
        最近更新 更多