【问题标题】:How to handle progress update using ReactiveX Observables/Subjects?如何使用 ReactiveX Observables/Subjects 处理进度更新?
【发布时间】:2017-09-11 08:48:28
【问题描述】:

我正在编写一个使用 ReactiveX API 来处理异步操作的 Angular 应用程序。我之前在一个 Android 项目中使用过这个 API,我真的很喜欢它如何简化并发任务处理。但是有一件事我不确定如何以正确的方式解决。

如何从正在进行的任务中更新观察者?在这种情况下,任务将花费时间来加载/创建一个复杂/大对象,并且我能够返回中间进度,但不能返回对象本身。 observable 只能返回一种数据类型。因此,我知道两种可能性。

  1. 创建一个具有进度字段和数据字段的对象。这个对象可以简单地用 Observable.onNext(object) 返回。进度字段将在每次 onNext 时更新,而数据字段在最后一次 onNext 之前为空,这会将其设置为加载的值。

  2. 创建两个 observable,一个 data observable 和一个 progress observable。观察者必须订阅可观察的进度以进行进度更新,并订阅可观察的数据以在最终加载/创建数据时得到通知。这些也可以选择压缩在一起以进行一次订阅。

我使用了这两种技术,它们都有效,但我想知道是否有统一的标准,一种干净的方式,如何解决这个任务。当然,它也可以是全新的。我对所有解决方案都持开放态度。

【问题讨论】:

  • 选项 2 更简洁一些,但您当然可以使用 EitherPair 执行选项 1,将这两种类型组合成一个统一的数据类型。
  • 您对进度更新做了什么?第二种解决方案在接收最后的进度更新和接收数据对象之间容易出现竞争条件,因此如果您的逻辑对该排序敏感,您可能无法使用它。
  • 进度更新主要用于向用户显示一个进度条。一个用例是遍历目录结构,读取目录中每个 DICOM(医学图像)文件的标题,并在系列 ID 和图像之间创建映射。另一个用例是通过迭代体积从体积数据创建 3d 网格。我还发现在第二种解决方案中更难跟踪事件执行,即使进度只是一个视觉指示器,所以竞争条件并不那么重要。
  • 如果您只使用图形反馈,您可以考虑在视图渲染器中订阅 Timer 并通过文件迭代器对象状态检查进度(例如,维护一个迭代文件堆栈)。这释放了迭代器以维持其单一职责:从类似于 Promise<TResult> 的操作返回单一结果。
  • 更正:*考虑订阅Interval

标签: rxjs rx-java observable progress reactivex


【解决方案1】:

经过仔细考虑,我使用了 解决方案类似于我的问题中的选项二。 主要的 observable 与实际结果有关 操作。 本例中为 http 请求,但 File 迭代示例类似。 它由“工作”函数返回。

可以通过函数参数添加第二个观察者/订阅者。该订阅者只关心 进度信息。这样所有操作都是空安全的,不需要类型检查。

第二个版本的工作函数,没有进度观察者, 如果不需要进度 UI 更新,可以使用。

export class FileUploadService {

 doWork(formData: FormData, url: string): Subject<Response> {
    return this.privateDoWork(formData, url, null);
 }

 doWorkWithProgress(formData: FormData, url: string, progressObserver: Observer<number>): Subject<Response> {
    return this.privateDoWork(formData, url, progressObserver);
 }

 private privateDoWork(formData: FormData, url: string, progressObserver: Observer<number> | null): Subject<Response> {

     return Observable.create(resultObserver => {
     let xhr: XMLHttpRequest = new XMLHttpRequest();
     xhr.open("POST", url);

     xhr.onload = (evt) => {
         if (progressObserver) {
            progressObserver.next(1);
            progressObserver.complete();
            }
         resultObserver.next((<any>evt.target).response);
         resultObserver.complete()
     };
     xhr.upload.onprogress = (evt) => {
         if (progressObserver) {
            progressObserver.next(evt.loaded / evt.total);
         }

     };
     xhr.onabort = (evt) => resultObserver.error("Upload aborted by user");
     xhr.onerror = (evt) => resultObserver.error("Error");

     xhr.send(formData);
     });
 }

这是一个包含进度订阅者的函数调用。使用此解决方案,上传函数的调用者必须 创建/处理/拆除进度订阅者。

 this.fileUploadService.doWorkWithProgress(this.chosenSerie.formData, url, new Subscriber((progress) => console.log(progress * 100)).subscribe(
    (result) => console.log(result),
    (error) => console.log(error),
    () => console.log("request Completed")
    );

总的来说,我更喜欢这个解决方案,而不是一个订阅的“配对”对象。没有 null 处理需要,并且 我有一个清晰的关注点。

该示例是用 Typescript 编写的,但类似的解决方案应该可以使用其他 ReactiveX 实现。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-15
    • 1970-01-01
    相关资源
    最近更新 更多