【发布时间】:2016-05-24 03:40:56
【问题描述】:
我正在尝试使用TPL Dataflow 实现数据处理管道。但是,我对数据流比较陌生,不完全确定如何正确使用它来解决我要解决的问题。
问题:
我正在尝试遍历文件列表并处理每个文件以读取一些数据,然后进一步处理该数据。每个文件的大小大致为700MB 到1GB。每个文件都包含JSON 数据。为了并行处理这些文件而不是耗尽内存,我尝试将IEnumerable<> 与yield return 一起使用,然后进一步处理数据。
获得文件列表后,我想一次最多并行处理 4-5 个文件。我的困惑来自:
- 如何将
IEnumerable<>和yeild return与async/await和数据流一起使用。通过svick 遇到this answer,但仍然不确定如何将IEnumerable<>转换为ISourceBlock,然后将所有块链接在一起并跟踪完成情况。 - 就我而言,
producer会非常快(遍历文件列表),但consumer会非常慢(处理每个文件 - 读取数据,反序列化JSON)。在这种情况下,如何跟踪完成情况。 - 我应该使用数据块的
LinkTo功能来连接各个块吗?或使用OutputAvailableAsync()和ReceiveAsync()等方法将数据从一个块传播到另一个块。
代码:
private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;
public Task ProduceAsync()
{
PrepareDataflow(token);
var bufferTask = ListFilesAsync(_fileBufferBlock, token);
var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
return Task.WhenAll(tasks);
}
private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
...
// Get list of file Uris
...
foreach(var fileNameUri in fileNameUris)
await targetBlock.SendAsync(fileNameUri, token);
targetBlock.Complete();
}
private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
var httpClient = new HttpClient();
try
{
using (var stream = await httpClient.GetStreamAsync(fileNameUri))
using (var sr = new StreamReader(stream))
using (var jsonTextReader = new JsonTextReader(sr))
{
while (jsonTextReader.Read())
{
if (jsonTextReader.TokenType == JsonToken.StartObject)
{
try
{
var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
await _messageBufferBlock.SendAsync(data, token);
}
catch (Exception ex)
{
_logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
}
}
}
}
}
catch(Exception ex)
{
// Should throw?
// Or if converted to block then report using Fault() method?
}
finally
{
httpClient.Dispose();
buffer.Complete();
}
}
private void PrepareDataflow(CancellationToken token)
{
_fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
{
CancellationToken = token
});
var actionExecuteOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = ProcessingSize,
MaxMessagesPerTask = 1,
MaxDegreeOfParallelism = ProcessingSize
};
_processingBlock = new ActionBlock<string>(async fileName =>
{
try
{
await ProcessFileAsync(fileName, token);
}
catch (Exception ex)
{
_logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
// Should fault the block?
}
}, actionExecuteOptions);
_fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
_messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = 50000
});
_messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}
在上面的代码中,我没有使用IEnumerable<DataType> 和yield return,因为我不能将它与async/await 一起使用。所以我将输入缓冲区链接到ActionBlock<DataType>,后者又发布到另一个队列。但是,通过使用ActionBlock<>,我无法将其链接到下一个块进行处理,并且必须手动将Post/SendAsync 从ActionBlock<> 到BufferBlock<>。另外,在这种情况下,不确定如何跟踪完成情况。
此代码有效,但是,我确信可能有比这更好的解决方案,我可以链接所有块(而不是 ActionBlock<DataType>,然后将消息从它发送到 BufferBlock<DataType>)
另一种选择是使用Rx 将IEnumerable<> 转换为IObservable<>,但我对Rx 不太熟悉,也不知道如何混合使用TPL Dataflow 和Rx
【问题讨论】:
-
您的处理受 CPU 限制。因此,异步 IO 毫无意义。它不会为您节省一毫秒的处理时间。异步删除所有内容,问题就变得简单了。
-
@usr:我没有仔细研究过这个具体场景;这个问题的表述过于笼统,并没有提供一个很好的minimal reproducible example,可以让人们真正完全理解上下文。很可能这里的异步操作没有用。但是,恕我直言,认为仅仅因为处理受 CPU 限制,异步 I/O 就“毫无意义”是一种谬论。异步操作提供了独立于可能的性能优势的架构优势,缺少后者并不排除前者的可能性。
-
@PeterDuniho 有哪些架构优势?您始终可以使用线程模拟任何形式的并发性或并行性。异步 IO 的唯一要点是无线程(并且在异步 IO 和等待的情况下,GUI 场景非常棒)。然而,代码质量的损失是显着的。
-
要避免关闭它。不知何故,我认为这个问题有一个很好的核心。由于它是新颖的材料,而不是每天 100 个死记硬背的异步问题(“哦,我的应用程序被锁住了,因为我调用了 Result 或 Wait!”),我会给这个怀疑的好处。 @PeterDuniho
-
“异步 IO 的唯一要点是无线程”——我想我们必须同意不同意。首先,异步 I/O 甚至不是“无线程的”;它只是碰巧使用了 IOCP 线程池,而不是需要额外的显式创建的线程。其次,C# 中的
async习惯用法提供了一种非常好的、简洁的方式来以几乎非异步的形式实现异步代码,无论性能优势如何,这都是有用的。 YMMV。
标签: c# async-await ienumerable tpl-dataflow yield-return