【问题标题】:Maintaining order of DataTable rows during parallel processing在并行处理期间维护 DataTable 行的顺序
【发布时间】:2015-07-03 09:06:55
【问题描述】:

以下是当前代码:

 Parallel.ForEach(dataTable.AsEnumerable(),row => {

     // Code to process the data row to Dictionary<object,object>
     // Unique Column name is the Dictionary Key
     // ConcurrentDictionary is used for thread safety      
     });

这里我使用Parallel.ForEachDataTable的行处理成Dictionary&lt;object,object&gt;类型的对象,最终结果是List&lt;Dictionary&lt;object,object&gt;&gt;类型,使用中间线程安全结构ConcurrentQueue&lt;Dictionary&lt;object,object&gt;&gt;实现的,@的来源987654327@ 以给定的顺序对数据进行排序,但在并行处理期间总是会丢失。由于顺序很重要,所以我想出了以下解决方法:

Parallel.For(0,RowCount,index => {

  int rowIndex = index;

  // Access the rows using the Index
  // Final structure will be of type ConcurrentDictionary<Custom>, 
  // with the RowIndex assigned based on original index
});

Class Custom
{
  public int RowIndex { get; set; }

  public Dictionary<object,object> DataDictionary {get; set;}
}

ConcurrentQueue&lt;Dictionary&lt;Custom&gt;&gt; customObj 类型的最终结果使用以下代码进行处理:

customObj.OrderBy(x=>x.RowIndex).Select(y=>y.DataDictionary).ToList()

以下是我的问题:

  1. 有没有更好的方法来实现相同的并行处理,我可以保持原始顺序,这是最重要的业务需求

  2. 在最终解决方案中我是否需要局部变量rowIndex,我的理解是index 是并行循环的一部分,不会导致关闭问题

任何指针?

【问题讨论】:

  • 为什么要保留订单?你不能订购结果吗?
  • @Panagiotis Kanavos 订单是从数据源中保留的,基于可能不会暴露给数据库上方层的列。我们正在构建的分析应用程序的订单保留是一项重要的功能要求

标签: c# parallel-processing task-parallel-library parallel.foreach


【解决方案1】:

您可以将PLINQParallelEnumerable.AsOrdered 扩展方法一起使用

允许将数据源视为已排序,覆盖默认的无序。

在您的示例中,您可以按以下方式使用它:

var result = dataTable.AsEnumerable().AsParallel().AsOrdered()
                      .Select(/*Process the row to dictionary*/).ToList();

【讨论】:

  • 这行得通吗,当我在这种情况下固有地转换为不同的数据结构(如 Dictionary)时,我没有修改 DataTable 数据。据我了解,PLinq 也可能不是面向性能的解决方案
  • @MrinalKamboj 我看不出它为什么不起作用。如果您担心性能,最好简单地衡量和比较。
  • @MrinalKamboj PLINQ 和 Parallel.For 是等价的。事实上,PLINQ 可以并行化它的所有操作,而 Parallel.For 只会并行化进程中的第一步。
  • @Panagiotis Kanavos Stephen Toub 的有趣文章表明,为什么 Parallel.ForEach 和 PLINQ 不一样。相比之下,Parallel.ForEach 是一个更轻量级的实现。 @Dzienny 您可能希望在未来的 PLINQ 实现中添加 WithDegreeOfParallelism。 blogs.msdn.com/b/pfxteam/archive/2009/05/29/9655514.aspx
  • @MrinalKamboj 在比较不同的事物时要小心。 PLINQ 将并行化分区、过滤、选择、排序、聚合。 Parallel.ForEach 只会对数据进行分区,而将其他所有内容(尤其是将并行结果减少为单个结果)留给编码器。
【解决方案2】:

这个呢

var items = new ConcurrentDictionary<DataRow, Dictionary<object,object>>;

Parallel.ForEach(dataTable.AsEnumerable(),row => {
    var result = ...; 
    items.Add(row, result);
});

var finalResult = dataTable.Rows.Cast<DataRow>().Select(r => items[r]).ToList());

【讨论】:

  • 字典不是线程安全的,当您尝试同时修改它们时会抛出异常。此代码将崩溃。您可以使用 ConcurrentDictionary 来避免崩溃,但无论是该集合还是任何其他并发集合都不会保留原始顺序
  • 我可以在这里使用 ConcurrentDictionary,这看起来很有希望,让我检查一下
  • @PanagiotisKanavos 我将更改代码以使用 ConcurrentDictionary。关于第二个声明。顺序无关紧要,因为最终我使用 DataTables 行集合并且只使用字典从字典中获取结果。
【解决方案3】:

首先,您可以在 Parallel.ForEach 中获取索引,而不是使用 Parallel.For

Parallel.ForEach(dataTable.AsEnumerable(), (line, state, index) =>
{
    Console.WriteLine("{0} : {1}", index, line);
});

如我所见,主要目的是避免 OrderBy。 要实现这一点,请在 ForLoop 之前创建您的

var lines =  new YourClass[NumberOfElemnts] ;

在此之后,您可以在此列表中填写您想要的任何循环。让我们使用 Parallel.For

Parallel.For(0, NumberOfElemnts, i =>
    {
        lines[i]=dataTable[i];
    });

根据@Panagiotis Kanavos 的评论进行编辑

【讨论】:

  • 我不是想避免 OrderBy,我需要它,这就是解决方法的原因。我无法理解您提供的解决方案,需要努力才能更好地理解它
  • 列表不是线程安全的,无法修改。您可以使用数组实现同样的目的
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2023-03-31
  • 2015-09-12
  • 1970-01-01
  • 2021-11-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多