【问题标题】:Understanding TPL Dataflow Degree of Parallelism ordering理解 TPL 数据流并行度排序
【发布时间】:2023-03-21 00:48:01
【问题描述】:

我正在阅读Dataflow (Task Parallel Library),其中有一段说:

当您指定大于 1 的最大并行度时,会同时处理多条消息,因此,消息可能不会按照接收顺序进行处理。但是,从块中输出消息的顺序将是正确的。

什么意思?

例如,我将我的动作块设置为并行度 = 5:

testActionBlock = new ActionBlock<int>(i => Consumer(i),
            new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 5
            });

await Producer();
testActionBlock.Completion.Wait();

我的 Producer() 基本上将数字排入块中:

private async Task Producer()
{
    for (int i=0; i<= 1000; i++)
    {
        await testActionBlock.SendAsync(i);
    }
    testActionBlock.Complete();
}

而我的 Consumer(i) 只需写出以下几行:

private async Task Consumer(int i)
{
    if (i == 1)
    {
        await Task.Delay(5000);
    }
    Console.WriteLine(i);
}

这是否意味着 Consumer(2) 将被阻塞,直到 Consumer(1) 完成处理(因为有 5 秒的延迟)?我测试了代码,但似乎并非如此。即使我消除了 5 秒的延迟,我也没有看到输出是有序的。

[更新]

bBlock = new BufferBlock<int>(option);

testActionBlock = new ActionBlock<int>(i => Consumer(i),
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 5
    });

bBlock.LinkTo(testActionBlock);

await Producer();
testActionBlock.Completion.Wait();

我的 Producer() 现在将添加到 bBlock:

private async Task Producer()
{
    for (int i=0; i<= 1000; i++)
    {
        await bBlock.SendAsync(i);
    }
    bBlock.Complete();
}

那么,在这种情况下,Consumer(1) 将等待 5 秒,然后 Consumer(2) 才能继续?

【问题讨论】:

    标签: c# task-parallel-library tpl-dataflow


    【解决方案1】:

    没有。您可以将 DoP 视为线程(不完全是简单的方式)

    所以在 5 时,它会尝试一次处理 5 个。由于#1 需要 5 秒,因此#2 肯定会先完成。 #3、#4 和 #5 可能也是如此。甚至可能是 #6(因为 #2 已经完成,DoP 将允许它从 #6 开始)

    即使没有延迟,也无法保证处理的顺序。所以永远不要依赖他们执行的命令。话虽如此,当您使用消息输出(NOT 打印,因为这是它们执行的顺序)时,它们将按照它们进入的顺序重新排序,即使它们以任意方式执行顺序。

    【讨论】:

    • 感谢您的澄清。但是我仍然不清楚何时说“消息输出......将被重新排序”。我已经更新了我原来的问题,看看我的理解是否正确?
    • TPL 数据流提供了一个完整的消息传递机制,旨在将数据传入和传出块。看看这个来自 MS 的很棒的指南,里面有确切的例子。 msdn.microsoft.com/en-us/library/hh228597(v=vs.110).aspx
    【解决方案2】:

    DataflowBlockOptions 类包含一个可配置的属性EnsureOrdered

    获取或设置一个值,该值指示是否应对块的消息处理强制执行有序处理。

    这个属性决定了块是否会按照它收到消息的顺序输出处理过的消息,默认是true。因此,像TransformBlockTransformManyBlock 这样产生输出的块(实现ISourceBlock&lt;TOutput&gt; 接口)在将接收到的消息传播到它们的目标块(ITargetBlock&lt;TInput&gt; 块它们是@987654326 @to)。

    EnsureOrdered 选项与处理消息的顺序无关。例如,将属性MaxDegreeOfParallelism 设置为值DataflowBlockOptions.Unbounded 意味着所有接收到的消息将在到达后立即安排执行,并且——假设ThreadPool 有足够的可用线程——所有消息的执行将立即开始.将EnsureOrdered 设置为false 的效果是,一旦消息的执行完成,它将有资格向下游传播,即使之前收到的消息的执行尚未完成。

    EnsureOrdered 选项对ActionBlocks 无效,因为这些块不会产生输出。它对BufferBlocks 也没有影响,因为尽管这些块产生输出,但它们不进行任何处理,因此不会发生任何可能扭曲它们接收到的消息的原始顺序的事情。简而言之,此属性仅对产生输出和执行处理的块有效。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-02-26
      • 1970-01-01
      • 2015-11-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多