【问题标题】:Messages lost when consuming from BlockingCollection in batches从 BlockingCollection 批量消费时消息丢失
【发布时间】:2018-08-30 02:01:08
【问题描述】:

我试图发明一种方法来消耗来自BlockingCollection 的批次,但遇到了麻烦。

这是一个最小的复制:

internal class Program
{
    private static readonly BlockingCollection<string> _bc = new BlockingCollection<string>(1000);
    private static int _consumed;

    static void Main()
    {
        Task.Run(() => Producer());
        Task.Run(() => Consumer());
        Console.WriteLine("press [ENTER] to check");
        while (true)
        {
            Console.ReadLine();
            Console.WriteLine("consumed: " + _consumed);
        }
    }

    private static void Producer()
    {
        for (var i = 0; i < 5000; i++)
            _bc.Add("msg");
    }

    private static void Consumer()
    {
        foreach (var s in _bc.GetConsumingEnumerable())
        {
            var batchSize = _bc.Count + 1;
            var batch = new List<string>(batchSize) { s };
            while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
                batch.Add(additionalResult);
            _consumed = _consumed + batch.Count;
        }
    }
}

很少有消息丢失(但并不总是相同的数量)。如果无法重现,请尝试增加生成的消息数量。

我想要实现的是在消费者中使用GetConsumingEnumerable 方法(一段时间后,我将调用CompleteAdding)并能够收集一批一定大小的消息,如果它们是已经存在了。

消息丢失的原因是什么,如何正确使用?

【问题讨论】:

  • 如果我删除了while 循环的&amp;&amp; batch.Count &lt; batchSize 部分,它可以正常工作。也就是说,由于它应该重新进入GetConsumingEnumerable 循环,我不确定它为什么会错过掉队者。
  • 是的。那是因为while 消耗了所有东西,foreach 循环在第一个循环后退出。此外,我发现foreach 循环的数量和丢失的消息之间存在明显的相关性。看起来第一个循环之后的每个循环都以某种方式丢失了一条消息

标签: c# multithreading concurrency task-parallel-library blockingcollection


【解决方案1】:

哇。这是一个错误。这一行

while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)

应该是

while (batch.Count < batchSize && _bc.TryTake(out var additionalResult))

因为第一个条件具有从集合中删除项目的副作用。

【讨论】:

  • 现在看起来多么简单,过去 2 小时我是多么盲目
【解决方案2】:
 [__DynamicallyInvokable]
    public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
    {
      ...
        while (!this.IsCompleted)
        {
          T obj;
          if (this.TryTakeWithNoTimeValidation(out obj, -1, cancellationToken, linkedTokenSource))
            yield return obj;
        }
      ...
    }

public bool TryTake(out T item)
{
  ...
  return this.TryTakeWithNoTimeValidation(out item, (int) timeout.TotalMilliseconds, CancellationToken.None, (CancellationTokenSource) null);
}

TryTake 和 GetConsumingEnumerable 都使用方法 TryTakeWithNoTimeValidation 。我假设 GetConsumingEnumerable 从集合中删除了缺失的元素。考虑以下示例:

private static void Producer()
{
    Console.WriteLine($"begin produce isCompleted:{_bc.IsCompleted}");
    for (var i = 0; i < 5000; i++)
        _bc.Add($"msg:{i}");
    _bc.CompleteAdding();
    Console.WriteLine($"end produce isCompleted:{_bc.IsCompleted}");
}
var batch = new List<string>();
foreach (var s in _bc.GetConsumingEnumerable())
{
    batch.Add(s);
    if (_bc.IsCompleted && _bc.Count == 0)
    {
       break;
    }
}
Console.WriteLine($"first:{batch.First()}, last:{batch.Last()}");
Console.WriteLine($"consumed:{batch.Count}");

_bc 为空。 有几种方法可以实现您的算法,其中一种我建议在生产者之前使用 Take 并调用消费者(这会阻止调用线程)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-12-15
    • 2021-04-24
    • 2016-06-14
    • 1970-01-01
    • 2021-01-26
    • 1970-01-01
    • 2019-07-12
    • 2021-06-23
    相关资源
    最近更新 更多