【问题标题】:Implement Batch Process Using Producer Consumer使用生产者消费者实现批处理
【发布时间】:2012-10-17 02:09:19
【问题描述】:

批次

  1. 从文件或 SQL 中读取文本
  2. 将文本解析成单词
  3. 将单词加载到 SQL 中

今天
.NET 4.0
第 1 步非常快。 对于相同大小的文件,第 2 步和第 3 步的长度大致相同(平均为 0.1 秒)。
在第 3 步中,使用 BackGroundWorker 插入并等待最后一个完成。
其他一切都在主线程上。

在大负载上会执行数百万次。

需要步骤 3 是连续的,并且与 1 的顺序相同。
这是为了防止 SQL 表 PK 索引破裂。
并行尝试了第 3 步并破坏索引将其杀死。
此数据按 PK 排序提供。
其他索引在加载开始时被删除,然后在加载结束时重建。

当文本大小发生变化时,此过程无效。
而且文件之间的文本大小确实发生了巨大变化。
我想要的是让 1 和 2 排队,以便 3 尽可能保持忙碌。

需要第 3 步以使文件出列,以便它们在 1 中排队(即使它等待)。

需要内存管理的最大队列大小(例如 4-10)。

希望第 2 步并行,最多 4 个并发。

迁移到 .NET 4.5。

寻求有关如何实施此功能的一般指导?

我知道这是一种生产者消费者模式。
如果这不是生产者消费者模式,请告诉我,以便我更改标题。

【问题讨论】:

  • 你试过简单地使用files.AsParallel().AsOrdered()吗?
  • @DaxFohl Step 1. 已经非常快了。文本可以来自 SQL 或文件。

标签: c# .net task-parallel-library producer-consumer


【解决方案1】:

我认为 TPL 数据流是实现此目的的好方法:

对于第 2 步,您将使用 TransformBlock 并将 MaxDegreeOfParallelism 设置为 4 并且 BoundedCapacity 也设置为 4,以便其队列在工作时为空。它会按照它们进来的顺序生产物品,你不必为此做任何特别的事情。对于第 3 步,使用ActionBlock,并将BoundedCapacity 设置为您的限制。然后将两者链接在一起并开始向TransformBlock 发送项目,最好使用await stepTwoBlock.SendAsync(…) 之类的东西,如果队列已满,则异步等待。

在代码中,它看起来像:

async Task ProcessData()
{
    var stepTwoBlock = new TransformBlock<OriginalText, ParsedText>(
        text => Parse(text),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
            BoundedCapacity = 4
        });
    var stepThreeBlock = new ActionBlock<ParsedText>(
        text => LoadIntoDatabase(text),
        new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
    stepTwoBlock.LinkTo(
        stepThreeBlock, new DataflowLinkOptions { PropagateCompletion = true });

    // this is step one:
    foreach (var id in IdsToProcess)
    {
        OriginalText text = ReadText(id);
        await stepTwoBlock.SendAsync(text);
    }

    stepTwoBlock.Complete();
    await stepThreeBlock.Completion;
}

【讨论】:

  • 谢谢,我需要一段时间来测试一下。 stepTwoBlock 一次接收 4 个,然后当所有四个都完成后,它会抓取下一个 4?那么它会花费4个中最长的那个吗?如果这能让他们保持秩序,那么它是值得的。
  • 它不完全那样工作。如果当前在步骤 2 中的第一个项目(具有最低 id)首先完成,那么新的项目将立即开始处理(假设在步骤 3 块中有可用空间)。但是,如果您不走运,您所描述的情况可能会发生,因为该块将始终最多保留 4 个项目,即使它现在无法对其中的一些做任何事情。如果这是一个问题,您可以尝试增加第 2 步块的容量(并可能减少第 3 步块的容量,以保持内存使用量不变)。
  • 谢谢。抱歉,在我测试之前不能给你检查,这是我想学习的东西,所以我会花点时间。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-08-26
  • 1970-01-01
  • 1970-01-01
  • 2019-12-31
  • 1970-01-01
  • 2018-11-17
相关资源
最近更新 更多