【问题标题】:TPL Dataflow Blocks using LinkTo Predicate使用 LinkTo 谓词的 TPL 数据流块
【发布时间】:2014-09-07 03:03:35
【问题描述】:

我有一些块最终从一个 TransformBlock 到基于 LinkTo 谓词的其他三个转换块之一。我正在使用 DataflowLinkOptions 来传播完成。问题是,当满足谓词并且启动该块时,我的管道的其余部分将继续。看起来管道应该先等待这个块完成。

这个代码是这样的:

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
mainBlock.LinkTo(block1, linkOptions, x => x.Status = Status.Complete);
mainBlock.LinkTo(block2, linkOptions, x => x.Status = Status.Cancelled);
mainBlock.LinkTo(block3, linkOptions, x => x.Status = Status.Delayed);
mainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);

现在,正如我所说的那样,这并不像我期望的那样工作,所以我发现获得我想要的行为的唯一方法是取出 linkOptions 并将以下内容添加到 mainBlock 的 lambda 中。

mainBlock = new TransformBlock<Thing,Thing>(input =>
{
    DoMyStuff(input);

    if (input.Status = Status.Complete)
    {
        mainBlock.Completion.ContinueWith(t => block1.Complete());
    }
    if (input.Status = Status.Cancelled)
    {
        mainBlock.Completion.ContinueWith(t => block2.Complete());
    }
    if (input.Status = Status.Delayed)
    {
        mainBlock.Completion.ContinueWith(t => block3.Complete());
    }

    return input;
});

所以问题是,这是让它工作的唯一方法吗?

顺便说一句,这已在我的单元测试中运行,其中运行了一个数据项以尝试调试管道行为。每个块都已通过多个单元测试单独测试。所以在我的流水线单元测试中发生的是断言在块完成执行之前被命中,因此失败。

如果我删除 block2 和 block3 链接并使用 linkOptions 调试测试,它可以正常工作。

【问题讨论】:

  • 你能发布重现问题的真实代码吗?你的“这样的东西”甚至不会编译。
  • TPL 数据流,如果用于基于角色的编程。看来您正在尝试使用它,而不是仅使用 if 语句编写异步方法。
  • 嗯,这只是给我带来麻烦的管道的一部分。但是,在尝试提出一个我可以发布的完整测试示例时,我发现我遇到的问题与在我的块中使用异步委托更相关。我会在进行更多调查后发布更多信息。

标签: c# unit-testing task-parallel-library


【解决方案1】:

您的问题不在于您问题中的代码,它可以正常工作:当主块完成时,所有三个后续块也都标记为完成。

问题出在结束块上:您在那里也使用了PropagateCompletion,这意味着当前三个块中的任何块完成时,结束块被标记为完成。您想要的是在所有三个块都完成时将其标记为完成,并且您的答案中的 Task.WhenAll().ContinueWith() 组合可以做到这一点(尽管该 sn-p 的第一部分是不必要的,但它的作用与 PropagateCompletion 完全相同)。

事实证明,链接选项传播(至少这是我的猜测)将为不满足 linkTo 中的谓词的块传播完成。

是的,它总是传播完成。完成没有任何关联的项目,因此将谓词应用于它没有任何意义。也许您总是只有一件物品(这并不常见)这一事实让您更加困惑?

如果我的猜测是正确的,我觉得这是链接选项完成传播中的错误或设计错误。如果一个块从未使用过,为什么它应该是完整的?

为什么不应该呢?对我来说,这很有意义:即使这次没有带有Status.Delayed 的项目,您仍然希望完成处理这些项目的块,以便任何后续代码都可以知道所有延迟的项目都已处理.没有任何东西的事实并不重要。


无论如何,如果您经常遇到这种情况,您可能希望创建一个辅助方法,将多个源块同时链接到单个目标块并正确传播完成:

public static void LinkTo<T>(
    this IReadOnlyCollection<ISourceBlock<T>> sources, ITargetBlock<T> target,
    bool propagateCompletion)
{
    foreach (var source in sources)
    {
        source.LinkTo(target);
    }

    if (propagateCompletion)
        Task.WhenAll(sources.Select(source => source.Completion))
            .ContinueWith(_ => target.Complete());
}

用法:

new[] { block1, block2, block3 }.LinkTo(endBlock, propagateCompletion: true);

【讨论】:

  • 感谢您解决这个问题。我想你解释下游块完成的方式是有道理的,即使它们从未有任何数据流过它们。我想我是数据流块的新手,我描绘了一个工作方式不同的“管道”,其中只有数据项采用的路径是“完整的”。
【解决方案2】:

好的。所以我必须首先感谢科里。当我第一次阅读他的评论时,我有点生气,因为我觉得我的代码很好地说明了这个概念,并且可以很容易地变成一个工作版本。但无论如何,我觉得有必要做一个完整的可测试版本,因为他的评论我可以发布。

在我的测试中,令人惊讶的部分是即使它模仿了我的真实代码,我认为会失败的路径通过了,而我认为会通过的路径失败了。这让我有点头晕。所以我开始对原始代码做更多的排列。基本上,我创建了同步的块和异步的块,并制作了两种管道。总共四个,2 个同步和 2 个异步,每个使用链接选项传播,另一个使用 MainBlock 中的完成任务,如图所示。

在为异步任务添加一些任务延迟后,我发现同步版本通过了测试,而异步版本失败了。

因此,问题的最终解决方案并非上述任何一种。事实证明,链接选项传播(至少这是我的猜测)将为不满足 linkTo 中的谓词的块传播完成。因此,当状态为 Complete 的事物下降时,它会转到 Block1。

哦,我应该在完整的测试代码中指出,我让所有块 1,2 和 3 都连接到同一个 EndBlock,这在原始代码中没有显示。

无论如何,在满足谓词并且事物进入 Block1 之后,我相信第 2 和第 3 块将完成。这会导致我们在单元测试中等待的 EndBlock 完成,并且 Assert 失败,因为 Block1 还没有完成它的工作。

如果我的猜测是正确的,我觉得这是链接选项完成传播中的错误或设计错误。如果一个块从未使用过,为什么它应该是完整的?

所以,这就是我为解决问题所做的。我取出链接选项并手动连接完成事件。像这样:

MainBlock.Completion.ContinueWith(t =>
{
Block1.Complete();
Block2.Complete();
Block3.Complete();
});

Task.WhenAll(Block1.Completion, Block2.Completion, Block3.Completion)
.ContinueWith(t =>
{
    EndBlock.Complete();
});

这很好用,当移动到我的真实代码时也能正常工作。 Task.WhenAll 让我相信未使用的块已设置为完成,以及为什么自动传播是问题所在。

我希望这对某人有所帮助。当我发布所有测试代码时,我会回来添加一个链接。

编辑: 这是测试代码要点的链接https://gist.github.com/jmichas/bfab9cec84f0d1e40e12

【讨论】:

  • 我将其标记为答案,因为它是答案,但 svicks 的解释清除了我的一些问题,并通过将传播完成和所有内容时的任务结合起来提供了一种更简洁的方法。跨度>
猜你喜欢
  • 1970-01-01
  • 2012-08-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-08-29
相关资源
最近更新 更多