【发布时间】:2018-08-30 02:01:08
【问题描述】:
我试图发明一种方法来消耗来自BlockingCollection 的批次,但遇到了麻烦。
这是一个最小的复制:
internal class Program
{
private static readonly BlockingCollection<string> _bc = new BlockingCollection<string>(1000);
private static int _consumed;
static void Main()
{
Task.Run(() => Producer());
Task.Run(() => Consumer());
Console.WriteLine("press [ENTER] to check");
while (true)
{
Console.ReadLine();
Console.WriteLine("consumed: " + _consumed);
}
}
private static void Producer()
{
for (var i = 0; i < 5000; i++)
_bc.Add("msg");
}
private static void Consumer()
{
foreach (var s in _bc.GetConsumingEnumerable())
{
var batchSize = _bc.Count + 1;
var batch = new List<string>(batchSize) { s };
while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
batch.Add(additionalResult);
_consumed = _consumed + batch.Count;
}
}
}
很少有消息丢失(但并不总是相同的数量)。如果无法重现,请尝试增加生成的消息数量。
我想要实现的是在消费者中使用GetConsumingEnumerable 方法(一段时间后,我将调用CompleteAdding)并能够收集一批一定大小的消息,如果它们是已经存在了。
消息丢失的原因是什么,如何正确使用?
【问题讨论】:
-
如果我删除了
while循环的&& batch.Count < batchSize部分,它可以正常工作。也就是说,由于它应该重新进入GetConsumingEnumerable循环,我不确定它为什么会错过掉队者。 -
是的。那是因为
while消耗了所有东西,foreach循环在第一个循环后退出。此外,我发现foreach循环的数量和丢失的消息之间存在明显的相关性。看起来第一个循环之后的每个循环都以某种方式丢失了一条消息
标签: c# multithreading concurrency task-parallel-library blockingcollection