【问题标题】:BufferBlock and ActionBlock with BoundedCapacity does not use max DOP具有 BoundedCapacity 的 BufferBlock 和 ActionBlock 不使用最大 DOP
【发布时间】:2023-03-19 12:53:02
【问题描述】:

我有这个代码:

var data = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 1 });

var action = new ActionBlock<int>(async id =>
{
    Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);

    await Task.Delay(1000);

    Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 1,
    MaxDegreeOfParallelism = -1
});

data.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true });

for (var id = 1; id <= 3; id++)
{
    Console.WriteLine("[{0:T}] Sending {1}", DateTime.Now, id);
    data.SendAsync(id).Wait();
    Console.WriteLine("[{0:T}] Sending {1} complete", DateTime.Now, id);
}

data.Complete();

Task.WhenAll(data.Completion, action.Completion).Wait();

这段代码让我得到这个输出:

[22:31:22] Sending 1
[22:31:22] Sending 1 complete
[22:31:22] Sending 2
[22:31:22] #1: Start
[22:31:22] Sending 2 complete
[22:31:22] Sending 3
[22:31:23] #1: End
[22:31:23] #2: Start
[22:31:23] Sending 3 complete
[22:31:24] #2: End
[22:31:24] #3: Start
[22:31:25] #3: End

为什么ActionBlock 没有并行工作,即使它具有无限的 DOP?

【问题讨论】:

    标签: c# .net task-parallel-library async-await tpl-dataflow


    【解决方案1】:

    您的ActionBlock 似乎具有有限的并行度的原因是它的BoundedCapacity 为1。BoundedCapacity(与InputCount 不同)包括目前正在处理的项目。这很容易证明:

    var block = new ActionBlock<int>(_ => Task.Delay(-1), new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 1,
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
    
    await block.SendAsync(4); // Adds a new item
    await block.SendAsync(4); // Blocks forever
    

    这意味着当您设置 MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 时,该块同时不能接受多个项目,因此实际上限制了您的并行度。

    你可以通过设置一个更大的BoundedCapacity来解决这个问题:

    var action = new ActionBlock<int>(async id =>
    {
        Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);
        await Task.Delay(1000);
        Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
    }, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10,
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
    

    【讨论】:

    • 好的,但我也将 DOP 设置为 -1。而且我认为Dataflow应该根据需要创建尽可能多的ActionBlock副本,这意味着即使一个ActionBlock实例得到一个项目并且有另一个项目进入 - 它可以创建另一个ActionBlock实例来处理新项目。它是这样的。没有?
    • @MichaelLogutov 没有块复制自己。除非您创建更多,否则只有一个实例。块使用任务进行并行化,但如果你限制了它的容量,它就无法容纳足够的项目来并行化它们。
    • 谢谢。不知道那个。而文档确实缺乏这一点。
    猜你喜欢
    • 2021-09-20
    • 2016-05-25
    • 1970-01-01
    • 2021-06-22
    • 1970-01-01
    • 2016-04-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多