【发布时间】:2015-06-02 01:44:39
【问题描述】:
我正在关注这个问题的答案:Rx extensions: Where is Parallel.ForEach?,以便使用 Rx 并行运行多个操作。
我遇到的问题是它似乎为 每个 请求分配了一个新线程,而使用 Parallel.ForEach 的效果要少得多。
我正在并行运行的进程非常占用内存,因此如果我尝试一次处理数百个项目,则提供给链接问题的答案很快就会发现我的内存不足。
有没有办法可以修改该答案以限制在任何给定时间完成的项目数量?
我查看了Window 和Buffer 操作,我的代码如下所示:
return inputs.Select(i => new AccountViewModel(i))
.ToObservable()
.ObserveOn(RxApp.MainThreadScheduler)
.ToList()
.Do(l =>
{
using (Accounts.SuppressChangeNotifications())
{
Accounts.AddRange(l);
}
})
.SelectMany(x => x)
.SelectMany(acc => Observable.StartAsync(async () =>
{
var res = await acc.ProcessAsync(config, m, outputPath);
processed++;
var prog = ((double) processed/inputs.Count())*100.0;
OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
OverallProgress.Progress.OnNext(prog);
return res;
}))
.All(x => x);
理想情况下,我希望能够将其批处理成多个帐户视图模型,然后我调用 ProcessAsync 方法,并且只有在所有批处理完成后才能继续。
理想情况下,我希望即使只有一个批次完成,它也会继续前进,但始终保持相同的批次大小。
因此,如果我有一批 5 和 1 完成,我希望开始另一个,但只有一个,直到有更多可用空间。
【问题讨论】:
标签: c# parallel-processing system.reactive reactiveui