【发布时间】:2015-02-12 18:12:07
【问题描述】:
我有以下场景。
我将数据库中的 50 个作业放入阻塞集合中。
每个作业都是长期运行的。 (可能是)。所以我想在一个单独的线程中运行它们。 (我知道 - 最好将它们作为 Task.WhenAll 运行并让 TPL 解决 - 但我想控制同时运行的数量)
假设我想同时运行 5 个(可配置)
我创建了 5 个任务 (TPL),每个任务一个并并行运行它们。
我想要做的是在第 4 步中的一个作业完成后立即在阻塞集合中选择下一个作业,并继续进行,直到完成所有 50 个作业。
我正在考虑创建一个静态blockingCollection 和一个TaskCompletionSource,它们将在作业完成时被调用,然后它可以再次调用消费者从队列中一次选择一个作业。我还想在每个作业上调用 async/await - 但除此之外 - 不确定这是否会对方法产生影响。
这是完成我想做的事情的正确方法吗?
类似于this 链接,但要注意的是,我想在前 N 项中的一项完成后立即处理下一个作业。不是在所有 N 都完成后。
更新:
好的,我有这个代码 sn-p 做我想要的,如果有人想稍后使用它。正如您在下面看到的,创建了 5 个线程,每个线程在完成当前作业后开始下一个作业。在任何给定时间只有 5 个线程处于活动状态。我知道这可能无法始终像这样 100% 工作,并且如果与一个 cpu/核心一起使用,则会出现上下文切换的性能问题。
var block = new ActionBlock<Job>(
job => Handler.HandleJob(job),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
foreach (Job j in GetJobs())
block.SendAsync(j);
作业 2 开始于线程 :13。等待时间:3600000ms。时间:8/29/2014 下午 3 点 14 分 43 秒
作业 4 开始于线程 :14。等待时间:15000ms。时间:8/29/2014 下午 3 点 14 分 43 秒
作业 0 开始于线程 :7。等待时间:600000 毫秒。时间:8/29/2014 下午 3 点 14 分 43 秒
作业 1 开始于线程 :12。等待时间:900000 毫秒。时间:8/29/2014 下午 3 点 14 分 43 秒
作业 3 开始于线程 :11。等待时间:120000ms。时间:8/29/2014 下午 3 点 14 分 43 秒
作业 4 在线程 :14 上完成。 2014 年 8 月 29 日下午 3:14:58
作业 5 开始于线程 :14。等待时间:1800000ms。时间:8/29/2014 下午 3 点 14 分 58 秒
作业 3 在线程 :11 上完成。 2014 年 8 月 29 日下午 3:16:43
作业 6 开始于线程 :11。等待时间:1200000ms。时间:8/29/2014 下午 3:16:43
作业 0 在线程 :7 上完成。 2014 年 8 月 29 日下午 3:24:43
作业 7 开始于线程 :7。等待时间:30000ms。时间:8/29/2014 3:24:43 下午
作业 7 在线程 7 上完成。 2014 年 8 月 29 日下午 3:25:13
作业 8 开始于线程 :7。等待时间:100000 毫秒。时间:8/29/2014 下午 3:25:13
作业 8 在线程 7 上完成。 2014 年 8 月 29 日下午 3:26:53
作业 9 开始于线程 :7。等待时间:900000 毫秒。时间:8/29/2014 下午 3 点 26 分 53 秒
作业 1 在线程 :12 上完成。 2014 年 8 月 29 日下午 3:29:43
作业 10 开始于线程 :12。等待时间:300000 毫秒。时间:8/29/2014 下午 3 点 29 分 43 秒
作业 10 在线程 :12 上完成。 2014 年 8 月 29 日下午 3:34:43
作业 11 开始于线程 :12。等待时间:600000 毫秒。时间:8/29/2014 下午 3:34:43
作业 6 在线程 :11 上完成。 2014 年 8 月 29 日下午 3:36:43
作业 12 开始于线程 :11。等待时间:300000 毫秒。时间:8/29/2014 下午 3 点 36 分 43 秒
作业 12 在线程 :11 上完成。 2014 年 8 月 29 日下午 3:41:43
作业 13 开始于线程 :11。等待时间:100000 毫秒。时间:8/29/2014 下午 3 点 41 分 43 秒
作业 9 在线程 7 上完成。 2014 年 8 月 29 日下午 3:41:53
作业 14 开始于线程 :7。等待时间:300000 毫秒。时间:8/29/2014 下午 3 点 41 分 53 秒
作业 13 在线程 :11 上完成。 2014 年 8 月 29 日下午 3:43:23
作业 11 在线程 :12 上完成。 2014 年 8 月 29 日下午 3:44:43
作业 5 在线程 :14 上完成。 2014 年 8 月 29 日下午 3:44:58
作业 14 在线程 :7 上完成。 2014 年 8 月 29 日下午 3:46:53
作业 2 在线程 :13 上完成。 2014 年 8 月 29 日下午 4:14:43
【问题讨论】:
-
关于您的更新:我的建议在单核机器上应该没有问题,因为 TPL 可以优化并选择比最大 (5) 更低的并行度来减少上下文切换。
-
另一个注意事项:我使用
block.Post(item)是有原因的。当您没有在ActionBlock上设置 BoundedCapcity 时,使用await block.SendAsync(item)是多余的,并且它(非常轻微地)会损害性能。 -
是的,但是如果您在我的代码示例中注意到,我不再使用 async job => await job.ProcessAsync() 并且我认为使用 block.SendAsync 可能会有所帮助?
-
它没有。消费者和生产者在异步方面是不相关的。您可以拥有异步/同步生产者/消费者的所有 4 个选项。你刚才说你想使用 async-await,所以我在示例中使用。
标签: c# .net task-parallel-library async-await blockingcollection