【问题标题】:Using async/await and yield return with TPL Dataflow在 TPL 数据流中使用 async/await 和 yield return
【发布时间】:2016-05-24 03:40:56
【问题描述】:

我正在尝试使用TPL Dataflow 实现数据处理管道。但是,我对数据流比较陌生,不完全确定如何正确使用它来解决我要解决的问题。

问题

我正在尝试遍历文件列表并处理每个文件以读取一些数据,然后进一步处理该数据。每个文件的大小大致为700MB1GB。每个文件都包含JSON 数据。为了并行处理这些文件而不是耗尽内存,我尝试将IEnumerable<>yield return 一起使用,然后进一步处理数据。

获得文件列表后,我想一次最多并行处理 4-5 个文件。我的困惑来自:

  • 如何将IEnumerable<>yeild returnasync/await 和数据流一起使用。通过svick 遇到this answer,但仍然不确定如何将IEnumerable<> 转换为ISourceBlock,然后将所有块链接在一起并跟踪完成情况。
  • 就我而言,producer 会非常快(遍历文件列表),但consumer 会非常慢(处理每个文件 - 读取数据,反序列化 JSON)。在这种情况下,如何跟踪完成情况。
  • 我应该使用数据块的LinkTo 功能来连接各个块吗?或使用OutputAvailableAsync()ReceiveAsync()等方法将数据从一个块传播到另一个块。

代码

private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
    PrepareDataflow(token);
    var bufferTask = ListFilesAsync(_fileBufferBlock, token);

    var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
    return Task.WhenAll(tasks);
}

private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
    ...
    // Get list of file Uris
    ...
    foreach(var fileNameUri in fileNameUris)
        await targetBlock.SendAsync(fileNameUri, token);

    targetBlock.Complete();
}

private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
    var httpClient = new HttpClient();
    try
    {
        using (var stream = await httpClient.GetStreamAsync(fileNameUri))
        using (var sr = new StreamReader(stream))
        using (var jsonTextReader = new JsonTextReader(sr))
        {
            while (jsonTextReader.Read())
            {
                if (jsonTextReader.TokenType == JsonToken.StartObject)
                {
                    try
                    {
                        var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
                        await _messageBufferBlock.SendAsync(data, token);
                    }
                    catch (Exception ex)
                    {
                        _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
                    }
                }
            }
        }
    }
    catch(Exception ex)
    {
        // Should throw?
        // Or if converted to block then report using Fault() method?
    }
    finally
    {
        httpClient.Dispose();
        buffer.Complete();
    }
}

private void PrepareDataflow(CancellationToken token)
{
    _fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
    {
        CancellationToken = token
    });

    var actionExecuteOptions = new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = ProcessingSize,
        MaxMessagesPerTask = 1,
        MaxDegreeOfParallelism = ProcessingSize
    };
    _processingBlock = new ActionBlock<string>(async fileName =>
    {
        try
        {
            await ProcessFileAsync(fileName, token);
        }
        catch (Exception ex)
        {
            _logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
            // Should fault the block?
        }
    }, actionExecuteOptions);

    _fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });

    _messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = 50000
    });
    _messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}

在上面的代码中,我没有使用IEnumerable&lt;DataType&gt;yield return,因为我不能将它与async/await 一起使用。所以我将输入缓冲区链接到ActionBlock&lt;DataType&gt;,后者又发布到另一个队列。但是,通过使用ActionBlock&lt;&gt;,我无法将其链接到下一个块进行处理,并且必须手动将Post/SendAsyncActionBlock&lt;&gt;BufferBlock&lt;&gt;。另外,在这种情况下,不确定如何跟踪完成情况。

此代码有效,但是,我确信可能有比这更好的解决方案,我可以链接所有块(而不是 ActionBlock&lt;DataType&gt;,然后将消息从它发送到 BufferBlock&lt;DataType&gt;

另一种选择是使用RxIEnumerable&lt;&gt; 转换为IObservable&lt;&gt;,但我对Rx 不太熟悉,也不知道如何混合使用TPL DataflowRx

【问题讨论】:

  • 您的处理受 CPU 限制。因此,异步 IO 毫无意义。它不会为您节省一毫秒的处理时间。异步删除所有内容,问题就变得简单了。
  • @usr:我没有仔细研究过这个具体场景;这个问题的表述过于笼统,并没有提供一个很好的minimal reproducible example,可以让人们真正完全理解上下文。很可能这里的异步操作没有用。但是,恕我直言,认为仅仅因为处理受 CPU 限制,异步 I/O 就“毫无意义”是一种谬论。异步操作提供了独立于可能的性能优势的架构优势,缺少后者并不排除前者的可能性。
  • @PeterDuniho 有哪些架构优势?您始终可以使用线程模拟任何形式的并发性或并行性。异步 IO 的唯一要点是无线程(并且在异步 IO 和等待的情况下,GUI 场景非常棒)。然而,代码质量的损失是显着的。
  • 要避免关闭它。不知何故,我认为这个问题有一个很好的核心。由于它是新颖的材料,而不是每天 100 个死记硬背的异步问题(“哦,我的应用程序被锁住了,因为我调用了 Result 或 Wait!”),我会给这个怀疑的好处。 @PeterDuniho
  • “异步 IO 的唯一要点是无线程”——我想我们必须同意不同意。首先,异步 I/O 甚至不是“无线程的”;它只是碰巧使用了 IOCP 线程池,而不是需要额外的显式创建的线程。其次,C# 中的async 习惯用法提供了一种非常好的、简洁的方式来以几乎非异步的形式实现异步代码,无论性能优势如何,这都是有用的。 YMMV。

标签: c# async-await ienumerable tpl-dataflow yield-return


【解决方案1】:

问题 1

您可以直接在消费者块上使用PostSendAsyncIEnumerable&lt;T&gt; 生产者插入到您的TPL 数据流链中,如下所示:

foreach (string fileNameUri in fileNameUris)
{
    await _processingBlock.SendAsync(fileNameUri).ConfigureAwait(false);
}

您也可以使用BufferBlock&lt;TInput&gt;,但在您的情况下,它实际上似乎相当不必要(甚至有害 - 请参阅下一部分)。

问题 2

您什么时候更喜欢SendAsync 而不是Post?如果您的生产者运行速度快于 URI 可以处理的速度(并且您已经指出是这种情况),并且您选择给您的 _processingBlock 一个 BoundedCapacity,那么当块的内部缓冲区达到指定容量时,您的 @ 987654330@ 将“挂起”,直到缓冲区插槽被释放,并且您的 foreach 循环将被限制。这种反馈机制会产生背压并确保您不会耗尽内存。

问题 3

您绝对应该使用LinkTo 方法在大多数 情况下链接您的块。不幸的是,由于IDisposable 和非常大(可能)的序列的相互作用,你的情况是一个极端情况。所以你的完成将在缓冲区和处理块之间自动流动(由于LinkTo),但在那之后 - 你需要手动传播它。这很棘手,但可行。

我将通过一个“Hello World”示例来说明这一点,其中生产者迭代每个字符,而消费者(这真的很慢)将每个字符输出到调试窗口。

注意:LinkTo 不存在。

// REALLY slow consumer.
var consumer = new ActionBlock<char>(async c =>
{
    await Task.Delay(100);

    Debug.Print(c.ToString());
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

var producer = new ActionBlock<string>(async s =>
{
    foreach (char c in s)
    {
        await consumer.SendAsync(c);

        Debug.Print($"Yielded {c}");
    }
});

try
{
    producer.Post("Hello world");
    producer.Complete();

    await producer.Completion;
}
finally
{
    consumer.Complete();
}

// Observe combined producer and consumer completion/exceptions/cancellation.
await Task.WhenAll(producer.Completion, consumer.Completion);

这个输出:

产生的 H H 产量 e 产量 l l 产量 l l 产量 ○ 产量 产量 w w 产量 ○ 产量 r 产量 l l 产量d d

从上面的输出可以看出,生产者受到限制,块之间的切换缓冲区永远不会变得太大。

编辑

您可能会发现通过传播完成更简洁

producer.Completion.ContinueWith(
    _ => consumer.Complete(), TaskContinuationOptions.ExecuteSynchronously
);

...在producer 定义之后。这可以让您稍微减少生产者/消费者的耦合 - 但最后你仍然必须记住观察Task.WhenAll(producer.Completion, consumer.Completion)

【讨论】:

    【解决方案2】:

    Rx 值得一看。除非我遗漏了您需要的整个代码(除了您现有的 ProcessFileAsync 方法),否则您的代码将如下所示:

    var query =
        fileNameUris
            .Select(fileNameUri =>
                Observable
                    .FromAsync(ct => ProcessFileAsync(fileNameUri, ct)))
            .Merge(maxConcurrent : 4);
    
    var subscription =
        query
            .Subscribe(
                u => { },
                () => { Console.WriteLine("Done."); });
    

    完成。它是异步运行的。可致电subscription.Dispose(); 取消。并且您可以指定最大并行度。

    【讨论】:

      【解决方案3】:

      为了并行处理这些文件而不是耗尽内存,我尝试将 IEnumerable 与 yield return 一起使用,然后进一步处理数据。

      我认为这一步没有必要。您实际上在这里避免的只是文件名列表。即使您有 数百万 个文件,文件名列表也不会占用大量内存。

      我将输入缓冲区链接到 ActionBlock,后者又发布到另一个队列。但是,通过使用 ActionBlock,我无法将其链接到下一个块进行处理,并且必须手动将 ActionBlock 的 Post/SendAsync 发送到 BufferBlock。另外,在这种情况下,不确定如何跟踪完成情况。

      ActionBlock&lt;TInput&gt; 是“行尾”块。它只接受输入,不产生任何输出。在你的情况下,你不想要ActionBlock&lt;TInput&gt;;你想要TransformManyBlock&lt;TInput, TOutput&gt;,它接受输入,在其上运行一个函数,并产生输出(每个输入项有任意数量的输出项)。

      要记住的另一点是 所有 缓冲区块都有一个输入缓冲区。所以多余的BufferBlock 是不必要的。

      最后,如果您已经处于“数据流领域”,通常最好以实际执行某些操作的数据流块结束(例如,ActionBlock 而不是 BufferBlock)。在这种情况下,您可以BufferBlock 用作有界生产者/消费者队列,其中一些其他代码正在使用结果。就个人而言,我认为将消费代码重写为ActionBlock 的操作可能会更简洁,但让消费者独立于数据流也可能更简洁。对于下面的代码,我保留了最终的有界BufferBlock,但如果您使用此解决方案,请考虑将最终块更改为有界ActionBlock

      private const int ProcessingSize= 4;
      private static readonly HttpClient HttpClient = new HttpClient();
      private TransformBlock<string, DataType> _processingBlock;
      private BufferBlock<DataType> _messageBufferBlock;
      
      public Task ProduceAsync()
      {
        PrepareDataflow(token);
        ListFiles(_fileBufferBlock, token);
        _processingBlock.Complete();
        return _processingBlock.Completion;
      }
      
      private void ListFiles(ITargetBlock<string> targetBlock, CancellationToken token)
      {
        ... // Get list of file Uris, occasionally calling token.ThrowIfCancellationRequested()
        foreach(var fileNameUri in fileNameUris)
          _processingBlock.Post(fileNameUri);
      }
      
      private async Task<IEnumerable<DataType>> ProcessFileAsync(string fileNameUri, CancellationToken token)
      {
        return Process(await HttpClient.GetStreamAsync(fileNameUri), token);
      }
      
      private IEnumerable<DataType> Process(Stream stream, CancellationToken token)
      {
        using (stream)
        using (var sr = new StreamReader(stream))
        using (var jsonTextReader = new JsonTextReader(sr))
        {
          while (jsonTextReader.Read())
          {
            token.ThrowIfCancellationRequested();
            if (jsonTextReader.TokenType == JsonToken.StartObject)
            {
              try
              {
                yield _jsonSerializer.Deserialize<DataType>(jsonTextReader);
              }
              catch (Exception ex)
              {
                _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
              }
            }
          }
        }
      }
      
      private void PrepareDataflow(CancellationToken token)
      {
        var executeOptions = new ExecutionDataflowBlockOptions
        {
          CancellationToken = token,
          MaxDegreeOfParallelism = ProcessingSize
        };
        _processingBlock = new TransformManyBlock<string, DataType>(fileName =>
            ProcessFileAsync(fileName, token), executeOptions);
      
        _messageBufferBlock = new BufferBlock<DataType>(new DataflowBlockOptions
        {
          CancellationToken = token,
          BoundedCapacity = 50000
        });
      }
      

      或者,您可以使用 Rx。不过,学习 Rx 可能相当困难,尤其是对于混合的异步 并行数据流情况,您可以在这里找到。

      至于你的其他问题:

      如何使用 IEnumerable 并通过 async/await 和数据流返回 yield。

      asyncyield 根本不兼容。至少在今天的语言中。在您的情况下,JSON 读取器无论如何都必须从流中同步读取(它们不支持异步读取),因此实际的流处理是同步的,可以与yield 一起使用。执行初始来回获取流本身仍然可以是异步的,并且可以与async 一起使用。这是我们今天所能得到的一样好,直到 JSON 阅读器支持异步阅读并且语言支持async yield。 (Rx 今天可以做一个“异步输出”,但是 JSON 读取器仍然不支持异步读取,所以在这种特殊情况下它没有帮助)。

      在这种情况下,如何跟踪完成情况。

      如果 JSON 读取器确实支持异步读取,那么上面的解决方案将不是最好的解决方案。在这种情况下,您想要使用手动SendAsync 调用,并且只需要链接这些块的完成,可以这样做:

      _processingBlock.Completion.ContinueWith(
          task =>
          {
            if (task.IsFaulted)
              ((IDataflowBlock)_messageBufferBlock).Fault(task.Exception);
            else if (!task.IsCanceled)
              _messageBufferBlock.Complete();
          },
          CancellationToken.None,
          TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
          TaskScheduler.Default);
      

      我应该使用数据块的 LinkTo 功能来连接各个块吗?或使用 OutputAvailableAsync() 和 ReceiveAsync() 等方法将数据从一个块传播到另一个块。

      尽可能使用LinkTo。它为您处理所有极端情况。

      // 应该扔吗? // 应该让块出错吗?

      这完全取决于您。默认情况下,当任何项目的任何处理失败时,块都会出错,如果你正在传播完成,整个块链都会出错。

      故障块相当严重;他们丢弃任何正在进行的工作并拒绝继续处理。如果要重试,则必须构建新的数据流网格。

      如果您更喜欢“更温和”的错误策略,您可以catch 异常并执行类似记录它们的操作(您的代码当前会这样做),或者您可以更改数据流块的性质以传递异常作为数据项。

      【讨论】:

      • 问题:为什么我不能使用SendAsync而必须在这里使用Post
      • @TejasVora:让我扭转局面。在这种情况下,SendAsyncPost 有什么好处?
      • 没什么特别的。只是一个信息性问题。
      猜你喜欢
      • 2018-06-05
      • 2013-10-11
      • 2017-05-23
      • 2020-10-23
      • 2019-05-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多