【问题标题】:How do I signal completion of my dataflow?如何发出数据流完成的信号?
【发布时间】:2015-11-04 09:27:01
【问题描述】:

我有一个使用 TPL 数据流实现由 3 个步骤组成的数据流的类。

在构造函数中,我将步骤创建为 TransformBlocks,并使用 LinkTo 将它们链接起来,并将 DataflowLinkOptions.PropagateCompletion 设置为 true。该类公开了一个方法,该方法通过在第一步调用 SendAsync 来启动工作流。该方法返回工作流最后一步的“完成”属性。

目前,工作流中的步骤似乎按预期执行,但最后一步永远不会完成,除非我明确调用 Complete。但是这样做会缩短工作流程并且没有执行任何步骤?我做错了什么?

public class MessagePipeline {
   private TransformBlock<object, object> step1;
   private TransformBlock<object, object> step2;
   private TransformBlock<object, object> step3;

   public MessagePipeline() {
      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
      step1 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step1...");
        return x;
      });
      step2 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step2...");
        return x;
      });
      step3 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step3...");
        return x;
      });

      step1.LinkTo(step2, linkOptions);
      step2.LinkTo(step3, linkOptions);
   }

   public Task Push(object message) {
      step1.SendAsync(message);
      step1.Complete();
      return step3.Completion;
   }
}
...
public class Program {
  public static void Main(string[] args) {
    var pipeline = new MessagePipeline();
    var result = pipeline.Push("Hello, world!");
    result.ContinueWith(_ => Console.WriteLine("Completed"));
    Console.ReadLine();
  }
}

【问题讨论】:

  • 这是数据流,不是工作流
  • @PanagiotisKanavos:感谢您的更正;-)
  • 代码似乎与问题无关。调用Complete() 的代码在哪里?您在哪里/如何等待完成?步骤是如何关联的?
  • @PanagiotisKanavos:好的。我现在把代码放好了。
  • 问题是 step3 永远不会完成 - 它是一个 TransformBlock 并且没有人检索到它的输出消息。因此,该块永远无法完成。应该是ActionBlock

标签: c# task-parallel-library tpl-dataflow


【解决方案1】:

当您链接这些步骤时,您需要传递一个将PropagateCompletion 属性设置为 true 的 DataflowLinkOptions 以传播完成和错误。一旦你这样做了,在第一个块上调用 Complete() 将促进下游块的完成。

一旦块接收到完成事件,它就会完成处理,然后通知其链接的下游目标。

这样您就可以将所有数据发布到第一步并致电Complete()。最终区块只有在所有上游区块都完成后才会完成。

例如,

var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
myFirstBlock.LinkTo(mySecondBlock,linkOptions);
mySecondBlock.LinkTo(myFinalBlock,linkOptions);

foreach(var message in messages)
{
    myFirstBlock.Post(message);
}
myFirstBlock.Complete();
......
await myFinalBlock.Completion;

PropagateCompletion 默认情况下不正确,因为在更复杂的场景(例如非线性流或动态变化的流)中,您不希望完成和错误自动传播。如果您想在不终止整个流程的情况下处理错误,您可能还希望避免自动完成。

当 TPL 数据流处于测试阶段时,默认 true 但在 RTM 上已更改

更新

代码永远不会完成,因为最后一步是TransformBlock,没有链接目标来接收其输出。这意味着即使块收到了完成信号,它还没有完成所有的工作,不能改变自己的完成状态。

将其更改为 ActionBlock&lt;object&gt; 即可解决此问题。

【讨论】:

  • 谢谢。可悲的是,这并没有像我预期的那样工作。我尝试过使用 Post 而不是我最初使用的 SendAsync(我现在添加了一些代码),但这并没有改变任何东西。所有步骤都已执行,但最后一步完成时的状态显示为 WaitingForActivation。如果在它上面调用 Wait() 我会被无限期地阻止......
  • 您还没有发布相关代码,因此无法提供帮助。这与 completion 无关,但是,如果您在第一步调用 Complete 并启用传播,则完成 传播到最后一步。使用 Post 还是 SendAsync 也无关紧要。另一方面,您不能指望在发布到第一步后立即完成最后一步,它可能还没有收到任何数据
  • 或者,我可以将其保留为转换块,而不是返回 Completion,而是返回 ReceiveAsync。
  • 如果我可以超级支持这篇文章我会的,我对我看到的行为感到非常困惑,直到我读到你的更新调用 ActionBlock 应该用作终结者而不是 Transform 因为它想要传播.说的很有道理
【解决方案2】:

您需要显式调用 Complete。

【讨论】:

  • 我是否按顺序在 3 个步骤上调用 Complete?只是第一个还是最后一个?
  • 调用Complete 是不够的,必须在链接选项中指定完成传播。完成后,在第一个块上调用 Complete 将传播到所有下游块
猜你喜欢
  • 1970-01-01
  • 2015-11-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多