除了takeUntil,我们还可以使用merge 或race 运算符来处理这种情况。但是它不会改变底层逻辑。
第 1 步:为个人上传创建 Subject
file.cancelUpload$ = new Subject();
第 2 步:将此主题与实际的 http 调用合并
如果任何 observable 发出错误,merge 将完成流。即当我们从cancelUpload$ 主题发出错误时,http 请求将被自动取消(查看网络选项卡)。
return merge(file.cancelUpload$, $http.pipe(...
第 3 步:实际取消代码
cancelUpload(file: any) {
file.uploadStatus = "cancelled";
file.cancelUpload$.error(file.uploadStatus);// implicitly subject gets completed
}
第四步:填写cancelUpload$主题,以防上传错误/成功
这将确保merge 操作将完成,因为两个流现在都已完成。因此forkJoin 将收到响应。
参考https://stackblitz.com/edit/angular-zteeql?file=src%2Fapp%2Fhttp-example.ts
uploadFiles() {
let errorCount = 0,cancelledCount = 0, successCount = 0;
return forkJoin(
this.dummyFiles
.map(file =>
this.uploadFile(file).pipe(
catchError(() => of("Error re-emitted as success")) // value doesn't matter
)
)
).pipe(
map(() => { // map would receive array of files in the order it was subscribed
this.dummyFiles.forEach(file => {
switch (file.uploadStatus) {
case "success": successCount++; break;
case "cancelled": cancelledCount++; break;
case "error": errorCount++; break;
}
});
return { errorCount, successCount, cancelledCount };
})
);
}
uploadFile(file: any) {
const formData = new FormData();
const binaryContent = new Blob([Array(1000).join("some random text")], {
type: "text/plain"
}); // dummy data to upload
formData.append("file", binaryContent);
const $http = this.http.post(this.uploadUrl, formData, {
reportProgress: false
// observe: 'events',
// withCredentials: true
});
file.cancelUpload$ = new Subject();
file.uploadStatus = "inProgress";
return merge(
file.cancelUpload$,
$http.pipe(
tap(data => {
file.uploadStatus = "uploaded";
file.cancelUpload$.complete();
}),
catchError(event => {
file.uploadStatus = "error";
file.cancelUpload$.complete();
return throwError("error");
})
)
);
}