【问题标题】:blocking collection process n items at a time - continuing as soon as 1 is done一次阻止收集过程 n 个项目 - 一旦完成 1 就继续
【发布时间】:2015-02-12 18:12:07
【问题描述】:

我有以下场景。

  1. 我将数据库中的 50 个作业放入阻塞集合中。

  2. 每个作业都是长期运行的。 (可能是)。所以我想在一个单独的线程中运行它们。 (我知道 - 最好将它们作为 Task.WhenAll 运行并让 TPL 解决 - 但我想控制同时运行的数量)

  3. 假设我想同时运行 5 个(可配置)

  4. 我创建了 5 个任务 (TPL),每个任务一个并并行运行它们。

我想要做的是在第 4 步中的一个作业完成后立即在阻塞集合中选择下一个作业,并继续进行,直到完成所有 50 个作业。

我正在考虑创建一个静态blockingCollection 和一个TaskCompletionSource,它们将在作业完成时被调用,然后它可以再次调用消费者从队列中一次选择一个作业。我还想在每个作业上调用 async/await - 但除此之外 - 不确定这是否会对方法产生影响。

这是完成我想做的事情的正确方法吗?

类似于this 链接,但要注意的是,我想在前 N 项中的一项完成后立即处理下一个作业。不是在所有 N 都完成后。

更新:

好的,我有这个代码 sn-p 做我想要的,如果有人想稍后使用它。正如您在下面看到的,创建了 5 个线程,每个线程在完成当前作业后开始下一个作业。在任何给定时间只有 5 个线程处于活动状态。我知道这可能无法始终像这样 100% 工作,并且如果与一个 cpu/核心一起使用,则会出现上下文切换的性能问题。

var block = new ActionBlock<Job>(
                job => Handler.HandleJob(job), 
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

              foreach (Job j in GetJobs())
                  block.SendAsync(j);

作业 2 开始于线程 :13。等待时间:3600000ms。时间:8/29/2014 下午 3 点 14 分 43 秒

作业 4 开始于线程 :14。等待时间:15000ms。时间:8/29/2014 下午 3 点 14 分 43 秒

作业 0 开始于线程 :7。等待时间:600000 毫秒。时间:8/29/2014 下午 3 点 14 分 43 秒

作业 1 开始于线程 :12。等待时间:900000 毫秒。时间:8/29/2014 下午 3 点 14 分 43 秒

作业 3 开始于线程 :11。等待时间:120000ms。时间:8/29/2014 下午 3 点 14 分 43 秒

作业 4 在线程 :14 上完成。 2014 年 8 月 29 日下午 3:14:58

作业 5 开始于线程 :14。等待时间:1800000ms。时间:8/29/2014 下午 3 点 14 分 58 秒

作业 3 在线程 :11 上完成。 2014 年 8 月 29 日下午 3:16:43

作业 6 开始于线程 :11。等待时间:1200000ms。时间:8/29/2014 下午 3:16:43

作业 0 在线程 :7 上完成。 2014 年 8 月 29 日下午 3:24:43

作业 7 开始于线程 :7。等待时间:30000ms。时间:8/29/2014 3:24:43 下午

作业 7 在线程 7 上完成。 2014 年 8 月 29 日下午 3:25:13

作业 8 开始于线程 :7。等待时间:100000 毫秒。时间:8/29/2014 下午 3:25:13

作业 8 在线程 7 上完成。 2014 年 8 月 29 日下午 3:26:53

作业 9 开始于线程 :7。等待时间:900000 毫秒。时间:8/29/2014 下午 3 点 26 分 53 秒

作业 1 在线程 :12 上完成。 2014 年 8 月 29 日下午 3:29:43

作业 10 开始于线程 :12。等待时间:300000 毫秒。时间:8/29/2014 下午 3 点 29 分 43 秒

作业 10 在线程 :12 上完成。 2014 年 8 月 29 日下午 3:34:43

作业 11 开始于线程 :12。等待时间:600000 毫秒。时间:8/29/2014 下午 3:34:43

作业 6 在线程 :11 上完成。 2014 年 8 月 29 日下午 3:36:43

作业 12 开始于线程 :11。等待时间:300000 毫秒。时间:8/29/2014 下午 3 点 36 分 43 秒

作业 12 在线程 :11 上完成。 2014 年 8 月 29 日下午 3:41:43

作业 13 开始于线程 :11。等待时间:100000 毫秒。时间:8/29/2014 下午 3 点 41 分 43 秒

作业 9 在线程 7 上完成。 2014 年 8 月 29 日下午 3:41:53

作业 14 开始于线程 :7。等待时间:300000 毫秒。时间:8/29/2014 下午 3 点 41 分 53 秒

作业 13 在线程 :11 上完成。 2014 年 8 月 29 日下午 3:43:23

作业 11 在线程 :12 上完成。 2014 年 8 月 29 日下午 3:44:43

作业 5 在线程 :14 上完成。 2014 年 8 月 29 日下午 3:44:58

作业 14 在线程 :7 上完成。 2014 年 8 月 29 日下午 3:46:53

作业 2 在线程 :13 上完成。 2014 年 8 月 29 日下午 4:14:43

【问题讨论】:

  • 关于您的更新:我的建议在单核机器上应该没有问题,因为 TPL 可以优化并选择比最大 (5) 更低的并行度来减少上下文切换。
  • 另一个注意事项:我使用block.Post(item) 是有原因的。当您没有在 ActionBlock 上设置 BoundedCapcity 时,使用 await block.SendAsync(item) 是多余的,并且它(非常轻微地)会损害性能。
  • 是的,但是如果您在我的代码示例中注意到,我不再使用 async job => await job.ProcessAsync() 并且我认为使用 block.SendAsync 可能会有所帮助?
  • 它没有。消费者和生产者在异步方面是不相关的。您可以拥有异步/同步生产者/消费者的所有 4 个选项。你刚才说你想使用 async-await,所以我在示例中使用。

标签: c# .net task-parallel-library async-await blockingcollection


【解决方案1】:

您可以使用TPL Dataflow 轻松实现您的需求。

您可以做的是使用BufferBlock&lt;T&gt;,这是一个用于存储数据的缓冲区,并将其与ActionBlock&lt;T&gt; 链接在一起,当这些请求来自BufferBlock&lt;T&gt; 时,它将使用这些请求。

现在,这里的美妙之处在于您可以使用ExecutionDataflowBlockOptions 类指定您希望ActionBlock&lt;T&gt; 同时处理多少个请求。

这是一个简化的控制台版本,当他们进入时处理一堆数字,打印他们的名字和Thread.ManagedThreadID

private static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();

    var actionBlock =
        new ActionBlock<int>(i => Console.WriteLine("Reading number {0} in thread {1}",
                                  i, Thread.CurrentThread.ManagedThreadId),
                             new ExecutionDataflowBlockOptions 
                                 {MaxDegreeOfParallelism = 5});

    bufferBlock.LinkTo(actionBlock);
    Produce(bufferBlock);

    Console.ReadKey();
}

private static void Produce(BufferBlock<int> bufferBlock)
{
    foreach (var num in Enumerable.Range(0, 500))
    {
        bufferBlock.Post(num);
    }
}

如果需要,您还可以使用可等待的 BufferBlock.SendAsync 异步发布它们

这样,您可以让TPL 为您处理所有限制,而无需手动执行。

【讨论】:

  • BufferBlock 在这里是多余的。
  • @l3arnon 你会如何发布进来的数据?
  • @l3arnon - 所以你委托实际对象处理的工作?如果他决定流需要多个接收器会发生什么?
  • 没关系,只是为了举例。处理代码可以位于块中。如果接收者指的是在动作块之后,那么那里没有问题。如果您指的是不同的操作块,那么这是一个非常不同的问题。
  • 为什么需要确保每个线程都有自己的线程?在单核上运行它只会导致影响性能的上下文切换。我建议你让TPL 平衡线程使用。 MaxDegreeOfParallalism 确保它不会超过那些最大线程数,它不保证它将使用该数字来处理请求
【解决方案2】:

您可以使用BlockingCollection,它可以正常工作,但它是在async-await 之前构建的,因此它会同步阻塞,在大多数情况下可能不太可扩展。

正如 Yuval Itzchakov 建议的那样,你最好使用 async 准备好的 TPL Dataflow。您所需要的只是一个ActionBlock,它可以同时处理每个项目,MaxDegreeOfParallelism 为 5,然后您将工作同步 (block.Post(item)) 或异步 (await block.SendAsync(item)) 发布到它:

private static void Main()
{
    var block = new ActionBlock<Job>(
        async job => await job.ProcessAsync(),
        new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5});

    for (var i = 0; i < 50; i++)
    {
        block.Post(new Job());
    }

    Console.ReadKey();
}

【讨论】:

  • 好的,这是一个简洁的示例,我认为这对我有用,因为我不需要多个接收器。问题。因此,如果每个作业都从数据库中获取一些数据,这可能需要 5 到 15 分钟,而我执行 job.ProcessAsync ,这是否意味着动作块将开始处理前 5 个,等待从数据库中获取所有数据他们,然后将接下来的 5 个排队,或者等到 5 个中的任何一个实际完成,然后开始下一个 1 并一次继续 1 个?。
  • 好的,谢谢。此外,如果我向 ExecuteDataFlowBlockOptions 引入 LongRunning 选项,除了在单独的线程上启动这些选项之外,它不会有任何区别,对吗?有些可能会运行很长时间(比如一个小时),我听说如果我不这样做,TPL 有时可能会等待其他工作。
  • @AlexJ LongRunning 是您自己运行任务时的一个选项。 TPL Dataflow 不需要,因为它自己处理调度。你只需给它一个可以达到的最大值(即MaxDegreeOfParallelism),但它也可以在适当的时候选择一个较低的度数。
【解决方案3】:

您可以使用SemaphoreSlim (如this answer)或使用ForEachAsync (如this answer)来完成此操作。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-12-28
    • 1970-01-01
    • 2021-05-17
    • 2020-11-12
    • 1970-01-01
    • 1970-01-01
    • 2021-02-22
    相关资源
    最近更新 更多