【问题标题】:Parallel.ForEach losing dataParallel.ForEach 丢失数据
【发布时间】:2016-07-05 06:30:13
【问题描述】:

Parallel.ForEach 有助于提高性能,但是,我看到数据丢失。

已尝试 - 变量结果,已处理数据为 ConcurrentBag<IwrRows>

1)

Parallel.ForEach(results, () => new ConcurrentBag<IwrRows>(), (n, loopState, localData)    =>
{
 return ProcessData(n); // ProcessData complicated business logic
}, (localData) => AddRows(localData, processedData, obj)
);

2)

await Task.Run(() => Parallel.ForEach(results, item =>
        {
            ProcessData(item, processedData);  
        }));

3)

Parallel.ForEach(results, item =>
 {
 ProcessData(item, processedData);
 });

他们都丢失了一些行。

当我使用 foreach 块时,它始终返回相同的值,但是速度慢了 4 倍。

foreach (var item in results)
        {
            // ProcessData returns a List<IwrRows>
            processedData.AddRange(ProcessData(item));
        }

不确定我在这里缺少什么。

结果 - 51112 Foreach 返回 41316 行。 ForeachParallel 返回 41308 或 41313 或 41314 因每次运行而异

【问题讨论】:

  • ProcessData()的定义是什么?
  • 我同意 makro88 回答中的一点:您至少应该解释 ProcessData() 函数的通常预期返回值是什么,以便其他人可以更好地帮助您。正如你现在写的那样,这个函数似乎返回了一个处理过的数据的集合,其中包含 0-N 个项目。您能否编辑问题并澄清这一点?项目数量的预期数学不清楚,其他人无法重现。如前所述,尝试在MCVE 中隔离您的问题,否则应标记并关闭此问题。

标签: c# parallel.foreach


【解决方案1】:

您似乎很难将结果重新编入一个连贯的列表中。您可以使用 PlinQ,因此您不必担心结果容器是线程安全的:

var processedData = yourData.AsParallel().Select(ProcessData).ToList();

【讨论】:

  • 与 foreach 循环相比,这会始终返回相同的数据行。它比 Parallel.Foreach 慢两倍,但比 foreach 快。在这种情况下,Parallel.Foreach 和 AsParallel 有什么区别?为什么即使在使用 ConcurrentBag 之后我还是会丢失数据?
【解决方案2】:

您的问题似乎出在:AddRows(localData, processesData, obj)。此方法可能会将数据添加到不是线程安全的列表中。您应该将其添加到线程安全列表或围绕添加数据进行一些同步。

【讨论】:

  • 其实processedData提到的OP是ConcurrentBag&lt;IwrRows&gt;类型的,理论上应该可以处理并发问题。
【解决方案3】:

我认为在 2) 中使用 await Task.Run 是没用的。

如果Foreach returns 41316 rows backResults - 51112 问题不在于Parallel.ForEach,而在于您的添加/处理机制。请记住,即使ConcurrentBag 保证对它的每个操作都是线程安全的,它也不会处理重复。

【讨论】:

    【解决方案4】:

    嗯,您的业务逻辑 (ProcessData) 肯定有问题。
    也许不是 pararell.foreach,但我认为这可能会加快您的代码速度,使用 LINQ 也是异步的。
    就是这样,我正在处理一些数据的并行异步操作。
    您可能需要展平 taskList 的结果(它的完整伪代码是从头编写的)。您始终可以始终使用 yield return 来实现您的列表,这可能会更加固定它。但谨慎使用 yield :)

    var taskList = results.Select(async item =>
        {
            return await ProcessData(item, processedData);  
        });
    
    await Task.WhenAll(taskList);
    

    使用WhenAll或WaitAll取决于你想要的情况

    Task.WaitAll:

    At least one of the Task instances was canceled -or- an exception was thrown during the execution of  
    at least one of the Task instances.If a task was canceled, the AggregateException contains an  
    OperationCanceledException in its InnerExceptions collection.
    

    Task.WhenAll:

    If any of the supplied tasks completes in a faulted state, the returned task will also complete in a   
    Faulted state, where its exceptions will contain the aggregation of the set of unwrapped exceptions  
    from each of the supplied tasks.
    

    【讨论】:

      猜你喜欢
      • 2023-04-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-11-13
      • 2019-01-28
      相关资源
      最近更新 更多