【问题标题】:C# ActionBlock Equivalent that collects results收集结果的 C# ActionBlock 等效项
【发布时间】:2018-03-20 16:45:51
【问题描述】:

我目前正在使用 ActionBlock 来处理串行启动的异步作业。它非常适合处理发布到它的每个项目,但无法收集每个作业的结果列表。

我可以使用什么以线程安全的方式收集作业的结果?

我的代码目前是这样的:

var actionBlock = new ActionBlock<int> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
    actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;

我尝试使用 TransformBlock 代替,但在等待完成时它会无限期挂起。完成的状态是“WaitingForActivation”。

我的 TransformBlock 代码是这样的:

var transformBlock = new TransformBlock<int, string> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
    actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;
transformBlock.TryReceiveAll(out IList<string> strings);

【问题讨论】:

  • 您的TransformBlock 根本无法卸载它的输出缓冲区,所以它会挂起。一个简单的解决方案是让您的 ActionBlockConcurrentBag 一样将项目添加到并发集合中。
  • 谢谢,这几乎就是我最终得到的结果,但我先闲逛了一下,所以我使用了 Parallel.Foreach,这使得代码更加简洁。
  • 要将TransformBlock 的所有结果收集为Task&lt;List&lt;T&gt;&gt;,您可以使用ToListAsync 找到的方法here

标签: c# asynchronous task-parallel-library tpl-dataflow


【解决方案1】:

原来 ConcurrentBag 就是答案

var bag = new ConcurrentBag<string>();
var actionBlock = new ActionBlock<int> (async i => 
   bag.Add(await Process(i))
);
for(int i = 0; i < 100; i++)
{
    actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;

现在 'bag' 中包含所有结果,可以作为 IEnumerable 访问。

我实际上最终使用的代码使用 Parallel.ForEach 而不是 ActionBlock。

Parallel.ForEach
(
    inputData, 
    i => bag.Add(await Process(i))
);

这要简单得多,但似乎对性能有好处,并且仍然可以选择限制并行度等。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-06-03
    • 2015-02-02
    • 1970-01-01
    • 1970-01-01
    • 2015-03-18
    • 2012-07-31
    • 2010-11-25
    相关资源
    最近更新 更多