【问题标题】:Concurrent threads calling same method leads to duplicates in ConcurrentBag<DataRow> collection并发线程调用相同的方法导致 ConcurrentBag<DataRow> 集合中的重复
【发布时间】:2020-04-02 15:54:14
【问题描述】:

我必须在单个线程上处理一个需要一个多小时的大型数据集。我已经实现了一些多线程来加快速度。每个线程处理特定范围的数据,没有重叠,但是当他们将结果插入到我创建的ConcurrentBag&lt;DataRow&gt; 集合中时,会有一些重复。

这怎么可能?任何关于我可以做得更好的建议都非常感谢!

主要方法:

public static ConcurrentBag<DataRow> finalRowList = new ConcurrentBag<DataRow>(); //Create a concurrent collection of datarows so we can thread these calculations
public static DataTable results = new DataTable(); //Final datatable the datarows are added to

static void Main(string[] args)
{
//The goal is to calculate correlation between each item in list 1 against each item in list 2
List<string> Variable1List = populateVariable1List(); //Primary List of distinct items to iterate over
List<string> Variable2List = populateVariable2List(); //Secondary list of distinct items

DateTime endDate = new DateTime(2020, 3, 31);

//Separate threads based on alphabetic ranges so there is no overlap
Thread t1 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "G") < 0), Variable2List, endDate));
Thread t2 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "G") >= 0 && string.Compare(s, "M") < 0), Variable2List, endDate));
Thread t3 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "M") >= 0 && string.Compare(s, "S") < 0), Variable2List, endDate));
Thread t4 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "S") >= 0), Variable2List, endDate));

List<Thread> threads = new List<Thread>();
threads.Add(t1);
threads.Add(t2);
threads.Add(t3);
threads.Add(t4);

foreach (Thread t in threads)
{
    t.Start();
}

foreach (Thread t in threads)
{
    t.Join();
}

//Add rows from finalRowList to final datatable
foreach (var dr in finalRowList)
{
    results.Rows.Add(dr);
}
}

CalculatePairCorrelation() 代码:

public static void CalculatePairCorrelation(IEnumerable<string> list1, IEnumerable<string> list2, DateTime endDate, int rows)
{
    foreach (var item1 in list1)
    {
        foreach (var item2 in list2)
        {                
            double r10 = CalculateCorrelation(item1, item2, endDate, 10);
            double r30 = CalculateCorrelation(item1, item2, endDate, 30);

            var dr = results.NewRow();
            dr["Item1"] = item1;
            dr["Item2"] = item2;
            dr["R10"] = r10;
            dr["R30"] = r30;

            finalRowList.Add(dr); //Add to thread-safe collection
        }
    }
}


【问题讨论】:

  • 如果你有重复,首先要寻找 - 分裂机制。您应该使用队列,您的问题将得到解决
  • 你能详细说明一下吗?
  • 您可以创建块收集队列并在一个线程中执行task.run(() =&gt; {Load your queue})。然后创建 1-n 个消费者线程 task.run(() =&gt; {get from queue and process})。等待结束Task.Wait(producer, consumer1, consumer2, ....) 这保证您快速可靠的多线程处理
  • 我为 Item1 列表实现了一个全局 ConcurrentQueue&lt;string&gt;,并为每个线程实现了 Item2 列表的一个新实例。我仍然有相同数量的重复。我需要进一步挖掘......

标签: c# multithreading concurrency duplicates


【解决方案1】:

问题可能与这条线有关,在并行路径中被调用:

var dr = results.NewRow();

创建DataRow 可能会改变底层DataTable,它不是线程安全的类。

我的建议是远离并发收集和手动分区数据,而改用PLINQ,它易于使用,并且更难出错:

var resultsList = Variable1List
    .SelectMany(_ => Variable2List, (Item1, Item2) => (Item1, Item2))
    .AsParallel()
    .AsOrdered() // Optional
    .WithDegreeOfParallelism(4) // Optional
    .Select(pair => (
        Item1: pair.Item1,
        Item2: pair.Item2,
        R10: CalculateCorrelation(pair.Item1, pair.Item2, endDate, 10),
        R30: CalculateCorrelation(pair.Item1, pair.Item2, endDate, 30)
    ))
    .ToList();

foreach (var result in resultsList)
{
    var dr = results.NewRow();
    dr["Item1"] = result.Item1;
    dr["Item2"] = result.Item2;
    dr["R10"] = result.R10;
    dr["R30"] = result.R30;
    results.Rows.Add(dr);
}

【讨论】:

  • 我认为创建数据行只是创建与表具有相同架构的新行。我不知道它实际上会改变桌子。我会试一试并报告。
  • 并行的唯一问题,你没有太多的控制权。你永远不知道有多少线程正在运行。
  • @eek 你是对的。创建新行不会将此行添加到表中
  • HereDataTable.NewRow 方法的源代码。乍一看,它似乎确实改变了类的内部状态。
  • 你说的对,NewRow() 调用不是线程安全的!我更改了代码以生成空行的并发集合,然后在填充每一行时执行 TryTake()。谢谢!
猜你喜欢
  • 2012-10-04
  • 1970-01-01
  • 1970-01-01
  • 2011-11-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多