【问题标题】:Execute threads in parrallel, but calling events in the same order并行执行线程,但以相同的顺序调用事件
【发布时间】:2021-02-02 23:57:15
【问题描述】:

我有一个需要处理的数据队列的场景。所有数据都被一个线程读入内存并存储到ConcurrentQueue<T> 中,而另一个线程开始出列并处理数据。 T 是一个自定义类,需要处理大量数据。

读取器线程填充队列的一侧,而处理器线程在队列的另一侧工作。使用两个线程,这可以完美地工作。处理数据所花费的工作/时间大约是将数据加载到内存中的 4 倍。所以我一直在尝试增加处理线程的数量。问题是处理后的数据需要按照读取的顺序保存。显然,让多个线程并行处理不同的数据意味着它们不会同时完成。由于 ConcurrentQueue,线程确实按顺序读取数据,并且它们以正确的顺序将数据出队,但我还没有找到一种方法来同步线程的“保存”功能,以确保每个线程将“保存”在他们出列的顺序相同。

我知道 .NET 包含大量线程帮助程序,并且我研究过诸如 Monitor 和 Barrier 之类的东西,但它们的差异如此之大,以至于我不确定哪个帮助程序类或哪种方法最适合。

有人有什么建议或想法吗?

【问题讨论】:

  • 塑造你的东西,让它可以无序工作?
  • 不幸的是,我不能。我正在读取一个需要在处理后按顺序保存的顺序文件。
  • 但是没有什么能阻止你处理/保存它的“块”,对吧?
  • 只需在数据中添加一个序列号。问题解决了。

标签: c# .net .net-5


【解决方案1】:

有很多方法可以做到这一点。这是一个 TPL 数据流示例

DataFlow 有几个优点

  1. 它可以处理同步和异步工作负载。
  2. 您可以创建更大的管道。
  3. 支持任务计划程序和取消令牌。
  4. 可以永久运行,也可以强制完成。
  5. 每个区块可以支持多个生产者和消费者

它确实有一些缺点

  1. 对于初学者来说,这是一个学习曲线。
  2. 它是围绕管道设计的,而不是线性集合本身,因此使用它们可能有点不直观。
  3. 创建您自己的自定义块需要深入研究 Stephen Toub 的 Twisted TPL 大脑。
  4. 它不像其他生产者消费者框架那样轻巧,但它以灵活性弥补了这一点

示例

// multi threaded Consumer
var processor = new TransformBlock<Data, Data>(
   ProcessAsync,
   new ExecutionDataflowBlockOptions() 
   {
      EnsureOrdered = true, 
      MaxDegreeOfParallelism = 3

   });

var saver = new ActionBlock<Data>(
   SaveAsync, 
   new ExecutionDataflowBlockOptions()
   {
      MaxDegreeOfParallelism = 1, 
      SingleProducerConstrained = true
   });

processor.LinkTo(saver,new DataflowLinkOptions() {PropagateCompletion = true});

// Producer
for (int i = 0; i < 15; i++)
{
   Console.WriteLine($"Queueing : {i}");
   await processor.SendAsync(new Data() {Id = i});
}

processor.Complete();
await saver.Completion;

工作负载

public static Random r = new Random();
private static async Task<Data> SaveAsync(Data arg)
{
   await Task.Delay(r.Next(100, 1000));
   Console.WriteLine($"Saving : {arg.Id}");
   return arg;
}
private static async Task<Data> ProcessAsync(Data arg)
{
   await Task.Delay(r.Next(100, 1000));
   Console.WriteLine($"Processing : {arg.Id}");
   return arg;
}

输出

Queueing : 0
Queueing : 1
Queueing : 2
Queueing : 3
Queueing : 4
Queueing : 5
Queueing : 6
Queueing : 7
Queueing : 8
Queueing : 9
Queueing : 10
Queueing : 11
Queueing : 12
Queueing : 13
Queueing : 14
Processing : 2
Processing : 3
Processing : 0
Processing : 1
Processing : 4
Saving : 0
Processing : 5
Processing : 8
Saving : 1
Processing : 7
Processing : 6
Processing : 10
Saving : 2
Processing : 9
Processing : 11
Processing : 12
Processing : 13
Saving : 3
Processing : 14
Saving : 4
Saving : 5
Saving : 6
Saving : 7
Saving : 8
Saving : 9
Saving : 10
Saving : 11
Saving : 12
Saving : 13
Saving : 14

免责声明,还有许多其他方式,例如反应式扩展,或者只是常规的 TPL 和循环,都有优点和缺点,有些可能比其他方式更适合您的用例。这只是 DataFlow Pipeline 的一个基本示例。

【讨论】:

  • 虽然这是一个了不起的答案,而且我实际上试图围绕它来调整我的代码,但它在我的特定场景中不起作用,因为处理函数在不同的点“退出”取决于它的深度和深度需要处理数据块,并且在每次退出时,它都会调用一个结果事件(它没有相同的类也没有继承的基)。以及我需要同步的那些不同的“点”。如果我看到一种方法可以重新编写代码以在同一端点“完成”,我会记住这一点。
  • @jscarle 是的,我担心情况会如此......也许你可以更详细地编辑你的问题,这样其他人就不会陷入类似的陷阱。我也许可以根据编辑写出不同的答案。祝你好运
  • 我将不得不花时间详细说明。感谢您的努力。
猜你喜欢
  • 1970-01-01
  • 2016-01-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-25
  • 1970-01-01
相关资源
最近更新 更多