【发布时间】:2020-04-02 15:54:14
【问题描述】:
我必须在单个线程上处理一个需要一个多小时的大型数据集。我已经实现了一些多线程来加快速度。每个线程处理特定范围的数据,没有重叠,但是当他们将结果插入到我创建的ConcurrentBag<DataRow> 集合中时,会有一些重复。
这怎么可能?任何关于我可以做得更好的建议都非常感谢!
主要方法:
public static ConcurrentBag<DataRow> finalRowList = new ConcurrentBag<DataRow>(); //Create a concurrent collection of datarows so we can thread these calculations
public static DataTable results = new DataTable(); //Final datatable the datarows are added to
static void Main(string[] args)
{
//The goal is to calculate correlation between each item in list 1 against each item in list 2
List<string> Variable1List = populateVariable1List(); //Primary List of distinct items to iterate over
List<string> Variable2List = populateVariable2List(); //Secondary list of distinct items
DateTime endDate = new DateTime(2020, 3, 31);
//Separate threads based on alphabetic ranges so there is no overlap
Thread t1 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "G") < 0), Variable2List, endDate));
Thread t2 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "G") >= 0 && string.Compare(s, "M") < 0), Variable2List, endDate));
Thread t3 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "M") >= 0 && string.Compare(s, "S") < 0), Variable2List, endDate));
Thread t4 = new Thread(() => CalculatePairCorrelation(Variable1List.Where(s => string.Compare(s, "S") >= 0), Variable2List, endDate));
List<Thread> threads = new List<Thread>();
threads.Add(t1);
threads.Add(t2);
threads.Add(t3);
threads.Add(t4);
foreach (Thread t in threads)
{
t.Start();
}
foreach (Thread t in threads)
{
t.Join();
}
//Add rows from finalRowList to final datatable
foreach (var dr in finalRowList)
{
results.Rows.Add(dr);
}
}
CalculatePairCorrelation() 代码:
public static void CalculatePairCorrelation(IEnumerable<string> list1, IEnumerable<string> list2, DateTime endDate, int rows)
{
foreach (var item1 in list1)
{
foreach (var item2 in list2)
{
double r10 = CalculateCorrelation(item1, item2, endDate, 10);
double r30 = CalculateCorrelation(item1, item2, endDate, 30);
var dr = results.NewRow();
dr["Item1"] = item1;
dr["Item2"] = item2;
dr["R10"] = r10;
dr["R30"] = r30;
finalRowList.Add(dr); //Add to thread-safe collection
}
}
}
【问题讨论】:
-
如果你有重复,首先要寻找 - 分裂机制。您应该使用队列,您的问题将得到解决
-
你能详细说明一下吗?
-
您可以创建块收集队列并在一个线程中执行
task.run(() => {Load your queue})。然后创建 1-n 个消费者线程task.run(() => {get from queue and process})。等待结束Task.Wait(producer, consumer1, consumer2, ....)这保证您快速可靠的多线程处理 -
我为 Item1 列表实现了一个全局
ConcurrentQueue<string>,并为每个线程实现了 Item2 列表的一个新实例。我仍然有相同数量的重复。我需要进一步挖掘......
标签: c# multithreading concurrency duplicates