【问题标题】:TPL Dataflow, can I query whether a data block is marked complete but has not yet completed?TPL Dataflow,我可以查询数据块是否标记为完成但尚未完成?
【发布时间】:2012-11-28 13:50:08
【问题描述】:

鉴于以下情况:

BufferBlock<int> sourceBlock = new BufferBlock<int>();
TransformBlock<int, int> targetBlock = new TransformBlock<int, int>(element =>
{
    return element * 2;
});

sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

//feed some elements into the buffer block
for(int i = 1; i <= 1000000; i++)
{
    sourceBlock.SendAsync(i);
}

sourceBlock.Complete();

targetBlock.Completion.ContinueWith(_ =>
{
    //notify completion of the target block
});

targetBlock 似乎永远不会完成,我认为原因是TransformBlock targetBlock 中的所有项目都在输出队列中等待,因为我没有将targetBlock 链接到任何其他数据流块。但是,我真正想要实现的是当 (A) targetBlock 被通知完成并且 (B) 输入队列为空时的通知。我不想关心项目是否仍然位于TransformBlock 的输出队列中。我该怎么办?获得我想要查询sourceBlock 的完成状态并确保targetBlockInputCount 为零的唯一方法是什么?我不确定这是否非常稳定(sourceBlock 是否真的只有在sourceBlock 中的最后一项已传递给targetBlock 时才标记为已完成?)。有没有更优雅、更有效的方法来实现相同的目标?

编辑:我刚刚注意到,即使是“肮脏”的方式来检查 sourceBlockInputCount 的完成情况,targetBlock 为零也并非易事。那个街区会坐在哪里?它不能在targetBlock 内,因为一旦满足以上两个条件,显然targetBlock 内将不再处理任何消息。还要检查sourceBlock 的完成状态会带来很多低效率。

【问题讨论】:

  • 你为什么需要知道这个?
  • 因为一旦 targetBlock 中的所有项目完成处理,即使项目保留在 outQueue 中,我也会完成一个进程。一个原因是该过程的完成,另一个原因是延迟、吞吐量性能的测量。链接的下一个数据块可能需要更长的时间来处理 outQueue 中的所有剩余项目
  • 为什么不等待源块的任务呢?您不需要附加一个块,只需在源块的Completion 属性上继续。

标签: c# concurrency task-parallel-library message-passing tpl-dataflow


【解决方案1】:

我相信你不能直接这样做。您可以使用反射从某些 private 字段中获取此信息,但我不建议这样做。

但是您可以通过创建自定义块来做到这一点。在Complete() 的情况下,它很简单:只需创建一个将每个方法转发到原始块的块。除了Complete(),它还会记录它。

在确定所有项目的处理何时完成的情况下,您可以将您的块链接到中间BufferBlock。这样,输出队列将被快速清空,因此检查内部块的Completed 将为您提供相当准确的处理何时完成的测量。这会影响您的测量结果,但希望不会显着。

另一种选择是在块的委托末尾添加一些日志记录。这样,您可以看到最后一个项目的处理完成时间。

【讨论】:

    【解决方案2】:

    如果TransformBlock 有一个ProcessingCompleted 事件会在块完成处理其队列中的所有消息时触发,但没有这样的事件,那就太好了。以下是纠正这一遗漏的尝试。 CreateTransformBlockEx 方法接受一个 Action&lt;Exception&gt; 处理程序,该处理程序在此“事件”发生时被调用。

    目的是始终在块最终完成之前调用处理程序。不幸的是,在提供的CancellationToken 被取消的情况下,完成(取消)首先发生,并且处理程序在几毫秒后被调用。要解决这种不一致,需要一些棘手的解决方法,并且可能会产生其他不需要的副作用,所以我将保持原样。

    public static IPropagatorBlock<TInput, TOutput>
        CreateTransformBlockEx<TInput, TOutput>(Func<TInput, Task<TOutput>> transform,
        Action<Exception> onProcessingCompleted,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
    {
        if (onProcessingCompleted == null)
            throw new ArgumentNullException(nameof(onProcessingCompleted));
        dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
    
        var transformBlock = new TransformBlock<TInput, TOutput>(transform,
            dataflowBlockOptions);
        var bufferBlock = new BufferBlock<TOutput>(dataflowBlockOptions);
    
        transformBlock.LinkTo(bufferBlock);
        PropagateCompletion(transformBlock, bufferBlock, onProcessingCompleted);
        return DataflowBlock.Encapsulate(transformBlock, bufferBlock);
    
        async void PropagateCompletion(IDataflowBlock block1, IDataflowBlock block2,
            Action<Exception> completionHandler)
        {
            try
            {
                await block1.Completion.ConfigureAwait(false);
            }
            catch { }
            var exception = 
                block1.Completion.IsFaulted ? block1.Completion.Exception : null;
            try
            {
                // Invoke the handler before completing the second block
                completionHandler(exception);
            }
            finally
            {
                if (exception != null) block2.Fault(exception); else block2.Complete();
            }
        }
    }
    
    // Overload with synchronous lambda
    public static IPropagatorBlock<TInput, TOutput>
        CreateTransformBlockEx<TInput, TOutput>(Func<TInput, TOutput> transform,
        Action<Exception> onProcessingCompleted,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
    {
        return CreateTransformBlockEx<TInput, TOutput>(
            x => Task.FromResult(transform(x)), onProcessingCompleted,
            dataflowBlockOptions);
    }
    

    当使用PropagateCompletion = true 选项调用时,本地函数PropagateCompletion 的代码模仿了LinkTo 内置方法的source code

    使用示例:

    var httpClient = new HttpClient();
    var downloader = CreateTransformBlockEx<string, string>(async url =>
    {
        return await httpClient.GetStringAsync(url);
    }, onProcessingCompleted: ex =>
    {
        Console.WriteLine($"Download completed {(ex == null ? "OK" : "Error")}");
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10
    });
    

    【讨论】:

      【解决方案3】:

      首先,将 IPropagator 块用作叶终端是不正确的。但是您的要求仍然可以通过异步检查 TargetBlock 的输出缓冲区中的输出消息然后消费然后清空缓冲区来满足。

          `  BufferBlock<int> sourceBlock = new BufferBlock<int>();
             TransformBlock<int, int> targetBlock = new TransformBlock<int, int> 
             (element =>
             {
      
              return element * 2;
              });
              sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { 
              PropagateCompletion = true });
      
              //feed some elements into the buffer block
              for (int i = 1; i <= 100; i++)
              {
                   sourceBlock.SendAsync(i);
              }
      
              sourceBlock.Complete();
      
              bool isOutputAvailable = await targetBlock.OutputAvailableAsync();
              while(isOutputAvailable)
              {
                  int value = await targetBlock.ReceiveAsync();
      
                  isOutputAvailable = await targetBlock.OutputAvailableAsync();
              }
      
      
              await targetBlock.Completion.ContinueWith(_ =>
              {
                  Console.WriteLine("Target Block Completed");//notify completion of the target block
              });
      

      `

      【讨论】:

      • 如果你想清空TransformBlock的缓冲区,你可以像这样链接到NullTargettargetBlock.LinkTo(DataflowBlock.NullTarget&lt;int&gt;());
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多