【问题标题】:Rx extensions Parallel.ForEach throttlingRx 扩展 Parallel.ForEach 节流
【发布时间】:2015-06-02 01:44:39
【问题描述】:

我正在关注这个问题的答案:Rx extensions: Where is Parallel.ForEach?,以便使用 Rx 并行运行多个操作。

我遇到的问题是它似乎为 每个 请求分配了一个新线程,而使用 Parallel.ForEach 的效果要少得多。

我正在并行运行的进程非常占用内存,因此如果我尝试一次处理数百个项目,则提供给链接问题的答案很快就会发现我的内存不足。

有没有办法可以修改该答案以限制在任何给定时间完成的项目数量?

我查看了WindowBuffer 操作,我的代码如下所示:

return inputs.Select(i => new AccountViewModel(i))
    .ToObservable()
    .ObserveOn(RxApp.MainThreadScheduler)
    .ToList()
    .Do(l =>
    {
        using (Accounts.SuppressChangeNotifications())
        {
            Accounts.AddRange(l);
        }
    })
    .SelectMany(x => x)
    .SelectMany(acc => Observable.StartAsync(async () =>
    {
        var res = await acc.ProcessAsync(config, m, outputPath);
        processed++;
        var prog = ((double) processed/inputs.Count())*100.0;
        OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
        OverallProgress.Progress.OnNext(prog);
        return res;
    }))
    .All(x => x);

理想情况下,我希望能够将其批处理成多个帐户视图模型,然后我调用 ProcessAsync 方法,并且只有在所有批处理完成后才能继续。

理想情况下,我希望即使只有一个批次完成,它也会继续前进,但始终保持相同的批次大小。

因此,如果我有一批 5 和 1 完成,我希望开始另一个,但只有一个,直到有更多可用空间。

【问题讨论】:

    标签: c# parallel-processing system.reactive reactiveui


    【解决方案1】:

    像往常一样,Paul Betts 回答了一个类似的问题,解决了我的问题:

    问题:Reactive Extensions Parallel processing based on specific number

    有一些关于使用Observable.Defer 然后合并成批次的信息,使用我已经修改了我以前的代码,如下所示:

    return inputs.Select(i => new AccountViewModel(i))
        .ToObservable()
        .ObserveOn(RxApp.MainThreadScheduler)
        .ToList()
        .Do(l =>
        {
            using (Accounts.SuppressChangeNotifications())
            {
                Accounts.AddRange(l);
            }
        })
        .SelectMany(x => x)
        .Select(x => Observable.DeferAsync(async _ =>
        {
            var res = await x.ProcessAsync(config, m, outputPath);
            processed++;
            var prog = ((double) processed/inputs.Count())*100.0;
            OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
            OverallProgress.Progress.OnNext(prog);
            return Observable.Return(res);
        }))
        .Merge(5)
        .All(x => x);
    

    果然,我得到了滚动完成行为(例如,如果 1/5 完成,那么只有一个开始)。

    显然我还有一些基础知识要掌握,但这太棒了!

    【讨论】:

    • 仅供参考,您可以使用 FromAsync 而不是 DeferAsync,这可以让您的异步 lambda 更自然地返回其结果,而无需使用 Return 包装。
    • 另请注意,您用于跟踪和报告进度的代码存在一些多线程竞争条件,并且可能违反某些 Rx 并发规则。我建议在Merge 子句之后将所有进度跟踪代码移到Do 子句中。这将消除比赛。
    • @Brandon 不会在合并后将其移至 Do 只是意味着进度报告仅在每批之后进行?我想在每个项目之后报告进度。
    • Merge 让项目在完成时一个接一个地通过。请注意,merge(5) 不是 批处理操作。它只是确保只有 5 个同时运行。一旦单个操作完成,它就会让另一个排队的操作开始。如果你想要真正的批处理,你需要使用 BufferWindow
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-12-23
    • 1970-01-01
    • 1970-01-01
    • 2023-03-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多