【问题标题】:Take all items from ConcurrentBag using a swap使用交换从 ConcurrentBag 中取出所有项目
【发布时间】:2015-10-15 15:29:47
【问题描述】:

我正试图一口气从ConcurrentBag 获取所有项目。由于集合中没有像 TryEmpty 这样的东西,因此我采用了与此处所述相同的方式使用 Interlocked.ExchangeHow to remove all Items from ConcurrentBag?

我的代码如下所示:

private ConcurrentBag<Foo> _allFoos; //Initialized in constructor.

public bool LotsOfThreadsAccessingThisMethod(Foo toInsert)
{
    this._allFoos.Add(toInsert);
    return true;
}

public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
    var token = (CancellationToken) state;
    var workingSet = new List<Foo>();

    while (!token.IsCancellationRequested)
    {
        if (!workingSet.Any())
        {
            workingSet = Interlocked.Exchange(ref this._allFoos, new ConcurrentBag<Foo>).ToList();
        }

        var processingCount = (int)Math.Min(workingSet.Count, TRANSACTION_LIMIT);

        if (processingCount > 0)
        {
            using (var ctx = new MyEntityFrameworkContext())
            {
                ctx.BulkInsert(workingSet.Take(processingCount));
            }
            workingSet.RemoveRange(0, processingCount);
        }
    }
}

问题是这有时会遗漏添加到列表中的项目。我编写了一个测试应用程序,它将数据提供给我的ConcurrentBag.Add 方法并验证它正在发送所有数据。当我在Add 调用上设置断点并检查ConcurrentBag 之后的计数时,它为零。该项目只是没有被添加。

我相当肯定这是因为 Interlocked.Exchange 调用没有使用 ConcurrentBag 的内部锁定机制,所以它在交换的某个地方丢失了数据,但我不知道实际发生了什么。

如何在不借助自己的锁定机制的情况下一次从ConcurrentBag 中取出所有项目?还有为什么Add 会忽略这个项目?

【问题讨论】:

  • 看起来您正在实现生产者-消费者模式。您是否考虑过使用 TPL 数据流
  • 不,实际上我以前从未见过。现在阅读它。
  • _"无需求助于我自己的锁定机制" -- 既然ConcurrentBag 一开始就没有提供这样的机制,为什么你认为甚至可以在不实现自己的情况下这样做?用lock 包围代码并完成它有什么问题?请注意,InterlockedExchange 仅在所有其他访问使用相同时才保护变量;否则,您仍然有可能以不同步的方式更改变量(即,即使您尝试从集合中提取项目,其他一些代码也会获取旧值并使用它)。
  • 如果您只是尝试实现生产者-消费者,请注意BlockingCollection&lt;T&gt;(您可以使用ConcurrentBag&lt;T&gt; 对其进行初始化)提供易于使用、直接的生产者-消费者语义。如果你真正想要的只是生产者-消费者,你可以使用它。
  • 那么如果在访问元素时没有内部锁定机制,ConcurrentBag 相对于列表有什么好处?

标签: c# multithreading concurrency interlocked


【解决方案1】:

我认为不需要从ConcurentBag 获取所有项目。您可以通过如下更改处理逻辑(无需自己的同步或互锁交换)来实现您尝试实现的完全相同的行为:

public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
    var token = (CancellationToken)state;
    var buffer = new List<Foo>(TRANSACTION_LIMIT);
    while (!token.IsCancellationRequested)
    {
        Foo item;
        if (!this._allFoos.TryTake(out item))
        {
            if (buffer.Count == 0) continue;
        }
        else
        {
            buffer.Add(item);
            if (buffer.Count < TRANSACTION_LIMIT) continue;
        }
        using (var ctx = new MyEntityFrameworkContext())
        {
            ctx.BulkInsert(buffer);
        }
        buffer.Clear();
    }
}

【讨论】:

  • 我喜欢这个。但是,对于由于某种原因数据停止的情况,我必须进行一些修改。这将无限期地等待,直到达到事务限制...我会看看我是否可以使用它。谢谢
  • @Brandon 它不会。请参阅if 的正文。如果包里什么都没有,而缓冲区里有东西,它就会提交。
  • 是的……是的,它会的……那是我的错。度过了漫长的一天。
  • 我为我的 3 个 ConcurrentBag 实例对此做了一些调整,就像一个魅力。现在,这解决了我的问题,但没有回答问题。本着 SO 的精神,我还要将此标记为答案吗??
  • @Brandon 恐怕我无法回答这个问题,这似乎更适合meta。但是,如果我们认为您的原始问题是 XY problem,我想您可以:-)
猜你喜欢
  • 2011-07-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-07
  • 2012-04-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多