【发布时间】:2017-03-13 10:10:05
【问题描述】:
在将 TPL 数据流移植到我的生产代码之前,我正在试验它。 生产代码是经典的生产者/消费者系统 - 生产者生产消息(与金融领域相关),消费者处理这些消息。
我感兴趣的是,如果在某些时候生产者的生产速度远远超过消费者的处理速度(系统会崩溃,或者会发生什么),更重要的是该怎么做,那么环境将如何保持稳定在那些情况下。
所以为了尝试类似的简单应用程序,我想出了以下内容。
var bufferBlock = new BufferBlock<Item>();
var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
,
BoundedCapacity = 100000
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
executiondataflowBlockOptions);
bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
bufferBlock.SendAsync(GenerateItem());
}
bufferBlock.Complete();
Console.ReadLine();
Item 是一个非常简单的类
internal class Item
{
public Item(string itemId)
{
ItemId = itemId;
}
public string ItemId { get; }
}
GenerateItem 只是新闻Item
static Item GenerateItem()
{
return new Item(Guid.NewGuid().ToString());
}
现在,为了模仿没那么快消费者 - 我让ProcessItem 等待100ms。
static async Task ProcessItem(Item item)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"Processing #{item.ItemId} item.");
}
执行此操作会在 20 秒左右导致 OOM 异常。
然后我继续添加更多消费者(更多 ActionBlocks 最多 10 个),这赢得了更多时间,但最终导致相同的 OOM 异常。
我还注意到 GC 承受着巨大的压力(VS 2015 诊断工具显示 GC 几乎一直在运行),所以我为Item 引入了对象池(非常简单,本质上是ConcurrentBag 存储项目) ,但我仍然碰壁(抛出 OOM 异常)。
详细说明内存中的内容,以及内存不足的原因。
- 最大尺寸有
SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item>和ConcurrentQueue+Segment<TplDataFlow.Item>类型的对象 - 我看到
BufferBlock的 InputBuffer 中充满了Items (Count=14,562,296) - 由于我为
ActionBlock(s) 设置了BoundedCapacity,它们的输入缓冲区也接近配置的数字 (InputCount=99,996)
为了确保较慢的生产者可以让消费者跟上,我让生产者在迭代之间休眠:
for (int i = 0; i < int.MaxValue; i++)
{
Thread.Sleep(TimeSpan.FromMilliseconds(50));
bufferBlock.SendAsync(GenerateItem());
}
它工作正常 - 没有抛出异常,内存使用率一直很低,我看不到任何 GC 压力。
所以我有几个问题
- 在尝试使用 TPL Dataflow 构建块重现非常快速的生产者/慢速消费者场景时,我做错了什么吗
- 有什么方法可以使这项工作不会因 OOM 异常而失败。
- 有关如何在 TPL 数据流上下文中处理此类场景(非常快的生产者/慢速消费者)的最佳实践的任何 cmets/链接。
- 我对这个问题的理解是 - 由于消费者跟不上,
BufferBlock的内部缓冲区很快就会被消息填满,并且会一直等待消息,直到一些消费者回来询问下一条消息结果应用程序内存不足(由于BufferBlock的内部缓冲区已满) - 你同意吗?
我正在使用Microsoft.Tpl.Dataflow 包-版本 4.5.24。
.NET 4.5 (C# 6)。进程是 32 位的。
【问题讨论】:
标签: c# .net producer-consumer tpl-dataflow