【问题标题】:How to run Finally() when all Observable async items are finished processing?当所有 Observable 异步项都完成处理后,如何运行 finally()?
【发布时间】:2019-10-28 16:30:23
【问题描述】:

为了让 UI 响应更快,我分批输出项目。这里的问题是OnFinally() 在最后一个OutputItems() 完成之前被调用。

    IObservable<IList<xx>> obs = Observable
     .Interval(TimeSpan.FromSeconds(.1), Scheduler.Default)
       .Zip(dirEnum.ToObservable(NewThreadScheduler.Default)
         .Buffer(100), (a, b) => b)
     .ObserveOn(syncContext).Finally(OnFinally);
    ...
    obs.Subscribe(async x => await OutputItems(x));

有没有办法在所有项目都用完时调用 OnFinally?

【问题讨论】:

  • 我认为这是预期的执行。 .Finally() 每批运行,因为这是每个订阅的延续。也许您不想使用.Finally(),而是想捕获一组任务并使用Task.WaitAll() 等待所有任务完成。

标签: c# linq async-await system.reactive


【解决方案1】:

将缓冲区直接投影到您的 OutPutItems 并且不要在 Subscription 中使用 await

IObservable<IList<xx>> obs = Observable
     .Interval(TimeSpan.FromSeconds(.1), Scheduler.Default)
       .Zip(dirEnum.ToObservable(NewThreadScheduler.Default)
         .Buffer(100), (a, b) => b)
     .ObserveOn(syncContext).Finally(OnFinally)
     .SelectManay(X=>OutputItmes().ToObservable())
     .Finally(OnFinally);
    ...
    obs.Subscribe();

【讨论】:

    【解决方案2】:

    Subscribe 方法不接受异步委托,因此您的 lambda 是异步无效的。 Async void 方法不可观察或不可等待,并且它们的异常不可捕获(它们未处理并使进程崩溃)。一种解决方案是将您的批次投影到Tasks 批次,然后使用Merge 方法返回到批次。当您Subscribe 时,您将按照完成的顺序(可能不是原始顺序)获得任务的结果。如果您对结果不感兴趣,只需拨打Subscribe 不带参数即可。

    IObservable<IList<xx>> obs = Observable
        .Interval(TimeSpan.FromSeconds(.1), Scheduler.Default)
        .Zip(dirEnum.ToObservable(NewThreadScheduler.Default)
        .Buffer(100), (a, b) => b)
        .Select(x => OutputItemsAsync(x))
        .Merge()
        .ObserveOn(syncContext)
        .Finally(OnFinally);
    
    obs.Subscribe();
    

    【讨论】:

      猜你喜欢
      • 2016-12-14
      • 1970-01-01
      • 1970-01-01
      • 2012-08-10
      • 1970-01-01
      • 2015-08-08
      • 2023-03-12
      • 1970-01-01
      • 2020-02-22
      相关资源
      最近更新 更多